summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--slixmpp/plugins/xep_0047/stream.py19
1 files changed, 7 insertions, 12 deletions
diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py
index 9b7426a9..3b8c013d 100644
--- a/slixmpp/plugins/xep_0047/stream.py
+++ b/slixmpp/plugins/xep_0047/stream.py
@@ -1,6 +1,6 @@
+import asyncio
import socket
import logging
-from queue import Queue
from slixmpp.stanza import Iq
from slixmpp.exceptions import XMPPError
@@ -29,7 +29,7 @@ class IBBytestream(object):
self.stream_in_closed = False
self.stream_out_closed = False
- self.recv_queue = Queue()
+ self.recv_queue = asyncio.Queue()
def send(self, data):
if not self.stream_started or self.stream_out_closed:
@@ -78,24 +78,19 @@ class IBBytestream(object):
self.close()
raise XMPPError('not-acceptable')
- self.recv_queue.put(data)
+ self.recv_queue.put_nowait(data)
self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data})
if isinstance(stanza, Iq):
stanza.reply().send()
def recv(self, *args, **kwargs):
- return self.read(block=True)
+ return self.read()
- def read(self, block=True, timeout=None, **kwargs):
+ def read(self):
if not self.stream_started or self.stream_in_closed:
raise socket.error
- if timeout is not None:
- block = True
- try:
- return self.recv_queue.get(block, timeout)
- except:
- return None
+ return self.recv_queue.get_nowait()
def close(self):
iq = self.xmpp.Iq()
@@ -106,7 +101,7 @@ class IBBytestream(object):
self.stream_out_closed = True
def _close_stream(_):
self.stream_in_closed = True
- iq.send(block=False, callback=_close_stream)
+ iq.send(callback=_close_stream)
self.xmpp.event('ibb_stream_end', self)
def _closed(self, iq):