diff options
author | mathieui <mathieui@mathieui.net> | 2019-08-23 10:57:20 +0200 |
---|---|---|
committer | mathieui <mathieui@mathieui.net> | 2019-12-27 15:27:29 +0100 |
commit | 110bbf8afc17244ae4ecc9a0cb7ca799bb121834 (patch) | |
tree | 3894e737c6b1917464e695fb049d28c89c2c738c | |
parent | d97efa0bd82daa54d4aca1a3996001d1b71634a7 (diff) | |
download | slixmpp-110bbf8afc17244ae4ecc9a0cb7ca799bb121834.tar.gz slixmpp-110bbf8afc17244ae4ecc9a0cb7ca799bb121834.tar.bz2 slixmpp-110bbf8afc17244ae4ecc9a0cb7ca799bb121834.tar.xz slixmpp-110bbf8afc17244ae4ecc9a0cb7ca799bb121834.zip |
Improve the send queue code a bit
-rw-r--r-- | slixmpp/xmlstream/xmlstream.py | 36 |
1 files changed, 28 insertions, 8 deletions
diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 7909318d..83a43dbc 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -12,7 +12,7 @@ :license: MIT, see LICENSE for more details """ -from typing import Optional +from typing import Optional, Set, Callable import functools import logging @@ -896,10 +896,18 @@ class XMLStream(asyncio.BaseProtocol): """ return xml - async def continue_slow_send(self, task, already_used): - log.debug('rescheduled task: %s', task) + async def _continue_slow_send(self, + task: asyncio.Task, + already_used: Set[Callable[[ElementBase], Optional[StanzaBase]]] + ) -> None: + """ + Used when an item in the send queue has taken too long to process. + + This is away from the send queue and can take as much time as needed. + :param asyncio.Task task: the Task wrapping the coroutine + :param set already_used: Filters already used on this outgoing stanza + """ data = await task - log.debug('data for rescheduled task %s : %s', task, data) for filter in self.__filters['out']: if filter in already_used: continue @@ -921,10 +929,12 @@ class XMLStream(asyncio.BaseProtocol): else: self.send_raw(data) + async def run_filters(self): """ Background loop that processes stanzas to send. """ + class ContinueQueue(Exception): pass while True: (data, use_filters) = await self.waiting_queue.get() try: @@ -935,9 +945,17 @@ class XMLStream(asyncio.BaseProtocol): already_run_filters.add(filter) if iscoroutinefunction(filter): task = asyncio.create_task(filter(data)) - completed, pending = await wait({task}, timeout=1) + completed, pending = await wait( + {task}, + timeout=1, + ) if pending: - asyncio.ensure_future(self.continue_slow_send(task, already_run_filters)) + asyncio.ensure_future( + self._continue_slow_send( + task, + already_run_filters + ) + ) raise Exception("Slow coro, rescheduling") data = task.result() else: @@ -956,8 +974,10 @@ class XMLStream(asyncio.BaseProtocol): self.send_raw(str_data) else: self.send_raw(data) - except: - log.error('Could not send stanza %s', data, exc_info=True) + except ContinueQueue as exc: + log.info('Stanza in send queue not sent: %s', exc) + except Exception: + log.error('Exception raised in send queue:', exc_info=True) self.waiting_queue.task_done() def send(self, data, use_filters=True): |