summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--slixmpp/plugins/xep_0198/stream_management.py46
-rw-r--r--slixmpp/xmlstream/xmlstream.py15
2 files changed, 46 insertions, 15 deletions
diff --git a/slixmpp/plugins/xep_0198/stream_management.py b/slixmpp/plugins/xep_0198/stream_management.py
index 759e82e1..0200646a 100644
--- a/slixmpp/plugins/xep_0198/stream_management.py
+++ b/slixmpp/plugins/xep_0198/stream_management.py
@@ -71,7 +71,8 @@ class XEP_0198(BasePlugin):
self.window_counter = self.window
- self.enabled = False
+ self.enabled_in = False
+ self.enabled_out = False
self.unacked_queue = collections.deque()
register_stanza_plugin(StreamFeatures, stanza.StreamManagement)
@@ -82,10 +83,6 @@ class XEP_0198(BasePlugin):
self.xmpp.register_stanza(stanza.Ack)
self.xmpp.register_stanza(stanza.RequestAck)
- # Only end the session when a </stream> element is sent,
- # not just because the connection has died.
- self.xmpp.end_session_on_disconnect = False
-
# Register the feature twice because it may be ordered two
# different ways: enabling after binding and resumption
# before binding.
@@ -131,6 +128,7 @@ class XEP_0198(BasePlugin):
self.xmpp.add_filter('in', self._handle_incoming)
self.xmpp.add_filter('out_sync', self._handle_outgoing)
+ self.xmpp.add_event_handler('disconnected', self.disconnected)
self.xmpp.add_event_handler('session_end', self.session_end)
def plugin_end(self):
@@ -139,6 +137,7 @@ class XEP_0198(BasePlugin):
self.xmpp.unregister_feature('sm', self.order)
self.xmpp.unregister_feature('sm', self.resume_order)
+ self.xmpp.del_event_handler('disconnected', self.disconnected)
self.xmpp.del_event_handler('session_end', self.session_end)
self.xmpp.del_filter('in', self._handle_incoming)
self.xmpp.del_filter('out_sync', self._handle_outgoing)
@@ -154,9 +153,19 @@ class XEP_0198(BasePlugin):
self.xmpp.remove_stanza(stanza.Ack)
self.xmpp.remove_stanza(stanza.RequestAck)
+ def disconnected(self, event):
+ """Reset enabled state until we can resume/reenable."""
+ log.debug("disconnected, disabling SM")
+ self.xmpp.event('sm_disabled', event)
+ self.enabled_in = False
+ self.enabled_out = False
+
def session_end(self, event):
"""Reset stream management state."""
- self.enabled = False
+ log.debug("session_end, disabling SM")
+ self.xmpp.event('sm_disabled', event)
+ self.enabled_in = False
+ self.enabled_out = False
self.unacked_queue.clear()
self.sm_id = None
self.handled = 0
@@ -171,6 +180,7 @@ class XEP_0198(BasePlugin):
def request_ack(self, e=None):
"""Request an ack from the server."""
+ log.debug("requesting ack")
req = stanza.RequestAck(self.xmpp)
self.xmpp.send_raw(str(req))
@@ -193,9 +203,7 @@ class XEP_0198(BasePlugin):
enable = stanza.Enable(self.xmpp)
enable['resume'] = self.allow_resume
enable.send()
- self.enabled = True
- self.handled = 0
- self.unacked_queue.clear()
+ log.debug("enabling SM")
waiter = Waiter('enabled_or_failed',
MatchMany([
@@ -204,11 +212,11 @@ class XEP_0198(BasePlugin):
self.xmpp.register_handler(waiter)
result = await waiter.wait()
elif self.sm_id and self.allow_resume and 'bind' not in self.xmpp.features:
- self.enabled = True
resume = stanza.Resume(self.xmpp)
resume['h'] = self.handled
resume['previd'] = self.sm_id
resume.send()
+ log.debug("resuming SM")
# Wait for a response before allowing stream feature processing
# to continue. The actual result processing will be done in the
@@ -231,7 +239,10 @@ class XEP_0198(BasePlugin):
self.xmpp.features.add('stream_management')
if stanza['id']:
self.sm_id = stanza['id']
+ self.enabled_in = True
+ self.handled = 0
self.xmpp.event('sm_enabled', stanza)
+ self.xmpp.end_session_on_disconnect = False
def _handle_resumed(self, stanza):
"""Finish resuming a stream by resending unacked stanzas.
@@ -239,10 +250,12 @@ class XEP_0198(BasePlugin):
Raises a :term:`session_resumed` event.
"""
self.xmpp.features.add('stream_management')
+ self.enabled_in = True
self._handle_ack(stanza)
for id, stanza in self.unacked_queue:
self.xmpp.send(stanza, use_filters=False)
self.xmpp.event('session_resumed', stanza)
+ self.xmpp.end_session_on_disconnect = False
def _handle_failed(self, stanza):
"""
@@ -252,7 +265,8 @@ class XEP_0198(BasePlugin):
Raises an :term:`sm_failed` event.
"""
- self.enabled = False
+ self.enabled_in = False
+ self.enabled_out = False
self.unacked_queue.clear()
self.xmpp.event('sm_failed', stanza)
@@ -289,7 +303,7 @@ class XEP_0198(BasePlugin):
def _handle_incoming(self, stanza):
"""Increment the handled counter for each inbound stanza."""
- if not self.enabled:
+ if not self.enabled_in:
return stanza
if isinstance(stanza, (Message, Presence, Iq)):
@@ -299,7 +313,13 @@ class XEP_0198(BasePlugin):
def _handle_outgoing(self, stanza):
"""Store outgoing stanzas in a queue to be acked."""
- if not self.enabled:
+ from slixmpp.plugins.xep_0198 import stanza as st
+ if isinstance(stanza, (st.Enable, st.Resume)):
+ self.enabled_out = True
+ self.unacked_queue.clear()
+ log.debug("enabling outgoing SM: %s" % stanza)
+
+ if not self.enabled_out:
return stanza
if isinstance(stanza, (Message, Presence, Iq)):
diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py
index dbf515ca..06fa058c 100644
--- a/slixmpp/xmlstream/xmlstream.py
+++ b/slixmpp/xmlstream/xmlstream.py
@@ -277,6 +277,7 @@ class XMLStream(asyncio.BaseProtocol):
)
self.disconnect_reason = None
self.cancel_connection_attempt()
+ self.connect_loop_wait = 0
if host and port:
self.address = (host, int(port))
try:
@@ -301,6 +302,10 @@ class XMLStream(asyncio.BaseProtocol):
async def _connect_routine(self):
self.event_when_connected = "connected"
+ if self.connect_loop_wait > 0:
+ self.event('reconnect_delay', self.connect_loop_wait)
+ await asyncio.sleep(self.connect_loop_wait, loop=self.loop)
+
record = await self.pick_dns_answer(self.default_domain)
if record is not None:
host, address, dns_port = record
@@ -317,7 +322,6 @@ class XMLStream(asyncio.BaseProtocol):
else:
ssl_context = None
- await asyncio.sleep(self.connect_loop_wait, loop=self.loop)
if self._current_connection_attempt is None:
return
try:
@@ -376,6 +380,7 @@ class XMLStream(asyncio.BaseProtocol):
"ssl_object",
default=self.transport.get_extra_info("socket")
)
+ self._current_connection_attempt = None
self.init_parser()
self.send_raw(self.stream_header)
self.dns_answers = None
@@ -434,6 +439,9 @@ class XMLStream(asyncio.BaseProtocol):
self.send(error)
self.disconnect()
+ def is_connecting(self):
+ return self._current_connection_attempt is not None
+
def is_connected(self):
return self.transport is not None
@@ -494,6 +502,8 @@ class XMLStream(asyncio.BaseProtocol):
self.send_raw(self.stream_footer)
self.schedule('Disconnect wait', wait,
self.abort, repeat=False)
+ else:
+ self.event("disconnected", reason)
def abort(self):
"""
@@ -506,14 +516,15 @@ class XMLStream(asyncio.BaseProtocol):
self.event("killed")
self.disconnected.set_result(True)
self.disconnected = asyncio.Future()
+ self.event("disconnected", self.disconnect_reason)
def reconnect(self, wait=2.0, reason="Reconnecting"):
"""Calls disconnect(), and once we are disconnected (after the timeout, or
when the server acknowledgement is received), call connect()
"""
log.debug("reconnecting...")
- self.disconnect(wait, reason)
self.add_event_handler('disconnected', lambda event: self.connect(), disposable=True)
+ self.disconnect(wait, reason)
def configure_socket(self):
"""Set timeout and other options for self.socket.