summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r--sleekxmpp/xmlstream/scheduler.py51
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py6
2 files changed, 35 insertions, 22 deletions
diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py
index 7aa59f3d..18d9229e 100644
--- a/sleekxmpp/xmlstream/scheduler.py
+++ b/sleekxmpp/xmlstream/scheduler.py
@@ -31,11 +31,12 @@ class Task(object):
class Scheduler(object):
"""Threaded scheduler that allows for updates mid-execution unlike http://docs.python.org/library/sched.html#module-sched"""
- def __init__(self):
+ def __init__(self, parentqueue=None):
self.addq = queue.Queue()
self.schedule = []
self.thread = None
self.run = False
+ self.parentqueue = parentqueue
def process(self, threaded=True):
if threaded:
@@ -47,29 +48,37 @@ class Scheduler(object):
def _process(self):
self.run = True
while self.run:
- wait = 5
- updated = False
- if self.schedule:
- wait = self.schedule[0].next - time.time()
try:
- newtask = self.addq.get(True, wait)
- except queue.Empty:
- cleanup = []
- for task in self.schedule:
- if time.time() >= task.next:
- updated = True
- if not task.run():
- cleanup.append(task)
+ wait = 5
+ updated = False
+ if self.schedule:
+ wait = self.schedule[0].next - time.time()
+ try:
+ if wait <= 0.0:
+ newtask = self.addq.get(False)
else:
- break
- for task in cleanup:
- x = self.schedule.pop(self.schedule.index(task))
- else:
- updated = True
- self.schedule.append(newtask)
- finally:
- if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next)
+ newtask = self.addq.get(True, wait)
+ except queue.Empty:
+ cleanup = []
+ for task in self.schedule:
+ if time.time() >= task.next:
+ updated = True
+ if not task.run():
+ cleanup.append(task)
+ else:
+ break
+ for task in cleanup:
+ x = self.schedule.pop(self.schedule.index(task))
+ else:
+ updated = True
+ self.schedule.append(newtask)
+ finally:
+ if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next)
+ except KeyboardInterrupt:
+ self.run = False
logging.debug("Qutting Scheduler thread")
+ if self.parentqueue is not None:
+ self.parentqueue.put(('quit', None, None))
def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None):
self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer))
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index 006f3876..cea300a7 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -76,7 +76,7 @@ class XMLStream(object):
self.eventqueue = queue.Queue()
self.sendqueue = queue.Queue()
- self.scheduler = scheduler.Scheduler()
+ self.scheduler = scheduler.Scheduler(self.eventqueue)
self.namespace_map = {}
@@ -149,6 +149,7 @@ class XMLStream(object):
def process(self, threaded=True):
self.scheduler.process(threaded=True)
for t in range(0, HANDLER_THREADS):
+ logging.debug("Starting HANDLER THREAD")
self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
self.__thread['eventhandle%s' % t].start()
self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread)
@@ -333,6 +334,9 @@ class XMLStream(object):
event = self.eventqueue.get(True, timeout=5)
except queue.Empty:
event = None
+ except KeyboardInterrupt:
+ self.run = False
+ self.scheduler.run = False
if event is not None:
etype = event[0]
handler = event[1]