summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--slixmpp/xmlstream/xmlstream.py63
1 files changed, 42 insertions, 21 deletions
diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py
index 9f6f3083..46b39392 100644
--- a/slixmpp/xmlstream/xmlstream.py
+++ b/slixmpp/xmlstream/xmlstream.py
@@ -21,6 +21,8 @@ import ssl
import weakref
import uuid
+from asyncio import iscoroutinefunction
+
import xml.etree.ElementTree as ET
from slixmpp.xmlstream.asyncio import asyncio
@@ -83,6 +85,8 @@ class XMLStream(asyncio.BaseProtocol):
self.force_starttls = None
self.disable_starttls = None
+ self.waiting_queue = asyncio.Queue()
+
# A dict of {name: handle}
self.scheduled_events = {}
@@ -263,6 +267,10 @@ class XMLStream(asyncio.BaseProtocol):
localhost
"""
+ asyncio.ensure_future(
+ self.run_filters(),
+ loop=self.loop,
+ )
self.disconnect_reason = None
self.cancel_connection_attempt()
if host and port:
@@ -789,7 +797,7 @@ class XMLStream(asyncio.BaseProtocol):
# If the callback is a coroutine, schedule it instead of
# running it directly
- if asyncio.iscoroutinefunction(handler_callback):
+ if iscoroutinefunction(handler_callback):
async def handler_callback_routine(cb):
try:
await cb(data)
@@ -888,11 +896,41 @@ class XMLStream(asyncio.BaseProtocol):
"""
return xml
+ async def run_filters(self):
+ """
+ Background loop that processes stanzas to send.
+ """
+ while True:
+ (data, use_filters) = await self.waiting_queue.get()
+ if isinstance(data, ElementBase):
+ if use_filters:
+ for filter in self.__filters['out']:
+ if iscoroutinefunction(filter):
+ data = await filter(data)
+ else:
+ data = filter(data)
+ if data is None:
+ return
+
+ if isinstance(data, ElementBase):
+ if use_filters:
+ for filter in self.__filters['out_sync']:
+ if iscoroutinefunction(filter):
+ data = await filter(data)
+ else:
+ data = filter(data)
+ if data is None:
+ return
+ str_data = tostring(data.xml, xmlns=self.default_ns,
+ stream=self, top_level=True)
+ self.send_raw(str_data)
+ else:
+ self.send_raw(data)
+ self.waiting_queue.task_done()
+
def send(self, data, use_filters=True):
"""A wrapper for :meth:`send_raw()` for sending stanza objects.
- May optionally block until an expected response is received.
-
:param data: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase`
stanza to send on the stream.
:param bool use_filters: Indicates if outgoing filters should be
@@ -900,24 +938,7 @@ class XMLStream(asyncio.BaseProtocol):
filters is useful when resending stanzas.
Defaults to ``True``.
"""
- if isinstance(data, ElementBase):
- if use_filters:
- for filter in self.__filters['out']:
- data = filter(data)
- if data is None:
- return
-
- if isinstance(data, ElementBase):
- if use_filters:
- for filter in self.__filters['out_sync']:
- data = filter(data)
- if data is None:
- return
- str_data = tostring(data.xml, xmlns=self.default_ns,
- stream=self, top_level=True)
- self.send_raw(str_data)
- else:
- self.send_raw(data)
+ self.waiting_queue.put_nowait((data, use_filters))
def send_xml(self, data):
"""Send an XML object on the stream