diff options
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r-- | sleekxmpp/xmlstream/cert.py | 4 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/filesocket.py | 9 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/matcher/xmlmask.py | 4 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/resolver.py | 5 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/scheduler.py | 34 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 49 |
6 files changed, 71 insertions, 34 deletions
diff --git a/sleekxmpp/xmlstream/cert.py b/sleekxmpp/xmlstream/cert.py index fa12f794..71146f36 100644 --- a/sleekxmpp/xmlstream/cert.py +++ b/sleekxmpp/xmlstream/cert.py @@ -1,6 +1,10 @@ import logging from datetime import datetime, timedelta +# Make a call to strptime before starting threads to +# prevent thread safety issues. +datetime.strptime('1970-01-01 12:00:00', "%Y-%m-%d %H:%M:%S") + try: from pyasn1.codec.der import decoder, encoder diff --git a/sleekxmpp/xmlstream/filesocket.py b/sleekxmpp/xmlstream/filesocket.py index d4537998..53b83bc7 100644 --- a/sleekxmpp/xmlstream/filesocket.py +++ b/sleekxmpp/xmlstream/filesocket.py @@ -13,6 +13,7 @@ """ from socket import _fileobject +import errno import socket @@ -29,7 +30,13 @@ class FileSocket(_fileobject): """Read data from the socket as if it were a file.""" if self._sock is None: return None - data = self._sock.recv(size) + while True: + try: + data = self._sock.recv(size) + break + except socket.error as serr: + if serr.errno != errno.EINTR: + raise if data is not None: return data diff --git a/sleekxmpp/xmlstream/matcher/xmlmask.py b/sleekxmpp/xmlstream/matcher/xmlmask.py index cb202448..56f728e1 100644 --- a/sleekxmpp/xmlstream/matcher/xmlmask.py +++ b/sleekxmpp/xmlstream/matcher/xmlmask.py @@ -37,11 +37,11 @@ class MatchXMLMask(MatcherBase): object or XML string to use as a mask. """ - def __init__(self, criteria): + def __init__(self, criteria, default_ns='jabber:client'): MatcherBase.__init__(self, criteria) if isinstance(criteria, str): self._criteria = ET.fromstring(self._criteria) - self.default_ns = 'jabber:client' + self.default_ns = default_ns def setDefaultNS(self, ns): """Set the default namespace to use during comparisons. diff --git a/sleekxmpp/xmlstream/resolver.py b/sleekxmpp/xmlstream/resolver.py index 16f8a7ad..6f26797f 100644 --- a/sleekxmpp/xmlstream/resolver.py +++ b/sleekxmpp/xmlstream/resolver.py @@ -202,11 +202,14 @@ def get_AAAA(host, resolver=None): # If not using dnspython, attempt lookup using the OS level # getaddrinfo() method. if resolver is None: + if not socket.has_ipv6: + log.debug("Unable to query %s for AAAA records: IPv6 is not supported", host) + return [] try: recs = socket.getaddrinfo(host, None, socket.AF_INET6, socket.SOCK_STREAM) return [rec[4][0] for rec in recs] - except socket.gaierror: + except (OSError, socket.gaierror): log.debug("DNS: Error retreiving AAAA address " + \ "info for %s." % host) return [] diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py index b3e50983..e6fae37a 100644 --- a/sleekxmpp/xmlstream/scheduler.py +++ b/sleekxmpp/xmlstream/scheduler.py @@ -20,6 +20,11 @@ import itertools from sleekxmpp.util import Queue, QueueEmpty +#: The time in seconds to wait for events from the event queue, and also the +#: time between checks for the process stop signal. +WAIT_TIMEOUT = 1.0 + + log = logging.getLogger(__name__) @@ -76,7 +81,7 @@ class Task(object): """ if self.qpointer is not None: self.qpointer.put(('schedule', self.callback, - self.args, self.name)) + self.args, self.kwargs, self.name)) else: self.callback(*self.args, **self.kwargs) self.reset() @@ -120,6 +125,10 @@ class Scheduler(object): #: Lock for accessing the task queue. self.schedule_lock = threading.RLock() + #: The time in seconds to wait for events from the event queue, + #: and also the time between checks for the process stop signal. + self.wait_timeout = WAIT_TIMEOUT + def process(self, threaded=True, daemon=False): """Begin accepting and processing scheduled tasks. @@ -139,24 +148,25 @@ class Scheduler(object): self.run = True try: while self.run and not self.stop.is_set(): - wait = 0.1 updated = False if self.schedule: wait = self.schedule[0].next - time.time() + else: + wait = self.wait_timeout try: if wait <= 0.0: newtask = self.addq.get(False) else: - if wait >= 3.0: - wait = 3.0 newtask = None - elapsed = 0 - while not self.stop.is_set() and \ + while self.run and \ + not self.stop.is_set() and \ newtask is None and \ - elapsed < wait: - newtask = self.addq.get(True, 0.1) - elapsed += 0.1 - except QueueEmpty: + wait > 0: + try: + newtask = self.addq.get(True, min(wait, self.wait_timeout)) + except QueueEmpty: # Nothing to add, nothing to do. Check run flags and continue waiting. + wait -= self.wait_timeout + except QueueEmpty: # Time to run some tasks, and no new tasks to add. self.schedule_lock.acquire() # select only those tasks which are to be executed now relevant = itertools.takewhile( @@ -174,11 +184,11 @@ class Scheduler(object): # only need to resort tasks if a repeated task has # been kept in the list. updated = True - else: - updated = True + else: # Add new task self.schedule_lock.acquire() if newtask is not None: self.schedule.append(newtask) + updated = True finally: if updated: self.schedule.sort(key=lambda task: task.next) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 1c0b84b9..8242a127 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -26,6 +26,7 @@ import time import random import weakref import uuid +import errno from xml.parsers.expat import ExpatError @@ -49,7 +50,7 @@ RESPONSE_TIMEOUT = 30 #: The time in seconds to wait for events from the event queue, and also the #: time between checks for the process stop signal. -WAIT_TIMEOUT = 0.1 +WAIT_TIMEOUT = 1.0 #: The number of threads to use to handle XML stream events. This is not the #: same as the number of custom event handling threads. @@ -461,10 +462,10 @@ class XMLStream(object): time.sleep(0.1) elapsed += 0.1 except KeyboardInterrupt: - self.stop.set() + self.set_stop() return False except SystemExit: - self.stop.set() + self.set_stop() return False if self.default_domain: @@ -706,7 +707,7 @@ class XMLStream(object): self.stream_end_event.set() if not self.auto_reconnect: - self.stop.set() + self.set_stop() if self._disconnect_wait_for_threads: self._wait_for_threads() @@ -723,7 +724,7 @@ class XMLStream(object): def abort(self): self.session_started_event.clear() - self.stop.set() + self.set_stop() if self._disconnect_wait_for_threads: self._wait_for_threads() try: @@ -1017,8 +1018,12 @@ class XMLStream(object): if name is None: name = 'add_handler_%s' % self.new_id() - self.register_handler(XMLCallback(name, MatchXMLMask(mask), pointer, - once=disposable, instream=instream)) + self.register_handler( + XMLCallback(name, + MatchXMLMask(mask, self.default_ns), + pointer, + once=disposable, + instream=instream)) def register_handler(self, handler, before=None, after=None): """Add a stream event handler that will be executed when a matching @@ -1290,6 +1295,9 @@ class XMLStream(object): try: sent += self.socket.send(data[sent:]) count += 1 + except Socket.error as serr: + if serr.errno != errno.EINTR: + raise except ssl.SSLError as serr: if tries >= self.ssl_retry_max: log.debug('SSL error: max retries reached') @@ -1352,6 +1360,13 @@ class XMLStream(object): if self.__thread_count == 0: self.__thread_cond.notify() + def set_stop(self): + self.stop.set() + + # Unlock queues + self.event_queue.put(None) + self.send_queue.put(None) + def _wait_for_threads(self): with self.__thread_cond: if self.__thread_count != 0: @@ -1624,11 +1639,7 @@ class XMLStream(object): log.debug("Loading event runner") try: while not self.stop.is_set(): - try: - wait = self.wait_timeout - event = self.event_queue.get(True, timeout=wait) - except QueueEmpty: - event = None + event = self.event_queue.get() if event is None: continue @@ -1644,10 +1655,10 @@ class XMLStream(object): log.exception(error_msg, handler.name) orig.exception(e) elif etype == 'schedule': - name = args[1] + name = args[2] try: log.debug('Scheduled event: %s: %s', name, args[0]) - handler(*args[0]) + handler(*args[0], **args[1]) except Exception as e: log.exception('Error processing scheduled task') self.exception(e) @@ -1689,14 +1700,13 @@ class XMLStream(object): while not self.stop.is_set(): while not self.stop.is_set() and \ not self.session_started_event.is_set(): - self.session_started_event.wait(timeout=0.1) + self.session_started_event.wait(timeout=0.1) # Wait for session start if self.__failed_send_stanza is not None: data = self.__failed_send_stanza self.__failed_send_stanza = None else: - try: - data = self.send_queue.get(True, 1) - except QueueEmpty: + data = self.send_queue.get() # Wait for data to send + if data is None: continue log.debug("SEND: %s", data) enc_data = data.encode('utf-8') @@ -1711,6 +1721,9 @@ class XMLStream(object): try: sent += self.socket.send(enc_data[sent:]) count += 1 + except Socket.error as serr: + if serr.errno != errno.EINTR: + raise except ssl.SSLError as serr: if tries >= self.ssl_retry_max: log.debug('SSL error: max retries reached') |