summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0198
diff options
context:
space:
mode:
authorFlorent Le Coz <louiz@louiz.org>2014-07-17 14:19:04 +0200
committerFlorent Le Coz <louiz@louiz.org>2014-07-17 14:19:04 +0200
commit5ab77c745270d7d5c016c1dc7ef2a82533a4b16e (patch)
tree259377cc666f8b9c7954fc4e7b8f7a912bcfe101 /slixmpp/plugins/xep_0198
parente5582694c07236e6830c20361840360a1dde37f3 (diff)
downloadslixmpp-5ab77c745270d7d5c016c1dc7ef2a82533a4b16e.tar.gz
slixmpp-5ab77c745270d7d5c016c1dc7ef2a82533a4b16e.tar.bz2
slixmpp-5ab77c745270d7d5c016c1dc7ef2a82533a4b16e.tar.xz
slixmpp-5ab77c745270d7d5c016c1dc7ef2a82533a4b16e.zip
Rename to slixmpp
Diffstat (limited to 'slixmpp/plugins/xep_0198')
-rw-r--r--slixmpp/plugins/xep_0198/__init__.py20
-rw-r--r--slixmpp/plugins/xep_0198/stanza.py150
-rw-r--r--slixmpp/plugins/xep_0198/stream_management.py314
3 files changed, 484 insertions, 0 deletions
diff --git a/slixmpp/plugins/xep_0198/__init__.py b/slixmpp/plugins/xep_0198/__init__.py
new file mode 100644
index 00000000..bd709041
--- /dev/null
+++ b/slixmpp/plugins/xep_0198/__init__.py
@@ -0,0 +1,20 @@
+"""
+ Slixmpp: The Slick XMPP Library
+ Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
+ This file is part of Slixmpp.
+
+ See the file LICENSE for copying permission.
+"""
+
+from slixmpp.plugins.base import register_plugin
+
+from slixmpp.plugins.xep_0198.stanza import Enable, Enabled
+from slixmpp.plugins.xep_0198.stanza import Resume, Resumed
+from slixmpp.plugins.xep_0198.stanza import Failed
+from slixmpp.plugins.xep_0198.stanza import StreamManagement
+from slixmpp.plugins.xep_0198.stanza import Ack, RequestAck
+
+from slixmpp.plugins.xep_0198.stream_management import XEP_0198
+
+
+register_plugin(XEP_0198)
diff --git a/slixmpp/plugins/xep_0198/stanza.py b/slixmpp/plugins/xep_0198/stanza.py
new file mode 100644
index 00000000..b1c4c010
--- /dev/null
+++ b/slixmpp/plugins/xep_0198/stanza.py
@@ -0,0 +1,150 @@
+"""
+ Slixmpp: The Slick XMPP Library
+ Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
+ This file is part of Slixmpp.
+
+ See the file LICENSE for copying permission.
+"""
+
+from slixmpp.stanza import Error
+from slixmpp.xmlstream import ElementBase, StanzaBase
+
+
+class Enable(StanzaBase):
+ name = 'enable'
+ namespace = 'urn:xmpp:sm:3'
+ interfaces = set(['max', 'resume'])
+
+ def setup(self, xml):
+ StanzaBase.setup(self, xml)
+ self.xml.tag = self.tag_name()
+
+ def get_resume(self):
+ return self._get_attr('resume', 'false').lower() in ('true', '1')
+
+ def set_resume(self, val):
+ self._del_attr('resume')
+ self._set_attr('resume', 'true' if val else 'false')
+
+
+class Enabled(StanzaBase):
+ name = 'enabled'
+ namespace = 'urn:xmpp:sm:3'
+ interfaces = set(['id', 'location', 'max', 'resume'])
+
+ def setup(self, xml):
+ StanzaBase.setup(self, xml)
+ self.xml.tag = self.tag_name()
+
+ def get_resume(self):
+ return self._get_attr('resume', 'false').lower() in ('true', '1')
+
+ def set_resume(self, val):
+ self._del_attr('resume')
+ self._set_attr('resume', 'true' if val else 'false')
+
+
+class Resume(StanzaBase):
+ name = 'resume'
+ namespace = 'urn:xmpp:sm:3'
+ interfaces = set(['h', 'previd'])
+
+ def setup(self, xml):
+ StanzaBase.setup(self, xml)
+ self.xml.tag = self.tag_name()
+
+ def get_h(self):
+ h = self._get_attr('h', None)
+ if h:
+ return int(h)
+ return None
+
+ def set_h(self, val):
+ self._set_attr('h', str(val))
+
+
+class Resumed(StanzaBase):
+ name = 'resumed'
+ namespace = 'urn:xmpp:sm:3'
+ interfaces = set(['h', 'previd'])
+
+ def setup(self, xml):
+ StanzaBase.setup(self, xml)
+ self.xml.tag = self.tag_name()
+
+ def get_h(self):
+ h = self._get_attr('h', None)
+ if h:
+ return int(h)
+ return None
+
+ def set_h(self, val):
+ self._set_attr('h', str(val))
+
+
+class Failed(StanzaBase, Error):
+ name = 'failed'
+ namespace = 'urn:xmpp:sm:3'
+ interfaces = set()
+
+ def setup(self, xml):
+ StanzaBase.setup(self, xml)
+ self.xml.tag = self.tag_name()
+
+
+class StreamManagement(ElementBase):
+ name = 'sm'
+ namespace = 'urn:xmpp:sm:3'
+ plugin_attrib = name
+ interfaces = set(['required', 'optional'])
+
+ def get_required(self):
+ return self.find('{%s}required' % self.namespace) is not None
+
+ def set_required(self, val):
+ self.del_required()
+ if val:
+ self._set_sub_text('required', '', keep=True)
+
+ def del_required(self):
+ self._del_sub('required')
+
+ def get_optional(self):
+ return self.find('{%s}optional' % self.namespace) is not None
+
+ def set_optional(self, val):
+ self.del_optional()
+ if val:
+ self._set_sub_text('optional', '', keep=True)
+
+ def del_optional(self):
+ self._del_sub('optional')
+
+
+class RequestAck(StanzaBase):
+ name = 'r'
+ namespace = 'urn:xmpp:sm:3'
+ interfaces = set()
+
+ def setup(self, xml):
+ StanzaBase.setup(self, xml)
+ self.xml.tag = self.tag_name()
+
+
+class Ack(StanzaBase):
+ name = 'a'
+ namespace = 'urn:xmpp:sm:3'
+ interfaces = set(['h'])
+
+ def setup(self, xml):
+ StanzaBase.setup(self, xml)
+ self.xml.tag = self.tag_name()
+
+ def get_h(self):
+ h = self._get_attr('h', None)
+ if h:
+ return int(h)
+ return None
+
+ def set_h(self, val):
+ self._set_attr('h', str(val))
diff --git a/slixmpp/plugins/xep_0198/stream_management.py b/slixmpp/plugins/xep_0198/stream_management.py
new file mode 100644
index 00000000..64052fc5
--- /dev/null
+++ b/slixmpp/plugins/xep_0198/stream_management.py
@@ -0,0 +1,314 @@
+"""
+ Slixmpp: The Slick XMPP Library
+ Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
+ This file is part of Slixmpp.
+
+ See the file LICENSE for copying permission.
+"""
+
+import logging
+import threading
+import collections
+
+from slixmpp.stanza import Message, Presence, Iq, StreamFeatures
+from slixmpp.xmlstream import register_stanza_plugin
+from slixmpp.xmlstream.handler import Callback, Waiter
+from slixmpp.xmlstream.matcher import MatchXPath, MatchMany
+from slixmpp.plugins.base import BasePlugin
+from slixmpp.plugins.xep_0198 import stanza
+
+
+log = logging.getLogger(__name__)
+
+
+MAX_SEQ = 2 ** 32
+
+
+class XEP_0198(BasePlugin):
+
+ """
+ XEP-0198: Stream Management
+ """
+
+ name = 'xep_0198'
+ description = 'XEP-0198: Stream Management'
+ dependencies = set()
+ stanza = stanza
+ default_config = {
+ #: The last ack number received from the server.
+ 'last_ack': 0,
+
+ #: The number of stanzas to wait between sending ack requests to
+ #: the server. Setting this to ``1`` will send an ack request after
+ #: every sent stanza. Defaults to ``5``.
+ 'window': 5,
+
+ #: The stream management ID for the stream. Knowing this value is
+ #: required in order to do stream resumption.
+ 'sm_id': None,
+
+ #: A counter of handled incoming stanzas, mod 2^32.
+ 'handled': 0,
+
+ #: A counter of unacked outgoing stanzas, mod 2^32.
+ 'seq': 0,
+
+ #: Control whether or not the ability to resume the stream will be
+ #: requested when enabling stream management. Defaults to ``True``.
+ 'allow_resume': True,
+
+ 'order': 10100,
+ 'resume_order': 9000
+ }
+
+ def plugin_init(self):
+ """Start the XEP-0198 plugin."""
+
+ # Only enable stream management for non-components,
+ # since components do not yet perform feature negotiation.
+ if self.xmpp.is_component:
+ return
+
+ self.window_counter = self.window
+ self.window_counter_lock = threading.Lock()
+
+ self.enabled = threading.Event()
+ self.unacked_queue = collections.deque()
+
+ self.seq_lock = threading.Lock()
+ self.handled_lock = threading.Lock()
+ self.ack_lock = threading.Lock()
+
+ register_stanza_plugin(StreamFeatures, stanza.StreamManagement)
+ self.xmpp.register_stanza(stanza.Enable)
+ self.xmpp.register_stanza(stanza.Enabled)
+ self.xmpp.register_stanza(stanza.Resume)
+ self.xmpp.register_stanza(stanza.Resumed)
+ self.xmpp.register_stanza(stanza.Ack)
+ self.xmpp.register_stanza(stanza.RequestAck)
+
+ # Only end the session when a </stream> element is sent,
+ # not just because the connection has died.
+ self.xmpp.end_session_on_disconnect = False
+
+ # Register the feature twice because it may be ordered two
+ # different ways: enabling after binding and resumption
+ # before binding.
+ self.xmpp.register_feature('sm',
+ self._handle_sm_feature,
+ restart=True,
+ order=self.order)
+ self.xmpp.register_feature('sm',
+ self._handle_sm_feature,
+ restart=True,
+ order=self.resume_order)
+
+ self.xmpp.register_handler(
+ Callback('Stream Management Enabled',
+ MatchXPath(stanza.Enabled.tag_name()),
+ self._handle_enabled,
+ instream=True))
+
+ self.xmpp.register_handler(
+ Callback('Stream Management Resumed',
+ MatchXPath(stanza.Resumed.tag_name()),
+ self._handle_resumed,
+ instream=True))
+
+ self.xmpp.register_handler(
+ Callback('Stream Management Failed',
+ MatchXPath(stanza.Failed.tag_name()),
+ self._handle_failed,
+ instream=True))
+
+ self.xmpp.register_handler(
+ Callback('Stream Management Ack',
+ MatchXPath(stanza.Ack.tag_name()),
+ self._handle_ack,
+ instream=True))
+
+ self.xmpp.register_handler(
+ Callback('Stream Management Request Ack',
+ MatchXPath(stanza.RequestAck.tag_name()),
+ self._handle_request_ack,
+ instream=True))
+
+ self.xmpp.add_filter('in', self._handle_incoming)
+ self.xmpp.add_filter('out_sync', self._handle_outgoing)
+
+ self.xmpp.add_event_handler('session_end', self.session_end)
+
+ def plugin_end(self):
+ if self.xmpp.is_component:
+ return
+
+ self.xmpp.unregister_feature('sm', self.order)
+ self.xmpp.unregister_feature('sm', self.resume_order)
+ self.xmpp.del_event_handler('session_end', self.session_end)
+ self.xmpp.del_filter('in', self._handle_incoming)
+ self.xmpp.del_filter('out_sync', self._handle_outgoing)
+ self.xmpp.remove_handler('Stream Management Enabled')
+ self.xmpp.remove_handler('Stream Management Resumed')
+ self.xmpp.remove_handler('Stream Management Failed')
+ self.xmpp.remove_handler('Stream Management Ack')
+ self.xmpp.remove_handler('Stream Management Request Ack')
+ self.xmpp.remove_stanza(stanza.Enable)
+ self.xmpp.remove_stanza(stanza.Enabled)
+ self.xmpp.remove_stanza(stanza.Resume)
+ self.xmpp.remove_stanza(stanza.Resumed)
+ self.xmpp.remove_stanza(stanza.Ack)
+ self.xmpp.remove_stanza(stanza.RequestAck)
+
+ def session_end(self, event):
+ """Reset stream management state."""
+ self.enabled.clear()
+ self.unacked_queue.clear()
+ self.sm_id = None
+ self.handled = 0
+ self.seq = 0
+ self.last_ack = 0
+
+ def send_ack(self):
+ """Send the current ack count to the server."""
+ ack = stanza.Ack(self.xmpp)
+ with self.handled_lock:
+ ack['h'] = self.handled
+ self.xmpp.send_raw(str(ack), now=True)
+
+ def request_ack(self, e=None):
+ """Request an ack from the server."""
+ req = stanza.RequestAck(self.xmpp)
+ self.xmpp.send_queue.put(str(req))
+
+ def _handle_sm_feature(self, features):
+ """
+ Enable or resume stream management.
+
+ If no SM-ID is stored, and resource binding has taken place,
+ stream management will be enabled.
+
+ If an SM-ID is known, and the server allows resumption, the
+ previous stream will be resumed.
+ """
+ if 'stream_management' in self.xmpp.features:
+ # We've already negotiated stream management,
+ # so no need to do it again.
+ return False
+ if not self.sm_id:
+ if 'bind' in self.xmpp.features:
+ self.enabled.set()
+ enable = stanza.Enable(self.xmpp)
+ enable['resume'] = self.allow_resume
+ enable.send(now=True)
+ self.handled = 0
+ elif self.sm_id and self.allow_resume:
+ self.enabled.set()
+ resume = stanza.Resume(self.xmpp)
+ resume['h'] = self.handled
+ resume['previd'] = self.sm_id
+ resume.send(now=True)
+
+ # Wait for a response before allowing stream feature processing
+ # to continue. The actual result processing will be done in the
+ # _handle_resumed() or _handle_failed() methods.
+ waiter = Waiter('resumed_or_failed',
+ MatchMany([
+ MatchXPath(stanza.Resumed.tag_name()),
+ MatchXPath(stanza.Failed.tag_name())]))
+ self.xmpp.register_handler(waiter)
+ result = waiter.wait()
+ if result is not None and result.name == 'resumed':
+ return True
+ return False
+
+ def _handle_enabled(self, stanza):
+ """Save the SM-ID, if provided.
+
+ Raises an :term:`sm_enabled` event.
+ """
+ self.xmpp.features.add('stream_management')
+ if stanza['id']:
+ self.sm_id = stanza['id']
+ self.xmpp.event('sm_enabled', stanza)
+
+ def _handle_resumed(self, stanza):
+ """Finish resuming a stream by resending unacked stanzas.
+
+ Raises a :term:`session_resumed` event.
+ """
+ self.xmpp.features.add('stream_management')
+ self._handle_ack(stanza)
+ for id, stanza in self.unacked_queue:
+ self.xmpp.send(stanza, now=True, use_filters=False)
+ self.xmpp.session_started_event.set()
+ self.xmpp.event('session_resumed', stanza)
+
+ def _handle_failed(self, stanza):
+ """
+ Disable and reset any features used since stream management was
+ requested (tracked stanzas may have been sent during the interval
+ between the enable request and the enabled response).
+
+ Raises an :term:`sm_failed` event.
+ """
+ self.enabled.clear()
+ self.unacked_queue.clear()
+ self.xmpp.event('sm_failed', stanza)
+
+ def _handle_ack(self, ack):
+ """Process a server ack by freeing acked stanzas from the queue.
+
+ Raises a :term:`stanza_acked` event for each acked stanza.
+ """
+ if ack['h'] == self.last_ack:
+ return
+
+ with self.ack_lock:
+ num_acked = (ack['h'] - self.last_ack) % MAX_SEQ
+ num_unacked = len(self.unacked_queue)
+ log.debug("Ack: %s, Last Ack: %s, " + \
+ "Unacked: %s, Num Acked: %s, " + \
+ "Remaining: %s",
+ ack['h'],
+ self.last_ack,
+ num_unacked,
+ num_acked,
+ num_unacked - num_acked)
+ for x in range(num_acked):
+ seq, stanza = self.unacked_queue.popleft()
+ self.xmpp.event('stanza_acked', stanza)
+ self.last_ack = ack['h']
+
+ def _handle_request_ack(self, req):
+ """Handle an ack request by sending an ack."""
+ self.send_ack()
+
+ def _handle_incoming(self, stanza):
+ """Increment the handled counter for each inbound stanza."""
+ if not self.enabled.is_set():
+ return stanza
+
+ if isinstance(stanza, (Message, Presence, Iq)):
+ with self.handled_lock:
+ # Sequence numbers are mod 2^32
+ self.handled = (self.handled + 1) % MAX_SEQ
+ return stanza
+
+ def _handle_outgoing(self, stanza):
+ """Store outgoing stanzas in a queue to be acked."""
+ if not self.enabled.is_set():
+ return stanza
+
+ if isinstance(stanza, (Message, Presence, Iq)):
+ seq = None
+ with self.seq_lock:
+ # Sequence numbers are mod 2^32
+ self.seq = (self.seq + 1) % MAX_SEQ
+ seq = self.seq
+ self.unacked_queue.append((seq, stanza))
+ with self.window_counter_lock:
+ self.window_counter -= 1
+ if self.window_counter == 0:
+ self.window_counter = self.window
+ self.request_ack()
+ return stanza