diff options
Diffstat (limited to 'slixmpp')
-rw-r--r-- | slixmpp/plugins/xep_0047/ibb.py | 86 | ||||
-rw-r--r-- | slixmpp/plugins/xep_0047/stanza.py | 7 | ||||
-rw-r--r-- | slixmpp/plugins/xep_0047/stream.py | 108 |
3 files changed, 80 insertions, 121 deletions
diff --git a/slixmpp/plugins/xep_0047/ibb.py b/slixmpp/plugins/xep_0047/ibb.py index 87cd1f51..52d7fbe5 100644 --- a/slixmpp/plugins/xep_0047/ibb.py +++ b/slixmpp/plugins/xep_0047/ibb.py @@ -1,6 +1,6 @@ +import asyncio import uuid import logging -import threading from slixmpp import Message, Iq from slixmpp.exceptions import XMPPError @@ -23,17 +23,11 @@ class XEP_0047(BasePlugin): default_config = { 'block_size': 4096, 'max_block_size': 8192, - 'window_size': 1, 'auto_accept': False, } def plugin_init(self): self._streams = {} - self._pending_streams = {} - self._pending_lock = threading.Lock() - self._stream_lock = threading.Lock() - - self._preauthed_sids_lock = threading.Lock() self._preauthed_sids = {} register_stanza_plugin(Iq, Open) @@ -85,9 +79,8 @@ class XEP_0047(BasePlugin): self._streams[(jid, sid, peer_jid)] = stream def _del_stream(self, jid, sid, peer_jid, data): - with self._stream_lock: - if (jid, sid, peer_jid) in self._streams: - del self._streams[(jid, sid, peer_jid)] + if (jid, sid, peer_jid) in self._streams: + del self._streams[(jid, sid, peer_jid)] def _accept_stream(self, iq): receiver = iq['to'] @@ -100,22 +93,19 @@ class XEP_0047(BasePlugin): def _authorized(self, jid, sid, ifrom, iq): if self.auto_accept: - if iq['ibb_open']['block_size'] <= self.max_block_size: - return True + return True return False def _authorized_sid(self, jid, sid, ifrom, iq): - with self._preauthed_sids_lock: - if (jid, sid, ifrom) in self._preauthed_sids: - del self._preauthed_sids[(jid, sid, ifrom)] - return True - return False + if (jid, sid, ifrom) in self._preauthed_sids: + del self._preauthed_sids[(jid, sid, ifrom)] + return True + return False def _preauthorize_sid(self, jid, sid, ifrom, data): - with self._preauthed_sids_lock: - self._preauthed_sids[(jid, sid, ifrom)] = True + self._preauthed_sids[(jid, sid, ifrom)] = True - def open_stream(self, jid, block_size=None, sid=None, window=1, use_messages=False, + def open_stream(self, jid, block_size=None, sid=None, use_messages=False, ifrom=None, timeout=None, callback=None): if sid is None: sid = str(uuid.uuid4()) @@ -128,43 +118,28 @@ class XEP_0047(BasePlugin): iq['from'] = ifrom iq['ibb_open']['block_size'] = block_size iq['ibb_open']['sid'] = sid - iq['ibb_open']['stanza'] = 'iq' + iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq' stream = IBBytestream(self.xmpp, sid, block_size, - iq['from'], iq['to'], window, - use_messages) + iq['from'], iq['to'], use_messages) - with self._stream_lock: - self._pending_streams[iq['id']] = stream + stream_future = asyncio.Future() - self._pending_streams[iq['id']] = stream + def _handle_opened_stream(iq): + log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from']) + stream.self_jid = iq['to'] + stream.peer_jid = iq['from'] + stream.stream_started = True + self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) + stream_future.set_result(stream) + if callback is not None: + callback(stream) + self.xmpp.event('ibb_stream_start', stream) + self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream) - cb = None - if callback is not None: - def chained(resp): - self._handle_opened_stream(resp) - callback(resp) - cb = chained - else: - cb = self._handle_opened_stream - return iq.send(timeout=timeout, callback=cb) - - def _handle_opened_stream(self, iq): - if iq['type'] == 'result': - with self._stream_lock: - stream = self._pending_streams.get(iq['id'], None) - if stream is not None: - log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from']) - stream.self_jid = iq['to'] - stream.peer_jid = iq['from'] - stream.stream_started.set() - self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) - self.xmpp.event('ibb_stream_start', stream) - self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream) - - with self._stream_lock: - if iq['id'] in self._pending_streams: - del self._pending_streams[iq['id']] + iq.send(timeout=timeout, callback=_handle_opened_stream) + + return stream_future def _handle_open_request(self, iq): sid = iq['ibb_open']['sid'] @@ -176,15 +151,14 @@ class XEP_0047(BasePlugin): raise XMPPError(etype='modify', condition='bad-request') if not self._accept_stream(iq): - raise XMPPError(etype='modify', condition='not-acceptable') + raise XMPPError(etype='cancel', condition='not-acceptable') if size > self.max_block_size: raise XMPPError('resource-constraint') stream = IBBytestream(self.xmpp, sid, size, - iq['to'], iq['from'], - self.window_size) - stream.stream_started.set() + iq['to'], iq['from']) + stream.stream_started = True self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) iq.reply().send() diff --git a/slixmpp/plugins/xep_0047/stanza.py b/slixmpp/plugins/xep_0047/stanza.py index 62199077..7f8ff0ba 100644 --- a/slixmpp/plugins/xep_0047/stanza.py +++ b/slixmpp/plugins/xep_0047/stanza.py @@ -24,7 +24,7 @@ class Open(ElementBase): interfaces = set(('block_size', 'sid', 'stanza')) def get_block_size(self): - return int(self._get_attr('block-size')) + return int(self._get_attr('block-size', '0')) def set_block_size(self, value): self._set_attr('block-size', str(value)) @@ -47,7 +47,10 @@ class Data(ElementBase): self._set_attr('seq', str(value)) def get_data(self): - b64_data = self.xml.text.strip() + text = self.xml.text + if not text: + raise XMPPError('not-acceptable', 'IBB data element is empty.') + b64_data = text.strip() if VALID_B64.match(b64_data).group() == b64_data: return from_b64(b64_data) else: diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index 817f96a1..3be894eb 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -1,7 +1,6 @@ +import asyncio import socket -import threading import logging -from queue import Queue from slixmpp.stanza import Iq from slixmpp.exceptions import XMPPError @@ -12,11 +11,10 @@ log = logging.getLogger(__name__) class IBBytestream(object): - def __init__(self, xmpp, sid, block_size, jid, peer, window_size=1, use_messages=False): + def __init__(self, xmpp, sid, block_size, jid, peer, use_messages=False): self.xmpp = xmpp self.sid = sid self.block_size = block_size - self.window_size = window_size self.use_messages = use_messages if jid is None: @@ -27,29 +25,20 @@ class IBBytestream(object): self.send_seq = -1 self.recv_seq = -1 - self._send_seq_lock = threading.Lock() - self._recv_seq_lock = threading.Lock() + self.stream_started = False + self.stream_in_closed = False + self.stream_out_closed = False - self.stream_started = threading.Event() - self.stream_in_closed = threading.Event() - self.stream_out_closed = threading.Event() + self.recv_queue = asyncio.Queue() - self.recv_queue = Queue() - - self.send_window = threading.BoundedSemaphore(value=self.window_size) - self.window_ids = set() - self.window_empty = threading.Event() - self.window_empty.set() - - def send(self, data): - if not self.stream_started.is_set() or \ - self.stream_out_closed.is_set(): + @asyncio.coroutine + def send(self, data, timeout=None): + if not self.stream_started or self.stream_out_closed: raise socket.error - data = data[0:self.block_size] - self.send_window.acquire() - with self._send_seq_lock: - self.send_seq = (self.send_seq + 1) % 65535 - seq = self.send_seq + if len(data) > self.block_size: + data = data[:self.block_size] + self.send_seq = (self.send_seq + 1) % 65535 + seq = self.send_seq if self.use_messages: msg = self.xmpp.Message() msg['to'] = self.peer_jid @@ -59,7 +48,6 @@ class IBBytestream(object): msg['ibb_data']['seq'] = seq msg['ibb_data']['data'] = data msg.send() - self.send_window.release() else: iq = self.xmpp.Iq() iq['type'] = 'set' @@ -68,71 +56,65 @@ class IBBytestream(object): iq['ibb_data']['sid'] = self.sid iq['ibb_data']['seq'] = seq iq['ibb_data']['data'] = data - self.window_empty.clear() - self.window_ids.add(iq['id']) - iq.send(callback=self._recv_ack) + yield from iq.send(timeout=timeout) return len(data) - def sendall(self, data): + @asyncio.coroutine + def sendall(self, data, timeout=None): sent_len = 0 while sent_len < len(data): - sent_len += self.send(data[sent_len:]) - - def _recv_ack(self, iq): - self.window_ids.remove(iq['id']) - if not self.window_ids: - self.window_empty.set() - self.send_window.release() - if iq['type'] == 'error': - self.close() + sent_len += yield from self.send(data[sent_len:self.block_size], timeout=timeout) + + @asyncio.coroutine + def sendfile(self, file, timeout=None): + while True: + data = file.read(self.block_size) + if not data: + break + yield from self.send(data, timeout=timeout) def _recv_data(self, stanza): - with self._recv_seq_lock: - new_seq = stanza['ibb_data']['seq'] - if new_seq != (self.recv_seq + 1) % 65535: - self.close() - raise XMPPError('unexpected-request') - self.recv_seq = new_seq + new_seq = stanza['ibb_data']['seq'] + if new_seq != (self.recv_seq + 1) % 65535: + self.close() + raise XMPPError('unexpected-request') + self.recv_seq = new_seq data = stanza['ibb_data']['data'] if len(data) > self.block_size: self.close() raise XMPPError('not-acceptable') - self.recv_queue.put(data) - self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data}) + self.recv_queue.put_nowait(data) + self.xmpp.event('ibb_stream_data', self) if isinstance(stanza, Iq): stanza.reply().send() def recv(self, *args, **kwargs): - return self.read(block=True) + return self.read() - def read(self, block=True, timeout=None, **kwargs): - if not self.stream_started.is_set() or \ - self.stream_in_closed.is_set(): + def read(self): + if not self.stream_started or self.stream_in_closed: raise socket.error - if timeout is not None: - block = True - try: - return self.recv_queue.get(block, timeout) - except: - return None - - def close(self): + return self.recv_queue.get_nowait() + + def close(self, timeout=None): iq = self.xmpp.Iq() iq['type'] = 'set' iq['to'] = self.peer_jid iq['from'] = self.self_jid iq['ibb_close']['sid'] = self.sid - self.stream_out_closed.set() - iq.send(block=False, - callback=lambda x: self.stream_in_closed.set()) + self.stream_out_closed = True + def _close_stream(_): + self.stream_in_closed = True + future = iq.send(timeout=timeout, callback=_close_stream) self.xmpp.event('ibb_stream_end', self) + return future def _closed(self, iq): - self.stream_in_closed.set() - self.stream_out_closed.set() + self.stream_in_closed = True + self.stream_out_closed = True iq.reply().send() self.xmpp.event('ibb_stream_end', self) |