diff options
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r-- | sleekxmpp/xmlstream/filesocket.py | 2 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/stanzabase.py | 12 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 153 |
3 files changed, 113 insertions, 54 deletions
diff --git a/sleekxmpp/xmlstream/filesocket.py b/sleekxmpp/xmlstream/filesocket.py index 441ff875..fd81864b 100644 --- a/sleekxmpp/xmlstream/filesocket.py +++ b/sleekxmpp/xmlstream/filesocket.py @@ -22,6 +22,8 @@ class FileSocket(_fileobject): def read(self, size=4096): """Read data from the socket as if it were a file.""" + if self._sock is None: + return None data = self._sock.recv(size) if data is not None: return data diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py index 28f78f3c..f1a9e1f5 100644 --- a/sleekxmpp/xmlstream/stanzabase.py +++ b/sleekxmpp/xmlstream/stanzabase.py @@ -1255,9 +1255,15 @@ class StanzaBase(ElementBase): log.exception('Error handling {%s}%s stanza' % (self.namespace, self.name)) - def send(self): - """Queue the stanza to be sent on the XML stream.""" - self.stream.sendRaw(self.__str__()) + def send(self, now=False): + """ + Queue the stanza to be sent on the XML stream. + Arguments: + now -- Indicates if the queue should be skipped and the + stanza sent immediately. Useful for stream + initialization. Defaults to False. + """ + self.stream.send_raw(self.__str__(), now=now) def __copy__(self): """ diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 1c165562..2d72de5f 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -17,6 +17,7 @@ import sys import threading import time import types +import random try: import queue except ImportError: @@ -45,6 +46,9 @@ HANDLER_THREADS = 1 # Flag indicating if the SSL library is available for use. SSL_SUPPORT = True +# Maximum time to delay between connection attempts is one hour. +RECONNECT_MAX_DELAY = 3600 + log = logging.getLogger(__name__) @@ -104,7 +108,11 @@ class XMLStream(object): use_ssl -- Flag indicating if SSL should be used. use_tls -- Flag indicating if TLS should be used. stop -- threading Event used to stop all threads. - auto_reconnect-- Flag to determine whether we auto reconnect. + + auto_reconnect -- Flag to determine whether we auto reconnect. + reconnect_max_delay -- Maximum time to delay between connection + attempts. Defaults to RECONNECT_MAX_DELAY, + which is one hour. Methods: add_event_handler -- Add a handler for a custom event. @@ -155,6 +163,8 @@ class XMLStream(object): self.ca_certs = None self.response_timeout = RESPONSE_TIMEOUT + self.reconnect_delay = None + self.reconnect_max_delay = RECONNECT_MAX_DELAY self.state = StateMachine(('disconnected', 'connected')) self.state._set_state('disconnected') @@ -178,6 +188,8 @@ class XMLStream(object): self.stop = threading.Event() self.stream_end_event = threading.Event() self.stream_end_event.set() + self.session_started_event = threading.Event() + self.event_queue = queue.Queue() self.send_queue = queue.Queue() self.scheduler = Scheduler(self.event_queue, self.stop) @@ -300,6 +312,15 @@ class XMLStream(object): self.stop.clear() self.socket = self.socket_class(Socket.AF_INET, Socket.SOCK_STREAM) self.socket.settimeout(None) + + if self.reconnect_delay is None: + delay = 1.0 + else: + delay = min(self.reconnect_delay * 2, self.reconnect_max_delay) + delay = random.normalvariate(delay, delay * 0.1) + log.debug('Waiting %s seconds before connecting.' % delay) + time.sleep(delay) + if self.use_ssl and self.ssl_support: log.debug("Socket Wrapped for SSL") if self.ca_certs is None: @@ -309,7 +330,7 @@ class XMLStream(object): ssl_socket = ssl.wrap_socket(self.socket, ca_certs=self.ca_certs, - certs_reqs=cert_policy) + cert_reqs=cert_policy) if hasattr(self.socket, 'socket'): # We are using a testing socket, so preserve the top @@ -324,13 +345,14 @@ class XMLStream(object): self.set_socket(self.socket, ignore=True) #this event is where you should set your application state self.event("connected", direct=True) + self.reconnect_delay = 1.0 return True except Socket.error as serr: error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" self.event('socket_error', serr) log.error(error_msg % (self.address[0], self.address[1], serr.errno, serr.strerror)) - time.sleep(1) + self.reconnect_delay = delay return False def disconnect(self, reconnect=False): @@ -350,7 +372,8 @@ class XMLStream(object): def _disconnect(self, reconnect=False): # Send the end of stream marker. - self.send_raw(self.stream_footer) + self.send_raw(self.stream_footer, now=True) + self.session_started_event.clear() # Wait for confirmation that the stream was # closed in the other direction. self.auto_reconnect = reconnect @@ -643,7 +666,7 @@ class XMLStream(object): """ return xml - def send(self, data, mask=None, timeout=None): + def send(self, data, mask=None, timeout=None, now=False): """ A wrapper for send_raw for sending stanza objects. @@ -657,10 +680,13 @@ class XMLStream(object): or a timeout occurs. timeout -- Time in seconds to wait for a response before continuing. Defaults to RESPONSE_TIMEOUT. + now -- Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to False. """ if timeout is None: timeout = self.response_timeout - if hasattr(mask, 'xml'): mask = mask.xml data = str(data) @@ -669,21 +695,11 @@ class XMLStream(object): wait_for = Waiter("SendWait_%s" % self.new_id(), MatchXMLMask(mask)) self.register_handler(wait_for) - self.send_raw(data) + self.send_raw(data, now) if mask is not None: return wait_for.wait(timeout) - def send_raw(self, data): - """ - Send raw data across the stream. - - Arguments: - data -- Any string value. - """ - self.send_queue.put(data) - return True - - def send_xml(self, data, mask=None, timeout=None): + def send_xml(self, data, mask=None, timeout=None, now=False): """ Send an XML object on the stream, and optionally wait for a response. @@ -696,10 +712,39 @@ class XMLStream(object): or a timeout occurs. timeout -- Time in seconds to wait for a response before continuing. Defaults to RESPONSE_TIMEOUT. + now -- Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to False. """ if timeout is None: timeout = self.response_timeout - return self.send(tostring(data), mask, timeout) + return self.send(tostring(data), mask, timeout, now) + + def send_raw(self, data, now=False, reconnect=None): + """ + Send raw data across the stream. + + Arguments: + data -- Any string value. + reconnect -- Indicates if the stream should be + restarted if there is an error sending + the stanza. Used mainly for testing. + Defaults to self.auto_reconnect. + """ + if now: + log.debug("SEND (IMMED): %s" % data) + try: + self.socket.send(data.encode('utf-8')) + except Socket.error as serr: + self.event('socket_error', serr) + log.warning("Failed to send %s" % data) + if reconnect is None: + reconnect = self.auto_reconnect + self.disconnect(reconnect) + else: + self.send_queue.put(data) + return True def process(self, threaded=True): """ @@ -753,7 +798,7 @@ class XMLStream(object): firstrun = False try: if self.is_client: - self.send_raw(self.stream_header) + self.send_raw(self.stream_header, now=True) # The call to self.__read_xml will block and prevent # the body of the loop from running until a disconnect # occurs. After any reconnection, the stream header will @@ -762,7 +807,7 @@ class XMLStream(object): # Ensure the stream header is sent for any # new connections. if self.is_client: - self.send_raw(self.stream_header) + self.send_raw(self.stream_header, now=True) except KeyboardInterrupt: log.debug("Keyboard Escape Detected in _process") self.stop.set() @@ -790,35 +835,39 @@ class XMLStream(object): """ depth = 0 root = None - for (event, xml) in ET.iterparse(self.filesocket, (b'end', b'start')): - if event == b'start': - if depth == 0: - # We have received the start of the root element. - root = xml - # Perform any stream initialization actions, such - # as handshakes. - self.stream_end_event.clear() - self.start_stream_handler(root) - depth += 1 - if event == b'end': - depth -= 1 - if depth == 0: - # The stream's root element has closed, - # terminating the stream. - log.debug("End of stream recieved") - self.stream_end_event.set() - return False - elif depth == 1: - # We only raise events for stanzas that are direct - # children of the root element. - try: - self.__spawn_event(xml) - except RestartStream: - return True - if root: - # Keep the root element empty of children to - # save on memory use. - root.clear() + try: + for (event, xml) in ET.iterparse(self.filesocket, + (b'end', b'start')): + if event == b'start': + if depth == 0: + # We have received the start of the root element. + root = xml + # Perform any stream initialization actions, such + # as handshakes. + self.stream_end_event.clear() + self.start_stream_handler(root) + depth += 1 + if event == b'end': + depth -= 1 + if depth == 0: + # The stream's root element has closed, + # terminating the stream. + log.debug("End of stream recieved") + self.stream_end_event.set() + return False + elif depth == 1: + # We only raise events for stanzas that are direct + # children of the root element. + try: + self.__spawn_event(xml) + except RestartStream: + return True + if root: + # Keep the root element empty of children to + # save on memory use. + root.clear() + except SyntaxError: + log.error("Error reading from XML stream.") log.debug("Ending read XML loop") def _build_stanza(self, xml, default_ns=None): @@ -971,6 +1020,7 @@ class XMLStream(object): """ try: while not self.stop.isSet(): + self.session_started_event.wait() try: data = self.send_queue.get(True, 1) except queue.Empty: @@ -978,7 +1028,8 @@ class XMLStream(object): log.debug("SEND: %s" % data) try: self.socket.send(data.encode('utf-8')) - except: + except Socket.error as serr: + self.event('socket_error', serr) log.warning("Failed to send %s" % data) self.disconnect(self.auto_reconnect) except KeyboardInterrupt: |