diff options
Diffstat (limited to 'sleekxmpp/xmlstream/xmlstream.py')
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 20 |
1 files changed, 16 insertions, 4 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index cdce1fdf..217367f9 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -22,6 +22,7 @@ import time import traceback import types import xml.sax.saxutils +from . import scheduler HANDLER_THREADS = 1 @@ -76,6 +77,7 @@ class XMLStream(object): self.eventqueue = queue.Queue() self.sendqueue = queue.Queue() + self.scheduler = scheduler.Scheduler(self.eventqueue) self.namespace_map = {} @@ -151,6 +153,7 @@ class XMLStream(object): raise RestartStream() def process(self, threaded=True): + self.scheduler.process(threaded=True) for t in range(0, HANDLER_THREADS): th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner) th.setDaemon(True) @@ -168,8 +171,8 @@ class XMLStream(object): else: self._process() - def schedule(self, seconds, handler, args=None): - threading.Timer(seconds, handler, args).start() + def schedule(self, name, seconds, callback, args=None, kwargs=None, repeat=False): + self.scheduler.add(name, seconds, callback, args, kwargs, repeat, qpointer=self.eventqueue) def _process(self): "Start processing the socket." @@ -189,6 +192,7 @@ class XMLStream(object): self.state.set('reconnect', False) self.disconnect() self.run = False + self.scheduler.run = False self.eventqueue.put(('quit', None, None)) return except CloseStream: @@ -237,6 +241,7 @@ class XMLStream(object): edepth += -1 if edepth == 0 and event == b'end': self.disconnect(reconnect=self.state['reconnect']) + logging.debug("Ending readXML loop") return False elif edepth == 1: #self.xmlin.put(xmlobj) @@ -245,11 +250,13 @@ class XMLStream(object): except RestartStream: return True except CloseStream: + logging.debug("Ending readXML loop") return False if root: root.clear() if event == b'start': edepth += 1 + logging.debug("Ending readXML loop") def _sendThread(self): while self.run: @@ -279,6 +286,7 @@ class XMLStream(object): logging.debug("Disconnecting...") self.state.set('disconnecting', True) self.run = False + self.scheduler.run = False if self.state['connected']: self.sendRaw(self.stream_footer) time.sleep(1) @@ -337,6 +345,9 @@ class XMLStream(object): event = self.eventqueue.get(True, timeout=5) except queue.Empty: event = None + except KeyboardInterrupt: + self.run = False + self.scheduler.run = False if event is not None: etype = event[0] handler = event[1] @@ -348,9 +359,10 @@ class XMLStream(object): except Exception as e: traceback.print_exc() args[0].exception(e) - elif etype == 'sched': + elif etype == 'schedule': try: - handler.run(*args) + logging.debug(args) + handler(*args[0]) except: logging.error(traceback.format_exc()) elif etype == 'quit': |