From 766d0dfd405fd1e0c6892d1bd3be9b599648f5d2 Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Tue, 14 Apr 2015 18:52:30 +0200 Subject: =?UTF-8?q?XEP-0047:=20use=20asyncio=E2=80=99s=20Queue=20implement?= =?UTF-8?q?ation,=20to=20prevent=20any=20possibility=20of=20deadlock.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- slixmpp/plugins/xep_0047/stream.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) (limited to 'slixmpp/plugins') diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index 9b7426a9..3b8c013d 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -1,6 +1,6 @@ +import asyncio import socket import logging -from queue import Queue from slixmpp.stanza import Iq from slixmpp.exceptions import XMPPError @@ -29,7 +29,7 @@ class IBBytestream(object): self.stream_in_closed = False self.stream_out_closed = False - self.recv_queue = Queue() + self.recv_queue = asyncio.Queue() def send(self, data): if not self.stream_started or self.stream_out_closed: @@ -78,24 +78,19 @@ class IBBytestream(object): self.close() raise XMPPError('not-acceptable') - self.recv_queue.put(data) + self.recv_queue.put_nowait(data) self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data}) if isinstance(stanza, Iq): stanza.reply().send() def recv(self, *args, **kwargs): - return self.read(block=True) + return self.read() - def read(self, block=True, timeout=None, **kwargs): + def read(self): if not self.stream_started or self.stream_in_closed: raise socket.error - if timeout is not None: - block = True - try: - return self.recv_queue.get(block, timeout) - except: - return None + return self.recv_queue.get_nowait() def close(self): iq = self.xmpp.Iq() @@ -106,7 +101,7 @@ class IBBytestream(object): self.stream_out_closed = True def _close_stream(_): self.stream_in_closed = True - iq.send(block=False, callback=_close_stream) + iq.send(callback=_close_stream) self.xmpp.event('ibb_stream_end', self) def _closed(self, iq): -- cgit v1.2.3