summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream/scheduler.py
blob: 2efa7d1e706f1cceefca0837ec4c9f3ea2cf87b9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# -*- coding: utf-8 -*-
"""
    sleekxmpp.xmlstream.scheduler
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

    This module provides a task scheduler that works better
    with SleekXMPP's threading usage than the stock version.

    Part of SleekXMPP: The Sleek XMPP Library

    :copyright: (c) 2011 Nathanael C. Fritz
    :license: MIT, see LICENSE for more details
"""

import time
import threading
import logging
import itertools

from sleekxmpp.util import Queue, QueueEmpty


#: The time in seconds to wait for events from the event queue, and also the
#: time between checks for the process stop signal.
WAIT_TIMEOUT = 1.0


log = logging.getLogger(__name__)


class Task(object):

    """
    A scheduled task that will be executed by the scheduler
    after a given time interval has passed.

    :param string name: The name of the task.
    :param int seconds: The number of seconds to wait before executing.
    :param callback: The function to execute.
    :param tuple args: The arguments to pass to the callback.
    :param dict kwargs: The keyword arguments to pass to the callback.
    :param bool repeat: Indicates if the task should repeat.
                        Defaults to ``False``.
    :param pointer: A pointer to an event queue for queuing callback
                    execution instead of executing immediately.
    """

    def __init__(self, name, seconds, callback, args=None,
                 kwargs=None, repeat=False, qpointer=None):
        #: The name of the task.
        self.name = name

        #: The number of seconds to wait before executing.
        self.seconds = seconds

        #: The function to execute once enough time has passed.
        self.callback = callback

        #: The arguments to pass to :attr:`callback`.
        self.args = args or tuple()

        #: The keyword arguments to pass to :attr:`callback`.
        self.kwargs = kwargs or {}

        #: Indicates if the task should repeat after executing,
        #: using the same :attr:`seconds` delay.
        self.repeat = repeat

        #: The time when the task should execute next.
        self.next = time.time() + self.seconds

        #: The main event queue, which allows for callbacks to
        #: be queued for execution instead of executing immediately.
        self.qpointer = qpointer

    def run(self):
        """Execute the task's callback.

        If an event queue was supplied, place the callback in the queue;
        otherwise, execute the callback immediately.
        """
        if self.qpointer is not None:
            self.qpointer.put(('schedule', self.callback,
                               self.args, self.kwargs, self.name))
        else:
            self.callback(*self.args, **self.kwargs)
        self.reset()
        return self.repeat

    def reset(self):
        """Reset the task's timer so that it will repeat."""
        self.next = time.time() + self.seconds


class Scheduler(object):

    """
    A threaded scheduler that allows for updates mid-execution unlike the
    scheduler in the standard library.

    Based on: http://docs.python.org/library/sched.html#module-sched

    :param parentstop: An :class:`~threading.Event` to signal stopping
                       the scheduler.
    """

    def __init__(self, parentstop=None):
        #: A queue for storing tasks
        self.addq = Queue()

        #: A list of tasks in order of execution time.
        self.schedule = []

        #: If running in threaded mode, this will be the thread processing
        #: the schedule.
        self.thread = None

        #: A flag indicating that the scheduler is running.
        self.run = False

        #: An :class:`~threading.Event` instance for signalling to stop
        #: the scheduler.
        self.stop = parentstop

        #: Lock for accessing the task queue.
        self.schedule_lock = threading.RLock()

        #: The time in seconds to wait for events from the event queue,
        #: and also the time between checks for the process stop signal.
        self.wait_timeout = WAIT_TIMEOUT

    def process(self, threaded=True, daemon=False):
        """Begin accepting and processing scheduled tasks.

        :param bool threaded: Indicates if the scheduler should execute
                              in its own thread. Defaults to ``True``.
        """
        if threaded:
            self.thread = threading.Thread(name='scheduler_process',
                                           target=self._process)
            self.thread.daemon = daemon
            self.thread.start()
        else:
            self._process()

    def _process(self):
        """Process scheduled tasks."""
        self.run = True
        try:
            while self.run and not self.stop.is_set():
                updated = False
                if self.schedule:
                    wait = self.schedule[0].next - time.time()
                else:
                    wait = self.wait_timeout
                try:
                    if wait <= 0.0:
                        newtask = self.addq.get(False)
                    else:
                        if wait > 3.0:
                            wait = 3.0
                        newtask = None
                        elapsed = 0
                        while self.run and \
                              not self.stop.is_set() and \
                              newtask is None and \
                              elapsed < wait:
                            newtask = self.addq.get(True, self.wait_timeout)
                            elapsed += self.wait_timeout
                except QueueEmpty:                      # Time to run some tasks, and no new tasks to add.
                    self.schedule_lock.acquire()
                    # select only those tasks which are to be executed now
                    relevant = itertools.takewhile(
                        lambda task: time.time() >= task.next, self.schedule)
                    # run the tasks and keep the return value in a tuple
                    status = map(lambda task: (task, task.run()), relevant)
                    # remove non-repeating tasks
                    for task, doRepeat in status:
                        if not doRepeat:
                            try:
                                self.schedule.remove(task)
                            except ValueError:
                                pass
                        else:
                            # only need to resort tasks if a repeated task has
                            # been kept in the list.
                            updated = True
                else:                                   # Add new task
                    self.schedule_lock.acquire()
                    if newtask is not None:
                        self.schedule.append(newtask)
                        updated = True
                finally:
                    if updated:
                        self.schedule.sort(key=lambda task: task.next)
                    self.schedule_lock.release()
        except KeyboardInterrupt:
            self.run = False
        except SystemExit:
            self.run = False
        log.debug("Quitting Scheduler thread")

    def add(self, name, seconds, callback, args=None,
            kwargs=None, repeat=False, qpointer=None):
        """Schedule a new task.

        :param string name: The name of the task.
        :param int seconds: The number of seconds to wait before executing.
        :param callback: The function to execute.
        :param tuple args: The arguments to pass to the callback.
        :param dict kwargs: The keyword arguments to pass to the callback.
        :param bool repeat: Indicates if the task should repeat.
                            Defaults to ``False``.
        :param pointer: A pointer to an event queue for queuing callback
                        execution instead of executing immediately.
        """
        try:
            self.schedule_lock.acquire()
            for task in self.schedule:
                if task.name == name:
                    raise ValueError("Key %s already exists" % name)

            self.addq.put(Task(name, seconds, callback, args,
                               kwargs, repeat, qpointer))
        except:
            raise
        finally:
            self.schedule_lock.release()

    def remove(self, name):
        """Remove a scheduled task ahead of schedule, and without
        executing it.

        :param string name: The name of the task to remove.
        """
        try:
            self.schedule_lock.acquire()
            the_task = None
            for task in self.schedule:
                if task.name == name:
                    the_task = task
            if the_task is not None:
                self.schedule.remove(the_task)
        except:
            raise
        finally:
            self.schedule_lock.release()

    def quit(self):
        """Shutdown the scheduler."""
        self.run = False