From 384e1a92b716250c168f5dedc1f9693111f81423 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Fri, 27 May 2011 11:01:30 -0700 Subject: Added support for testind disconnect errors. --- sleekxmpp/xmlstream/xmlstream.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'sleekxmpp/xmlstream/xmlstream.py') diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 1c165562..468db032 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -978,7 +978,8 @@ class XMLStream(object): log.debug("SEND: %s" % data) try: self.socket.send(data.encode('utf-8')) - except: + except Socket.error as serr: + self.event('socket_error', serr) log.warning("Failed to send %s" % data) self.disconnect(self.auto_reconnect) except KeyboardInterrupt: -- cgit v1.2.3 From b81ab979006956134e5d924640936fe8cc20dbf3 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Fri, 27 May 2011 14:42:40 -0700 Subject: Add exponential backoff to connection attempts. Delay will approximately double between attempts (random variation). See issue #67. --- sleekxmpp/xmlstream/xmlstream.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) (limited to 'sleekxmpp/xmlstream/xmlstream.py') diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 468db032..6bf70fbf 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -17,6 +17,7 @@ import sys import threading import time import types +import random try: import queue except ImportError: @@ -45,6 +46,9 @@ HANDLER_THREADS = 1 # Flag indicating if the SSL library is available for use. SSL_SUPPORT = True +# Maximum time to delay between connection attempts is one hour. +RECONNECT_MAX_DELAY = 3600 + log = logging.getLogger(__name__) @@ -104,7 +108,11 @@ class XMLStream(object): use_ssl -- Flag indicating if SSL should be used. use_tls -- Flag indicating if TLS should be used. stop -- threading Event used to stop all threads. - auto_reconnect-- Flag to determine whether we auto reconnect. + + auto_reconnect -- Flag to determine whether we auto reconnect. + reconnect_max_delay -- Maximum time to delay between connection + attempts. Defaults to RECONNECT_MAX_DELAY, + which is one hour. Methods: add_event_handler -- Add a handler for a custom event. @@ -155,6 +163,7 @@ class XMLStream(object): self.ca_certs = None self.response_timeout = RESPONSE_TIMEOUT + self.reconnect_max_delay = RECONNECT_MAX_DELAY self.state = StateMachine(('disconnected', 'connected')) self.state._set_state('disconnected') @@ -291,9 +300,14 @@ class XMLStream(object): # is established. connected = self.state.transition('disconnected', 'connected', func=self._connect) + delay = 1.0 while reattempt and not connected: connected = self.state.transition('disconnected', 'connected', func=self._connect) + delay = min(delay * 2, self.reconnect_max_delay) + delay = random.normalvariate(delay, delay * 0.1) + log.debug('Waiting %s seconds before reconnecting.' % delay) + time.sleep(delay) return connected def _connect(self): -- cgit v1.2.3 From 6997b2fbf87a080a12334b348653ed4cb30f9218 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Fri, 27 May 2011 16:39:45 -0700 Subject: Fix typo for SSL certificate use. --- sleekxmpp/xmlstream/xmlstream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'sleekxmpp/xmlstream/xmlstream.py') diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 6bf70fbf..1dc2d430 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -323,7 +323,7 @@ class XMLStream(object): ssl_socket = ssl.wrap_socket(self.socket, ca_certs=self.ca_certs, - certs_reqs=cert_policy) + cert_reqs=cert_policy) if hasattr(self.socket, 'socket'): # We are using a testing socket, so preserve the top -- cgit v1.2.3 From 1735c194cdf83b61850bba45044070db6c42d0ac Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Fri, 27 May 2011 16:59:52 -0700 Subject: Don't use the send queue for stream initialization. Use the parameter now=True to skip the queue when sending Iq stanzas, or using xmpp.send(). --- sleekxmpp/xmlstream/xmlstream.py | 62 ++++++++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 18 deletions(-) (limited to 'sleekxmpp/xmlstream/xmlstream.py') diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 1dc2d430..9d00ee8c 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -187,6 +187,8 @@ class XMLStream(object): self.stop = threading.Event() self.stream_end_event = threading.Event() self.stream_end_event.set() + self.session_started_event = threading.Event() + self.event_queue = queue.Queue() self.send_queue = queue.Queue() self.scheduler = Scheduler(self.event_queue, self.stop) @@ -364,7 +366,8 @@ class XMLStream(object): def _disconnect(self, reconnect=False): # Send the end of stream marker. - self.send_raw(self.stream_footer) + self.send_raw(self.stream_footer, now=True) + self.session_started_event.clear() # Wait for confirmation that the stream was # closed in the other direction. self.auto_reconnect = reconnect @@ -657,7 +660,7 @@ class XMLStream(object): """ return xml - def send(self, data, mask=None, timeout=None): + def send(self, data, mask=None, timeout=None, now=False): """ A wrapper for send_raw for sending stanza objects. @@ -671,10 +674,13 @@ class XMLStream(object): or a timeout occurs. timeout -- Time in seconds to wait for a response before continuing. Defaults to RESPONSE_TIMEOUT. + now -- Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to False. """ if timeout is None: timeout = self.response_timeout - if hasattr(mask, 'xml'): mask = mask.xml data = str(data) @@ -683,21 +689,11 @@ class XMLStream(object): wait_for = Waiter("SendWait_%s" % self.new_id(), MatchXMLMask(mask)) self.register_handler(wait_for) - self.send_raw(data) + self.send_raw(data, now) if mask is not None: return wait_for.wait(timeout) - def send_raw(self, data): - """ - Send raw data across the stream. - - Arguments: - data -- Any string value. - """ - self.send_queue.put(data) - return True - - def send_xml(self, data, mask=None, timeout=None): + def send_xml(self, data, mask=None, timeout=None, now=False): """ Send an XML object on the stream, and optionally wait for a response. @@ -710,10 +706,39 @@ class XMLStream(object): or a timeout occurs. timeout -- Time in seconds to wait for a response before continuing. Defaults to RESPONSE_TIMEOUT. + now -- Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to False. """ if timeout is None: timeout = self.response_timeout - return self.send(tostring(data), mask, timeout) + return self.send(tostring(data), mask, timeout, now) + + def send_raw(self, data, now=False, reconnect=None): + """ + Send raw data across the stream. + + Arguments: + data -- Any string value. + reconnect -- Indicates if the stream should be + restarted if there is an error sending + the stanza. Used mainly for testing. + Defaults to self.auto_reconnect. + """ + if now: + log.debug("SEND: %s" % data) + try: + self.socket.send(data.encode('utf-8')) + except Socket.error as serr: + self.event('socket_error', serr) + log.warning("Failed to send %s" % data) + if reconnect is None: + reconnect = self.auto_reconnect + self.disconnect(reconnect) + else: + self.send_queue.put(data) + return True def process(self, threaded=True): """ @@ -767,7 +792,7 @@ class XMLStream(object): firstrun = False try: if self.is_client: - self.send_raw(self.stream_header) + self.send_raw(self.stream_header, now=True) # The call to self.__read_xml will block and prevent # the body of the loop from running until a disconnect # occurs. After any reconnection, the stream header will @@ -776,7 +801,7 @@ class XMLStream(object): # Ensure the stream header is sent for any # new connections. if self.is_client: - self.send_raw(self.stream_header) + self.send_raw(self.stream_header, now=True) except KeyboardInterrupt: log.debug("Keyboard Escape Detected in _process") self.stop.set() @@ -985,6 +1010,7 @@ class XMLStream(object): """ try: while not self.stop.isSet(): + self.session_started_event.wait() try: data = self.send_queue.get(True, 1) except queue.Empty: -- cgit v1.2.3 From 8080b4cae2000ccd5be2eaa442b903d1b180273b Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Tue, 31 May 2011 10:23:05 -0700 Subject: Cleanup logging and exception handling. The syntax and attribute errors raised during a disconnect/reconnect attempt are now caught and produce nicer log messages. --- sleekxmpp/xmlstream/xmlstream.py | 64 +++++++++++++++++++++------------------- 1 file changed, 34 insertions(+), 30 deletions(-) (limited to 'sleekxmpp/xmlstream/xmlstream.py') diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 9d00ee8c..121e5978 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -727,7 +727,7 @@ class XMLStream(object): Defaults to self.auto_reconnect. """ if now: - log.debug("SEND: %s" % data) + log.debug("SEND (IMMED): %s" % data) try: self.socket.send(data.encode('utf-8')) except Socket.error as serr: @@ -829,35 +829,39 @@ class XMLStream(object): """ depth = 0 root = None - for (event, xml) in ET.iterparse(self.filesocket, (b'end', b'start')): - if event == b'start': - if depth == 0: - # We have received the start of the root element. - root = xml - # Perform any stream initialization actions, such - # as handshakes. - self.stream_end_event.clear() - self.start_stream_handler(root) - depth += 1 - if event == b'end': - depth -= 1 - if depth == 0: - # The stream's root element has closed, - # terminating the stream. - log.debug("End of stream recieved") - self.stream_end_event.set() - return False - elif depth == 1: - # We only raise events for stanzas that are direct - # children of the root element. - try: - self.__spawn_event(xml) - except RestartStream: - return True - if root: - # Keep the root element empty of children to - # save on memory use. - root.clear() + try: + for (event, xml) in ET.iterparse(self.filesocket, + (b'end', b'start')): + if event == b'start': + if depth == 0: + # We have received the start of the root element. + root = xml + # Perform any stream initialization actions, such + # as handshakes. + self.stream_end_event.clear() + self.start_stream_handler(root) + depth += 1 + if event == b'end': + depth -= 1 + if depth == 0: + # The stream's root element has closed, + # terminating the stream. + log.debug("End of stream recieved") + self.stream_end_event.set() + return False + elif depth == 1: + # We only raise events for stanzas that are direct + # children of the root element. + try: + self.__spawn_event(xml) + except RestartStream: + return True + if root: + # Keep the root element empty of children to + # save on memory use. + root.clear() + except SyntaxError: + log.error("Error reading from XML stream.") log.debug("Ending read XML loop") def _build_stanza(self, xml, default_ns=None): -- cgit v1.2.3 From a81162edd2434756e21d7f9a79d71d770a43db7b Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Tue, 31 May 2011 10:53:14 -0700 Subject: Apply connection backoff to reconnect attempts. Backoff was only being done for the initial connection attempt before. Now any reconnection will start with a minimum 1 sec delay which will approximately double between attempts. --- sleekxmpp/xmlstream/xmlstream.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) (limited to 'sleekxmpp/xmlstream/xmlstream.py') diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 121e5978..2d72de5f 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -163,6 +163,7 @@ class XMLStream(object): self.ca_certs = None self.response_timeout = RESPONSE_TIMEOUT + self.reconnect_delay = None self.reconnect_max_delay = RECONNECT_MAX_DELAY self.state = StateMachine(('disconnected', 'connected')) @@ -302,20 +303,24 @@ class XMLStream(object): # is established. connected = self.state.transition('disconnected', 'connected', func=self._connect) - delay = 1.0 while reattempt and not connected: connected = self.state.transition('disconnected', 'connected', func=self._connect) - delay = min(delay * 2, self.reconnect_max_delay) - delay = random.normalvariate(delay, delay * 0.1) - log.debug('Waiting %s seconds before reconnecting.' % delay) - time.sleep(delay) return connected def _connect(self): self.stop.clear() self.socket = self.socket_class(Socket.AF_INET, Socket.SOCK_STREAM) self.socket.settimeout(None) + + if self.reconnect_delay is None: + delay = 1.0 + else: + delay = min(self.reconnect_delay * 2, self.reconnect_max_delay) + delay = random.normalvariate(delay, delay * 0.1) + log.debug('Waiting %s seconds before connecting.' % delay) + time.sleep(delay) + if self.use_ssl and self.ssl_support: log.debug("Socket Wrapped for SSL") if self.ca_certs is None: @@ -340,13 +345,14 @@ class XMLStream(object): self.set_socket(self.socket, ignore=True) #this event is where you should set your application state self.event("connected", direct=True) + self.reconnect_delay = 1.0 return True except Socket.error as serr: error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" self.event('socket_error', serr) log.error(error_msg % (self.address[0], self.address[1], serr.errno, serr.strerror)) - time.sleep(1) + self.reconnect_delay = delay return False def disconnect(self, reconnect=False): -- cgit v1.2.3