From 31f6ef681401b7c54c07212ad2741f9eca324e48 Mon Sep 17 00:00:00 2001 From: mathieui Date: Wed, 21 Aug 2019 21:18:24 +0200 Subject: Run the send queue in a separate coroutine To be able to run async stream filters --- slixmpp/xmlstream/xmlstream.py | 63 ++++++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 9f6f3083..46b39392 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -21,6 +21,8 @@ import ssl import weakref import uuid +from asyncio import iscoroutinefunction + import xml.etree.ElementTree as ET from slixmpp.xmlstream.asyncio import asyncio @@ -83,6 +85,8 @@ class XMLStream(asyncio.BaseProtocol): self.force_starttls = None self.disable_starttls = None + self.waiting_queue = asyncio.Queue() + # A dict of {name: handle} self.scheduled_events = {} @@ -263,6 +267,10 @@ class XMLStream(asyncio.BaseProtocol): localhost """ + asyncio.ensure_future( + self.run_filters(), + loop=self.loop, + ) self.disconnect_reason = None self.cancel_connection_attempt() if host and port: @@ -789,7 +797,7 @@ class XMLStream(asyncio.BaseProtocol): # If the callback is a coroutine, schedule it instead of # running it directly - if asyncio.iscoroutinefunction(handler_callback): + if iscoroutinefunction(handler_callback): async def handler_callback_routine(cb): try: await cb(data) @@ -888,11 +896,41 @@ class XMLStream(asyncio.BaseProtocol): """ return xml + async def run_filters(self): + """ + Background loop that processes stanzas to send. + """ + while True: + (data, use_filters) = await self.waiting_queue.get() + if isinstance(data, ElementBase): + if use_filters: + for filter in self.__filters['out']: + if iscoroutinefunction(filter): + data = await filter(data) + else: + data = filter(data) + if data is None: + return + + if isinstance(data, ElementBase): + if use_filters: + for filter in self.__filters['out_sync']: + if iscoroutinefunction(filter): + data = await filter(data) + else: + data = filter(data) + if data is None: + return + str_data = tostring(data.xml, xmlns=self.default_ns, + stream=self, top_level=True) + self.send_raw(str_data) + else: + self.send_raw(data) + self.waiting_queue.task_done() + def send(self, data, use_filters=True): """A wrapper for :meth:`send_raw()` for sending stanza objects. - May optionally block until an expected response is received. - :param data: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase` stanza to send on the stream. :param bool use_filters: Indicates if outgoing filters should be @@ -900,24 +938,7 @@ class XMLStream(asyncio.BaseProtocol): filters is useful when resending stanzas. Defaults to ``True``. """ - if isinstance(data, ElementBase): - if use_filters: - for filter in self.__filters['out']: - data = filter(data) - if data is None: - return - - if isinstance(data, ElementBase): - if use_filters: - for filter in self.__filters['out_sync']: - data = filter(data) - if data is None: - return - str_data = tostring(data.xml, xmlns=self.default_ns, - stream=self, top_level=True) - self.send_raw(str_data) - else: - self.send_raw(data) + self.waiting_queue.put_nowait((data, use_filters)) def send_xml(self, data): """Send an XML object on the stream -- cgit v1.2.3