summaryrefslogtreecommitdiff
path: root/slixmpp/xmlstream/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'slixmpp/xmlstream/scheduler.py')
-rw-r--r--slixmpp/xmlstream/scheduler.py250
1 files changed, 250 insertions, 0 deletions
diff --git a/slixmpp/xmlstream/scheduler.py b/slixmpp/xmlstream/scheduler.py
new file mode 100644
index 00000000..137230b6
--- /dev/null
+++ b/slixmpp/xmlstream/scheduler.py
@@ -0,0 +1,250 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.xmlstream.scheduler
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ This module provides a task scheduler that works better
+ with Slixmpp's threading usage than the stock version.
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+import time
+import threading
+import logging
+import itertools
+
+from slixmpp.util import Queue, QueueEmpty
+
+
+#: The time in seconds to wait for events from the event queue, and also the
+#: time between checks for the process stop signal.
+WAIT_TIMEOUT = 1.0
+
+
+log = logging.getLogger(__name__)
+
+
+class Task(object):
+
+ """
+ A scheduled task that will be executed by the scheduler
+ after a given time interval has passed.
+
+ :param string name: The name of the task.
+ :param int seconds: The number of seconds to wait before executing.
+ :param callback: The function to execute.
+ :param tuple args: The arguments to pass to the callback.
+ :param dict kwargs: The keyword arguments to pass to the callback.
+ :param bool repeat: Indicates if the task should repeat.
+ Defaults to ``False``.
+ :param pointer: A pointer to an event queue for queuing callback
+ execution instead of executing immediately.
+ """
+
+ def __init__(self, name, seconds, callback, args=None,
+ kwargs=None, repeat=False, qpointer=None):
+ #: The name of the task.
+ self.name = name
+
+ #: The number of seconds to wait before executing.
+ self.seconds = seconds
+
+ #: The function to execute once enough time has passed.
+ self.callback = callback
+
+ #: The arguments to pass to :attr:`callback`.
+ self.args = args or tuple()
+
+ #: The keyword arguments to pass to :attr:`callback`.
+ self.kwargs = kwargs or {}
+
+ #: Indicates if the task should repeat after executing,
+ #: using the same :attr:`seconds` delay.
+ self.repeat = repeat
+
+ #: The time when the task should execute next.
+ self.next = time.time() + self.seconds
+
+ #: The main event queue, which allows for callbacks to
+ #: be queued for execution instead of executing immediately.
+ self.qpointer = qpointer
+
+ def run(self):
+ """Execute the task's callback.
+
+ If an event queue was supplied, place the callback in the queue;
+ otherwise, execute the callback immediately.
+ """
+ if self.qpointer is not None:
+ self.qpointer.put(('schedule', self.callback,
+ self.args, self.kwargs, self.name))
+ else:
+ self.callback(*self.args, **self.kwargs)
+ self.reset()
+ return self.repeat
+
+ def reset(self):
+ """Reset the task's timer so that it will repeat."""
+ self.next = time.time() + self.seconds
+
+
+class Scheduler(object):
+
+ """
+ A threaded scheduler that allows for updates mid-execution unlike the
+ scheduler in the standard library.
+
+ Based on: http://docs.python.org/library/sched.html#module-sched
+
+ :param parentstop: An :class:`~threading.Event` to signal stopping
+ the scheduler.
+ """
+
+ def __init__(self, parentstop=None):
+ #: A queue for storing tasks
+ self.addq = Queue()
+
+ #: A list of tasks in order of execution time.
+ self.schedule = []
+
+ #: If running in threaded mode, this will be the thread processing
+ #: the schedule.
+ self.thread = None
+
+ #: A flag indicating that the scheduler is running.
+ self.run = False
+
+ #: An :class:`~threading.Event` instance for signalling to stop
+ #: the scheduler.
+ self.stop = parentstop
+
+ #: Lock for accessing the task queue.
+ self.schedule_lock = threading.RLock()
+
+ #: The time in seconds to wait for events from the event queue,
+ #: and also the time between checks for the process stop signal.
+ self.wait_timeout = WAIT_TIMEOUT
+
+ def process(self, threaded=True, daemon=False):
+ """Begin accepting and processing scheduled tasks.
+
+ :param bool threaded: Indicates if the scheduler should execute
+ in its own thread. Defaults to ``True``.
+ """
+ if threaded:
+ self.thread = threading.Thread(name='scheduler_process',
+ target=self._process)
+ self.thread.daemon = daemon
+ self.thread.start()
+ else:
+ self._process()
+
+ def _process(self):
+ """Process scheduled tasks."""
+ self.run = True
+ try:
+ while self.run and not self.stop.is_set():
+ updated = False
+ if self.schedule:
+ wait = self.schedule[0].next - time.time()
+ else:
+ wait = self.wait_timeout
+ try:
+ if wait <= 0.0:
+ newtask = self.addq.get(False)
+ else:
+ newtask = None
+ while self.run and \
+ not self.stop.is_set() and \
+ newtask is None and \
+ wait > 0:
+ try:
+ newtask = self.addq.get(True, min(wait, self.wait_timeout))
+ except QueueEmpty: # Nothing to add, nothing to do. Check run flags and continue waiting.
+ wait -= self.wait_timeout
+ except QueueEmpty: # Time to run some tasks, and no new tasks to add.
+ self.schedule_lock.acquire()
+ # select only those tasks which are to be executed now
+ relevant = itertools.takewhile(
+ lambda task: time.time() >= task.next, self.schedule)
+ # run the tasks and keep the return value in a tuple
+ status = map(lambda task: (task, task.run()), relevant)
+ # remove non-repeating tasks
+ for task, doRepeat in status:
+ if not doRepeat:
+ try:
+ self.schedule.remove(task)
+ except ValueError:
+ pass
+ else:
+ # only need to resort tasks if a repeated task has
+ # been kept in the list.
+ updated = True
+ else: # Add new task
+ self.schedule_lock.acquire()
+ if newtask is not None:
+ self.schedule.append(newtask)
+ updated = True
+ finally:
+ if updated:
+ self.schedule.sort(key=lambda task: task.next)
+ self.schedule_lock.release()
+ except KeyboardInterrupt:
+ self.run = False
+ except SystemExit:
+ self.run = False
+ log.debug("Quitting Scheduler thread")
+
+ def add(self, name, seconds, callback, args=None,
+ kwargs=None, repeat=False, qpointer=None):
+ """Schedule a new task.
+
+ :param string name: The name of the task.
+ :param int seconds: The number of seconds to wait before executing.
+ :param callback: The function to execute.
+ :param tuple args: The arguments to pass to the callback.
+ :param dict kwargs: The keyword arguments to pass to the callback.
+ :param bool repeat: Indicates if the task should repeat.
+ Defaults to ``False``.
+ :param pointer: A pointer to an event queue for queuing callback
+ execution instead of executing immediately.
+ """
+ try:
+ self.schedule_lock.acquire()
+ for task in self.schedule:
+ if task.name == name:
+ raise ValueError("Key %s already exists" % name)
+
+ self.addq.put(Task(name, seconds, callback, args,
+ kwargs, repeat, qpointer))
+ except:
+ raise
+ finally:
+ self.schedule_lock.release()
+
+ def remove(self, name):
+ """Remove a scheduled task ahead of schedule, and without
+ executing it.
+
+ :param string name: The name of the task to remove.
+ """
+ try:
+ self.schedule_lock.acquire()
+ the_task = None
+ for task in self.schedule:
+ if task.name == name:
+ the_task = task
+ if the_task is not None:
+ self.schedule.remove(the_task)
+ except:
+ raise
+ finally:
+ self.schedule_lock.release()
+
+ def quit(self):
+ """Shutdown the scheduler."""
+ self.run = False