From f12c241dcae5ac29a9a5ab5041f45bede27d93fd Mon Sep 17 00:00:00 2001 From: Anton Ryzhov Date: Mon, 1 Jul 2013 00:38:56 +0400 Subject: Blocking queue get --- sleekxmpp/xmlstream/xmlstream.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) (limited to 'sleekxmpp/xmlstream/xmlstream.py') diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 0beaa499..8242a127 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -462,10 +462,10 @@ class XMLStream(object): time.sleep(0.1) elapsed += 0.1 except KeyboardInterrupt: - self.stop.set() + self.set_stop() return False except SystemExit: - self.stop.set() + self.set_stop() return False if self.default_domain: @@ -707,7 +707,7 @@ class XMLStream(object): self.stream_end_event.set() if not self.auto_reconnect: - self.stop.set() + self.set_stop() if self._disconnect_wait_for_threads: self._wait_for_threads() @@ -724,7 +724,7 @@ class XMLStream(object): def abort(self): self.session_started_event.clear() - self.stop.set() + self.set_stop() if self._disconnect_wait_for_threads: self._wait_for_threads() try: @@ -1360,6 +1360,13 @@ class XMLStream(object): if self.__thread_count == 0: self.__thread_cond.notify() + def set_stop(self): + self.stop.set() + + # Unlock queues + self.event_queue.put(None) + self.send_queue.put(None) + def _wait_for_threads(self): with self.__thread_cond: if self.__thread_count != 0: @@ -1632,10 +1639,7 @@ class XMLStream(object): log.debug("Loading event runner") try: while not self.stop.is_set(): - try: - event = self.event_queue.get(True, timeout=self.wait_timeout) - except QueueEmpty: - event = None + event = self.event_queue.get() if event is None: continue @@ -1701,9 +1705,8 @@ class XMLStream(object): data = self.__failed_send_stanza self.__failed_send_stanza = None else: - try: - data = self.send_queue.get(True, timeout=self.wait_timeout) # Wait for data to send - except QueueEmpty: + data = self.send_queue.get() # Wait for data to send + if data is None: continue log.debug("SEND: %s", data) enc_data = data.encode('utf-8') -- cgit v1.2.3