diff options
-rw-r--r-- | sleekxmpp/test/sleektest.py | 1 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/scheduler.py | 70 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 24 |
3 files changed, 57 insertions, 38 deletions
diff --git a/sleekxmpp/test/sleektest.py b/sleekxmpp/test/sleektest.py index bc83472e..ba79dce8 100644 --- a/sleekxmpp/test/sleektest.py +++ b/sleekxmpp/test/sleektest.py @@ -332,6 +332,7 @@ class SleekTest(unittest.TestCase): # Remove unique ID prefix to make it easier to test self.xmpp._id_prefix = '' + self.xmpp._disconnect_wait_for_threads = False # We will use this to wait for the session_start event # for live connections. diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py index 6f6e0278..cf47c164 100644 --- a/sleekxmpp/xmlstream/scheduler.py +++ b/sleekxmpp/xmlstream/scheduler.py @@ -139,39 +139,45 @@ class Scheduler(object): """Process scheduled tasks.""" self.run = True try: - while self.run and not self.stop.isSet(): - wait = 1 - updated = False - if self.schedule: - wait = self.schedule[0].next - time.time() - try: - if wait <= 0.0: - newtask = self.addq.get(False) - else: - if wait >= 3.0: - wait = 3.0 - newtask = self.addq.get(True, wait) - except queue.Empty: - cleanup = [] - self.schedule_lock.acquire() - for task in self.schedule: - if time.time() >= task.next: - updated = True - if not task.run(): - cleanup.append(task) - else: - break - for task in cleanup: - self.schedule.pop(self.schedule.index(task)) + while self.run and not self.stop.is_set(): + wait = 0.1 + updated = False + if self.schedule: + wait = self.schedule[0].next - time.time() + try: + if wait <= 0.0: + newtask = self.addq.get(False) else: - updated = True - self.schedule_lock.acquire() - self.schedule.append(newtask) - finally: - if updated: - self.schedule = sorted(self.schedule, - key=lambda task: task.next) - self.schedule_lock.release() + if wait >= 3.0: + wait = 3.0 + newtask = None + elapsed = 0 + while not self.stop.is_set() and \ + newtask is None and \ + elapsed < wait: + newtask = self.addq.get(True, 0.1) + elapsed += 0.1 + except queue.Empty: + cleanup = [] + self.schedule_lock.acquire() + for task in self.schedule: + if time.time() >= task.next: + updated = True + if not task.run(): + cleanup.append(task) + else: + break + for task in cleanup: + self.schedule.pop(self.schedule.index(task)) + else: + updated = True + self.schedule_lock.acquire() + self.schedule.append(newtask) + finally: + if updated: + self.schedule = sorted(self.schedule, + key=lambda task: task.next) + self.schedule_lock.release() except KeyboardInterrupt: self.run = False except SystemExit: diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 7b42b460..b039ebd1 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -52,7 +52,7 @@ RESPONSE_TIMEOUT = 30 #: The time in seconds to wait for events from the event queue, and also the #: time between checks for the process stop signal. -WAIT_TIMEOUT = 1 +WAIT_TIMEOUT = 0.1 #: The number of threads to use to handle XML stream events. This is not the #: same as the number of custom event handling threads. @@ -285,6 +285,7 @@ class XMLStream(object): self.__thread_count = 0 self.__thread_cond = threading.Condition() self._use_daemons = False + self._disconnect_wait_for_threads = True self._id = 0 self._id_lock = threading.Lock() @@ -652,7 +653,8 @@ class XMLStream(object): if not self.auto_reconnect: self.stop.set() - self._wait_for_threads() + if self._disconnect_wait_for_threads: + self._wait_for_threads() try: self.socket.shutdown(Socket.SHUT_RDWR) @@ -1183,8 +1185,9 @@ class XMLStream(object): self.__thread[name].daemon = self._use_daemons self.__thread[name].start() - with self.__thread_cond: - self.__thread_count += 1 + if track: + with self.__thread_cond: + self.__thread_count += 1 def _end_thread(self, name): with self.__thread_cond: @@ -1196,7 +1199,12 @@ class XMLStream(object): def _wait_for_threads(self): with self.__thread_cond: - self.__thread_cond.wait() + if self.__thread_count != 0: + log.debug("Waiting for %s threads to exit." % + self.__thread_count) + self.__thread_cond.wait() + if self.__thread_count != 0: + raise Exception("Hanged threads: %s" % threading.enumerate()) def process(self, **kwargs): """Initialize the XML streams and begin processing events. @@ -1501,7 +1509,7 @@ class XMLStream(object): while not self.stop.is_set(): while not self.stop.is_set() and \ not self.session_started_event.is_set(): - self.session_started_event.wait(timeout=1) + self.session_started_event.wait(timeout=0.1) if self.__failed_send_stanza is not None: data = self.__failed_send_stanza self.__failed_send_stanza = None @@ -1541,12 +1549,16 @@ class XMLStream(object): log.warning("Failed to send %s", data) if not self.stop.is_set(): self.__failed_send_stanza = data + self._end_thread('send') self.disconnect(self.auto_reconnect, send_close=False) + return except Exception as ex: log.exception('Unexpected error in send thread: %s', ex) self.exception(ex) if not self.stop.is_set(): + self._end_thread('send') self.disconnect(self.auto_reconnect) + return self._end_thread('send') |