From eb4e09b0ca2f699405c448ae7e4b818ff72ea301 Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Tue, 14 Apr 2015 17:46:50 +0200 Subject: XEP-0047: allow only one window over the stream. --- slixmpp/plugins/xep_0047/ibb.py | 9 +++------ slixmpp/plugins/xep_0047/stream.py | 16 +--------------- 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/slixmpp/plugins/xep_0047/ibb.py b/slixmpp/plugins/xep_0047/ibb.py index de801877..d1c708ed 100644 --- a/slixmpp/plugins/xep_0047/ibb.py +++ b/slixmpp/plugins/xep_0047/ibb.py @@ -22,7 +22,6 @@ class XEP_0047(BasePlugin): default_config = { 'block_size': 4096, 'max_block_size': 8192, - 'window_size': 1, 'auto_accept': False, } @@ -106,7 +105,7 @@ class XEP_0047(BasePlugin): def _preauthorize_sid(self, jid, sid, ifrom, data): self._preauthed_sids[(jid, sid, ifrom)] = True - def open_stream(self, jid, block_size=None, sid=None, window=1, use_messages=False, + def open_stream(self, jid, block_size=None, sid=None, use_messages=False, ifrom=None, timeout=None, callback=None): if sid is None: sid = str(uuid.uuid4()) @@ -122,8 +121,7 @@ class XEP_0047(BasePlugin): iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq' stream = IBBytestream(self.xmpp, sid, block_size, - iq['from'], iq['to'], window, - use_messages) + iq['from'], iq['to'], use_messages) self._pending_streams[iq['id']] = stream @@ -168,8 +166,7 @@ class XEP_0047(BasePlugin): raise XMPPError('resource-constraint') stream = IBBytestream(self.xmpp, sid, size, - iq['to'], iq['from'], - self.window_size) + iq['to'], iq['from']) stream.stream_started.set() self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) iq.reply().send() diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index e15a66be..9c9d82a5 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -12,11 +12,10 @@ log = logging.getLogger(__name__) class IBBytestream(object): - def __init__(self, xmpp, sid, block_size, jid, peer, window_size=1, use_messages=False): + def __init__(self, xmpp, sid, block_size, jid, peer, use_messages=False): self.xmpp = xmpp self.sid = sid self.block_size = block_size - self.window_size = window_size self.use_messages = use_messages if jid is None: @@ -33,17 +32,11 @@ class IBBytestream(object): self.recv_queue = Queue() - self.send_window = threading.BoundedSemaphore(value=self.window_size) - self.window_ids = set() - self.window_empty = threading.Event() - self.window_empty.set() - def send(self, data): if not self.stream_started.is_set() or \ self.stream_out_closed.is_set(): raise socket.error data = data[0:self.block_size] - self.send_window.acquire() self.send_seq = (self.send_seq + 1) % 65535 seq = self.send_seq if self.use_messages: @@ -55,7 +48,6 @@ class IBBytestream(object): msg['ibb_data']['seq'] = seq msg['ibb_data']['data'] = data msg.send() - self.send_window.release() else: iq = self.xmpp.Iq() iq['type'] = 'set' @@ -64,8 +56,6 @@ class IBBytestream(object): iq['ibb_data']['sid'] = self.sid iq['ibb_data']['seq'] = seq iq['ibb_data']['data'] = data - self.window_empty.clear() - self.window_ids.add(iq['id']) iq.send(callback=self._recv_ack) return len(data) @@ -75,10 +65,6 @@ class IBBytestream(object): sent_len += self.send(data[sent_len:]) def _recv_ack(self, iq): - self.window_ids.remove(iq['id']) - if not self.window_ids: - self.window_empty.set() - self.send_window.release() if iq['type'] == 'error': self.close() -- cgit v1.2.3