diff options
-rw-r--r-- | slixmpp/plugins/xep_0198/stream_management.py | 46 | ||||
-rw-r--r-- | slixmpp/xmlstream/xmlstream.py | 15 |
2 files changed, 46 insertions, 15 deletions
diff --git a/slixmpp/plugins/xep_0198/stream_management.py b/slixmpp/plugins/xep_0198/stream_management.py index 759e82e1..0200646a 100644 --- a/slixmpp/plugins/xep_0198/stream_management.py +++ b/slixmpp/plugins/xep_0198/stream_management.py @@ -71,7 +71,8 @@ class XEP_0198(BasePlugin): self.window_counter = self.window - self.enabled = False + self.enabled_in = False + self.enabled_out = False self.unacked_queue = collections.deque() register_stanza_plugin(StreamFeatures, stanza.StreamManagement) @@ -82,10 +83,6 @@ class XEP_0198(BasePlugin): self.xmpp.register_stanza(stanza.Ack) self.xmpp.register_stanza(stanza.RequestAck) - # Only end the session when a </stream> element is sent, - # not just because the connection has died. - self.xmpp.end_session_on_disconnect = False - # Register the feature twice because it may be ordered two # different ways: enabling after binding and resumption # before binding. @@ -131,6 +128,7 @@ class XEP_0198(BasePlugin): self.xmpp.add_filter('in', self._handle_incoming) self.xmpp.add_filter('out_sync', self._handle_outgoing) + self.xmpp.add_event_handler('disconnected', self.disconnected) self.xmpp.add_event_handler('session_end', self.session_end) def plugin_end(self): @@ -139,6 +137,7 @@ class XEP_0198(BasePlugin): self.xmpp.unregister_feature('sm', self.order) self.xmpp.unregister_feature('sm', self.resume_order) + self.xmpp.del_event_handler('disconnected', self.disconnected) self.xmpp.del_event_handler('session_end', self.session_end) self.xmpp.del_filter('in', self._handle_incoming) self.xmpp.del_filter('out_sync', self._handle_outgoing) @@ -154,9 +153,19 @@ class XEP_0198(BasePlugin): self.xmpp.remove_stanza(stanza.Ack) self.xmpp.remove_stanza(stanza.RequestAck) + def disconnected(self, event): + """Reset enabled state until we can resume/reenable.""" + log.debug("disconnected, disabling SM") + self.xmpp.event('sm_disabled', event) + self.enabled_in = False + self.enabled_out = False + def session_end(self, event): """Reset stream management state.""" - self.enabled = False + log.debug("session_end, disabling SM") + self.xmpp.event('sm_disabled', event) + self.enabled_in = False + self.enabled_out = False self.unacked_queue.clear() self.sm_id = None self.handled = 0 @@ -171,6 +180,7 @@ class XEP_0198(BasePlugin): def request_ack(self, e=None): """Request an ack from the server.""" + log.debug("requesting ack") req = stanza.RequestAck(self.xmpp) self.xmpp.send_raw(str(req)) @@ -193,9 +203,7 @@ class XEP_0198(BasePlugin): enable = stanza.Enable(self.xmpp) enable['resume'] = self.allow_resume enable.send() - self.enabled = True - self.handled = 0 - self.unacked_queue.clear() + log.debug("enabling SM") waiter = Waiter('enabled_or_failed', MatchMany([ @@ -204,11 +212,11 @@ class XEP_0198(BasePlugin): self.xmpp.register_handler(waiter) result = await waiter.wait() elif self.sm_id and self.allow_resume and 'bind' not in self.xmpp.features: - self.enabled = True resume = stanza.Resume(self.xmpp) resume['h'] = self.handled resume['previd'] = self.sm_id resume.send() + log.debug("resuming SM") # Wait for a response before allowing stream feature processing # to continue. The actual result processing will be done in the @@ -231,7 +239,10 @@ class XEP_0198(BasePlugin): self.xmpp.features.add('stream_management') if stanza['id']: self.sm_id = stanza['id'] + self.enabled_in = True + self.handled = 0 self.xmpp.event('sm_enabled', stanza) + self.xmpp.end_session_on_disconnect = False def _handle_resumed(self, stanza): """Finish resuming a stream by resending unacked stanzas. @@ -239,10 +250,12 @@ class XEP_0198(BasePlugin): Raises a :term:`session_resumed` event. """ self.xmpp.features.add('stream_management') + self.enabled_in = True self._handle_ack(stanza) for id, stanza in self.unacked_queue: self.xmpp.send(stanza, use_filters=False) self.xmpp.event('session_resumed', stanza) + self.xmpp.end_session_on_disconnect = False def _handle_failed(self, stanza): """ @@ -252,7 +265,8 @@ class XEP_0198(BasePlugin): Raises an :term:`sm_failed` event. """ - self.enabled = False + self.enabled_in = False + self.enabled_out = False self.unacked_queue.clear() self.xmpp.event('sm_failed', stanza) @@ -289,7 +303,7 @@ class XEP_0198(BasePlugin): def _handle_incoming(self, stanza): """Increment the handled counter for each inbound stanza.""" - if not self.enabled: + if not self.enabled_in: return stanza if isinstance(stanza, (Message, Presence, Iq)): @@ -299,7 +313,13 @@ class XEP_0198(BasePlugin): def _handle_outgoing(self, stanza): """Store outgoing stanzas in a queue to be acked.""" - if not self.enabled: + from slixmpp.plugins.xep_0198 import stanza as st + if isinstance(stanza, (st.Enable, st.Resume)): + self.enabled_out = True + self.unacked_queue.clear() + log.debug("enabling outgoing SM: %s" % stanza) + + if not self.enabled_out: return stanza if isinstance(stanza, (Message, Presence, Iq)): diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index dbf515ca..06fa058c 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -277,6 +277,7 @@ class XMLStream(asyncio.BaseProtocol): ) self.disconnect_reason = None self.cancel_connection_attempt() + self.connect_loop_wait = 0 if host and port: self.address = (host, int(port)) try: @@ -301,6 +302,10 @@ class XMLStream(asyncio.BaseProtocol): async def _connect_routine(self): self.event_when_connected = "connected" + if self.connect_loop_wait > 0: + self.event('reconnect_delay', self.connect_loop_wait) + await asyncio.sleep(self.connect_loop_wait, loop=self.loop) + record = await self.pick_dns_answer(self.default_domain) if record is not None: host, address, dns_port = record @@ -317,7 +322,6 @@ class XMLStream(asyncio.BaseProtocol): else: ssl_context = None - await asyncio.sleep(self.connect_loop_wait, loop=self.loop) if self._current_connection_attempt is None: return try: @@ -376,6 +380,7 @@ class XMLStream(asyncio.BaseProtocol): "ssl_object", default=self.transport.get_extra_info("socket") ) + self._current_connection_attempt = None self.init_parser() self.send_raw(self.stream_header) self.dns_answers = None @@ -434,6 +439,9 @@ class XMLStream(asyncio.BaseProtocol): self.send(error) self.disconnect() + def is_connecting(self): + return self._current_connection_attempt is not None + def is_connected(self): return self.transport is not None @@ -494,6 +502,8 @@ class XMLStream(asyncio.BaseProtocol): self.send_raw(self.stream_footer) self.schedule('Disconnect wait', wait, self.abort, repeat=False) + else: + self.event("disconnected", reason) def abort(self): """ @@ -506,14 +516,15 @@ class XMLStream(asyncio.BaseProtocol): self.event("killed") self.disconnected.set_result(True) self.disconnected = asyncio.Future() + self.event("disconnected", self.disconnect_reason) def reconnect(self, wait=2.0, reason="Reconnecting"): """Calls disconnect(), and once we are disconnected (after the timeout, or when the server acknowledgement is received), call connect() """ log.debug("reconnecting...") - self.disconnect(wait, reason) self.add_event_handler('disconnected', lambda event: self.connect(), disposable=True) + self.disconnect(wait, reason) def configure_socket(self): """Set timeout and other options for self.socket. |