summaryrefslogtreecommitdiff
path: root/slixmpp/xmlstream/xmlstream.py
diff options
context:
space:
mode:
Diffstat (limited to 'slixmpp/xmlstream/xmlstream.py')
-rw-r--r--slixmpp/xmlstream/xmlstream.py34
1 files changed, 26 insertions, 8 deletions
diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py
index 06fa058c..3aac8c8e 100644
--- a/slixmpp/xmlstream/xmlstream.py
+++ b/slixmpp/xmlstream/xmlstream.py
@@ -475,7 +475,7 @@ class XMLStream(asyncio.BaseProtocol):
self._current_connection_attempt.cancel()
self._current_connection_attempt = None
- def disconnect(self, wait: float = 2.0, reason: Optional[str] = None) -> None:
+ def disconnect(self, wait: float = 2.0, reason: Optional[str] = None, ignore_send_queue: bool = False) -> None:
"""Close the XML stream and wait for an acknowldgement from the server for
at most `wait` seconds. After the given number of seconds has
passed without a response from the server, or when the server
@@ -495,16 +495,32 @@ class XMLStream(asyncio.BaseProtocol):
if wait == True:
wait = 2.0
- self.disconnect_reason = reason
- self.cancel_connection_attempt()
if self.transport:
- if wait > 0.0:
- self.send_raw(self.stream_footer)
- self.schedule('Disconnect wait', wait,
- self.abort, repeat=False)
+ if self.waiting_queue.empty() or ignore_send_queue:
+ self.disconnect_reason = reason
+ self.cancel_connection_attempt()
+ if wait > 0.0:
+ self.send_raw(self.stream_footer)
+ self.schedule('Disconnect wait', wait,
+ self.abort, repeat=False)
+ else:
+ asyncio.ensure_future(
+ self._consume_send_queue_before_disconnecting(reason, wait),
+ loop=self.loop,
+ )
else:
self.event("disconnected", reason)
+ async def _consume_send_queue_before_disconnecting(self, reason: Optional[str], wait: float):
+ """Wait until the send queue is empty before disconnecting"""
+ await self.waiting_queue.join()
+ self.disconnect_reason = reason
+ self.cancel_connection_attempt()
+ if wait > 0.0:
+ self.send_raw(self.stream_footer)
+ self.schedule('Disconnect wait', wait,
+ self.abort, repeat=False)
+
def abort(self):
"""
Forcibly close the connection
@@ -899,7 +915,9 @@ class XMLStream(asyncio.BaseProtocol):
Execute the callback and remove the handler for it.
"""
self._safe_cb_run(name, cb)
- del self.scheduled_events[name]
+ # workaround for specific events which unschedule themselves
+ if name in self.scheduled_events:
+ del self.scheduled_events[name]
def incoming_filter(self, xml):
"""Filter incoming XML objects before they are processed.