summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream/xmlstream.py
diff options
context:
space:
mode:
authorAnton Ryzhov <anton@ryzhov.me>2013-07-01 00:38:56 +0400
committerAnton Ryzhov <anton@ryzhov.me>2013-07-01 10:30:43 +0400
commitf12c241dcae5ac29a9a5ab5041f45bede27d93fd (patch)
tree015a0266ac41514cd11f0026ba433b8fea6669df /sleekxmpp/xmlstream/xmlstream.py
parentcedc9dd175afba02e7beba21dc9eb4de1e63623d (diff)
downloadslixmpp-f12c241dcae5ac29a9a5ab5041f45bede27d93fd.tar.gz
slixmpp-f12c241dcae5ac29a9a5ab5041f45bede27d93fd.tar.bz2
slixmpp-f12c241dcae5ac29a9a5ab5041f45bede27d93fd.tar.xz
slixmpp-f12c241dcae5ac29a9a5ab5041f45bede27d93fd.zip
Blocking queue get
Diffstat (limited to 'sleekxmpp/xmlstream/xmlstream.py')
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py25
1 files changed, 14 insertions, 11 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index 0beaa499..8242a127 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -462,10 +462,10 @@ class XMLStream(object):
time.sleep(0.1)
elapsed += 0.1
except KeyboardInterrupt:
- self.stop.set()
+ self.set_stop()
return False
except SystemExit:
- self.stop.set()
+ self.set_stop()
return False
if self.default_domain:
@@ -707,7 +707,7 @@ class XMLStream(object):
self.stream_end_event.set()
if not self.auto_reconnect:
- self.stop.set()
+ self.set_stop()
if self._disconnect_wait_for_threads:
self._wait_for_threads()
@@ -724,7 +724,7 @@ class XMLStream(object):
def abort(self):
self.session_started_event.clear()
- self.stop.set()
+ self.set_stop()
if self._disconnect_wait_for_threads:
self._wait_for_threads()
try:
@@ -1360,6 +1360,13 @@ class XMLStream(object):
if self.__thread_count == 0:
self.__thread_cond.notify()
+ def set_stop(self):
+ self.stop.set()
+
+ # Unlock queues
+ self.event_queue.put(None)
+ self.send_queue.put(None)
+
def _wait_for_threads(self):
with self.__thread_cond:
if self.__thread_count != 0:
@@ -1632,10 +1639,7 @@ class XMLStream(object):
log.debug("Loading event runner")
try:
while not self.stop.is_set():
- try:
- event = self.event_queue.get(True, timeout=self.wait_timeout)
- except QueueEmpty:
- event = None
+ event = self.event_queue.get()
if event is None:
continue
@@ -1701,9 +1705,8 @@ class XMLStream(object):
data = self.__failed_send_stanza
self.__failed_send_stanza = None
else:
- try:
- data = self.send_queue.get(True, timeout=self.wait_timeout) # Wait for data to send
- except QueueEmpty:
+ data = self.send_queue.get() # Wait for data to send
+ if data is None:
continue
log.debug("SEND: %s", data)
enc_data = data.encode('utf-8')