diff options
Diffstat (limited to 'sleekxmpp/xmlstream/xmlstream.py')
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 67 |
1 files changed, 43 insertions, 24 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index bea6e88f..8242a127 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -26,6 +26,7 @@ import time import random import weakref import uuid +import errno from xml.parsers.expat import ExpatError @@ -49,7 +50,7 @@ RESPONSE_TIMEOUT = 30 #: The time in seconds to wait for events from the event queue, and also the #: time between checks for the process stop signal. -WAIT_TIMEOUT = 0.1 +WAIT_TIMEOUT = 1.0 #: The number of threads to use to handle XML stream events. This is not the #: same as the number of custom event handling threads. @@ -461,10 +462,10 @@ class XMLStream(object): time.sleep(0.1) elapsed += 0.1 except KeyboardInterrupt: - self.stop.set() + self.set_stop() return False except SystemExit: - self.stop.set() + self.set_stop() return False if self.default_domain: @@ -550,7 +551,7 @@ class XMLStream(object): cert.verify(self._expected_server_name, self._der_cert) except cert.CertificateError as err: if not self.event_handled('ssl_invalid_cert'): - log.error(err.message) + log.error(err) self.disconnect(send_close=False) else: self.event('ssl_invalid_cert', @@ -559,8 +560,7 @@ class XMLStream(object): self.set_socket(self.socket, ignore=True) #this event is where you should set your application state - self.event("connected", direct=True) - self.reconnect_delay = 1.0 + self.event('connected', direct=True) return True except (Socket.error, ssl.SSLError) as serr: error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" @@ -611,6 +611,7 @@ class XMLStream(object): lines = resp.split('\r\n') if '200' not in lines[0]: self.event('proxy_error', resp) + self.event('connection_failed', direct=True) log.error('Proxy Error: %s', lines[0]) return False @@ -706,7 +707,7 @@ class XMLStream(object): self.stream_end_event.set() if not self.auto_reconnect: - self.stop.set() + self.set_stop() if self._disconnect_wait_for_threads: self._wait_for_threads() @@ -718,12 +719,12 @@ class XMLStream(object): self.event('socket_error', serr, direct=True) finally: #clear your application state - self.event("disconnected", direct=True) + self.event('disconnected', direct=True) return True def abort(self): self.session_started_event.clear() - self.stop.set() + self.set_stop() if self._disconnect_wait_for_threads: self._wait_for_threads() try: @@ -859,7 +860,7 @@ class XMLStream(object): cert.verify(self._expected_server_name, self._der_cert) except cert.CertificateError as err: if not self.event_handled('ssl_invalid_cert'): - log.error(err.message) + log.error(err) self.disconnect(self.auto_reconnect, send_close=False) else: self.event('ssl_invalid_cert', pem_cert, direct=True) @@ -1016,9 +1017,13 @@ class XMLStream(object): # and handler classes here. if name is None: - name = 'add_handler_%s' % self.getNewId() - self.registerHandler(XMLCallback(name, MatchXMLMask(mask), pointer, - once=disposable, instream=instream)) + name = 'add_handler_%s' % self.new_id() + self.register_handler( + XMLCallback(name, + MatchXMLMask(mask, self.default_ns), + pointer, + once=disposable, + instream=instream)) def register_handler(self, handler, before=None, after=None): """Add a stream event handler that will be executed when a matching @@ -1131,6 +1136,8 @@ class XMLStream(object): event queue. All event handlers will run in the same thread. """ + log.debug("Event triggered: " + name) + handlers = self.__event_handlers.get(name, []) for handler in handlers: #TODO: Data should not be copied, but should be read only, @@ -1288,6 +1295,9 @@ class XMLStream(object): try: sent += self.socket.send(data[sent:]) count += 1 + except Socket.error as serr: + if serr.errno != errno.EINTR: + raise except ssl.SSLError as serr: if tries >= self.ssl_retry_max: log.debug('SSL error: max retries reached') @@ -1350,6 +1360,13 @@ class XMLStream(object): if self.__thread_count == 0: self.__thread_cond.notify() + def set_stop(self): + self.stop.set() + + # Unlock queues + self.event_queue.put(None) + self.send_queue.put(None) + def _wait_for_threads(self): with self.__thread_cond: if self.__thread_count != 0: @@ -1493,6 +1510,10 @@ class XMLStream(object): # as handshakes. self.stream_end_event.clear() self.start_stream_handler(root) + + # We have a successful stream connection, so reset + # exponential backoff for new reconnect attempts. + self.reconnect_delay = 1.0 depth += 1 if event == b'end': depth -= 1 @@ -1618,11 +1639,7 @@ class XMLStream(object): log.debug("Loading event runner") try: while not self.stop.is_set(): - try: - wait = self.wait_timeout - event = self.event_queue.get(True, timeout=wait) - except QueueEmpty: - event = None + event = self.event_queue.get() if event is None: continue @@ -1638,10 +1655,10 @@ class XMLStream(object): log.exception(error_msg, handler.name) orig.exception(e) elif etype == 'schedule': - name = args[1] + name = args[2] try: log.debug('Scheduled event: %s: %s', name, args[0]) - handler(*args[0]) + handler(*args[0], **args[1]) except Exception as e: log.exception('Error processing scheduled task') self.exception(e) @@ -1683,14 +1700,13 @@ class XMLStream(object): while not self.stop.is_set(): while not self.stop.is_set() and \ not self.session_started_event.is_set(): - self.session_started_event.wait(timeout=0.1) + self.session_started_event.wait(timeout=0.1) # Wait for session start if self.__failed_send_stanza is not None: data = self.__failed_send_stanza self.__failed_send_stanza = None else: - try: - data = self.send_queue.get(True, 1) - except QueueEmpty: + data = self.send_queue.get() # Wait for data to send + if data is None: continue log.debug("SEND: %s", data) enc_data = data.encode('utf-8') @@ -1705,6 +1721,9 @@ class XMLStream(object): try: sent += self.socket.send(enc_data[sent:]) count += 1 + except Socket.error as serr: + if serr.errno != errno.EINTR: + raise except ssl.SSLError as serr: if tries >= self.ssl_retry_max: log.debug('SSL error: max retries reached') |