diff options
-rw-r--r-- | slixmpp/plugins/xep_0047/stream.py | 19 |
1 files changed, 7 insertions, 12 deletions
diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index 9b7426a9..3b8c013d 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -1,6 +1,6 @@ +import asyncio import socket import logging -from queue import Queue from slixmpp.stanza import Iq from slixmpp.exceptions import XMPPError @@ -29,7 +29,7 @@ class IBBytestream(object): self.stream_in_closed = False self.stream_out_closed = False - self.recv_queue = Queue() + self.recv_queue = asyncio.Queue() def send(self, data): if not self.stream_started or self.stream_out_closed: @@ -78,24 +78,19 @@ class IBBytestream(object): self.close() raise XMPPError('not-acceptable') - self.recv_queue.put(data) + self.recv_queue.put_nowait(data) self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data}) if isinstance(stanza, Iq): stanza.reply().send() def recv(self, *args, **kwargs): - return self.read(block=True) + return self.read() - def read(self, block=True, timeout=None, **kwargs): + def read(self): if not self.stream_started or self.stream_in_closed: raise socket.error - if timeout is not None: - block = True - try: - return self.recv_queue.get(block, timeout) - except: - return None + return self.recv_queue.get_nowait() def close(self): iq = self.xmpp.Iq() @@ -106,7 +101,7 @@ class IBBytestream(object): self.stream_out_closed = True def _close_stream(_): self.stream_in_closed = True - iq.send(block=False, callback=_close_stream) + iq.send(callback=_close_stream) self.xmpp.event('ibb_stream_end', self) def _closed(self, iq): |