summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sleekxmpp/xmlstream/filesocket.py9
-rw-r--r--sleekxmpp/xmlstream/scheduler.py32
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py15
3 files changed, 39 insertions, 17 deletions
diff --git a/sleekxmpp/xmlstream/filesocket.py b/sleekxmpp/xmlstream/filesocket.py
index d4537998..53b83bc7 100644
--- a/sleekxmpp/xmlstream/filesocket.py
+++ b/sleekxmpp/xmlstream/filesocket.py
@@ -13,6 +13,7 @@
"""
from socket import _fileobject
+import errno
import socket
@@ -29,7 +30,13 @@ class FileSocket(_fileobject):
"""Read data from the socket as if it were a file."""
if self._sock is None:
return None
- data = self._sock.recv(size)
+ while True:
+ try:
+ data = self._sock.recv(size)
+ break
+ except socket.error as serr:
+ if serr.errno != errno.EINTR:
+ raise
if data is not None:
return data
diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py
index bef8f5e5..e6fae37a 100644
--- a/sleekxmpp/xmlstream/scheduler.py
+++ b/sleekxmpp/xmlstream/scheduler.py
@@ -20,6 +20,11 @@ import itertools
from sleekxmpp.util import Queue, QueueEmpty
+#: The time in seconds to wait for events from the event queue, and also the
+#: time between checks for the process stop signal.
+WAIT_TIMEOUT = 1.0
+
+
log = logging.getLogger(__name__)
@@ -120,6 +125,10 @@ class Scheduler(object):
#: Lock for accessing the task queue.
self.schedule_lock = threading.RLock()
+ #: The time in seconds to wait for events from the event queue,
+ #: and also the time between checks for the process stop signal.
+ self.wait_timeout = WAIT_TIMEOUT
+
def process(self, threaded=True, daemon=False):
"""Begin accepting and processing scheduled tasks.
@@ -139,24 +148,25 @@ class Scheduler(object):
self.run = True
try:
while self.run and not self.stop.is_set():
- wait = 0.1
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
+ else:
+ wait = self.wait_timeout
try:
if wait <= 0.0:
newtask = self.addq.get(False)
else:
- if wait >= 3.0:
- wait = 3.0
newtask = None
- elapsed = 0
- while not self.stop.is_set() and \
+ while self.run and \
+ not self.stop.is_set() and \
newtask is None and \
- elapsed < wait:
- newtask = self.addq.get(True, 0.1)
- elapsed += 0.1
- except QueueEmpty:
+ wait > 0:
+ try:
+ newtask = self.addq.get(True, min(wait, self.wait_timeout))
+ except QueueEmpty: # Nothing to add, nothing to do. Check run flags and continue waiting.
+ wait -= self.wait_timeout
+ except QueueEmpty: # Time to run some tasks, and no new tasks to add.
self.schedule_lock.acquire()
# select only those tasks which are to be executed now
relevant = itertools.takewhile(
@@ -174,11 +184,11 @@ class Scheduler(object):
# only need to resort tasks if a repeated task has
# been kept in the list.
updated = True
- else:
- updated = True
+ else: # Add new task
self.schedule_lock.acquire()
if newtask is not None:
self.schedule.append(newtask)
+ updated = True
finally:
if updated:
self.schedule.sort(key=lambda task: task.next)
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index 478bd9c0..54bc7cdf 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -49,7 +49,7 @@ RESPONSE_TIMEOUT = 30
#: The time in seconds to wait for events from the event queue, and also the
#: time between checks for the process stop signal.
-WAIT_TIMEOUT = 0.1
+WAIT_TIMEOUT = 1.0
#: The number of threads to use to handle XML stream events. This is not the
#: same as the number of custom event handling threads.
@@ -1294,6 +1294,9 @@ class XMLStream(object):
try:
sent += self.socket.send(data[sent:])
count += 1
+ except Socket.error as serr:
+ if serr.errno != errno.EINTR:
+ raise
except ssl.SSLError as serr:
if tries >= self.ssl_retry_max:
log.debug('SSL error: max retries reached')
@@ -1629,8 +1632,7 @@ class XMLStream(object):
try:
while not self.stop.is_set():
try:
- wait = self.wait_timeout
- event = self.event_queue.get(True, timeout=wait)
+ event = self.event_queue.get(True, timeout=self.wait_timeout)
except QueueEmpty:
event = None
if event is None:
@@ -1693,13 +1695,13 @@ class XMLStream(object):
while not self.stop.is_set():
while not self.stop.is_set() and \
not self.session_started_event.is_set():
- self.session_started_event.wait(timeout=0.1)
+ self.session_started_event.wait(timeout=0.1) # Wait for session start
if self.__failed_send_stanza is not None:
data = self.__failed_send_stanza
self.__failed_send_stanza = None
else:
try:
- data = self.send_queue.get(True, 1)
+ data = self.send_queue.get(True, timeout=self.wait_timeout) # Wait for data to send
except QueueEmpty:
continue
log.debug("SEND: %s", data)
@@ -1715,6 +1717,9 @@ class XMLStream(object):
try:
sent += self.socket.send(enc_data[sent:])
count += 1
+ except Socket.error as serr:
+ if serr.errno != errno.EINTR:
+ raise
except ssl.SSLError as serr:
if tries >= self.ssl_retry_max:
log.debug('SSL error: max retries reached')