summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream/scheduler.py
diff options
context:
space:
mode:
authorNathan Fritz <nathan@andyet.net>2010-10-13 18:15:21 -0700
committerNathan Fritz <nathan@andyet.net>2010-10-13 18:15:21 -0700
commit7ad7a29a8f08ff815164731eec50ff1cba46b07a (patch)
tree444a2d89993072a377155d8f131899a19d18dffa /sleekxmpp/xmlstream/scheduler.py
parentb0e036d03c5e850af3a3c1589af7333becf54a86 (diff)
downloadslixmpp-7ad7a29a8f08ff815164731eec50ff1cba46b07a.tar.gz
slixmpp-7ad7a29a8f08ff815164731eec50ff1cba46b07a.tar.bz2
slixmpp-7ad7a29a8f08ff815164731eec50ff1cba46b07a.tar.xz
slixmpp-7ad7a29a8f08ff815164731eec50ff1cba46b07a.zip
new state machine in place
Diffstat (limited to 'sleekxmpp/xmlstream/scheduler.py')
-rw-r--r--sleekxmpp/xmlstream/scheduler.py68
1 files changed, 36 insertions, 32 deletions
diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py
index 0e3d30b3..0af1f508 100644
--- a/sleekxmpp/xmlstream/scheduler.py
+++ b/sleekxmpp/xmlstream/scheduler.py
@@ -104,7 +104,7 @@ class Scheduler(object):
quit -- Stop the scheduler.
"""
- def __init__(self, parentqueue=None):
+ def __init__(self, parentqueue=None, parentstop=None):
"""
Create a new scheduler.
@@ -116,6 +116,7 @@ class Scheduler(object):
self.thread = None
self.run = False
self.parentqueue = parentqueue
+ self.parentstop = parentstop
def process(self, threaded=True):
"""
@@ -135,38 +136,41 @@ class Scheduler(object):
def _process(self):
"""Process scheduled tasks."""
self.run = True
- while self.run:
- try:
- wait = 1
- updated = False
- if self.schedule:
- wait = self.schedule[0].next - time.time()
- try:
- if wait <= 0.0:
- newtask = self.addq.get(False)
- else:
- newtask = self.addq.get(True, wait)
- except queue.Empty:
- cleanup = []
- for task in self.schedule:
- if time.time() >= task.next:
- updated = True
- if not task.run():
- cleanup.append(task)
+ try:
+ while self.run:
+ wait = 1
+ updated = False
+ if self.schedule:
+ wait = self.schedule[0].next - time.time()
+ try:
+ if wait <= 0.0:
+ newtask = self.addq.get(False)
else:
- break
- for task in cleanup:
- x = self.schedule.pop(self.schedule.index(task))
- else:
- updated = True
- self.schedule.append(newtask)
- finally:
- if updated:
- self.schedule = sorted(self.schedule,
- key=lambda task: task.next)
- except KeyboardInterrupt:
- self.run = False
-
+ newtask = self.addq.get(True, wait)
+ except queue.Empty:
+ cleanup = []
+ for task in self.schedule:
+ if time.time() >= task.next:
+ updated = True
+ if not task.run():
+ cleanup.append(task)
+ else:
+ break
+ for task in cleanup:
+ x = self.schedule.pop(self.schedule.index(task))
+ else:
+ updated = True
+ self.schedule.append(newtask)
+ finally:
+ if updated:
+ self.schedule = sorted(self.schedule,
+ key=lambda task: task.next)
+ except KeyboardInterrupt:
+ self.run = False
+ if self.parentstop is not None: self.parentstop.set()
+ except SystemExit:
+ self.run = False
+ if self.parentstop is not None: self.parentstop.set()
logging.debug("Qutting Scheduler thread")
if self.parentqueue is not None:
self.parentqueue.put(('quit', None, None))