diff options
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r-- | sleekxmpp/xmlstream/stanzabase.py | 12 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 62 |
2 files changed, 53 insertions, 21 deletions
diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py index b8a7ceaa..d9a4636a 100644 --- a/sleekxmpp/xmlstream/stanzabase.py +++ b/sleekxmpp/xmlstream/stanzabase.py @@ -1253,9 +1253,15 @@ class StanzaBase(ElementBase): log.exception('Error handling {%s}%s stanza' % (self.namespace, self.name)) - def send(self): - """Queue the stanza to be sent on the XML stream.""" - self.stream.sendRaw(self.__str__()) + def send(self, now=False): + """ + Queue the stanza to be sent on the XML stream. + Arguments: + now -- Indicates if the queue should be skipped and the + stanza sent immediately. Useful for stream + initialization. Defaults to False. + """ + self.stream.send_raw(self.__str__(), now=now) def __copy__(self): """ diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 1dc2d430..9d00ee8c 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -187,6 +187,8 @@ class XMLStream(object): self.stop = threading.Event() self.stream_end_event = threading.Event() self.stream_end_event.set() + self.session_started_event = threading.Event() + self.event_queue = queue.Queue() self.send_queue = queue.Queue() self.scheduler = Scheduler(self.event_queue, self.stop) @@ -364,7 +366,8 @@ class XMLStream(object): def _disconnect(self, reconnect=False): # Send the end of stream marker. - self.send_raw(self.stream_footer) + self.send_raw(self.stream_footer, now=True) + self.session_started_event.clear() # Wait for confirmation that the stream was # closed in the other direction. self.auto_reconnect = reconnect @@ -657,7 +660,7 @@ class XMLStream(object): """ return xml - def send(self, data, mask=None, timeout=None): + def send(self, data, mask=None, timeout=None, now=False): """ A wrapper for send_raw for sending stanza objects. @@ -671,10 +674,13 @@ class XMLStream(object): or a timeout occurs. timeout -- Time in seconds to wait for a response before continuing. Defaults to RESPONSE_TIMEOUT. + now -- Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to False. """ if timeout is None: timeout = self.response_timeout - if hasattr(mask, 'xml'): mask = mask.xml data = str(data) @@ -683,21 +689,11 @@ class XMLStream(object): wait_for = Waiter("SendWait_%s" % self.new_id(), MatchXMLMask(mask)) self.register_handler(wait_for) - self.send_raw(data) + self.send_raw(data, now) if mask is not None: return wait_for.wait(timeout) - def send_raw(self, data): - """ - Send raw data across the stream. - - Arguments: - data -- Any string value. - """ - self.send_queue.put(data) - return True - - def send_xml(self, data, mask=None, timeout=None): + def send_xml(self, data, mask=None, timeout=None, now=False): """ Send an XML object on the stream, and optionally wait for a response. @@ -710,10 +706,39 @@ class XMLStream(object): or a timeout occurs. timeout -- Time in seconds to wait for a response before continuing. Defaults to RESPONSE_TIMEOUT. + now -- Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to False. """ if timeout is None: timeout = self.response_timeout - return self.send(tostring(data), mask, timeout) + return self.send(tostring(data), mask, timeout, now) + + def send_raw(self, data, now=False, reconnect=None): + """ + Send raw data across the stream. + + Arguments: + data -- Any string value. + reconnect -- Indicates if the stream should be + restarted if there is an error sending + the stanza. Used mainly for testing. + Defaults to self.auto_reconnect. + """ + if now: + log.debug("SEND: %s" % data) + try: + self.socket.send(data.encode('utf-8')) + except Socket.error as serr: + self.event('socket_error', serr) + log.warning("Failed to send %s" % data) + if reconnect is None: + reconnect = self.auto_reconnect + self.disconnect(reconnect) + else: + self.send_queue.put(data) + return True def process(self, threaded=True): """ @@ -767,7 +792,7 @@ class XMLStream(object): firstrun = False try: if self.is_client: - self.send_raw(self.stream_header) + self.send_raw(self.stream_header, now=True) # The call to self.__read_xml will block and prevent # the body of the loop from running until a disconnect # occurs. After any reconnection, the stream header will @@ -776,7 +801,7 @@ class XMLStream(object): # Ensure the stream header is sent for any # new connections. if self.is_client: - self.send_raw(self.stream_header) + self.send_raw(self.stream_header, now=True) except KeyboardInterrupt: log.debug("Keyboard Escape Detected in _process") self.stop.set() @@ -985,6 +1010,7 @@ class XMLStream(object): """ try: while not self.stop.isSet(): + self.session_started_event.wait() try: data = self.send_queue.get(True, 1) except queue.Empty: |