summaryrefslogtreecommitdiff
path: root/sleekxmpp
diff options
context:
space:
mode:
authorLance Stout <lancestout@gmail.com>2011-08-25 13:34:30 -0700
committerLance Stout <lancestout@gmail.com>2011-08-25 13:34:30 -0700
commit4c08c9c52495a3e3e5b794fa346724abfb206c43 (patch)
tree2efcf5b106b5eb50e6a4da28c37814e67a0fbf62 /sleekxmpp
parent63b8444abe94eb2a4bc3922510a89ef0a30a0008 (diff)
downloadslixmpp-4c08c9c52495a3e3e5b794fa346724abfb206c43.tar.gz
slixmpp-4c08c9c52495a3e3e5b794fa346724abfb206c43.tar.bz2
slixmpp-4c08c9c52495a3e3e5b794fa346724abfb206c43.tar.xz
slixmpp-4c08c9c52495a3e3e5b794fa346724abfb206c43.zip
Update scheduler with locks and ability to remove tasks.
Scheduled tasks must have a unique name.
Diffstat (limited to 'sleekxmpp')
-rw-r--r--sleekxmpp/xmlstream/scheduler.py72
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py5
2 files changed, 53 insertions, 24 deletions
diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py
index 12deeddf..58219257 100644
--- a/sleekxmpp/xmlstream/scheduler.py
+++ b/sleekxmpp/xmlstream/scheduler.py
@@ -73,7 +73,8 @@ class Task(object):
otherwise, execute the callback immediately.
"""
if self.qpointer is not None:
- self.qpointer.put(('schedule', self.callback, self.args))
+ self.qpointer.put(('schedule', self.callback,
+ self.args, self.name))
else:
self.callback(*self.args, **self.kwargs)
self.reset()
@@ -95,31 +96,32 @@ class Scheduler(object):
http://docs.python.org/library/sched.html#module-sched
Attributes:
- addq -- A queue storing added tasks.
- schedule -- A list of tasks in order of execution times.
- thread -- If threaded, the thread processing the schedule.
- run -- Indicates if the scheduler is running.
- parentqueue -- A parent event queue in control of this scheduler.
-
+ addq -- A queue storing added tasks.
+ schedule -- A list of tasks in order of execution times.
+ thread -- If threaded, the thread processing the schedule.
+ run -- Indicates if the scheduler is running.
+ stop -- Threading event indicating if the main process
+ has been stopped.
Methods:
add -- Add a new task to the schedule.
process -- Process and schedule tasks.
quit -- Stop the scheduler.
"""
- def __init__(self, parentqueue=None, parentstop=None):
+ def __init__(self, parentstop=None):
"""
Create a new scheduler.
Arguments:
- parentqueue -- A separate event queue controlling this scheduler.
+ parentstop -- A threading event indicating if the main process has
+ been stopped.
"""
self.addq = queue.Queue()
self.schedule = []
self.thread = None
self.run = False
- self.parentqueue = parentqueue
- self.parentstop = parentstop
+ self.stop = parentstop
+ self.schedule_lock = threading.RLock()
def process(self, threaded=True):
"""
@@ -141,8 +143,7 @@ class Scheduler(object):
"""Process scheduled tasks."""
self.run = True
try:
- while self.run and (self.parentstop is None or \
- not self.parentstop.isSet()):
+ while self.run and not self.stop.isSet():
wait = 1
updated = False
if self.schedule:
@@ -156,6 +157,7 @@ class Scheduler(object):
newtask = self.addq.get(True, wait)
except queue.Empty:
cleanup = []
+ self.schedule_lock.acquire()
for task in self.schedule:
if time.time() >= task.next:
updated = True
@@ -167,23 +169,18 @@ class Scheduler(object):
x = self.schedule.pop(self.schedule.index(task))
else:
updated = True
+ self.schedule_lock.acquire()
self.schedule.append(newtask)
finally:
if updated:
self.schedule = sorted(self.schedule,
key=lambda task: task.next)
+ self.schedule_lock.release()
except KeyboardInterrupt:
self.run = False
- if self.parentstop is not None:
- log.debug("stopping parent")
- self.parentstop.set()
except SystemExit:
self.run = False
- if self.parentstop is not None:
- self.parentstop.set()
log.debug("Quitting Scheduler thread")
- if self.parentqueue is not None:
- self.parentqueue.put(('quit', None, None))
def add(self, name, seconds, callback, args=None,
kwargs=None, repeat=False, qpointer=None):
@@ -201,8 +198,39 @@ class Scheduler(object):
qpointer -- A pointer to an event queue for queuing callback
execution instead of executing immediately.
"""
- self.addq.put(Task(name, seconds, callback, args,
- kwargs, repeat, qpointer))
+ try:
+ self.schedule_lock.acquire()
+ for task in self.schedule:
+ if task.name == name:
+ raise ValueError("Key %s already exists" % name)
+
+ self.addq.put(Task(name, seconds, callback, args,
+ kwargs, repeat, qpointer))
+ except:
+ raise
+ finally:
+ self.schedule_lock.release()
+
+ def remove(self, name):
+ """
+ Remove a scheduled task ahead of schedule, and without
+ executing it.
+
+ Arguments:
+ name -- The name of the task to remove.
+ """
+ try:
+ self.schedule_lock.acquire()
+ the_task = None
+ for task in self.schedule:
+ if task.name == name:
+ the_task = task
+ if the_task is not None:
+ self.schedule.remove(the_task)
+ except:
+ raise
+ finally:
+ self.schedule_lock.release()
def quit(self):
"""Shutdown the scheduler."""
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index 3eb263ae..405f76d3 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -215,7 +215,7 @@ class XMLStream(object):
self.event_queue = queue.Queue()
self.send_queue = queue.Queue()
self.__failed_send_stanza = None
- self.scheduler = Scheduler(self.event_queue, self.stop)
+ self.scheduler = Scheduler(self.stop)
self.namespace_map = {StanzaBase.xml_ns: 'xml'}
@@ -1178,8 +1178,9 @@ class XMLStream(object):
log.exception(error_msg % handler.name)
orig.exception(e)
elif etype == 'schedule':
+ name = args[1]
try:
- log.debug('Scheduled event: %s' % args)
+ log.debug('Scheduled event: %s: %s' % (name, args[0]))
handler(*args[0])
except Exception as e:
log.exception('Error processing scheduled task')