summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sleekxmpp/test/sleektest.py1
-rw-r--r--sleekxmpp/xmlstream/scheduler.py70
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py24
3 files changed, 57 insertions, 38 deletions
diff --git a/sleekxmpp/test/sleektest.py b/sleekxmpp/test/sleektest.py
index bc83472e..ba79dce8 100644
--- a/sleekxmpp/test/sleektest.py
+++ b/sleekxmpp/test/sleektest.py
@@ -332,6 +332,7 @@ class SleekTest(unittest.TestCase):
# Remove unique ID prefix to make it easier to test
self.xmpp._id_prefix = ''
+ self.xmpp._disconnect_wait_for_threads = False
# We will use this to wait for the session_start event
# for live connections.
diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py
index 6f6e0278..cf47c164 100644
--- a/sleekxmpp/xmlstream/scheduler.py
+++ b/sleekxmpp/xmlstream/scheduler.py
@@ -139,39 +139,45 @@ class Scheduler(object):
"""Process scheduled tasks."""
self.run = True
try:
- while self.run and not self.stop.isSet():
- wait = 1
- updated = False
- if self.schedule:
- wait = self.schedule[0].next - time.time()
- try:
- if wait <= 0.0:
- newtask = self.addq.get(False)
- else:
- if wait >= 3.0:
- wait = 3.0
- newtask = self.addq.get(True, wait)
- except queue.Empty:
- cleanup = []
- self.schedule_lock.acquire()
- for task in self.schedule:
- if time.time() >= task.next:
- updated = True
- if not task.run():
- cleanup.append(task)
- else:
- break
- for task in cleanup:
- self.schedule.pop(self.schedule.index(task))
+ while self.run and not self.stop.is_set():
+ wait = 0.1
+ updated = False
+ if self.schedule:
+ wait = self.schedule[0].next - time.time()
+ try:
+ if wait <= 0.0:
+ newtask = self.addq.get(False)
else:
- updated = True
- self.schedule_lock.acquire()
- self.schedule.append(newtask)
- finally:
- if updated:
- self.schedule = sorted(self.schedule,
- key=lambda task: task.next)
- self.schedule_lock.release()
+ if wait >= 3.0:
+ wait = 3.0
+ newtask = None
+ elapsed = 0
+ while not self.stop.is_set() and \
+ newtask is None and \
+ elapsed < wait:
+ newtask = self.addq.get(True, 0.1)
+ elapsed += 0.1
+ except queue.Empty:
+ cleanup = []
+ self.schedule_lock.acquire()
+ for task in self.schedule:
+ if time.time() >= task.next:
+ updated = True
+ if not task.run():
+ cleanup.append(task)
+ else:
+ break
+ for task in cleanup:
+ self.schedule.pop(self.schedule.index(task))
+ else:
+ updated = True
+ self.schedule_lock.acquire()
+ self.schedule.append(newtask)
+ finally:
+ if updated:
+ self.schedule = sorted(self.schedule,
+ key=lambda task: task.next)
+ self.schedule_lock.release()
except KeyboardInterrupt:
self.run = False
except SystemExit:
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index 7b42b460..b039ebd1 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -52,7 +52,7 @@ RESPONSE_TIMEOUT = 30
#: The time in seconds to wait for events from the event queue, and also the
#: time between checks for the process stop signal.
-WAIT_TIMEOUT = 1
+WAIT_TIMEOUT = 0.1
#: The number of threads to use to handle XML stream events. This is not the
#: same as the number of custom event handling threads.
@@ -285,6 +285,7 @@ class XMLStream(object):
self.__thread_count = 0
self.__thread_cond = threading.Condition()
self._use_daemons = False
+ self._disconnect_wait_for_threads = True
self._id = 0
self._id_lock = threading.Lock()
@@ -652,7 +653,8 @@ class XMLStream(object):
if not self.auto_reconnect:
self.stop.set()
- self._wait_for_threads()
+ if self._disconnect_wait_for_threads:
+ self._wait_for_threads()
try:
self.socket.shutdown(Socket.SHUT_RDWR)
@@ -1183,8 +1185,9 @@ class XMLStream(object):
self.__thread[name].daemon = self._use_daemons
self.__thread[name].start()
- with self.__thread_cond:
- self.__thread_count += 1
+ if track:
+ with self.__thread_cond:
+ self.__thread_count += 1
def _end_thread(self, name):
with self.__thread_cond:
@@ -1196,7 +1199,12 @@ class XMLStream(object):
def _wait_for_threads(self):
with self.__thread_cond:
- self.__thread_cond.wait()
+ if self.__thread_count != 0:
+ log.debug("Waiting for %s threads to exit." %
+ self.__thread_count)
+ self.__thread_cond.wait()
+ if self.__thread_count != 0:
+ raise Exception("Hanged threads: %s" % threading.enumerate())
def process(self, **kwargs):
"""Initialize the XML streams and begin processing events.
@@ -1501,7 +1509,7 @@ class XMLStream(object):
while not self.stop.is_set():
while not self.stop.is_set() and \
not self.session_started_event.is_set():
- self.session_started_event.wait(timeout=1)
+ self.session_started_event.wait(timeout=0.1)
if self.__failed_send_stanza is not None:
data = self.__failed_send_stanza
self.__failed_send_stanza = None
@@ -1541,12 +1549,16 @@ class XMLStream(object):
log.warning("Failed to send %s", data)
if not self.stop.is_set():
self.__failed_send_stanza = data
+ self._end_thread('send')
self.disconnect(self.auto_reconnect, send_close=False)
+ return
except Exception as ex:
log.exception('Unexpected error in send thread: %s', ex)
self.exception(ex)
if not self.stop.is_set():
+ self._end_thread('send')
self.disconnect(self.auto_reconnect)
+ return
self._end_thread('send')