summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream/xmlstream.py
diff options
context:
space:
mode:
authorLance Stout <lancestout@gmail.com>2012-04-21 10:34:09 -0700
committerLance Stout <lancestout@gmail.com>2012-04-21 10:36:39 -0700
commit913738444eab8e574a3de5cbbd422bf7d45120e5 (patch)
tree1cde7967174e14283806df7a8b569d644fb7fa4a /sleekxmpp/xmlstream/xmlstream.py
parent8ee30179eae3b6697629aad50ca31df9daa5ef6e (diff)
downloadslixmpp-913738444eab8e574a3de5cbbd422bf7d45120e5.tar.gz
slixmpp-913738444eab8e574a3de5cbbd422bf7d45120e5.tar.bz2
slixmpp-913738444eab8e574a3de5cbbd422bf7d45120e5.tar.xz
slixmpp-913738444eab8e574a3de5cbbd422bf7d45120e5.zip
Count and track the main threads, so we can delay disconnecting until all have quit.
Diffstat (limited to 'sleekxmpp/xmlstream/xmlstream.py')
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py50
1 files changed, 37 insertions, 13 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index c5c8581b..7b42b460 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -282,6 +282,8 @@ class XMLStream(object):
self.__event_handlers = {}
self.__event_handlers_lock = threading.Lock()
self.__filters = {'in': [], 'out': [], 'out_sync': []}
+ self.__thread_count = 0
+ self.__thread_cond = threading.Condition()
self._use_daemons = False
self._id = 0
@@ -650,6 +652,8 @@ class XMLStream(object):
if not self.auto_reconnect:
self.stop.set()
+ self._wait_for_threads()
+
try:
self.socket.shutdown(Socket.SHUT_RDWR)
self.socket.close()
@@ -1174,6 +1178,26 @@ class XMLStream(object):
self.send_queue.put(data)
return True
+ def _start_thread(self, name, target, track=True):
+ self.__thread[name] = threading.Thread(name=name, target=target)
+ self.__thread[name].daemon = self._use_daemons
+ self.__thread[name].start()
+
+ with self.__thread_cond:
+ self.__thread_count += 1
+
+ def _end_thread(self, name):
+ with self.__thread_cond:
+ self.__thread_count -= 1
+ log.debug("Stopped %s thread. %s threads remain." % (
+ name, self.__thread_count))
+ if self.__thread_count == 0:
+ self.__thread_cond.notify()
+
+ def _wait_for_threads(self):
+ with self.__thread_cond:
+ self.__thread_cond.wait()
+
def process(self, **kwargs):
"""Initialize the XML streams and begin processing events.
@@ -1207,22 +1231,16 @@ class XMLStream(object):
else:
threaded = kwargs.get('threaded', True)
- self.scheduler.process(threaded=True, daemon=self._use_daemons)
-
- def start_thread(name, target):
- self.__thread[name] = threading.Thread(name=name, target=target)
- self.__thread[name].daemon = self._use_daemons
- self.__thread[name].start()
-
for t in range(0, HANDLER_THREADS):
log.debug("Starting HANDLER THREAD")
- start_thread('stream_event_handler_%s' % t, self._event_runner)
+ self._start_thread('event_thread_%s' % t, self._event_runner)
- start_thread('send_thread', self._send_thread)
+ self._start_thread('send_thread', self._send_thread)
+ self._start_thread('scheduler_thread', self._scheduler_thread)
if threaded:
# Run the XML stream in the background for another application.
- start_thread('process', self._process)
+ self._start_thread('read_thread', self._process, track=False)
else:
self._process()
@@ -1466,16 +1484,16 @@ class XMLStream(object):
self.exception(e)
elif etype == 'quit':
log.debug("Quitting event runner thread")
- return False
+ break
except KeyboardInterrupt:
log.debug("Keyboard Escape Detected in _event_runner")
self.event('killed', direct=True)
self.disconnect()
- return
except SystemExit:
self.disconnect()
self.event_queue.put(('quit', None, None))
- return
+
+ self._end_thread('event runner')
def _send_thread(self):
"""Extract stanzas from the send queue and send them on the stream."""
@@ -1530,6 +1548,12 @@ class XMLStream(object):
if not self.stop.is_set():
self.disconnect(self.auto_reconnect)
+ self._end_thread('send')
+
+ def _scheduler_thread(self):
+ self.scheduler.process(threaded=False)
+ self._end_thread('scheduler')
+
def exception(self, exception):
"""Process an unknown exception.