summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream/scheduler.py')
-rw-r--r--sleekxmpp/xmlstream/scheduler.py70
1 files changed, 38 insertions, 32 deletions
diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py
index 6f6e0278..cf47c164 100644
--- a/sleekxmpp/xmlstream/scheduler.py
+++ b/sleekxmpp/xmlstream/scheduler.py
@@ -139,39 +139,45 @@ class Scheduler(object):
"""Process scheduled tasks."""
self.run = True
try:
- while self.run and not self.stop.isSet():
- wait = 1
- updated = False
- if self.schedule:
- wait = self.schedule[0].next - time.time()
- try:
- if wait <= 0.0:
- newtask = self.addq.get(False)
- else:
- if wait >= 3.0:
- wait = 3.0
- newtask = self.addq.get(True, wait)
- except queue.Empty:
- cleanup = []
- self.schedule_lock.acquire()
- for task in self.schedule:
- if time.time() >= task.next:
- updated = True
- if not task.run():
- cleanup.append(task)
- else:
- break
- for task in cleanup:
- self.schedule.pop(self.schedule.index(task))
+ while self.run and not self.stop.is_set():
+ wait = 0.1
+ updated = False
+ if self.schedule:
+ wait = self.schedule[0].next - time.time()
+ try:
+ if wait <= 0.0:
+ newtask = self.addq.get(False)
else:
- updated = True
- self.schedule_lock.acquire()
- self.schedule.append(newtask)
- finally:
- if updated:
- self.schedule = sorted(self.schedule,
- key=lambda task: task.next)
- self.schedule_lock.release()
+ if wait >= 3.0:
+ wait = 3.0
+ newtask = None
+ elapsed = 0
+ while not self.stop.is_set() and \
+ newtask is None and \
+ elapsed < wait:
+ newtask = self.addq.get(True, 0.1)
+ elapsed += 0.1
+ except queue.Empty:
+ cleanup = []
+ self.schedule_lock.acquire()
+ for task in self.schedule:
+ if time.time() >= task.next:
+ updated = True
+ if not task.run():
+ cleanup.append(task)
+ else:
+ break
+ for task in cleanup:
+ self.schedule.pop(self.schedule.index(task))
+ else:
+ updated = True
+ self.schedule_lock.acquire()
+ self.schedule.append(newtask)
+ finally:
+ if updated:
+ self.schedule = sorted(self.schedule,
+ key=lambda task: task.next)
+ self.schedule_lock.release()
except KeyboardInterrupt:
self.run = False
except SystemExit: