summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream/xmlstream.py
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream/xmlstream.py')
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py49
1 files changed, 31 insertions, 18 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index 1c0b84b9..8242a127 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -26,6 +26,7 @@ import time
import random
import weakref
import uuid
+import errno
from xml.parsers.expat import ExpatError
@@ -49,7 +50,7 @@ RESPONSE_TIMEOUT = 30
#: The time in seconds to wait for events from the event queue, and also the
#: time between checks for the process stop signal.
-WAIT_TIMEOUT = 0.1
+WAIT_TIMEOUT = 1.0
#: The number of threads to use to handle XML stream events. This is not the
#: same as the number of custom event handling threads.
@@ -461,10 +462,10 @@ class XMLStream(object):
time.sleep(0.1)
elapsed += 0.1
except KeyboardInterrupt:
- self.stop.set()
+ self.set_stop()
return False
except SystemExit:
- self.stop.set()
+ self.set_stop()
return False
if self.default_domain:
@@ -706,7 +707,7 @@ class XMLStream(object):
self.stream_end_event.set()
if not self.auto_reconnect:
- self.stop.set()
+ self.set_stop()
if self._disconnect_wait_for_threads:
self._wait_for_threads()
@@ -723,7 +724,7 @@ class XMLStream(object):
def abort(self):
self.session_started_event.clear()
- self.stop.set()
+ self.set_stop()
if self._disconnect_wait_for_threads:
self._wait_for_threads()
try:
@@ -1017,8 +1018,12 @@ class XMLStream(object):
if name is None:
name = 'add_handler_%s' % self.new_id()
- self.register_handler(XMLCallback(name, MatchXMLMask(mask), pointer,
- once=disposable, instream=instream))
+ self.register_handler(
+ XMLCallback(name,
+ MatchXMLMask(mask, self.default_ns),
+ pointer,
+ once=disposable,
+ instream=instream))
def register_handler(self, handler, before=None, after=None):
"""Add a stream event handler that will be executed when a matching
@@ -1290,6 +1295,9 @@ class XMLStream(object):
try:
sent += self.socket.send(data[sent:])
count += 1
+ except Socket.error as serr:
+ if serr.errno != errno.EINTR:
+ raise
except ssl.SSLError as serr:
if tries >= self.ssl_retry_max:
log.debug('SSL error: max retries reached')
@@ -1352,6 +1360,13 @@ class XMLStream(object):
if self.__thread_count == 0:
self.__thread_cond.notify()
+ def set_stop(self):
+ self.stop.set()
+
+ # Unlock queues
+ self.event_queue.put(None)
+ self.send_queue.put(None)
+
def _wait_for_threads(self):
with self.__thread_cond:
if self.__thread_count != 0:
@@ -1624,11 +1639,7 @@ class XMLStream(object):
log.debug("Loading event runner")
try:
while not self.stop.is_set():
- try:
- wait = self.wait_timeout
- event = self.event_queue.get(True, timeout=wait)
- except QueueEmpty:
- event = None
+ event = self.event_queue.get()
if event is None:
continue
@@ -1644,10 +1655,10 @@ class XMLStream(object):
log.exception(error_msg, handler.name)
orig.exception(e)
elif etype == 'schedule':
- name = args[1]
+ name = args[2]
try:
log.debug('Scheduled event: %s: %s', name, args[0])
- handler(*args[0])
+ handler(*args[0], **args[1])
except Exception as e:
log.exception('Error processing scheduled task')
self.exception(e)
@@ -1689,14 +1700,13 @@ class XMLStream(object):
while not self.stop.is_set():
while not self.stop.is_set() and \
not self.session_started_event.is_set():
- self.session_started_event.wait(timeout=0.1)
+ self.session_started_event.wait(timeout=0.1) # Wait for session start
if self.__failed_send_stanza is not None:
data = self.__failed_send_stanza
self.__failed_send_stanza = None
else:
- try:
- data = self.send_queue.get(True, 1)
- except QueueEmpty:
+ data = self.send_queue.get() # Wait for data to send
+ if data is None:
continue
log.debug("SEND: %s", data)
enc_data = data.encode('utf-8')
@@ -1711,6 +1721,9 @@ class XMLStream(object):
try:
sent += self.socket.send(enc_data[sent:])
count += 1
+ except Socket.error as serr:
+ if serr.errno != errno.EINTR:
+ raise
except ssl.SSLError as serr:
if tries >= self.ssl_retry_max:
log.debug('SSL error: max retries reached')