summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r--sleekxmpp/xmlstream/stanzabase.py12
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py62
2 files changed, 53 insertions, 21 deletions
diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py
index b8a7ceaa..d9a4636a 100644
--- a/sleekxmpp/xmlstream/stanzabase.py
+++ b/sleekxmpp/xmlstream/stanzabase.py
@@ -1253,9 +1253,15 @@ class StanzaBase(ElementBase):
log.exception('Error handling {%s}%s stanza' % (self.namespace,
self.name))
- def send(self):
- """Queue the stanza to be sent on the XML stream."""
- self.stream.sendRaw(self.__str__())
+ def send(self, now=False):
+ """
+ Queue the stanza to be sent on the XML stream.
+ Arguments:
+ now -- Indicates if the queue should be skipped and the
+ stanza sent immediately. Useful for stream
+ initialization. Defaults to False.
+ """
+ self.stream.send_raw(self.__str__(), now=now)
def __copy__(self):
"""
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index 1dc2d430..9d00ee8c 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -187,6 +187,8 @@ class XMLStream(object):
self.stop = threading.Event()
self.stream_end_event = threading.Event()
self.stream_end_event.set()
+ self.session_started_event = threading.Event()
+
self.event_queue = queue.Queue()
self.send_queue = queue.Queue()
self.scheduler = Scheduler(self.event_queue, self.stop)
@@ -364,7 +366,8 @@ class XMLStream(object):
def _disconnect(self, reconnect=False):
# Send the end of stream marker.
- self.send_raw(self.stream_footer)
+ self.send_raw(self.stream_footer, now=True)
+ self.session_started_event.clear()
# Wait for confirmation that the stream was
# closed in the other direction.
self.auto_reconnect = reconnect
@@ -657,7 +660,7 @@ class XMLStream(object):
"""
return xml
- def send(self, data, mask=None, timeout=None):
+ def send(self, data, mask=None, timeout=None, now=False):
"""
A wrapper for send_raw for sending stanza objects.
@@ -671,10 +674,13 @@ class XMLStream(object):
or a timeout occurs.
timeout -- Time in seconds to wait for a response before
continuing. Defaults to RESPONSE_TIMEOUT.
+ now -- Indicates if the send queue should be skipped,
+ sending the stanza immediately. Useful mainly
+ for stream initialization stanzas.
+ Defaults to False.
"""
if timeout is None:
timeout = self.response_timeout
-
if hasattr(mask, 'xml'):
mask = mask.xml
data = str(data)
@@ -683,21 +689,11 @@ class XMLStream(object):
wait_for = Waiter("SendWait_%s" % self.new_id(),
MatchXMLMask(mask))
self.register_handler(wait_for)
- self.send_raw(data)
+ self.send_raw(data, now)
if mask is not None:
return wait_for.wait(timeout)
- def send_raw(self, data):
- """
- Send raw data across the stream.
-
- Arguments:
- data -- Any string value.
- """
- self.send_queue.put(data)
- return True
-
- def send_xml(self, data, mask=None, timeout=None):
+ def send_xml(self, data, mask=None, timeout=None, now=False):
"""
Send an XML object on the stream, and optionally wait
for a response.
@@ -710,10 +706,39 @@ class XMLStream(object):
or a timeout occurs.
timeout -- Time in seconds to wait for a response before
continuing. Defaults to RESPONSE_TIMEOUT.
+ now -- Indicates if the send queue should be skipped,
+ sending the stanza immediately. Useful mainly
+ for stream initialization stanzas.
+ Defaults to False.
"""
if timeout is None:
timeout = self.response_timeout
- return self.send(tostring(data), mask, timeout)
+ return self.send(tostring(data), mask, timeout, now)
+
+ def send_raw(self, data, now=False, reconnect=None):
+ """
+ Send raw data across the stream.
+
+ Arguments:
+ data -- Any string value.
+ reconnect -- Indicates if the stream should be
+ restarted if there is an error sending
+ the stanza. Used mainly for testing.
+ Defaults to self.auto_reconnect.
+ """
+ if now:
+ log.debug("SEND: %s" % data)
+ try:
+ self.socket.send(data.encode('utf-8'))
+ except Socket.error as serr:
+ self.event('socket_error', serr)
+ log.warning("Failed to send %s" % data)
+ if reconnect is None:
+ reconnect = self.auto_reconnect
+ self.disconnect(reconnect)
+ else:
+ self.send_queue.put(data)
+ return True
def process(self, threaded=True):
"""
@@ -767,7 +792,7 @@ class XMLStream(object):
firstrun = False
try:
if self.is_client:
- self.send_raw(self.stream_header)
+ self.send_raw(self.stream_header, now=True)
# The call to self.__read_xml will block and prevent
# the body of the loop from running until a disconnect
# occurs. After any reconnection, the stream header will
@@ -776,7 +801,7 @@ class XMLStream(object):
# Ensure the stream header is sent for any
# new connections.
if self.is_client:
- self.send_raw(self.stream_header)
+ self.send_raw(self.stream_header, now=True)
except KeyboardInterrupt:
log.debug("Keyboard Escape Detected in _process")
self.stop.set()
@@ -985,6 +1010,7 @@ class XMLStream(object):
"""
try:
while not self.stop.isSet():
+ self.session_started_event.wait()
try:
data = self.send_queue.get(True, 1)
except queue.Empty: