From 1735c194cdf83b61850bba45044070db6c42d0ac Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Fri, 27 May 2011 16:59:52 -0700 Subject: Don't use the send queue for stream initialization. Use the parameter now=True to skip the queue when sending Iq stanzas, or using xmpp.send(). --- sleekxmpp/clientxmpp.py | 15 +++++----- sleekxmpp/componentxmpp.py | 1 + sleekxmpp/stanza/iq.py | 11 ++++--- sleekxmpp/test/sleektest.py | 2 ++ sleekxmpp/xmlstream/stanzabase.py | 12 ++++++-- sleekxmpp/xmlstream/xmlstream.py | 62 +++++++++++++++++++++++++++------------ 6 files changed, 70 insertions(+), 33 deletions(-) diff --git a/sleekxmpp/clientxmpp.py b/sleekxmpp/clientxmpp.py index 92186e91..fb5b2087 100644 --- a/sleekxmpp/clientxmpp.py +++ b/sleekxmpp/clientxmpp.py @@ -75,9 +75,6 @@ class ClientXMPP(BaseXMPP): self.plugin_whitelist = plugin_whitelist self.srv_support = SRV_SUPPORT - self.session_started_event = threading.Event() - self.session_started_event.clear() - self.stream_header = "" % ( self.boundjid.host, "xmlns:stream='%s'" % self.stream_ns, @@ -313,7 +310,7 @@ class ClientXMPP(BaseXMPP): self._handle_tls_start, name='TLS Proceed', instream=True) - self.send_xml(xml) + self.send_xml(xml, now=True) return True else: log.warning("The module tlslite is required to log in" +\ @@ -369,11 +366,13 @@ class ClientXMPP(BaseXMPP): self.send("%s" % ( sasl_ns, - auth)) + auth), + now=True) elif 'sasl:ANONYMOUS' in self.features and not self.boundjid.user: self.send("" % ( sasl_ns, - 'ANONYMOUS')) + 'ANONYMOUS'), + now=True) else: log.error("No appropriate login method.") self.disconnect() @@ -416,7 +415,7 @@ class ClientXMPP(BaseXMPP): res.text = self.boundjid.resource xml.append(res) iq.append(xml) - response = iq.send() + response = iq.send(now=True) bind_ns = 'urn:ietf:params:xml:ns:xmpp-bind' self.set_jid(response.xml.find('{%s}bind/{%s}jid' % (bind_ns, @@ -439,7 +438,7 @@ class ClientXMPP(BaseXMPP): """ if self.authenticated and self.bound: iq = self.makeIqSet(xml) - response = iq.send() + response = iq.send(now=True) log.debug("Established Session") self.sessionstarted = True self.session_started_event.set() diff --git a/sleekxmpp/componentxmpp.py b/sleekxmpp/componentxmpp.py index ae58c5f2..8c380ddc 100644 --- a/sleekxmpp/componentxmpp.py +++ b/sleekxmpp/componentxmpp.py @@ -138,4 +138,5 @@ class ComponentXMPP(BaseXMPP): Arguments: xml -- The reply handshake stanza. """ + self.session_started_event.set() self.event("session_start") diff --git a/sleekxmpp/stanza/iq.py b/sleekxmpp/stanza/iq.py index 82ab13ec..4a12a87e 100644 --- a/sleekxmpp/stanza/iq.py +++ b/sleekxmpp/stanza/iq.py @@ -154,7 +154,7 @@ class Iq(RootStanza): StanzaBase.reply(self, clear) return self - def send(self, block=True, timeout=None, callback=None): + def send(self, block=True, timeout=None, callback=None, now=False): """ Send an stanza over the XML stream. @@ -178,6 +178,9 @@ class Iq(RootStanza): Defaults to sleekxmpp.xmlstream.RESPONSE_TIMEOUT callback -- Optional reference to a stream handler function. Will be executed when a reply stanza is received. + now -- Indicates if the send queue should be skipped and send + the stanza immediately. Used during stream + initialization. Defaults to False. """ if timeout is None: timeout = self.stream.response_timeout @@ -188,15 +191,15 @@ class Iq(RootStanza): callback, once=True) self.stream.register_handler(handler) - StanzaBase.send(self) + StanzaBase.send(self, now=now) return handler_name elif block and self['type'] in ('get', 'set'): waitfor = Waiter('IqWait_%s' % self['id'], MatcherId(self['id'])) self.stream.register_handler(waitfor) - StanzaBase.send(self) + StanzaBase.send(self, now=now) return waitfor.wait(timeout) else: - return StanzaBase.send(self) + return StanzaBase.send(self, now=now) def _set_stanza_values(self, values): """ diff --git a/sleekxmpp/test/sleektest.py b/sleekxmpp/test/sleektest.py index 8cf7b70d..7802a9bc 100644 --- a/sleekxmpp/test/sleektest.py +++ b/sleekxmpp/test/sleektest.py @@ -334,6 +334,8 @@ class SleekTest(unittest.TestCase): self.xmpp.process(threaded=True) if skip: if socket != 'live': + # Mark send queue as usable + self.xmpp.session_started_event.set() # Clear startup stanzas self.xmpp.socket.next_sent(timeout=1) if mode == 'component': diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py index b8a7ceaa..d9a4636a 100644 --- a/sleekxmpp/xmlstream/stanzabase.py +++ b/sleekxmpp/xmlstream/stanzabase.py @@ -1253,9 +1253,15 @@ class StanzaBase(ElementBase): log.exception('Error handling {%s}%s stanza' % (self.namespace, self.name)) - def send(self): - """Queue the stanza to be sent on the XML stream.""" - self.stream.sendRaw(self.__str__()) + def send(self, now=False): + """ + Queue the stanza to be sent on the XML stream. + Arguments: + now -- Indicates if the queue should be skipped and the + stanza sent immediately. Useful for stream + initialization. Defaults to False. + """ + self.stream.send_raw(self.__str__(), now=now) def __copy__(self): """ diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 1dc2d430..9d00ee8c 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -187,6 +187,8 @@ class XMLStream(object): self.stop = threading.Event() self.stream_end_event = threading.Event() self.stream_end_event.set() + self.session_started_event = threading.Event() + self.event_queue = queue.Queue() self.send_queue = queue.Queue() self.scheduler = Scheduler(self.event_queue, self.stop) @@ -364,7 +366,8 @@ class XMLStream(object): def _disconnect(self, reconnect=False): # Send the end of stream marker. - self.send_raw(self.stream_footer) + self.send_raw(self.stream_footer, now=True) + self.session_started_event.clear() # Wait for confirmation that the stream was # closed in the other direction. self.auto_reconnect = reconnect @@ -657,7 +660,7 @@ class XMLStream(object): """ return xml - def send(self, data, mask=None, timeout=None): + def send(self, data, mask=None, timeout=None, now=False): """ A wrapper for send_raw for sending stanza objects. @@ -671,10 +674,13 @@ class XMLStream(object): or a timeout occurs. timeout -- Time in seconds to wait for a response before continuing. Defaults to RESPONSE_TIMEOUT. + now -- Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to False. """ if timeout is None: timeout = self.response_timeout - if hasattr(mask, 'xml'): mask = mask.xml data = str(data) @@ -683,21 +689,11 @@ class XMLStream(object): wait_for = Waiter("SendWait_%s" % self.new_id(), MatchXMLMask(mask)) self.register_handler(wait_for) - self.send_raw(data) + self.send_raw(data, now) if mask is not None: return wait_for.wait(timeout) - def send_raw(self, data): - """ - Send raw data across the stream. - - Arguments: - data -- Any string value. - """ - self.send_queue.put(data) - return True - - def send_xml(self, data, mask=None, timeout=None): + def send_xml(self, data, mask=None, timeout=None, now=False): """ Send an XML object on the stream, and optionally wait for a response. @@ -710,10 +706,39 @@ class XMLStream(object): or a timeout occurs. timeout -- Time in seconds to wait for a response before continuing. Defaults to RESPONSE_TIMEOUT. + now -- Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to False. """ if timeout is None: timeout = self.response_timeout - return self.send(tostring(data), mask, timeout) + return self.send(tostring(data), mask, timeout, now) + + def send_raw(self, data, now=False, reconnect=None): + """ + Send raw data across the stream. + + Arguments: + data -- Any string value. + reconnect -- Indicates if the stream should be + restarted if there is an error sending + the stanza. Used mainly for testing. + Defaults to self.auto_reconnect. + """ + if now: + log.debug("SEND: %s" % data) + try: + self.socket.send(data.encode('utf-8')) + except Socket.error as serr: + self.event('socket_error', serr) + log.warning("Failed to send %s" % data) + if reconnect is None: + reconnect = self.auto_reconnect + self.disconnect(reconnect) + else: + self.send_queue.put(data) + return True def process(self, threaded=True): """ @@ -767,7 +792,7 @@ class XMLStream(object): firstrun = False try: if self.is_client: - self.send_raw(self.stream_header) + self.send_raw(self.stream_header, now=True) # The call to self.__read_xml will block and prevent # the body of the loop from running until a disconnect # occurs. After any reconnection, the stream header will @@ -776,7 +801,7 @@ class XMLStream(object): # Ensure the stream header is sent for any # new connections. if self.is_client: - self.send_raw(self.stream_header) + self.send_raw(self.stream_header, now=True) except KeyboardInterrupt: log.debug("Keyboard Escape Detected in _process") self.stop.set() @@ -985,6 +1010,7 @@ class XMLStream(object): """ try: while not self.stop.isSet(): + self.session_started_event.wait() try: data = self.send_queue.get(True, 1) except queue.Empty: -- cgit v1.2.3