From 0eed84d0b2fe144850a082feb339187bd98a4817 Mon Sep 17 00:00:00 2001 From: mathieui Date: Sat, 23 Jan 2021 15:41:13 +0100 Subject: xmlstream: handle done tasks in wait_until and handle other loops properly --- slixmpp/xmlstream/xmlstream.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 6b890729..c8f797ac 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -1142,9 +1142,15 @@ class XMLStream(asyncio.BaseProtocol): :param int timeout: Timeout """ fut = asyncio.Future() + def result_handler(event_data): + if not fut.done(): + fut.set_result(event_data) + else: + log.debug("Future registered on event '%s' was alredy done", event) + self.add_event_handler( event, - fut.set_result, + result_handler, disposable=True, ) - return await asyncio.wait_for(fut, timeout) + return await asyncio.wait_for(fut, timeout, loop=self.loop) -- cgit v1.2.3 From efdcd396d8b7314e29212c31869a725a72ef5f78 Mon Sep 17 00:00:00 2001 From: mathieui Date: Mon, 25 Jan 2021 09:56:53 +0100 Subject: xmlstream: fix race conditions on handlers --- slixmpp/xmlstream/xmlstream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index c8f797ac..77bb9e4a 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -833,7 +833,7 @@ class XMLStream(asyncio.BaseProtocol): """ log.debug("Event triggered: %s", name) - handlers = self.__event_handlers.get(name, []) + handlers = self.__event_handlers.get(name, [])[:] for handler in handlers: handler_callback, disposable = handler old_exception = getattr(data, 'exception', None) -- cgit v1.2.3 From 8700f8d162c4b863f045c2459d68406c04e35bad Mon Sep 17 00:00:00 2001 From: mathieui Date: Sat, 23 Jan 2021 16:14:59 +0100 Subject: xmlstream: do not cancel the send filter task it does not make sense to cancel it, it does not do anything when the sending queue is empty, and clients should not fill the send queue when not connected anyway. --- slixmpp/xmlstream/xmlstream.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 77bb9e4a..5b245e11 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -272,7 +272,7 @@ class XMLStream(asyncio.BaseProtocol): localhost """ - if self._run_filters is None: + if self._run_filters is None or self._run_filters.done(): self._run_filters = asyncio.ensure_future( self.run_filters(), loop=self.loop, @@ -463,8 +463,6 @@ class XMLStream(asyncio.BaseProtocol): self.parser = None self.transport = None self.socket = None - if self._run_filters: - self._run_filters.cancel() # Fire the events after cleanup if self.end_session_on_disconnect: self.event('session_end') @@ -480,8 +478,6 @@ class XMLStream(asyncio.BaseProtocol): if self._current_connection_attempt: self._current_connection_attempt.cancel() self._current_connection_attempt = None - if self._run_filters: - self._run_filters.cancel() def disconnect(self, wait: float = 2.0, reason: Optional[str] = None, ignore_send_queue: bool = False) -> None: """Close the XML stream and wait for an acknowldgement from the server for @@ -975,7 +971,6 @@ class XMLStream(asyncio.BaseProtocol): else: self.send_raw(data) - async def run_filters(self): """ Background loop that processes stanzas to send. -- cgit v1.2.3 From 9fbd40578ceffdf59ae9d08e0ceebdad7998bbee Mon Sep 17 00:00:00 2001 From: mathieui Date: Sat, 23 Jan 2021 16:16:56 +0100 Subject: xmlstream: purge send queue and pending tasks on session end and keep track of slow tasks --- slixmpp/xmlstream/xmlstream.py | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 5b245e11..a67c337d 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -12,7 +12,15 @@ :license: MIT, see LICENSE for more details """ -from typing import Optional, Set, Callable, Any +from typing import ( + Any, + Callable, + Iterable, + List, + Optional, + Set, + Union, +) import functools import logging @@ -21,7 +29,7 @@ import ssl import weakref import uuid -from asyncio import iscoroutinefunction, wait +from asyncio import iscoroutinefunction, wait, Future import xml.etree.ElementTree as ET @@ -228,8 +236,9 @@ class XMLStream(asyncio.BaseProtocol): self.add_event_handler('disconnected', self._remove_schedules) self.add_event_handler('session_start', self._start_keepalive) - + self._run_filters = None + self.__slow_tasks: List[Future] = [] @property def loop(self): @@ -465,6 +474,7 @@ class XMLStream(asyncio.BaseProtocol): self.socket = None # Fire the events after cleanup if self.end_session_on_disconnect: + self._reset_sendq() self.event('session_end') self.event("disconnected", self.disconnect_reason or exception and exception.strerror) @@ -937,6 +947,18 @@ class XMLStream(asyncio.BaseProtocol): """ return xml + def _reset_sendq(self): + """Clear sending tasks on session end""" + # Cancel all pending slow send tasks + log.debug('Cancelling %d slow send tasks', len(self.__slow_tasks)) + for slow_task in self.__slow_tasks: + slow_task.cancel() + self.__slow_tasks.clear() + # Purge pending stanzas + while not self.waiting_queue.empty(): + discarded = self.waiting_queue.get_nowait() + log.debug('Discarded stanza: %s', discarded) + async def _continue_slow_send( self, task: asyncio.Task, @@ -950,6 +972,7 @@ class XMLStream(asyncio.BaseProtocol): :param set already_used: Filters already used on this outgoing stanza """ data = await task + self.__slow_tasks.remove(task) for filter in self.__filters['out']: if filter in already_used: continue @@ -990,6 +1013,7 @@ class XMLStream(asyncio.BaseProtocol): timeout=1, ) if pending: + self.slow_tasks.append(task) asyncio.ensure_future( self._continue_slow_send( task, -- cgit v1.2.3 From a0b6bfcefe3308db6e87cba0342b9d3654e047b3 Mon Sep 17 00:00:00 2001 From: mathieui Date: Sat, 23 Jan 2021 18:39:05 +0100 Subject: xmlstream: change the connection logic * use asyncio wait_for to wait for a disconnected event * abort the connection if the timeout is not enough --- slixmpp/xmlstream/xmlstream.py | 58 ++++++++++++++++++++++++++++++------------ 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index a67c337d..9fb38f46 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -489,7 +489,7 @@ class XMLStream(asyncio.BaseProtocol): self._current_connection_attempt.cancel() self._current_connection_attempt = None - def disconnect(self, wait: float = 2.0, reason: Optional[str] = None, ignore_send_queue: bool = False) -> None: + def disconnect(self, wait: Union[float, int] = 2.0, reason: Optional[str] = None, ignore_send_queue: bool = False) -> Future: """Close the XML stream and wait for an acknowldgement from the server for at most `wait` seconds. After the given number of seconds has passed without a response from the server, or when the server @@ -497,10 +497,13 @@ class XMLStream(asyncio.BaseProtocol): called. If wait is 0.0, this will call abort() directly without closing the stream. - Does nothing if we are not connected. + Does nothing but trigger the disconnected event if we are not connected. :param wait: Time to wait for a response from the server. - + :param reason: An optional reason for the disconnect. + :param ignore_send_queue: Boolean to toggle if we want to ignore + the in-flight stanzas and disconnect immediately. + :return: A future that ends when all code involved in the disconnect has ended """ # Compat: docs/getting_started/sendlogout.rst has been promoting # `disconnect(wait=True)` for ages. This doesn't mean anything to the @@ -510,39 +513,63 @@ class XMLStream(asyncio.BaseProtocol): wait = 2.0 if self.transport: + self.disconnect_reason = reason if self.waiting_queue.empty() or ignore_send_queue: - self.disconnect_reason = reason self.cancel_connection_attempt() - if wait > 0.0: - self.send_raw(self.stream_footer) - self.schedule('Disconnect wait', wait, - self.abort, repeat=False) + return asyncio.ensure_future( + self._end_stream_wait(wait, reason=reason), + loop=self.loop, + ) else: - asyncio.ensure_future( + return asyncio.ensure_future( self._consume_send_queue_before_disconnecting(reason, wait), loop=self.loop, ) else: self.event("disconnected", reason) + future = Future() + future.set_result(None) + return future async def _consume_send_queue_before_disconnecting(self, reason: Optional[str], wait: float): """Wait until the send queue is empty before disconnecting""" - await self.waiting_queue.join() + try: + await asyncio.wait_for( + self.waiting_queue.join(), + wait, + loop=self.loop + ) + except asyncio.TimeoutError: + wait = 0 # we already consumed the timeout self.disconnect_reason = reason - self.cancel_connection_attempt() - if wait > 0.0: + await self._end_stream_wait(wait) + + async def _end_stream_wait(self, wait: Union[int, float] = 2, reason: Optional[str] = None): + """ + Run abort() if we do not received the disconnected event + after a waiting time. + + :param wait: The waiting time (defaults to 2) + """ + try: self.send_raw(self.stream_footer) - self.schedule('Disconnect wait', wait, - self.abort, repeat=False) + await self.wait_until('disconnected', wait) + except asyncio.TimeoutError: + self.abort() + except NotConnectedError: + # We are not connected when sending the end of stream + # that means the disconnect has already been handled + pass def abort(self): """ Forcibly close the connection """ - self.cancel_connection_attempt() if self.transport: + self.cancel_connection_attempt() self.transport.close() self.transport.abort() + self.transport = None self.event("killed") self.disconnected.set_result(True) self.disconnected = asyncio.Future() @@ -661,7 +688,6 @@ class XMLStream(asyncio.BaseProtocol): def _remove_schedules(self, event): """Remove some schedules that become pointless when disconnected""" self.cancel_schedule('Whitespace Keepalive') - self.cancel_schedule('Disconnect wait') def start_stream_handler(self, xml): """Perform any initialization actions, such as handshakes, -- cgit v1.2.3 From 456dff0b61113e7a1e48eab010cd3f6649b073db Mon Sep 17 00:00:00 2001 From: mathieui Date: Sun, 24 Jan 2021 21:43:35 +0100 Subject: xmlstream: rename run_filters --- slixmpp/xmlstream/xmlstream.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 9fb38f46..57922bbc 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -237,7 +237,7 @@ class XMLStream(asyncio.BaseProtocol): self.add_event_handler('disconnected', self._remove_schedules) self.add_event_handler('session_start', self._start_keepalive) - self._run_filters = None + self._run_out_filters: Optional[Future] = None self.__slow_tasks: List[Future] = [] @property @@ -281,8 +281,8 @@ class XMLStream(asyncio.BaseProtocol): localhost """ - if self._run_filters is None or self._run_filters.done(): - self._run_filters = asyncio.ensure_future( + if self._run_out_filters is None or self._run_out_filters.done(): + self._run_out_filters = asyncio.ensure_future( self.run_filters(), loop=self.loop, ) -- cgit v1.2.3 From 571774edb424c4102a26262cfc709814c16ac41d Mon Sep 17 00:00:00 2001 From: mathieui Date: Sun, 24 Jan 2021 21:43:49 +0100 Subject: xmlstream: end the parser when the stream has ended --- slixmpp/xmlstream/xmlstream.py | 1 + 1 file changed, 1 insertion(+) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 57922bbc..c9247cec 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -431,6 +431,7 @@ class XMLStream(asyncio.BaseProtocol): log.debug("End of stream received") self.disconnect_reason = "End of stream" self.abort() + return elif self.xml_depth == 1: # A stanza is an XML element that is a direct child of # the root element, hence the check of depth == 1 -- cgit v1.2.3 From d227579d56f827a96ac8563f7820e7960f3a92a1 Mon Sep 17 00:00:00 2001 From: mathieui Date: Sun, 24 Jan 2021 21:33:14 +0100 Subject: xmlstream: set disconnected future on event --- slixmpp/xmlstream/xmlstream.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index c9247cec..5f675e3f 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -232,7 +232,7 @@ class XMLStream(asyncio.BaseProtocol): self.disconnect_reason = None #: An asyncio Future being done when the stream is disconnected. - self.disconnected = asyncio.Future() + self.disconnected: Future = Future() self.add_event_handler('disconnected', self._remove_schedules) self.add_event_handler('session_start', self._start_keepalive) @@ -259,6 +259,12 @@ class XMLStream(asyncio.BaseProtocol): """ return uuid.uuid4().hex + def _set_disconnected_future(self): + """Set the self.disconnected future on disconnect""" + if not self.disconnected.done(): + self.disconnected.set_result(True) + self.disconnected = asyncio.Future() + def connect(self, host='', port=0, use_ssl=False, force_starttls=True, disable_starttls=False): """Create a new socket and connect to the server. @@ -477,6 +483,7 @@ class XMLStream(asyncio.BaseProtocol): if self.end_session_on_disconnect: self._reset_sendq() self.event('session_end') + self._set_disconnected_future() self.event("disconnected", self.disconnect_reason or exception and exception.strerror) def cancel_connection_attempt(self): @@ -527,6 +534,7 @@ class XMLStream(asyncio.BaseProtocol): loop=self.loop, ) else: + self._set_disconnected_future() self.event("disconnected", reason) future = Future() future.set_result(None) -- cgit v1.2.3 From b2dfb4c1f3ac2d7d034942bac41b7d026bc90699 Mon Sep 17 00:00:00 2001 From: mathieui Date: Mon, 25 Jan 2021 09:59:24 +0100 Subject: xmlstream: do not touch connection state on abort() leave it to the connection_lost handler --- slixmpp/xmlstream/xmlstream.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 5f675e3f..c84b40e1 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -578,11 +578,7 @@ class XMLStream(asyncio.BaseProtocol): self.cancel_connection_attempt() self.transport.close() self.transport.abort() - self.transport = None self.event("killed") - self.disconnected.set_result(True) - self.disconnected = asyncio.Future() - self.event("disconnected", self.disconnect_reason) def reconnect(self, wait=2.0, reason="Reconnecting"): """Calls disconnect(), and once we are disconnected (after the timeout, or -- cgit v1.2.3 From f15311bda8e5d1c227174771a30cde7013b0658f Mon Sep 17 00:00:00 2001 From: mathieui Date: Mon, 25 Jan 2021 10:00:40 +0100 Subject: xmlstream: Make the reconnect handler a coroutine --- slixmpp/xmlstream/xmlstream.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index c84b40e1..2cc5fe17 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -585,7 +585,11 @@ class XMLStream(asyncio.BaseProtocol): when the server acknowledgement is received), call connect() """ log.debug("reconnecting...") - self.add_event_handler('disconnected', lambda event: self.connect(), disposable=True) + async def handler(event): + # We yield here to allow synchronous handlers to work first + await asyncio.sleep(0, loop=self.loop) + self.connect() + self.add_event_handler('disconnected', handler, disposable=True) self.disconnect(wait, reason) def configure_socket(self): -- cgit v1.2.3 From 3642e2c7f436f6c0935ebcfa3ee279d4ac22adaf Mon Sep 17 00:00:00 2001 From: mathieui Date: Mon, 25 Jan 2021 10:13:49 +0100 Subject: xmlstream: ensure slow futures are scheduled on this loop --- slixmpp/xmlstream/xmlstream.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 2cc5fe17..37cadc1f 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -1053,7 +1053,8 @@ class XMLStream(asyncio.BaseProtocol): self._continue_slow_send( task, already_run_filters - ) + ), + loop=self.loop, ) raise Exception("Slow coro, rescheduling") data = task.result() -- cgit v1.2.3 From fc7d7b4eb751daa030169a0b48a411e51325f01d Mon Sep 17 00:00:00 2001 From: mathieui Date: Thu, 28 Jan 2021 18:20:44 +0100 Subject: XEP-0198: Enable SM even if we failed resuming the session And trigger session_end only after we fail the resuming. --- slixmpp/plugins/xep_0198/stream_management.py | 28 +++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/slixmpp/plugins/xep_0198/stream_management.py b/slixmpp/plugins/xep_0198/stream_management.py index 0200646a..30274ecf 100644 --- a/slixmpp/plugins/xep_0198/stream_management.py +++ b/slixmpp/plugins/xep_0198/stream_management.py @@ -198,20 +198,7 @@ class XEP_0198(BasePlugin): # We've already negotiated stream management, # so no need to do it again. return False - if not self.sm_id: - if 'bind' in self.xmpp.features: - enable = stanza.Enable(self.xmpp) - enable['resume'] = self.allow_resume - enable.send() - log.debug("enabling SM") - - waiter = Waiter('enabled_or_failed', - MatchMany([ - MatchXPath(stanza.Enabled.tag_name()), - MatchXPath(stanza.Failed.tag_name())])) - self.xmpp.register_handler(waiter) - result = await waiter.wait() - elif self.sm_id and self.allow_resume and 'bind' not in self.xmpp.features: + if self.sm_id and self.allow_resume and 'bind' not in self.xmpp.features: resume = stanza.Resume(self.xmpp) resume['h'] = self.handled resume['previd'] = self.sm_id @@ -229,6 +216,19 @@ class XEP_0198(BasePlugin): result = await waiter.wait() if result is not None and result.name == 'resumed': return True + self.xmpp.event("session_end") + if 'bind' in self.xmpp.features: + enable = stanza.Enable(self.xmpp) + enable['resume'] = self.allow_resume + enable.send() + log.debug("enabling SM") + + waiter = Waiter('enabled_or_failed', + MatchMany([ + MatchXPath(stanza.Enabled.tag_name()), + MatchXPath(stanza.Failed.tag_name())])) + self.xmpp.register_handler(waiter) + result = await waiter.wait() return False def _handle_enabled(self, stanza): -- cgit v1.2.3 From 3f739e513be8fdb3b00f78665650c6bf3c3f9911 Mon Sep 17 00:00:00 2001 From: mathieui Date: Thu, 28 Jan 2021 18:21:10 +0100 Subject: xmlstream: keep value of "end_session_on_disconnect" That value should be set statically. Worst case is we fail to resume the session. --- slixmpp/xmlstream/xmlstream.py | 1 - 1 file changed, 1 deletion(-) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 37cadc1f..5074aa8c 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -433,7 +433,6 @@ class XMLStream(asyncio.BaseProtocol): if self.xml_depth == 0: # The stream's root element has closed, # terminating the stream. - self.end_session_on_disconnect = True log.debug("End of stream received") self.disconnect_reason = "End of stream" self.abort() -- cgit v1.2.3 From f93af07882d19fd60af1696ccfa784ac4c03aa42 Mon Sep 17 00:00:00 2001 From: mathieui Date: Fri, 29 Jan 2021 16:07:44 +0100 Subject: XEP-0198: do not send acks when disconnected --- slixmpp/plugins/xep_0198/stream_management.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/slixmpp/plugins/xep_0198/stream_management.py b/slixmpp/plugins/xep_0198/stream_management.py index 30274ecf..1344235a 100644 --- a/slixmpp/plugins/xep_0198/stream_management.py +++ b/slixmpp/plugins/xep_0198/stream_management.py @@ -174,6 +174,9 @@ class XEP_0198(BasePlugin): def send_ack(self): """Send the current ack count to the server.""" + if not self.xmpp.transport: + log.debug('Disconnected: not sending ack') + return ack = stanza.Ack(self.xmpp) ack['h'] = self.handled self.xmpp.send_raw(str(ack)) -- cgit v1.2.3