From 85c9967b9ce55fd6ea1e2aa59cf065710568661f Mon Sep 17 00:00:00 2001 From: Georg Lukas Date: Sat, 28 Mar 2020 21:18:13 +0100 Subject: XEP-0198: fix race conditions on enable/resume This code splits out the `enabled` property into `enabled_in` and `enabled_out` to reflect that client and server enable 0198 asynchronously. This also moves the actual enabling code into the stanza processing logic, because apparently, `enable.send()` was just added into the end of the send queue, but `enable` got enabled immediately, so that poezio requested ACKs for whatever happened to be in the queue before. Async is hard, let's go get fishing. --- slixmpp/plugins/xep_0198/stream_management.py | 36 +++++++++++++++++++-------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/slixmpp/plugins/xep_0198/stream_management.py b/slixmpp/plugins/xep_0198/stream_management.py index a092736f..2b68faec 100644 --- a/slixmpp/plugins/xep_0198/stream_management.py +++ b/slixmpp/plugins/xep_0198/stream_management.py @@ -71,7 +71,8 @@ class XEP_0198(BasePlugin): self.window_counter = self.window - self.enabled = False + self.enabled_in = False + self.enabled_out = False self.unacked_queue = collections.deque() register_stanza_plugin(StreamFeatures, stanza.StreamManagement) @@ -158,11 +159,17 @@ class XEP_0198(BasePlugin): def disconnected(self, event): """Reset enabled state until we can resume/reenable.""" - self.enabled = False + log.debug("disconnected, disabling SM") + self.xmpp.event('sm_disabled', event) + self.enabled_in = False + self.enabled_out = False def session_end(self, event): """Reset stream management state.""" - self.enabled = False + log.debug("session_end, disabling SM") + self.xmpp.event('sm_disabled', event) + self.enabled_in = False + self.enabled_out = False self.unacked_queue.clear() self.sm_id = None self.handled = 0 @@ -177,6 +184,7 @@ class XEP_0198(BasePlugin): def request_ack(self, e=None): """Request an ack from the server.""" + log.debug("requesting ack") req = stanza.RequestAck(self.xmpp) self.xmpp.send_raw(str(req)) @@ -199,9 +207,7 @@ class XEP_0198(BasePlugin): enable = stanza.Enable(self.xmpp) enable['resume'] = self.allow_resume enable.send() - self.enabled = True - self.handled = 0 - self.unacked_queue.clear() + log.debug("enabling SM") waiter = Waiter('enabled_or_failed', MatchMany([ @@ -210,11 +216,11 @@ class XEP_0198(BasePlugin): self.xmpp.register_handler(waiter) result = await waiter.wait() elif self.sm_id and self.allow_resume and 'bind' not in self.xmpp.features: - self.enabled = True resume = stanza.Resume(self.xmpp) resume['h'] = self.handled resume['previd'] = self.sm_id resume.send() + log.debug("resuming SM") # Wait for a response before allowing stream feature processing # to continue. The actual result processing will be done in the @@ -237,6 +243,8 @@ class XEP_0198(BasePlugin): self.xmpp.features.add('stream_management') if stanza['id']: self.sm_id = stanza['id'] + self.enabled_in = True + self.handled = 0 self.xmpp.event('sm_enabled', stanza) def _handle_resumed(self, stanza): @@ -245,6 +253,7 @@ class XEP_0198(BasePlugin): Raises a :term:`session_resumed` event. """ self.xmpp.features.add('stream_management') + self.enabled_in = True self._handle_ack(stanza) for id, stanza in self.unacked_queue: self.xmpp.send(stanza, use_filters=False) @@ -258,7 +267,8 @@ class XEP_0198(BasePlugin): Raises an :term:`sm_failed` event. """ - self.enabled = False + self.enabled_in = False + self.enabled_out = False self.unacked_queue.clear() self.xmpp.event('sm_failed', stanza) @@ -295,7 +305,7 @@ class XEP_0198(BasePlugin): def _handle_incoming(self, stanza): """Increment the handled counter for each inbound stanza.""" - if not self.enabled: + if not self.enabled_in: return stanza if isinstance(stanza, (Message, Presence, Iq)): @@ -305,7 +315,13 @@ class XEP_0198(BasePlugin): def _handle_outgoing(self, stanza): """Store outgoing stanzas in a queue to be acked.""" - if not self.enabled: + from slixmpp.plugins.xep_0198 import stanza as st + if isinstance(stanza, (st.Enable, st.Resume)): + self.enabled_out = True + self.unacked_queue.clear() + log.debug("enabling outgoing SM: %s" % stanza) + + if not self.enabled_out: return stanza if isinstance(stanza, (Message, Presence, Iq)): -- cgit v1.2.3