diff options
author | mathieui <mathieui@mathieui.net> | 2019-08-21 21:18:24 +0200 |
---|---|---|
committer | mathieui <mathieui@mathieui.net> | 2019-12-27 15:27:25 +0100 |
commit | 31f6ef681401b7c54c07212ad2741f9eca324e48 (patch) | |
tree | 56c10143075d08f0764fd9b8b75d0c48ae8d8ae5 | |
parent | 9b3874b5dfecdbfa4170430f826413a19a336d7f (diff) | |
download | slixmpp-31f6ef681401b7c54c07212ad2741f9eca324e48.tar.gz slixmpp-31f6ef681401b7c54c07212ad2741f9eca324e48.tar.bz2 slixmpp-31f6ef681401b7c54c07212ad2741f9eca324e48.tar.xz slixmpp-31f6ef681401b7c54c07212ad2741f9eca324e48.zip |
Run the send queue in a separate coroutine
To be able to run async stream filters
-rw-r--r-- | slixmpp/xmlstream/xmlstream.py | 63 |
1 files changed, 42 insertions, 21 deletions
diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 9f6f3083..46b39392 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -21,6 +21,8 @@ import ssl import weakref import uuid +from asyncio import iscoroutinefunction + import xml.etree.ElementTree as ET from slixmpp.xmlstream.asyncio import asyncio @@ -83,6 +85,8 @@ class XMLStream(asyncio.BaseProtocol): self.force_starttls = None self.disable_starttls = None + self.waiting_queue = asyncio.Queue() + # A dict of {name: handle} self.scheduled_events = {} @@ -263,6 +267,10 @@ class XMLStream(asyncio.BaseProtocol): localhost """ + asyncio.ensure_future( + self.run_filters(), + loop=self.loop, + ) self.disconnect_reason = None self.cancel_connection_attempt() if host and port: @@ -789,7 +797,7 @@ class XMLStream(asyncio.BaseProtocol): # If the callback is a coroutine, schedule it instead of # running it directly - if asyncio.iscoroutinefunction(handler_callback): + if iscoroutinefunction(handler_callback): async def handler_callback_routine(cb): try: await cb(data) @@ -888,11 +896,41 @@ class XMLStream(asyncio.BaseProtocol): """ return xml + async def run_filters(self): + """ + Background loop that processes stanzas to send. + """ + while True: + (data, use_filters) = await self.waiting_queue.get() + if isinstance(data, ElementBase): + if use_filters: + for filter in self.__filters['out']: + if iscoroutinefunction(filter): + data = await filter(data) + else: + data = filter(data) + if data is None: + return + + if isinstance(data, ElementBase): + if use_filters: + for filter in self.__filters['out_sync']: + if iscoroutinefunction(filter): + data = await filter(data) + else: + data = filter(data) + if data is None: + return + str_data = tostring(data.xml, xmlns=self.default_ns, + stream=self, top_level=True) + self.send_raw(str_data) + else: + self.send_raw(data) + self.waiting_queue.task_done() + def send(self, data, use_filters=True): """A wrapper for :meth:`send_raw()` for sending stanza objects. - May optionally block until an expected response is received. - :param data: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase` stanza to send on the stream. :param bool use_filters: Indicates if outgoing filters should be @@ -900,24 +938,7 @@ class XMLStream(asyncio.BaseProtocol): filters is useful when resending stanzas. Defaults to ``True``. """ - if isinstance(data, ElementBase): - if use_filters: - for filter in self.__filters['out']: - data = filter(data) - if data is None: - return - - if isinstance(data, ElementBase): - if use_filters: - for filter in self.__filters['out_sync']: - data = filter(data) - if data is None: - return - str_data = tostring(data.xml, xmlns=self.default_ns, - stream=self, top_level=True) - self.send_raw(str_data) - else: - self.send_raw(data) + self.waiting_queue.put_nowait((data, use_filters)) def send_xml(self, data): """Send an XML object on the stream |