diff options
Diffstat (limited to 'sleekxmpp/xmlstream/xmlstream.py')
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 28 |
1 files changed, 24 insertions, 4 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index cdce1fdf..5650386e 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -22,6 +22,7 @@ import time import traceback import types import xml.sax.saxutils +from . import scheduler HANDLER_THREADS = 1 @@ -76,6 +77,7 @@ class XMLStream(object): self.eventqueue = queue.Queue() self.sendqueue = queue.Queue() + self.scheduler = scheduler.Scheduler(self.eventqueue) self.namespace_map = {} @@ -151,7 +153,9 @@ class XMLStream(object): raise RestartStream() def process(self, threaded=True): + self.scheduler.process(threaded=True) for t in range(0, HANDLER_THREADS): +<<<<<<< HEAD th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner) th.setDaemon(True) self.__thread['eventhandle%s' % t] = th @@ -160,6 +164,13 @@ class XMLStream(object): th.setDaemon(True) self.__thread['sendthread'] = th th.start() +======= + logging.debug("Starting HANDLER THREAD") + self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner) + self.__thread['eventhandle%s' % t].start() + self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread) + self.__thread['sendthread'].start() +>>>>>>> master if threaded: th = threading.Thread(name='process', target=self._process) th.setDaemon(True) @@ -168,8 +179,8 @@ class XMLStream(object): else: self._process() - def schedule(self, seconds, handler, args=None): - threading.Timer(seconds, handler, args).start() + def schedule(self, name, seconds, callback, args=None, kwargs=None, repeat=False): + self.scheduler.add(name, seconds, callback, args, kwargs, repeat, qpointer=self.eventqueue) def _process(self): "Start processing the socket." @@ -189,6 +200,7 @@ class XMLStream(object): self.state.set('reconnect', False) self.disconnect() self.run = False + self.scheduler.run = False self.eventqueue.put(('quit', None, None)) return except CloseStream: @@ -237,6 +249,7 @@ class XMLStream(object): edepth += -1 if edepth == 0 and event == b'end': self.disconnect(reconnect=self.state['reconnect']) + logging.debug("Ending readXML loop") return False elif edepth == 1: #self.xmlin.put(xmlobj) @@ -245,11 +258,13 @@ class XMLStream(object): except RestartStream: return True except CloseStream: + logging.debug("Ending readXML loop") return False if root: root.clear() if event == b'start': edepth += 1 + logging.debug("Ending readXML loop") def _sendThread(self): while self.run: @@ -279,6 +294,7 @@ class XMLStream(object): logging.debug("Disconnecting...") self.state.set('disconnecting', True) self.run = False + self.scheduler.run = False if self.state['connected']: self.sendRaw(self.stream_footer) time.sleep(1) @@ -337,6 +353,9 @@ class XMLStream(object): event = self.eventqueue.get(True, timeout=5) except queue.Empty: event = None + except KeyboardInterrupt: + self.run = False + self.scheduler.run = False if event is not None: etype = event[0] handler = event[1] @@ -348,9 +367,10 @@ class XMLStream(object): except Exception as e: traceback.print_exc() args[0].exception(e) - elif etype == 'sched': + elif etype == 'schedule': try: - handler.run(*args) + logging.debug(args) + handler(*args[0]) except: logging.error(traceback.format_exc()) elif etype == 'quit': |