diff options
Diffstat (limited to 'sleekxmpp/xmlstream/scheduler.py')
-rw-r--r-- | sleekxmpp/xmlstream/scheduler.py | 70 |
1 files changed, 42 insertions, 28 deletions
diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py index f68af081..e6fae37a 100644 --- a/sleekxmpp/xmlstream/scheduler.py +++ b/sleekxmpp/xmlstream/scheduler.py @@ -15,10 +15,14 @@ import time import threading import logging -try: - import queue -except ImportError: - import Queue as queue +import itertools + +from sleekxmpp.util import Queue, QueueEmpty + + +#: The time in seconds to wait for events from the event queue, and also the +#: time between checks for the process stop signal. +WAIT_TIMEOUT = 1.0 log = logging.getLogger(__name__) @@ -77,7 +81,7 @@ class Task(object): """ if self.qpointer is not None: self.qpointer.put(('schedule', self.callback, - self.args, self.name)) + self.args, self.kwargs, self.name)) else: self.callback(*self.args, **self.kwargs) self.reset() @@ -102,7 +106,7 @@ class Scheduler(object): def __init__(self, parentstop=None): #: A queue for storing tasks - self.addq = queue.Queue() + self.addq = Queue() #: A list of tasks in order of execution time. self.schedule = [] @@ -121,6 +125,10 @@ class Scheduler(object): #: Lock for accessing the task queue. self.schedule_lock = threading.RLock() + #: The time in seconds to wait for events from the event queue, + #: and also the time between checks for the process stop signal. + self.wait_timeout = WAIT_TIMEOUT + def process(self, threaded=True, daemon=False): """Begin accepting and processing scheduled tasks. @@ -140,44 +148,50 @@ class Scheduler(object): self.run = True try: while self.run and not self.stop.is_set(): - wait = 0.1 updated = False if self.schedule: wait = self.schedule[0].next - time.time() + else: + wait = self.wait_timeout try: if wait <= 0.0: newtask = self.addq.get(False) else: - if wait >= 3.0: - wait = 3.0 newtask = None - elapsed = 0 - while not self.stop.is_set() and \ + while self.run and \ + not self.stop.is_set() and \ newtask is None and \ - elapsed < wait: - newtask = self.addq.get(True, 0.1) - elapsed += 0.1 - except queue.Empty: - cleanup = [] + wait > 0: + try: + newtask = self.addq.get(True, min(wait, self.wait_timeout)) + except QueueEmpty: # Nothing to add, nothing to do. Check run flags and continue waiting. + wait -= self.wait_timeout + except QueueEmpty: # Time to run some tasks, and no new tasks to add. self.schedule_lock.acquire() - for task in self.schedule: - if time.time() >= task.next: - updated = True - if not task.run(): - cleanup.append(task) + # select only those tasks which are to be executed now + relevant = itertools.takewhile( + lambda task: time.time() >= task.next, self.schedule) + # run the tasks and keep the return value in a tuple + status = map(lambda task: (task, task.run()), relevant) + # remove non-repeating tasks + for task, doRepeat in status: + if not doRepeat: + try: + self.schedule.remove(task) + except ValueError: + pass else: - break - for task in cleanup: - self.schedule.pop(self.schedule.index(task)) - else: - updated = True + # only need to resort tasks if a repeated task has + # been kept in the list. + updated = True + else: # Add new task self.schedule_lock.acquire() if newtask is not None: self.schedule.append(newtask) + updated = True finally: if updated: - self.schedule = sorted(self.schedule, - key=lambda task: task.next) + self.schedule.sort(key=lambda task: task.next) self.schedule_lock.release() except KeyboardInterrupt: self.run = False |