diff options
author | Emmanuel Gil Peyrot <linkmauve@linkmauve.fr> | 2015-04-14 18:52:30 +0200 |
---|---|---|
committer | Emmanuel Gil Peyrot <linkmauve@linkmauve.fr> | 2015-04-19 20:48:01 +0200 |
commit | 766d0dfd405fd1e0c6892d1bd3be9b599648f5d2 (patch) | |
tree | 257d846875a1641e35061d9d4a78cdf2ab3796b1 | |
parent | ac31913a653e42d288a0ae477866ac06ad9bf0a3 (diff) | |
download | slixmpp-766d0dfd405fd1e0c6892d1bd3be9b599648f5d2.tar.gz slixmpp-766d0dfd405fd1e0c6892d1bd3be9b599648f5d2.tar.bz2 slixmpp-766d0dfd405fd1e0c6892d1bd3be9b599648f5d2.tar.xz slixmpp-766d0dfd405fd1e0c6892d1bd3be9b599648f5d2.zip |
XEP-0047: use asyncio’s Queue implementation, to prevent any possibility of deadlock.
-rw-r--r-- | slixmpp/plugins/xep_0047/stream.py | 19 |
1 files changed, 7 insertions, 12 deletions
diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index 9b7426a9..3b8c013d 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -1,6 +1,6 @@ +import asyncio import socket import logging -from queue import Queue from slixmpp.stanza import Iq from slixmpp.exceptions import XMPPError @@ -29,7 +29,7 @@ class IBBytestream(object): self.stream_in_closed = False self.stream_out_closed = False - self.recv_queue = Queue() + self.recv_queue = asyncio.Queue() def send(self, data): if not self.stream_started or self.stream_out_closed: @@ -78,24 +78,19 @@ class IBBytestream(object): self.close() raise XMPPError('not-acceptable') - self.recv_queue.put(data) + self.recv_queue.put_nowait(data) self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data}) if isinstance(stanza, Iq): stanza.reply().send() def recv(self, *args, **kwargs): - return self.read(block=True) + return self.read() - def read(self, block=True, timeout=None, **kwargs): + def read(self): if not self.stream_started or self.stream_in_closed: raise socket.error - if timeout is not None: - block = True - try: - return self.recv_queue.get(block, timeout) - except: - return None + return self.recv_queue.get_nowait() def close(self): iq = self.xmpp.Iq() @@ -106,7 +101,7 @@ class IBBytestream(object): self.stream_out_closed = True def _close_stream(_): self.stream_in_closed = True - iq.send(block=False, callback=_close_stream) + iq.send(callback=_close_stream) self.xmpp.event('ibb_stream_end', self) def _closed(self, iq): |