summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py31
1 files changed, 23 insertions, 8 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index f3e3faa8..22469039 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -267,6 +267,7 @@ class XMLStream(object):
#: A queue of string data to be sent over the stream.
self.send_queue = queue.Queue()
+ self.send_queue_lock = threading.Lock()
#: A :class:`~sleekxmpp.xmlstream.scheduler.Scheduler` instance for
#: executing callbacks in the future based on time delays.
@@ -281,7 +282,7 @@ class XMLStream(object):
self.__handlers = []
self.__event_handlers = {}
self.__event_handlers_lock = threading.Lock()
- self.__filters = {'in': [], 'out': []}
+ self.__filters = {'in': [], 'out': [], 'out_sync': []}
self._id = 0
self._id_lock = threading.Lock()
@@ -1080,7 +1081,7 @@ class XMLStream(object):
"""
return xml
- def send(self, data, mask=None, timeout=None, now=False):
+ def send(self, data, mask=None, timeout=None, now=False, use_filters=True):
"""A wrapper for :meth:`send_raw()` for sending stanza objects.
May optionally block until an expected response is received.
@@ -1098,6 +1099,10 @@ class XMLStream(object):
sending the stanza immediately. Useful mainly
for stream initialization stanzas.
Defaults to ``False``.
+ :param bool use_filters: Indicates if outgoing filters should be
+ applied to the given stanza data. Disabling
+ filters is useful when resending stanzas.
+ Defaults to ``True``.
"""
if timeout is None:
timeout = self.response_timeout
@@ -1105,19 +1110,29 @@ class XMLStream(object):
mask = mask.xml
if isinstance(data, ElementBase):
- for filter in self.__filters['out']:
- if data is not None:
+ if use_filters:
+ for filter in self.__filters['out']:
data = filter(data)
- if data is None:
- return
+ if data is None:
+ return
- data = str(data)
if mask is not None:
log.warning("Use of send mask waiters is deprecated.")
wait_for = Waiter("SendWait_%s" % self.new_id(),
MatchXMLMask(mask))
self.register_handler(wait_for)
- self.send_raw(data, now)
+
+ if isinstance(data, ElementBase):
+ with self.send_queue_lock:
+ if use_filters:
+ for filter in self.__filters['out_sync']:
+ data = filter(data)
+ if data is None:
+ return
+ str_data = str(data)
+ self.send_raw(str_data, now)
+ else:
+ self.send_raw(data, now)
if mask is not None:
return wait_for.wait(timeout)