summaryrefslogtreecommitdiff
path: root/sleekxmpp/plugins/xep_0198
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/plugins/xep_0198')
-rw-r--r--sleekxmpp/plugins/xep_0198/__init__.py20
-rw-r--r--sleekxmpp/plugins/xep_0198/stanza.py150
-rw-r--r--sleekxmpp/plugins/xep_0198/stream_management.py314
3 files changed, 0 insertions, 484 deletions
diff --git a/sleekxmpp/plugins/xep_0198/__init__.py b/sleekxmpp/plugins/xep_0198/__init__.py
deleted file mode 100644
index db930347..00000000
--- a/sleekxmpp/plugins/xep_0198/__init__.py
+++ /dev/null
@@ -1,20 +0,0 @@
-"""
- SleekXMPP: The Sleek XMPP Library
- Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
- This file is part of SleekXMPP.
-
- See the file LICENSE for copying permission.
-"""
-
-from sleekxmpp.plugins.base import register_plugin
-
-from sleekxmpp.plugins.xep_0198.stanza import Enable, Enabled
-from sleekxmpp.plugins.xep_0198.stanza import Resume, Resumed
-from sleekxmpp.plugins.xep_0198.stanza import Failed
-from sleekxmpp.plugins.xep_0198.stanza import StreamManagement
-from sleekxmpp.plugins.xep_0198.stanza import Ack, RequestAck
-
-from sleekxmpp.plugins.xep_0198.stream_management import XEP_0198
-
-
-register_plugin(XEP_0198)
diff --git a/sleekxmpp/plugins/xep_0198/stanza.py b/sleekxmpp/plugins/xep_0198/stanza.py
deleted file mode 100644
index 6461d766..00000000
--- a/sleekxmpp/plugins/xep_0198/stanza.py
+++ /dev/null
@@ -1,150 +0,0 @@
-"""
- SleekXMPP: The Sleek XMPP Library
- Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
- This file is part of SleekXMPP.
-
- See the file LICENSE for copying permission.
-"""
-
-from sleekxmpp.stanza import Error
-from sleekxmpp.xmlstream import ElementBase, StanzaBase
-
-
-class Enable(StanzaBase):
- name = 'enable'
- namespace = 'urn:xmpp:sm:3'
- interfaces = set(['max', 'resume'])
-
- def setup(self, xml):
- StanzaBase.setup(self, xml)
- self.xml.tag = self.tag_name()
-
- def get_resume(self):
- return self._get_attr('resume', 'false').lower() in ('true', '1')
-
- def set_resume(self, val):
- self._del_attr('resume')
- self._set_attr('resume', 'true' if val else 'false')
-
-
-class Enabled(StanzaBase):
- name = 'enabled'
- namespace = 'urn:xmpp:sm:3'
- interfaces = set(['id', 'location', 'max', 'resume'])
-
- def setup(self, xml):
- StanzaBase.setup(self, xml)
- self.xml.tag = self.tag_name()
-
- def get_resume(self):
- return self._get_attr('resume', 'false').lower() in ('true', '1')
-
- def set_resume(self, val):
- self._del_attr('resume')
- self._set_attr('resume', 'true' if val else 'false')
-
-
-class Resume(StanzaBase):
- name = 'resume'
- namespace = 'urn:xmpp:sm:3'
- interfaces = set(['h', 'previd'])
-
- def setup(self, xml):
- StanzaBase.setup(self, xml)
- self.xml.tag = self.tag_name()
-
- def get_h(self):
- h = self._get_attr('h', None)
- if h:
- return int(h)
- return None
-
- def set_h(self, val):
- self._set_attr('h', str(val))
-
-
-class Resumed(StanzaBase):
- name = 'resumed'
- namespace = 'urn:xmpp:sm:3'
- interfaces = set(['h', 'previd'])
-
- def setup(self, xml):
- StanzaBase.setup(self, xml)
- self.xml.tag = self.tag_name()
-
- def get_h(self):
- h = self._get_attr('h', None)
- if h:
- return int(h)
- return None
-
- def set_h(self, val):
- self._set_attr('h', str(val))
-
-
-class Failed(StanzaBase, Error):
- name = 'failed'
- namespace = 'urn:xmpp:sm:3'
- interfaces = set()
-
- def setup(self, xml):
- StanzaBase.setup(self, xml)
- self.xml.tag = self.tag_name()
-
-
-class StreamManagement(ElementBase):
- name = 'sm'
- namespace = 'urn:xmpp:sm:3'
- plugin_attrib = name
- interfaces = set(['required', 'optional'])
-
- def get_required(self):
- return self.find('{%s}required' % self.namespace) is not None
-
- def set_required(self, val):
- self.del_required()
- if val:
- self._set_sub_text('required', '', keep=True)
-
- def del_required(self):
- self._del_sub('required')
-
- def get_optional(self):
- return self.find('{%s}optional' % self.namespace) is not None
-
- def set_optional(self, val):
- self.del_optional()
- if val:
- self._set_sub_text('optional', '', keep=True)
-
- def del_optional(self):
- self._del_sub('optional')
-
-
-class RequestAck(StanzaBase):
- name = 'r'
- namespace = 'urn:xmpp:sm:3'
- interfaces = set()
-
- def setup(self, xml):
- StanzaBase.setup(self, xml)
- self.xml.tag = self.tag_name()
-
-
-class Ack(StanzaBase):
- name = 'a'
- namespace = 'urn:xmpp:sm:3'
- interfaces = set(['h'])
-
- def setup(self, xml):
- StanzaBase.setup(self, xml)
- self.xml.tag = self.tag_name()
-
- def get_h(self):
- h = self._get_attr('h', None)
- if h:
- return int(h)
- return None
-
- def set_h(self, val):
- self._set_attr('h', str(val))
diff --git a/sleekxmpp/plugins/xep_0198/stream_management.py b/sleekxmpp/plugins/xep_0198/stream_management.py
deleted file mode 100644
index 48029913..00000000
--- a/sleekxmpp/plugins/xep_0198/stream_management.py
+++ /dev/null
@@ -1,314 +0,0 @@
-"""
- SleekXMPP: The Sleek XMPP Library
- Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
- This file is part of SleekXMPP.
-
- See the file LICENSE for copying permission.
-"""
-
-import logging
-import threading
-import collections
-
-from sleekxmpp.stanza import Message, Presence, Iq, StreamFeatures
-from sleekxmpp.xmlstream import register_stanza_plugin
-from sleekxmpp.xmlstream.handler import Callback, Waiter
-from sleekxmpp.xmlstream.matcher import MatchXPath, MatchMany
-from sleekxmpp.plugins.base import BasePlugin
-from sleekxmpp.plugins.xep_0198 import stanza
-
-
-log = logging.getLogger(__name__)
-
-
-MAX_SEQ = 2 ** 32
-
-
-class XEP_0198(BasePlugin):
-
- """
- XEP-0198: Stream Management
- """
-
- name = 'xep_0198'
- description = 'XEP-0198: Stream Management'
- dependencies = set()
- stanza = stanza
- default_config = {
- #: The last ack number received from the server.
- 'last_ack': 0,
-
- #: The number of stanzas to wait between sending ack requests to
- #: the server. Setting this to ``1`` will send an ack request after
- #: every sent stanza. Defaults to ``5``.
- 'window': 5,
-
- #: The stream management ID for the stream. Knowing this value is
- #: required in order to do stream resumption.
- 'sm_id': None,
-
- #: A counter of handled incoming stanzas, mod 2^32.
- 'handled': 0,
-
- #: A counter of unacked outgoing stanzas, mod 2^32.
- 'seq': 0,
-
- #: Control whether or not the ability to resume the stream will be
- #: requested when enabling stream management. Defaults to ``True``.
- 'allow_resume': True,
-
- 'order': 10100,
- 'resume_order': 9000
- }
-
- def plugin_init(self):
- """Start the XEP-0198 plugin."""
-
- # Only enable stream management for non-components,
- # since components do not yet perform feature negotiation.
- if self.xmpp.is_component:
- return
-
- self.window_counter = self.window
- self.window_counter_lock = threading.Lock()
-
- self.enabled = threading.Event()
- self.unacked_queue = collections.deque()
-
- self.seq_lock = threading.Lock()
- self.handled_lock = threading.Lock()
- self.ack_lock = threading.Lock()
-
- register_stanza_plugin(StreamFeatures, stanza.StreamManagement)
- self.xmpp.register_stanza(stanza.Enable)
- self.xmpp.register_stanza(stanza.Enabled)
- self.xmpp.register_stanza(stanza.Resume)
- self.xmpp.register_stanza(stanza.Resumed)
- self.xmpp.register_stanza(stanza.Ack)
- self.xmpp.register_stanza(stanza.RequestAck)
-
- # Only end the session when a </stream> element is sent,
- # not just because the connection has died.
- self.xmpp.end_session_on_disconnect = False
-
- # Register the feature twice because it may be ordered two
- # different ways: enabling after binding and resumption
- # before binding.
- self.xmpp.register_feature('sm',
- self._handle_sm_feature,
- restart=True,
- order=self.order)
- self.xmpp.register_feature('sm',
- self._handle_sm_feature,
- restart=True,
- order=self.resume_order)
-
- self.xmpp.register_handler(
- Callback('Stream Management Enabled',
- MatchXPath(stanza.Enabled.tag_name()),
- self._handle_enabled,
- instream=True))
-
- self.xmpp.register_handler(
- Callback('Stream Management Resumed',
- MatchXPath(stanza.Resumed.tag_name()),
- self._handle_resumed,
- instream=True))
-
- self.xmpp.register_handler(
- Callback('Stream Management Failed',
- MatchXPath(stanza.Failed.tag_name()),
- self._handle_failed,
- instream=True))
-
- self.xmpp.register_handler(
- Callback('Stream Management Ack',
- MatchXPath(stanza.Ack.tag_name()),
- self._handle_ack,
- instream=True))
-
- self.xmpp.register_handler(
- Callback('Stream Management Request Ack',
- MatchXPath(stanza.RequestAck.tag_name()),
- self._handle_request_ack,
- instream=True))
-
- self.xmpp.add_filter('in', self._handle_incoming)
- self.xmpp.add_filter('out_sync', self._handle_outgoing)
-
- self.xmpp.add_event_handler('session_end', self.session_end)
-
- def plugin_end(self):
- if self.xmpp.is_component:
- return
-
- self.xmpp.unregister_feature('sm', self.order)
- self.xmpp.unregister_feature('sm', self.resume_order)
- self.xmpp.del_event_handler('session_end', self.session_end)
- self.xmpp.del_filter('in', self._handle_incoming)
- self.xmpp.del_filter('out_sync', self._handle_outgoing)
- self.xmpp.remove_handler('Stream Management Enabled')
- self.xmpp.remove_handler('Stream Management Resumed')
- self.xmpp.remove_handler('Stream Management Failed')
- self.xmpp.remove_handler('Stream Management Ack')
- self.xmpp.remove_handler('Stream Management Request Ack')
- self.xmpp.remove_stanza(stanza.Enable)
- self.xmpp.remove_stanza(stanza.Enabled)
- self.xmpp.remove_stanza(stanza.Resume)
- self.xmpp.remove_stanza(stanza.Resumed)
- self.xmpp.remove_stanza(stanza.Ack)
- self.xmpp.remove_stanza(stanza.RequestAck)
-
- def session_end(self, event):
- """Reset stream management state."""
- self.enabled.clear()
- self.unacked_queue.clear()
- self.sm_id = None
- self.handled = 0
- self.seq = 0
- self.last_ack = 0
-
- def send_ack(self):
- """Send the current ack count to the server."""
- ack = stanza.Ack(self.xmpp)
- with self.handled_lock:
- ack['h'] = self.handled
- self.xmpp.send_raw(str(ack), now=True)
-
- def request_ack(self, e=None):
- """Request an ack from the server."""
- req = stanza.RequestAck(self.xmpp)
- self.xmpp.send_queue.put(str(req))
-
- def _handle_sm_feature(self, features):
- """
- Enable or resume stream management.
-
- If no SM-ID is stored, and resource binding has taken place,
- stream management will be enabled.
-
- If an SM-ID is known, and the server allows resumption, the
- previous stream will be resumed.
- """
- if 'stream_management' in self.xmpp.features:
- # We've already negotiated stream management,
- # so no need to do it again.
- return False
- if not self.sm_id:
- if 'bind' in self.xmpp.features:
- self.enabled.set()
- enable = stanza.Enable(self.xmpp)
- enable['resume'] = self.allow_resume
- enable.send(now=True)
- self.handled = 0
- elif self.sm_id and self.allow_resume:
- self.enabled.set()
- resume = stanza.Resume(self.xmpp)
- resume['h'] = self.handled
- resume['previd'] = self.sm_id
- resume.send(now=True)
-
- # Wait for a response before allowing stream feature processing
- # to continue. The actual result processing will be done in the
- # _handle_resumed() or _handle_failed() methods.
- waiter = Waiter('resumed_or_failed',
- MatchMany([
- MatchXPath(stanza.Resumed.tag_name()),
- MatchXPath(stanza.Failed.tag_name())]))
- self.xmpp.register_handler(waiter)
- result = waiter.wait()
- if result is not None and result.name == 'resumed':
- return True
- return False
-
- def _handle_enabled(self, stanza):
- """Save the SM-ID, if provided.
-
- Raises an :term:`sm_enabled` event.
- """
- self.xmpp.features.add('stream_management')
- if stanza['id']:
- self.sm_id = stanza['id']
- self.xmpp.event('sm_enabled', stanza)
-
- def _handle_resumed(self, stanza):
- """Finish resuming a stream by resending unacked stanzas.
-
- Raises a :term:`session_resumed` event.
- """
- self.xmpp.features.add('stream_management')
- self._handle_ack(stanza)
- for id, stanza in self.unacked_queue:
- self.xmpp.send(stanza, now=True, use_filters=False)
- self.xmpp.session_started_event.set()
- self.xmpp.event('session_resumed', stanza)
-
- def _handle_failed(self, stanza):
- """
- Disable and reset any features used since stream management was
- requested (tracked stanzas may have been sent during the interval
- between the enable request and the enabled response).
-
- Raises an :term:`sm_failed` event.
- """
- self.enabled.clear()
- self.unacked_queue.clear()
- self.xmpp.event('sm_failed', stanza)
-
- def _handle_ack(self, ack):
- """Process a server ack by freeing acked stanzas from the queue.
-
- Raises a :term:`stanza_acked` event for each acked stanza.
- """
- if ack['h'] == self.last_ack:
- return
-
- with self.ack_lock:
- num_acked = (ack['h'] - self.last_ack) % MAX_SEQ
- num_unacked = len(self.unacked_queue)
- log.debug("Ack: %s, Last Ack: %s, " + \
- "Unacked: %s, Num Acked: %s, " + \
- "Remaining: %s",
- ack['h'],
- self.last_ack,
- num_unacked,
- num_acked,
- num_unacked - num_acked)
- for x in range(num_acked):
- seq, stanza = self.unacked_queue.popleft()
- self.xmpp.event('stanza_acked', stanza)
- self.last_ack = ack['h']
-
- def _handle_request_ack(self, req):
- """Handle an ack request by sending an ack."""
- self.send_ack()
-
- def _handle_incoming(self, stanza):
- """Increment the handled counter for each inbound stanza."""
- if not self.enabled.is_set():
- return stanza
-
- if isinstance(stanza, (Message, Presence, Iq)):
- with self.handled_lock:
- # Sequence numbers are mod 2^32
- self.handled = (self.handled + 1) % MAX_SEQ
- return stanza
-
- def _handle_outgoing(self, stanza):
- """Store outgoing stanzas in a queue to be acked."""
- if not self.enabled.is_set():
- return stanza
-
- if isinstance(stanza, (Message, Presence, Iq)):
- seq = None
- with self.seq_lock:
- # Sequence numbers are mod 2^32
- self.seq = (self.seq + 1) % MAX_SEQ
- seq = self.seq
- self.unacked_queue.append((seq, stanza))
- with self.window_counter_lock:
- self.window_counter -= 1
- if self.window_counter == 0:
- self.window_counter = self.window
- self.request_ack()
- return stanza