diff options
author | Lance Stout <lancestout@gmail.com> | 2011-08-25 13:34:30 -0700 |
---|---|---|
committer | Lance Stout <lancestout@gmail.com> | 2011-08-25 13:34:30 -0700 |
commit | 4c08c9c52495a3e3e5b794fa346724abfb206c43 (patch) | |
tree | 2efcf5b106b5eb50e6a4da28c37814e67a0fbf62 /sleekxmpp/xmlstream | |
parent | 63b8444abe94eb2a4bc3922510a89ef0a30a0008 (diff) | |
download | slixmpp-4c08c9c52495a3e3e5b794fa346724abfb206c43.tar.gz slixmpp-4c08c9c52495a3e3e5b794fa346724abfb206c43.tar.bz2 slixmpp-4c08c9c52495a3e3e5b794fa346724abfb206c43.tar.xz slixmpp-4c08c9c52495a3e3e5b794fa346724abfb206c43.zip |
Update scheduler with locks and ability to remove tasks.
Scheduled tasks must have a unique name.
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r-- | sleekxmpp/xmlstream/scheduler.py | 72 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 5 |
2 files changed, 53 insertions, 24 deletions
diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py index 12deeddf..58219257 100644 --- a/sleekxmpp/xmlstream/scheduler.py +++ b/sleekxmpp/xmlstream/scheduler.py @@ -73,7 +73,8 @@ class Task(object): otherwise, execute the callback immediately. """ if self.qpointer is not None: - self.qpointer.put(('schedule', self.callback, self.args)) + self.qpointer.put(('schedule', self.callback, + self.args, self.name)) else: self.callback(*self.args, **self.kwargs) self.reset() @@ -95,31 +96,32 @@ class Scheduler(object): http://docs.python.org/library/sched.html#module-sched Attributes: - addq -- A queue storing added tasks. - schedule -- A list of tasks in order of execution times. - thread -- If threaded, the thread processing the schedule. - run -- Indicates if the scheduler is running. - parentqueue -- A parent event queue in control of this scheduler. - + addq -- A queue storing added tasks. + schedule -- A list of tasks in order of execution times. + thread -- If threaded, the thread processing the schedule. + run -- Indicates if the scheduler is running. + stop -- Threading event indicating if the main process + has been stopped. Methods: add -- Add a new task to the schedule. process -- Process and schedule tasks. quit -- Stop the scheduler. """ - def __init__(self, parentqueue=None, parentstop=None): + def __init__(self, parentstop=None): """ Create a new scheduler. Arguments: - parentqueue -- A separate event queue controlling this scheduler. + parentstop -- A threading event indicating if the main process has + been stopped. """ self.addq = queue.Queue() self.schedule = [] self.thread = None self.run = False - self.parentqueue = parentqueue - self.parentstop = parentstop + self.stop = parentstop + self.schedule_lock = threading.RLock() def process(self, threaded=True): """ @@ -141,8 +143,7 @@ class Scheduler(object): """Process scheduled tasks.""" self.run = True try: - while self.run and (self.parentstop is None or \ - not self.parentstop.isSet()): + while self.run and not self.stop.isSet(): wait = 1 updated = False if self.schedule: @@ -156,6 +157,7 @@ class Scheduler(object): newtask = self.addq.get(True, wait) except queue.Empty: cleanup = [] + self.schedule_lock.acquire() for task in self.schedule: if time.time() >= task.next: updated = True @@ -167,23 +169,18 @@ class Scheduler(object): x = self.schedule.pop(self.schedule.index(task)) else: updated = True + self.schedule_lock.acquire() self.schedule.append(newtask) finally: if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next) + self.schedule_lock.release() except KeyboardInterrupt: self.run = False - if self.parentstop is not None: - log.debug("stopping parent") - self.parentstop.set() except SystemExit: self.run = False - if self.parentstop is not None: - self.parentstop.set() log.debug("Quitting Scheduler thread") - if self.parentqueue is not None: - self.parentqueue.put(('quit', None, None)) def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None): @@ -201,8 +198,39 @@ class Scheduler(object): qpointer -- A pointer to an event queue for queuing callback execution instead of executing immediately. """ - self.addq.put(Task(name, seconds, callback, args, - kwargs, repeat, qpointer)) + try: + self.schedule_lock.acquire() + for task in self.schedule: + if task.name == name: + raise ValueError("Key %s already exists" % name) + + self.addq.put(Task(name, seconds, callback, args, + kwargs, repeat, qpointer)) + except: + raise + finally: + self.schedule_lock.release() + + def remove(self, name): + """ + Remove a scheduled task ahead of schedule, and without + executing it. + + Arguments: + name -- The name of the task to remove. + """ + try: + self.schedule_lock.acquire() + the_task = None + for task in self.schedule: + if task.name == name: + the_task = task + if the_task is not None: + self.schedule.remove(the_task) + except: + raise + finally: + self.schedule_lock.release() def quit(self): """Shutdown the scheduler.""" diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 3eb263ae..405f76d3 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -215,7 +215,7 @@ class XMLStream(object): self.event_queue = queue.Queue() self.send_queue = queue.Queue() self.__failed_send_stanza = None - self.scheduler = Scheduler(self.event_queue, self.stop) + self.scheduler = Scheduler(self.stop) self.namespace_map = {StanzaBase.xml_ns: 'xml'} @@ -1178,8 +1178,9 @@ class XMLStream(object): log.exception(error_msg % handler.name) orig.exception(e) elif etype == 'schedule': + name = args[1] try: - log.debug('Scheduled event: %s' % args) + log.debug('Scheduled event: %s: %s' % (name, args[0])) handler(*args[0]) except Exception as e: log.exception('Error processing scheduled task') |