summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream/xmlstream.py
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream/xmlstream.py')
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py28
1 files changed, 24 insertions, 4 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index cdce1fdf..5650386e 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -22,6 +22,7 @@ import time
import traceback
import types
import xml.sax.saxutils
+from . import scheduler
HANDLER_THREADS = 1
@@ -76,6 +77,7 @@ class XMLStream(object):
self.eventqueue = queue.Queue()
self.sendqueue = queue.Queue()
+ self.scheduler = scheduler.Scheduler(self.eventqueue)
self.namespace_map = {}
@@ -151,7 +153,9 @@ class XMLStream(object):
raise RestartStream()
def process(self, threaded=True):
+ self.scheduler.process(threaded=True)
for t in range(0, HANDLER_THREADS):
+<<<<<<< HEAD
th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
th.setDaemon(True)
self.__thread['eventhandle%s' % t] = th
@@ -160,6 +164,13 @@ class XMLStream(object):
th.setDaemon(True)
self.__thread['sendthread'] = th
th.start()
+=======
+ logging.debug("Starting HANDLER THREAD")
+ self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
+ self.__thread['eventhandle%s' % t].start()
+ self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread)
+ self.__thread['sendthread'].start()
+>>>>>>> master
if threaded:
th = threading.Thread(name='process', target=self._process)
th.setDaemon(True)
@@ -168,8 +179,8 @@ class XMLStream(object):
else:
self._process()
- def schedule(self, seconds, handler, args=None):
- threading.Timer(seconds, handler, args).start()
+ def schedule(self, name, seconds, callback, args=None, kwargs=None, repeat=False):
+ self.scheduler.add(name, seconds, callback, args, kwargs, repeat, qpointer=self.eventqueue)
def _process(self):
"Start processing the socket."
@@ -189,6 +200,7 @@ class XMLStream(object):
self.state.set('reconnect', False)
self.disconnect()
self.run = False
+ self.scheduler.run = False
self.eventqueue.put(('quit', None, None))
return
except CloseStream:
@@ -237,6 +249,7 @@ class XMLStream(object):
edepth += -1
if edepth == 0 and event == b'end':
self.disconnect(reconnect=self.state['reconnect'])
+ logging.debug("Ending readXML loop")
return False
elif edepth == 1:
#self.xmlin.put(xmlobj)
@@ -245,11 +258,13 @@ class XMLStream(object):
except RestartStream:
return True
except CloseStream:
+ logging.debug("Ending readXML loop")
return False
if root:
root.clear()
if event == b'start':
edepth += 1
+ logging.debug("Ending readXML loop")
def _sendThread(self):
while self.run:
@@ -279,6 +294,7 @@ class XMLStream(object):
logging.debug("Disconnecting...")
self.state.set('disconnecting', True)
self.run = False
+ self.scheduler.run = False
if self.state['connected']:
self.sendRaw(self.stream_footer)
time.sleep(1)
@@ -337,6 +353,9 @@ class XMLStream(object):
event = self.eventqueue.get(True, timeout=5)
except queue.Empty:
event = None
+ except KeyboardInterrupt:
+ self.run = False
+ self.scheduler.run = False
if event is not None:
etype = event[0]
handler = event[1]
@@ -348,9 +367,10 @@ class XMLStream(object):
except Exception as e:
traceback.print_exc()
args[0].exception(e)
- elif etype == 'sched':
+ elif etype == 'schedule':
try:
- handler.run(*args)
+ logging.debug(args)
+ handler(*args[0])
except:
logging.error(traceback.format_exc())
elif etype == 'quit':