From 7430a8ca401a4ece1b9e717e3f45c9c93dda6960 Mon Sep 17 00:00:00 2001 From: Anton Ryzhov Date: Thu, 20 Jun 2013 12:41:46 +0400 Subject: Some optimizations in scheduler --- sleekxmpp/xmlstream/scheduler.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py index bef8f5e5..d57b2271 100644 --- a/sleekxmpp/xmlstream/scheduler.py +++ b/sleekxmpp/xmlstream/scheduler.py @@ -139,24 +139,26 @@ class Scheduler(object): self.run = True try: while self.run and not self.stop.is_set(): - wait = 0.1 updated = False if self.schedule: wait = self.schedule[0].next - time.time() + else: + wait = 0.1 try: if wait <= 0.0: newtask = self.addq.get(False) else: - if wait >= 3.0: + if wait > 3.0: wait = 3.0 newtask = None elapsed = 0 - while not self.stop.is_set() and \ + while self.run and \ + not self.stop.is_set() and \ newtask is None and \ elapsed < wait: newtask = self.addq.get(True, 0.1) elapsed += 0.1 - except QueueEmpty: + except QueueEmpty: # Time to run some tasks, and no new tasks to add. self.schedule_lock.acquire() # select only those tasks which are to be executed now relevant = itertools.takewhile( @@ -174,11 +176,11 @@ class Scheduler(object): # only need to resort tasks if a repeated task has # been kept in the list. updated = True - else: - updated = True + else: # Add new task self.schedule_lock.acquire() if newtask is not None: self.schedule.append(newtask) + updated = True finally: if updated: self.schedule.sort(key=lambda task: task.next) -- cgit v1.2.3 From 805f1c0e392b29dd335ea6b62b6c08a17a7f6e2c Mon Sep 17 00:00:00 2001 From: Anton Ryzhov Date: Thu, 20 Jun 2013 13:07:57 +0400 Subject: Use timeout constants instead of magic numbers in scheduler and event loop Set default wait timeout as max() of previous values --- sleekxmpp/xmlstream/scheduler.py | 15 ++++++++++++--- sleekxmpp/xmlstream/xmlstream.py | 9 ++++----- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py index d57b2271..2efa7d1e 100644 --- a/sleekxmpp/xmlstream/scheduler.py +++ b/sleekxmpp/xmlstream/scheduler.py @@ -20,6 +20,11 @@ import itertools from sleekxmpp.util import Queue, QueueEmpty +#: The time in seconds to wait for events from the event queue, and also the +#: time between checks for the process stop signal. +WAIT_TIMEOUT = 1.0 + + log = logging.getLogger(__name__) @@ -120,6 +125,10 @@ class Scheduler(object): #: Lock for accessing the task queue. self.schedule_lock = threading.RLock() + #: The time in seconds to wait for events from the event queue, + #: and also the time between checks for the process stop signal. + self.wait_timeout = WAIT_TIMEOUT + def process(self, threaded=True, daemon=False): """Begin accepting and processing scheduled tasks. @@ -143,7 +152,7 @@ class Scheduler(object): if self.schedule: wait = self.schedule[0].next - time.time() else: - wait = 0.1 + wait = self.wait_timeout try: if wait <= 0.0: newtask = self.addq.get(False) @@ -156,8 +165,8 @@ class Scheduler(object): not self.stop.is_set() and \ newtask is None and \ elapsed < wait: - newtask = self.addq.get(True, 0.1) - elapsed += 0.1 + newtask = self.addq.get(True, self.wait_timeout) + elapsed += self.wait_timeout except QueueEmpty: # Time to run some tasks, and no new tasks to add. self.schedule_lock.acquire() # select only those tasks which are to be executed now diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 478bd9c0..bf2e01fa 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -49,7 +49,7 @@ RESPONSE_TIMEOUT = 30 #: The time in seconds to wait for events from the event queue, and also the #: time between checks for the process stop signal. -WAIT_TIMEOUT = 0.1 +WAIT_TIMEOUT = 1.0 #: The number of threads to use to handle XML stream events. This is not the #: same as the number of custom event handling threads. @@ -1629,8 +1629,7 @@ class XMLStream(object): try: while not self.stop.is_set(): try: - wait = self.wait_timeout - event = self.event_queue.get(True, timeout=wait) + event = self.event_queue.get(True, timeout=self.wait_timeout) except QueueEmpty: event = None if event is None: @@ -1693,13 +1692,13 @@ class XMLStream(object): while not self.stop.is_set(): while not self.stop.is_set() and \ not self.session_started_event.is_set(): - self.session_started_event.wait(timeout=0.1) + self.session_started_event.wait(timeout=0.1) # Wait for session start if self.__failed_send_stanza is not None: data = self.__failed_send_stanza self.__failed_send_stanza = None else: try: - data = self.send_queue.get(True, 1) + data = self.send_queue.get(True, timeout=self.wait_timeout) # Wait for data to send except QueueEmpty: continue log.debug("SEND: %s", data) -- cgit v1.2.3 From a3606d9e4d3dcdf404949c5293545ac0d9ee5803 Mon Sep 17 00:00:00 2001 From: Anton Ryzhov Date: Thu, 20 Jun 2013 14:03:49 +0400 Subject: Fixed scheduler wait loop Do fastloop wait until task run time --- sleekxmpp/xmlstream/scheduler.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py index 2efa7d1e..e6fae37a 100644 --- a/sleekxmpp/xmlstream/scheduler.py +++ b/sleekxmpp/xmlstream/scheduler.py @@ -157,16 +157,15 @@ class Scheduler(object): if wait <= 0.0: newtask = self.addq.get(False) else: - if wait > 3.0: - wait = 3.0 newtask = None - elapsed = 0 while self.run and \ not self.stop.is_set() and \ newtask is None and \ - elapsed < wait: - newtask = self.addq.get(True, self.wait_timeout) - elapsed += self.wait_timeout + wait > 0: + try: + newtask = self.addq.get(True, min(wait, self.wait_timeout)) + except QueueEmpty: # Nothing to add, nothing to do. Check run flags and continue waiting. + wait -= self.wait_timeout except QueueEmpty: # Time to run some tasks, and no new tasks to add. self.schedule_lock.acquire() # select only those tasks which are to be executed now -- cgit v1.2.3