summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--slixmpp/plugins/xep_0198/stream_management.py36
1 files changed, 26 insertions, 10 deletions
diff --git a/slixmpp/plugins/xep_0198/stream_management.py b/slixmpp/plugins/xep_0198/stream_management.py
index a092736f..2b68faec 100644
--- a/slixmpp/plugins/xep_0198/stream_management.py
+++ b/slixmpp/plugins/xep_0198/stream_management.py
@@ -71,7 +71,8 @@ class XEP_0198(BasePlugin):
self.window_counter = self.window
- self.enabled = False
+ self.enabled_in = False
+ self.enabled_out = False
self.unacked_queue = collections.deque()
register_stanza_plugin(StreamFeatures, stanza.StreamManagement)
@@ -158,11 +159,17 @@ class XEP_0198(BasePlugin):
def disconnected(self, event):
"""Reset enabled state until we can resume/reenable."""
- self.enabled = False
+ log.debug("disconnected, disabling SM")
+ self.xmpp.event('sm_disabled', event)
+ self.enabled_in = False
+ self.enabled_out = False
def session_end(self, event):
"""Reset stream management state."""
- self.enabled = False
+ log.debug("session_end, disabling SM")
+ self.xmpp.event('sm_disabled', event)
+ self.enabled_in = False
+ self.enabled_out = False
self.unacked_queue.clear()
self.sm_id = None
self.handled = 0
@@ -177,6 +184,7 @@ class XEP_0198(BasePlugin):
def request_ack(self, e=None):
"""Request an ack from the server."""
+ log.debug("requesting ack")
req = stanza.RequestAck(self.xmpp)
self.xmpp.send_raw(str(req))
@@ -199,9 +207,7 @@ class XEP_0198(BasePlugin):
enable = stanza.Enable(self.xmpp)
enable['resume'] = self.allow_resume
enable.send()
- self.enabled = True
- self.handled = 0
- self.unacked_queue.clear()
+ log.debug("enabling SM")
waiter = Waiter('enabled_or_failed',
MatchMany([
@@ -210,11 +216,11 @@ class XEP_0198(BasePlugin):
self.xmpp.register_handler(waiter)
result = await waiter.wait()
elif self.sm_id and self.allow_resume and 'bind' not in self.xmpp.features:
- self.enabled = True
resume = stanza.Resume(self.xmpp)
resume['h'] = self.handled
resume['previd'] = self.sm_id
resume.send()
+ log.debug("resuming SM")
# Wait for a response before allowing stream feature processing
# to continue. The actual result processing will be done in the
@@ -237,6 +243,8 @@ class XEP_0198(BasePlugin):
self.xmpp.features.add('stream_management')
if stanza['id']:
self.sm_id = stanza['id']
+ self.enabled_in = True
+ self.handled = 0
self.xmpp.event('sm_enabled', stanza)
def _handle_resumed(self, stanza):
@@ -245,6 +253,7 @@ class XEP_0198(BasePlugin):
Raises a :term:`session_resumed` event.
"""
self.xmpp.features.add('stream_management')
+ self.enabled_in = True
self._handle_ack(stanza)
for id, stanza in self.unacked_queue:
self.xmpp.send(stanza, use_filters=False)
@@ -258,7 +267,8 @@ class XEP_0198(BasePlugin):
Raises an :term:`sm_failed` event.
"""
- self.enabled = False
+ self.enabled_in = False
+ self.enabled_out = False
self.unacked_queue.clear()
self.xmpp.event('sm_failed', stanza)
@@ -295,7 +305,7 @@ class XEP_0198(BasePlugin):
def _handle_incoming(self, stanza):
"""Increment the handled counter for each inbound stanza."""
- if not self.enabled:
+ if not self.enabled_in:
return stanza
if isinstance(stanza, (Message, Presence, Iq)):
@@ -305,7 +315,13 @@ class XEP_0198(BasePlugin):
def _handle_outgoing(self, stanza):
"""Store outgoing stanzas in a queue to be acked."""
- if not self.enabled:
+ from slixmpp.plugins.xep_0198 import stanza as st
+ if isinstance(stanza, (st.Enable, st.Resume)):
+ self.enabled_out = True
+ self.unacked_queue.clear()
+ log.debug("enabling outgoing SM: %s" % stanza)
+
+ if not self.enabled_out:
return stanza
if isinstance(stanza, (Message, Presence, Iq)):