summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r--sleekxmpp/xmlstream/cert.py4
-rw-r--r--sleekxmpp/xmlstream/filesocket.py9
-rw-r--r--sleekxmpp/xmlstream/matcher/xmlmask.py4
-rw-r--r--sleekxmpp/xmlstream/resolver.py5
-rw-r--r--sleekxmpp/xmlstream/scheduler.py34
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py49
6 files changed, 71 insertions, 34 deletions
diff --git a/sleekxmpp/xmlstream/cert.py b/sleekxmpp/xmlstream/cert.py
index fa12f794..71146f36 100644
--- a/sleekxmpp/xmlstream/cert.py
+++ b/sleekxmpp/xmlstream/cert.py
@@ -1,6 +1,10 @@
import logging
from datetime import datetime, timedelta
+# Make a call to strptime before starting threads to
+# prevent thread safety issues.
+datetime.strptime('1970-01-01 12:00:00', "%Y-%m-%d %H:%M:%S")
+
try:
from pyasn1.codec.der import decoder, encoder
diff --git a/sleekxmpp/xmlstream/filesocket.py b/sleekxmpp/xmlstream/filesocket.py
index d4537998..53b83bc7 100644
--- a/sleekxmpp/xmlstream/filesocket.py
+++ b/sleekxmpp/xmlstream/filesocket.py
@@ -13,6 +13,7 @@
"""
from socket import _fileobject
+import errno
import socket
@@ -29,7 +30,13 @@ class FileSocket(_fileobject):
"""Read data from the socket as if it were a file."""
if self._sock is None:
return None
- data = self._sock.recv(size)
+ while True:
+ try:
+ data = self._sock.recv(size)
+ break
+ except socket.error as serr:
+ if serr.errno != errno.EINTR:
+ raise
if data is not None:
return data
diff --git a/sleekxmpp/xmlstream/matcher/xmlmask.py b/sleekxmpp/xmlstream/matcher/xmlmask.py
index cb202448..56f728e1 100644
--- a/sleekxmpp/xmlstream/matcher/xmlmask.py
+++ b/sleekxmpp/xmlstream/matcher/xmlmask.py
@@ -37,11 +37,11 @@ class MatchXMLMask(MatcherBase):
object or XML string to use as a mask.
"""
- def __init__(self, criteria):
+ def __init__(self, criteria, default_ns='jabber:client'):
MatcherBase.__init__(self, criteria)
if isinstance(criteria, str):
self._criteria = ET.fromstring(self._criteria)
- self.default_ns = 'jabber:client'
+ self.default_ns = default_ns
def setDefaultNS(self, ns):
"""Set the default namespace to use during comparisons.
diff --git a/sleekxmpp/xmlstream/resolver.py b/sleekxmpp/xmlstream/resolver.py
index 16f8a7ad..6f26797f 100644
--- a/sleekxmpp/xmlstream/resolver.py
+++ b/sleekxmpp/xmlstream/resolver.py
@@ -202,11 +202,14 @@ def get_AAAA(host, resolver=None):
# If not using dnspython, attempt lookup using the OS level
# getaddrinfo() method.
if resolver is None:
+ if not socket.has_ipv6:
+ log.debug("Unable to query %s for AAAA records: IPv6 is not supported", host)
+ return []
try:
recs = socket.getaddrinfo(host, None, socket.AF_INET6,
socket.SOCK_STREAM)
return [rec[4][0] for rec in recs]
- except socket.gaierror:
+ except (OSError, socket.gaierror):
log.debug("DNS: Error retreiving AAAA address " + \
"info for %s." % host)
return []
diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py
index b3e50983..e6fae37a 100644
--- a/sleekxmpp/xmlstream/scheduler.py
+++ b/sleekxmpp/xmlstream/scheduler.py
@@ -20,6 +20,11 @@ import itertools
from sleekxmpp.util import Queue, QueueEmpty
+#: The time in seconds to wait for events from the event queue, and also the
+#: time between checks for the process stop signal.
+WAIT_TIMEOUT = 1.0
+
+
log = logging.getLogger(__name__)
@@ -76,7 +81,7 @@ class Task(object):
"""
if self.qpointer is not None:
self.qpointer.put(('schedule', self.callback,
- self.args, self.name))
+ self.args, self.kwargs, self.name))
else:
self.callback(*self.args, **self.kwargs)
self.reset()
@@ -120,6 +125,10 @@ class Scheduler(object):
#: Lock for accessing the task queue.
self.schedule_lock = threading.RLock()
+ #: The time in seconds to wait for events from the event queue,
+ #: and also the time between checks for the process stop signal.
+ self.wait_timeout = WAIT_TIMEOUT
+
def process(self, threaded=True, daemon=False):
"""Begin accepting and processing scheduled tasks.
@@ -139,24 +148,25 @@ class Scheduler(object):
self.run = True
try:
while self.run and not self.stop.is_set():
- wait = 0.1
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
+ else:
+ wait = self.wait_timeout
try:
if wait <= 0.0:
newtask = self.addq.get(False)
else:
- if wait >= 3.0:
- wait = 3.0
newtask = None
- elapsed = 0
- while not self.stop.is_set() and \
+ while self.run and \
+ not self.stop.is_set() and \
newtask is None and \
- elapsed < wait:
- newtask = self.addq.get(True, 0.1)
- elapsed += 0.1
- except QueueEmpty:
+ wait > 0:
+ try:
+ newtask = self.addq.get(True, min(wait, self.wait_timeout))
+ except QueueEmpty: # Nothing to add, nothing to do. Check run flags and continue waiting.
+ wait -= self.wait_timeout
+ except QueueEmpty: # Time to run some tasks, and no new tasks to add.
self.schedule_lock.acquire()
# select only those tasks which are to be executed now
relevant = itertools.takewhile(
@@ -174,11 +184,11 @@ class Scheduler(object):
# only need to resort tasks if a repeated task has
# been kept in the list.
updated = True
- else:
- updated = True
+ else: # Add new task
self.schedule_lock.acquire()
if newtask is not None:
self.schedule.append(newtask)
+ updated = True
finally:
if updated:
self.schedule.sort(key=lambda task: task.next)
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index 1c0b84b9..8242a127 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -26,6 +26,7 @@ import time
import random
import weakref
import uuid
+import errno
from xml.parsers.expat import ExpatError
@@ -49,7 +50,7 @@ RESPONSE_TIMEOUT = 30
#: The time in seconds to wait for events from the event queue, and also the
#: time between checks for the process stop signal.
-WAIT_TIMEOUT = 0.1
+WAIT_TIMEOUT = 1.0
#: The number of threads to use to handle XML stream events. This is not the
#: same as the number of custom event handling threads.
@@ -461,10 +462,10 @@ class XMLStream(object):
time.sleep(0.1)
elapsed += 0.1
except KeyboardInterrupt:
- self.stop.set()
+ self.set_stop()
return False
except SystemExit:
- self.stop.set()
+ self.set_stop()
return False
if self.default_domain:
@@ -706,7 +707,7 @@ class XMLStream(object):
self.stream_end_event.set()
if not self.auto_reconnect:
- self.stop.set()
+ self.set_stop()
if self._disconnect_wait_for_threads:
self._wait_for_threads()
@@ -723,7 +724,7 @@ class XMLStream(object):
def abort(self):
self.session_started_event.clear()
- self.stop.set()
+ self.set_stop()
if self._disconnect_wait_for_threads:
self._wait_for_threads()
try:
@@ -1017,8 +1018,12 @@ class XMLStream(object):
if name is None:
name = 'add_handler_%s' % self.new_id()
- self.register_handler(XMLCallback(name, MatchXMLMask(mask), pointer,
- once=disposable, instream=instream))
+ self.register_handler(
+ XMLCallback(name,
+ MatchXMLMask(mask, self.default_ns),
+ pointer,
+ once=disposable,
+ instream=instream))
def register_handler(self, handler, before=None, after=None):
"""Add a stream event handler that will be executed when a matching
@@ -1290,6 +1295,9 @@ class XMLStream(object):
try:
sent += self.socket.send(data[sent:])
count += 1
+ except Socket.error as serr:
+ if serr.errno != errno.EINTR:
+ raise
except ssl.SSLError as serr:
if tries >= self.ssl_retry_max:
log.debug('SSL error: max retries reached')
@@ -1352,6 +1360,13 @@ class XMLStream(object):
if self.__thread_count == 0:
self.__thread_cond.notify()
+ def set_stop(self):
+ self.stop.set()
+
+ # Unlock queues
+ self.event_queue.put(None)
+ self.send_queue.put(None)
+
def _wait_for_threads(self):
with self.__thread_cond:
if self.__thread_count != 0:
@@ -1624,11 +1639,7 @@ class XMLStream(object):
log.debug("Loading event runner")
try:
while not self.stop.is_set():
- try:
- wait = self.wait_timeout
- event = self.event_queue.get(True, timeout=wait)
- except QueueEmpty:
- event = None
+ event = self.event_queue.get()
if event is None:
continue
@@ -1644,10 +1655,10 @@ class XMLStream(object):
log.exception(error_msg, handler.name)
orig.exception(e)
elif etype == 'schedule':
- name = args[1]
+ name = args[2]
try:
log.debug('Scheduled event: %s: %s', name, args[0])
- handler(*args[0])
+ handler(*args[0], **args[1])
except Exception as e:
log.exception('Error processing scheduled task')
self.exception(e)
@@ -1689,14 +1700,13 @@ class XMLStream(object):
while not self.stop.is_set():
while not self.stop.is_set() and \
not self.session_started_event.is_set():
- self.session_started_event.wait(timeout=0.1)
+ self.session_started_event.wait(timeout=0.1) # Wait for session start
if self.__failed_send_stanza is not None:
data = self.__failed_send_stanza
self.__failed_send_stanza = None
else:
- try:
- data = self.send_queue.get(True, 1)
- except QueueEmpty:
+ data = self.send_queue.get() # Wait for data to send
+ if data is None:
continue
log.debug("SEND: %s", data)
enc_data = data.encode('utf-8')
@@ -1711,6 +1721,9 @@ class XMLStream(object):
try:
sent += self.socket.send(enc_data[sent:])
count += 1
+ except Socket.error as serr:
+ if serr.errno != errno.EINTR:
+ raise
except ssl.SSLError as serr:
if tries >= self.ssl_retry_max:
log.debug('SSL error: max retries reached')