From 4c08c9c52495a3e3e5b794fa346724abfb206c43 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Thu, 25 Aug 2011 13:34:30 -0700 Subject: Update scheduler with locks and ability to remove tasks. Scheduled tasks must have a unique name. --- sleekxmpp/xmlstream/scheduler.py | 72 ++++++++++++++++++++++++++++------------ sleekxmpp/xmlstream/xmlstream.py | 5 +-- 2 files changed, 53 insertions(+), 24 deletions(-) (limited to 'sleekxmpp') diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py index 12deeddf..58219257 100644 --- a/sleekxmpp/xmlstream/scheduler.py +++ b/sleekxmpp/xmlstream/scheduler.py @@ -73,7 +73,8 @@ class Task(object): otherwise, execute the callback immediately. """ if self.qpointer is not None: - self.qpointer.put(('schedule', self.callback, self.args)) + self.qpointer.put(('schedule', self.callback, + self.args, self.name)) else: self.callback(*self.args, **self.kwargs) self.reset() @@ -95,31 +96,32 @@ class Scheduler(object): http://docs.python.org/library/sched.html#module-sched Attributes: - addq -- A queue storing added tasks. - schedule -- A list of tasks in order of execution times. - thread -- If threaded, the thread processing the schedule. - run -- Indicates if the scheduler is running. - parentqueue -- A parent event queue in control of this scheduler. - + addq -- A queue storing added tasks. + schedule -- A list of tasks in order of execution times. + thread -- If threaded, the thread processing the schedule. + run -- Indicates if the scheduler is running. + stop -- Threading event indicating if the main process + has been stopped. Methods: add -- Add a new task to the schedule. process -- Process and schedule tasks. quit -- Stop the scheduler. """ - def __init__(self, parentqueue=None, parentstop=None): + def __init__(self, parentstop=None): """ Create a new scheduler. Arguments: - parentqueue -- A separate event queue controlling this scheduler. + parentstop -- A threading event indicating if the main process has + been stopped. """ self.addq = queue.Queue() self.schedule = [] self.thread = None self.run = False - self.parentqueue = parentqueue - self.parentstop = parentstop + self.stop = parentstop + self.schedule_lock = threading.RLock() def process(self, threaded=True): """ @@ -141,8 +143,7 @@ class Scheduler(object): """Process scheduled tasks.""" self.run = True try: - while self.run and (self.parentstop is None or \ - not self.parentstop.isSet()): + while self.run and not self.stop.isSet(): wait = 1 updated = False if self.schedule: @@ -156,6 +157,7 @@ class Scheduler(object): newtask = self.addq.get(True, wait) except queue.Empty: cleanup = [] + self.schedule_lock.acquire() for task in self.schedule: if time.time() >= task.next: updated = True @@ -167,23 +169,18 @@ class Scheduler(object): x = self.schedule.pop(self.schedule.index(task)) else: updated = True + self.schedule_lock.acquire() self.schedule.append(newtask) finally: if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next) + self.schedule_lock.release() except KeyboardInterrupt: self.run = False - if self.parentstop is not None: - log.debug("stopping parent") - self.parentstop.set() except SystemExit: self.run = False - if self.parentstop is not None: - self.parentstop.set() log.debug("Quitting Scheduler thread") - if self.parentqueue is not None: - self.parentqueue.put(('quit', None, None)) def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None): @@ -201,8 +198,39 @@ class Scheduler(object): qpointer -- A pointer to an event queue for queuing callback execution instead of executing immediately. """ - self.addq.put(Task(name, seconds, callback, args, - kwargs, repeat, qpointer)) + try: + self.schedule_lock.acquire() + for task in self.schedule: + if task.name == name: + raise ValueError("Key %s already exists" % name) + + self.addq.put(Task(name, seconds, callback, args, + kwargs, repeat, qpointer)) + except: + raise + finally: + self.schedule_lock.release() + + def remove(self, name): + """ + Remove a scheduled task ahead of schedule, and without + executing it. + + Arguments: + name -- The name of the task to remove. + """ + try: + self.schedule_lock.acquire() + the_task = None + for task in self.schedule: + if task.name == name: + the_task = task + if the_task is not None: + self.schedule.remove(the_task) + except: + raise + finally: + self.schedule_lock.release() def quit(self): """Shutdown the scheduler.""" diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 3eb263ae..405f76d3 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -215,7 +215,7 @@ class XMLStream(object): self.event_queue = queue.Queue() self.send_queue = queue.Queue() self.__failed_send_stanza = None - self.scheduler = Scheduler(self.event_queue, self.stop) + self.scheduler = Scheduler(self.stop) self.namespace_map = {StanzaBase.xml_ns: 'xml'} @@ -1178,8 +1178,9 @@ class XMLStream(object): log.exception(error_msg % handler.name) orig.exception(e) elif etype == 'schedule': + name = args[1] try: - log.debug('Scheduled event: %s' % args) + log.debug('Scheduled event: %s: %s' % (name, args[0])) handler(*args[0]) except Exception as e: log.exception('Error processing scheduled task') -- cgit v1.2.3