From 73cabcb6ae130519020a743567dec107a1c9f057 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Sun, 18 Mar 2012 01:02:19 -0700 Subject: Add initial support for XEP-0198 for stream management. --- setup.py | 1 + sleekxmpp/plugins/__init__.py | 1 + sleekxmpp/plugins/xep_0198/__init__.py | 20 ++ sleekxmpp/plugins/xep_0198/stanza.py | 151 ++++++++++++++ sleekxmpp/plugins/xep_0198/stream_management.py | 266 ++++++++++++++++++++++++ 5 files changed, 439 insertions(+) create mode 100644 sleekxmpp/plugins/xep_0198/__init__.py create mode 100644 sleekxmpp/plugins/xep_0198/stanza.py create mode 100644 sleekxmpp/plugins/xep_0198/stream_management.py diff --git a/setup.py b/setup.py index 021298b6..670c4aff 100755 --- a/setup.py +++ b/setup.py @@ -78,6 +78,7 @@ packages = [ 'sleekxmpp', 'sleekxmpp/plugins/xep_0128', 'sleekxmpp/plugins/xep_0172', 'sleekxmpp/plugins/xep_0184', + 'sleekxmpp/plugins/xep_0198', 'sleekxmpp/plugins/xep_0199', 'sleekxmpp/plugins/xep_0202', 'sleekxmpp/plugins/xep_0203', diff --git a/sleekxmpp/plugins/__init__.py b/sleekxmpp/plugins/__init__.py index 8f8f851a..c374f27b 100644 --- a/sleekxmpp/plugins/__init__.py +++ b/sleekxmpp/plugins/__init__.py @@ -41,6 +41,7 @@ __all__ = [ 'xep_0163', # Personal Eventing Protocol 'xep_0172', # User Nickname 'xep_0184', # Message Receipts + 'xep_0198', # Stream Management 'xep_0199', # Ping 'xep_0202', # Entity Time 'xep_0203', # Delayed Delivery diff --git a/sleekxmpp/plugins/xep_0198/__init__.py b/sleekxmpp/plugins/xep_0198/__init__.py new file mode 100644 index 00000000..db930347 --- /dev/null +++ b/sleekxmpp/plugins/xep_0198/__init__.py @@ -0,0 +1,20 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.plugins.base import register_plugin + +from sleekxmpp.plugins.xep_0198.stanza import Enable, Enabled +from sleekxmpp.plugins.xep_0198.stanza import Resume, Resumed +from sleekxmpp.plugins.xep_0198.stanza import Failed +from sleekxmpp.plugins.xep_0198.stanza import StreamManagement +from sleekxmpp.plugins.xep_0198.stanza import Ack, RequestAck + +from sleekxmpp.plugins.xep_0198.stream_management import XEP_0198 + + +register_plugin(XEP_0198) diff --git a/sleekxmpp/plugins/xep_0198/stanza.py b/sleekxmpp/plugins/xep_0198/stanza.py new file mode 100644 index 00000000..5cf93436 --- /dev/null +++ b/sleekxmpp/plugins/xep_0198/stanza.py @@ -0,0 +1,151 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.stanza import Error +from sleekxmpp.xmlstream import ElementBase, StanzaBase + + +class Enable(StanzaBase): + name = 'enable' + namespace = 'urn:xmpp:sm:3' + interfaces = set(['max', 'resume']) + + def setup(self, xml): + StanzaBase.setup(self, xml) + self.xml.tag = self.tag_name() + + def get_resume(self): + return self._get_attr('resume', 'false').lower() in ('true', '1') + + def set_resume(self, val): + self._del_attr('resume') + self._set_attr('resume', 'true' if val else 'false') + + +class Enabled(StanzaBase): + name = 'enabled' + namespace = 'urn:xmpp:sm:3' + interfaces = set(['id', 'location', 'max', 'resume']) + + def setup(self, xml): + StanzaBase.setup(self, xml) + self.xml.tag = self.tag_name() + + def get_resume(self): + return self._get_attr('resume', 'false').lower() in ('true', '1') + + def set_resume(self, val): + self._del_attr('resume') + self._set_attr('resume', 'true' if val else 'false') + + +class Resume(StanzaBase): + name = 'resume' + namespace = 'urn:xmpp:sm:3' + interfaces = set(['h', 'previd']) + + def setup(self, xml): + StanzaBase.setup(self, xml) + self.xml.tag = self.tag_name() + + def get_h(self): + h = self._get_attr('h', None) + if h: + return int(h) + return None + + def set_h(self, val): + self._set_attr('h', str(val)) + + +class Resumed(StanzaBase): + name = 'resumed' + namespace = 'urn:xmpp:sm:3' + interfaces = set(['h', 'previd']) + + def setup(self, xml): + StanzaBase.setup(self, xml) + self.xml.tag = self.tag_name() + + def get_h(self): + h = self._get_attr('h', None) + if h: + return int(h) + return None + + def set_h(self, val): + self._set_attr('h', str(val)) + + + +class Failed(StanzaBase, Error): + name = 'failed' + namespace = 'urn:xmpp:sm:3' + interfaces = set() + + def setup(self, xml): + StanzaBase.setup(self, xml) + self.xml.tag = self.tag_name() + + +class StreamManagement(ElementBase): + name = 'sm' + namespace = 'urn:xmpp:sm:3' + plugin_attrib = name + interfaces = set(['required', 'optional']) + + def get_required(self): + return self.find('{%s}required' % self.namespace) is not None + + def set_required(self, val): + self.del_required() + if val: + self._set_sub_text('required', '', keep=True) + + def del_required(self): + self._del_sub('required') + + def get_optional(self): + return self.find('{%s}optional' % self.namespace) is not None + + def set_optional(self, val): + self.del_optional() + if val: + self._set_sub_text('optional', '', keep=True) + + def del_optional(self): + self._del_sub('optional') + + +class RequestAck(StanzaBase): + name = 'r' + namespace = 'urn:xmpp:sm:3' + interfaces = set() + + def setup(self, xml): + StanzaBase.setup(self, xml) + self.xml.tag = self.tag_name() + + +class Ack(StanzaBase): + name = 'a' + namespace = 'urn:xmpp:sm:3' + interfaces = set(['h']) + + def setup(self, xml): + StanzaBase.setup(self, xml) + self.xml.tag = self.tag_name() + + def get_h(self): + h = self._get_attr('h', None) + if h: + return int(h) + return None + + def set_h(self, val): + self._set_attr('h', str(val)) diff --git a/sleekxmpp/plugins/xep_0198/stream_management.py b/sleekxmpp/plugins/xep_0198/stream_management.py new file mode 100644 index 00000000..6ed1ea26 --- /dev/null +++ b/sleekxmpp/plugins/xep_0198/stream_management.py @@ -0,0 +1,266 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +import logging +import threading +import collections + +from sleekxmpp.stanza import Message, Presence, Iq, StreamFeatures +from sleekxmpp.xmlstream import register_stanza_plugin +from sleekxmpp.xmlstream.handler import Callback, Waiter +from sleekxmpp.xmlstream.matcher import MatchXPath, MatchMany +from sleekxmpp.plugins.base import BasePlugin +from sleekxmpp.plugins.xep_0198 import stanza + + +log = logging.getLogger(__name__) + + +MAX_SEQ = 2**32 + + +class XEP_0198(BasePlugin): + + """ + XEP-0198: Stream Management + """ + + name = 'xep_0198' + description = 'XEP-0198: Stream Management' + dependencies = set() + stanza = stanza + + def plugin_init(self): + """Start the XEP-0198 plugin.""" + + # Only enable stream management for non-components, + # since components do not yet perform feature negotiation. + if self.xmpp.is_component: + return + + #: The stream management ID for the stream. Knowing this value is + #: required in order to do stream resumption. + self.sm_id = self.config.get('sm_id', None) + + #: A counter of handled incoming stanzas, mod 2^32. + self.handled = self.config.get('handled', 0) + + #: A counter of unacked outgoing stanzas, mod 2^32. + self.seq = self.config.get('seq', 0) + + #: The last ack number received from the server. + self.last_ack = self.config.get('last_ack', 0) + + #: The number of stanzas to wait between sending ack requests to + #: the server. Setting this to ``1`` will send an ack request after + #: every sent stanza. Defaults to ``5``. + self.window = self.config.get('window', 5) + + #: Control whether or not the ability to resume the stream will be + #: requested when enabling stream management. Defaults to ``True``. + self.allow_resume = self.config.get('allow_resume', True) + + self.enabled = threading.Event() + self.unacked_queue = collections.deque() + + self.seq_lock = threading.Lock() + self.handled_lock = threading.Lock() + self.ack_lock = threading.Lock() + + register_stanza_plugin(StreamFeatures, stanza.StreamManagement) + self.xmpp.register_stanza(stanza.Enable) + self.xmpp.register_stanza(stanza.Enabled) + self.xmpp.register_stanza(stanza.Resume) + self.xmpp.register_stanza(stanza.Resumed) + self.xmpp.register_stanza(stanza.Ack) + self.xmpp.register_stanza(stanza.RequestAck) + + # Register the feature twice because it may be ordered two + # different ways: enabling after binding and resumption + # before binding. + self.xmpp.register_feature('sm', + self._handle_sm_feature, + restart=True, + order=self.config.get('order', 10100)) + self.xmpp.register_feature('sm', + self._handle_sm_feature, + restart=True, + order=self.config.get('resume_order', 9000)) + + self.xmpp.register_handler( + Callback('Stream Management Enabled', + MatchXPath(stanza.Enabled.tag_name()), + self._handle_enabled, + instream=True)) + + self.xmpp.register_handler( + Callback('Stream Management Resumed', + MatchXPath(stanza.Resumed.tag_name()), + self._handle_resumed, + instream=True)) + + self.xmpp.register_handler( + Callback('Stream Management Failed', + MatchXPath(stanza.Failed.tag_name()), + self._handle_failed, + instream=True)) + + self.xmpp.register_handler( + Callback('Stream Management Ack', + MatchXPath(stanza.Ack.tag_name()), + self._handle_ack, + instream=True)) + + self.xmpp.register_handler( + Callback('Stream Management Request Ack', + MatchXPath(stanza.RequestAck.tag_name()), + self._handle_request_ack, + instream=True)) + + self.xmpp.add_filter('in', self._handle_incoming) + self.xmpp.add_filter('out_sync', self._handle_outgoing) + + self.xmpp.add_event_handler('need_ack', self.request_ack) + + def send_ack(self): + """Send the current ack count to the server.""" + ack = stanza.Ack(self.xmpp) + with self.handled_lock: + ack['h'] = self.handled + ack.send() + + def request_ack(self, e=None): + """Request an ack from the server.""" + req = stanza.RequestAck(self.xmpp) + req.send() + + def _handle_sm_feature(self, features): + """ + Enable or resume stream management. + + If no SM-ID is stored, and resource binding has taken place, + stream management will be enabled. + + If an SM-ID is known, and the server allows resumption, the + previous stream will be resumed. + """ + if 'stream_management' in self.xmpp.features: + # We've already negotiated stream management, + # so no need to do it again. + return False + if not self.sm_id: + if 'bind' in self.xmpp.features: + self.enabled.set() + enable = stanza.Enable(self.xmpp) + enable['resume'] = self.allow_resume + enable.send() + self.handled = 0 + elif self.sm_id and self.allow_resume: + self.enabled.set() + resume = stanza.Resume(self.xmpp) + resume['h'] = self.handled + resume['previd'] = self.sm_id + resume.send(now=True) + + # Wait for a response before allowing stream feature processing + # to continue. The actual result processing will be done in the + # _handle_resumed() or _handle_failed() methods. + waiter = Waiter('resumed_or_failed', + MatchMany([ + MatchXPath(stanza.Resumed.tag_name()), + MatchXPath(stanza.Failed.tag_name())])) + self.xmpp.register_handler(waiter) + result = waiter.wait() + if result is not None and result.name == 'resumed': + return True + return False + + def _handle_enabled(self, stanza): + """Save the SM-ID, if provided. + + Raises an :term:`sm_enabled` event. + """ + self.xmpp.features.add('stream_management') + if stanza['id']: + self.sm_id = stanza['id'] + self.xmpp.event('sm_enabled', stanza) + + def _handle_resumed(self, stanza): + """Finish resuming a stream by resending unacked stanzas. + + Raises a :term:`session_resumed` event. + """ + self.xmpp.features.add('stream_management') + self._handle_ack(stanza) + for id, stanza in self.unacked_queue: + self.xmpp.send(stanza, now=True, use_filters=False) + self.xmpp.session_started_event.set() + self.xmpp.event('session_resumed', stanza) + + def _handle_failed(self, stanza): + """ + Disable and reset any features used since stream management was + requested (tracked stanzas may have been sent during the interval + between the enable request and the enabled response). + + Raises an :term:`sm_failed` event. + """ + self.enabled.clear() + self.unacked_queue.clear() + self.xmpp.event('sm_failed', stanza) + + def _handle_ack(self, ack): + """Process a server ack by freeing acked stanzas from the queue. + + Raises a :term:`stanza_acked` event for each acked stanza. + """ + if ack['h'] == self.last_ack: + return + + with self.ack_lock: + num_acked = (ack['h'] - self.last_ack) % MAX_SEQ + log.debug("Ack: %s, Last Ack: %s, Num Acked: %s, Unacked: %s", + ack['h'], + self.last_ack, + num_acked, + len(self.unacked_queue)) + for x in range(num_acked): + seq, stanza = self.unacked_queue.popleft() + self.xmpp.event('stanza_acked', stanza) + self.last_ack = ack['h'] + + def _handle_request_ack(self, req): + """Handle an ack request by sending an ack.""" + self.send_ack() + + def _handle_incoming(self, stanza): + """Increment the handled counter for each inbound stanza.""" + if not self.enabled.is_set(): + return stanza + + if isinstance(stanza, (Message, Presence, Iq)): + with self.handled_lock: + # Sequence numbers are mod 2^32 + self.handled = (self.handled + 1) % MAX_SEQ + return stanza + + def _handle_outgoing(self, stanza): + """Store outgoing stanzas in a queue to be acked.""" + if not self.enabled.is_set(): + return stanza + + if isinstance(stanza, (Message, Presence, Iq)): + seq = None + with self.seq_lock: + # Sequence numbers are mod 2^32 + self.seq = (self.seq + 1) % MAX_SEQ + seq = self.seq + self.unacked_queue.append((seq, stanza)) + if len(self.unacked_queue) > self.window: + self.xmpp.event('need_ack') + return stanza -- cgit v1.2.3