summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream/scheduler.py')
-rw-r--r--sleekxmpp/xmlstream/scheduler.py70
1 files changed, 42 insertions, 28 deletions
diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py
index f68af081..e6fae37a 100644
--- a/sleekxmpp/xmlstream/scheduler.py
+++ b/sleekxmpp/xmlstream/scheduler.py
@@ -15,10 +15,14 @@
import time
import threading
import logging
-try:
- import queue
-except ImportError:
- import Queue as queue
+import itertools
+
+from sleekxmpp.util import Queue, QueueEmpty
+
+
+#: The time in seconds to wait for events from the event queue, and also the
+#: time between checks for the process stop signal.
+WAIT_TIMEOUT = 1.0
log = logging.getLogger(__name__)
@@ -77,7 +81,7 @@ class Task(object):
"""
if self.qpointer is not None:
self.qpointer.put(('schedule', self.callback,
- self.args, self.name))
+ self.args, self.kwargs, self.name))
else:
self.callback(*self.args, **self.kwargs)
self.reset()
@@ -102,7 +106,7 @@ class Scheduler(object):
def __init__(self, parentstop=None):
#: A queue for storing tasks
- self.addq = queue.Queue()
+ self.addq = Queue()
#: A list of tasks in order of execution time.
self.schedule = []
@@ -121,6 +125,10 @@ class Scheduler(object):
#: Lock for accessing the task queue.
self.schedule_lock = threading.RLock()
+ #: The time in seconds to wait for events from the event queue,
+ #: and also the time between checks for the process stop signal.
+ self.wait_timeout = WAIT_TIMEOUT
+
def process(self, threaded=True, daemon=False):
"""Begin accepting and processing scheduled tasks.
@@ -140,44 +148,50 @@ class Scheduler(object):
self.run = True
try:
while self.run and not self.stop.is_set():
- wait = 0.1
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
+ else:
+ wait = self.wait_timeout
try:
if wait <= 0.0:
newtask = self.addq.get(False)
else:
- if wait >= 3.0:
- wait = 3.0
newtask = None
- elapsed = 0
- while not self.stop.is_set() and \
+ while self.run and \
+ not self.stop.is_set() and \
newtask is None and \
- elapsed < wait:
- newtask = self.addq.get(True, 0.1)
- elapsed += 0.1
- except queue.Empty:
- cleanup = []
+ wait > 0:
+ try:
+ newtask = self.addq.get(True, min(wait, self.wait_timeout))
+ except QueueEmpty: # Nothing to add, nothing to do. Check run flags and continue waiting.
+ wait -= self.wait_timeout
+ except QueueEmpty: # Time to run some tasks, and no new tasks to add.
self.schedule_lock.acquire()
- for task in self.schedule:
- if time.time() >= task.next:
- updated = True
- if not task.run():
- cleanup.append(task)
+ # select only those tasks which are to be executed now
+ relevant = itertools.takewhile(
+ lambda task: time.time() >= task.next, self.schedule)
+ # run the tasks and keep the return value in a tuple
+ status = map(lambda task: (task, task.run()), relevant)
+ # remove non-repeating tasks
+ for task, doRepeat in status:
+ if not doRepeat:
+ try:
+ self.schedule.remove(task)
+ except ValueError:
+ pass
else:
- break
- for task in cleanup:
- self.schedule.pop(self.schedule.index(task))
- else:
- updated = True
+ # only need to resort tasks if a repeated task has
+ # been kept in the list.
+ updated = True
+ else: # Add new task
self.schedule_lock.acquire()
if newtask is not None:
self.schedule.append(newtask)
+ updated = True
finally:
if updated:
- self.schedule = sorted(self.schedule,
- key=lambda task: task.next)
+ self.schedule.sort(key=lambda task: task.next)
self.schedule_lock.release()
except KeyboardInterrupt:
self.run = False