summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormathieui <mathieui@mathieui.net>2019-08-23 10:57:20 +0200
committermathieui <mathieui@mathieui.net>2019-12-27 15:27:29 +0100
commit110bbf8afc17244ae4ecc9a0cb7ca799bb121834 (patch)
tree3894e737c6b1917464e695fb049d28c89c2c738c
parentd97efa0bd82daa54d4aca1a3996001d1b71634a7 (diff)
downloadslixmpp-110bbf8afc17244ae4ecc9a0cb7ca799bb121834.tar.gz
slixmpp-110bbf8afc17244ae4ecc9a0cb7ca799bb121834.tar.bz2
slixmpp-110bbf8afc17244ae4ecc9a0cb7ca799bb121834.tar.xz
slixmpp-110bbf8afc17244ae4ecc9a0cb7ca799bb121834.zip
Improve the send queue code a bit
-rw-r--r--slixmpp/xmlstream/xmlstream.py36
1 files changed, 28 insertions, 8 deletions
diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py
index 7909318d..83a43dbc 100644
--- a/slixmpp/xmlstream/xmlstream.py
+++ b/slixmpp/xmlstream/xmlstream.py
@@ -12,7 +12,7 @@
:license: MIT, see LICENSE for more details
"""
-from typing import Optional
+from typing import Optional, Set, Callable
import functools
import logging
@@ -896,10 +896,18 @@ class XMLStream(asyncio.BaseProtocol):
"""
return xml
- async def continue_slow_send(self, task, already_used):
- log.debug('rescheduled task: %s', task)
+ async def _continue_slow_send(self,
+ task: asyncio.Task,
+ already_used: Set[Callable[[ElementBase], Optional[StanzaBase]]]
+ ) -> None:
+ """
+ Used when an item in the send queue has taken too long to process.
+
+ This is away from the send queue and can take as much time as needed.
+ :param asyncio.Task task: the Task wrapping the coroutine
+ :param set already_used: Filters already used on this outgoing stanza
+ """
data = await task
- log.debug('data for rescheduled task %s : %s', task, data)
for filter in self.__filters['out']:
if filter in already_used:
continue
@@ -921,10 +929,12 @@ class XMLStream(asyncio.BaseProtocol):
else:
self.send_raw(data)
+
async def run_filters(self):
"""
Background loop that processes stanzas to send.
"""
+ class ContinueQueue(Exception): pass
while True:
(data, use_filters) = await self.waiting_queue.get()
try:
@@ -935,9 +945,17 @@ class XMLStream(asyncio.BaseProtocol):
already_run_filters.add(filter)
if iscoroutinefunction(filter):
task = asyncio.create_task(filter(data))
- completed, pending = await wait({task}, timeout=1)
+ completed, pending = await wait(
+ {task},
+ timeout=1,
+ )
if pending:
- asyncio.ensure_future(self.continue_slow_send(task, already_run_filters))
+ asyncio.ensure_future(
+ self._continue_slow_send(
+ task,
+ already_run_filters
+ )
+ )
raise Exception("Slow coro, rescheduling")
data = task.result()
else:
@@ -956,8 +974,10 @@ class XMLStream(asyncio.BaseProtocol):
self.send_raw(str_data)
else:
self.send_raw(data)
- except:
- log.error('Could not send stanza %s', data, exc_info=True)
+ except ContinueQueue as exc:
+ log.info('Stanza in send queue not sent: %s', exc)
+ except Exception:
+ log.error('Exception raised in send queue:', exc_info=True)
self.waiting_queue.task_done()
def send(self, data, use_filters=True):