From 02e0afbf0f5e56b15eec481bb4750422715b9562 Mon Sep 17 00:00:00 2001 From: mathieui Date: Sat, 13 Feb 2021 16:13:30 +0100 Subject: XEP-0047: Better typing, docs, coroutine. - Add a gather() shortcut to buffer all data received in a stream - Fix a bug in sendall that happens if the data is above the block size. --- docs/api/plugins/xep_0047.rst | 4 ++ slixmpp/plugins/xep_0047/ibb.py | 89 ++++++++++++++++++++++++++------------ slixmpp/plugins/xep_0047/stanza.py | 3 ++ slixmpp/plugins/xep_0047/stream.py | 86 +++++++++++++++++++++++++++++++----- 4 files changed, 144 insertions(+), 38 deletions(-) diff --git a/docs/api/plugins/xep_0047.rst b/docs/api/plugins/xep_0047.rst index 4efded9b..c8aea741 100644 --- a/docs/api/plugins/xep_0047.rst +++ b/docs/api/plugins/xep_0047.rst @@ -8,6 +8,10 @@ XEP-0047: In-band Bytestreams :members: :exclude-members: session_bind, plugin_init, plugin_end +.. module:: slixmpp.plugins.xep_0047 + +.. autoclass:: IBBytestream + :members: Stanza elements --------------- diff --git a/slixmpp/plugins/xep_0047/ibb.py b/slixmpp/plugins/xep_0047/ibb.py index bd96eca2..ec08a8b3 100644 --- a/slixmpp/plugins/xep_0047/ibb.py +++ b/slixmpp/plugins/xep_0047/ibb.py @@ -1,8 +1,17 @@ +# Slixmpp: The Slick XMPP Library +# This file is part of Slixmpp +# See the file LICENSE for copying permission import asyncio import uuid import logging -from slixmpp import Message, Iq +from typing import ( + Optional, + Union, +) + +from slixmpp import JID +from slixmpp.stanza import Message, Iq from slixmpp.exceptions import XMPPError from slixmpp.xmlstream.handler import Callback from slixmpp.xmlstream.matcher import StanzaPath @@ -15,9 +24,27 @@ log = logging.getLogger(__name__) class XEP_0047(BasePlugin): + """ + XEP-0047: In-Band Bytestreams + + Events registered by this plugin: + + - :term:`ibb_stream_start` + - :term:`ibb_stream_end` + - :term:`ibb_stream_data` + - :term:`stream:[stream id]:[peer jid]` + + Plugin Parameters: + + - ``block_size`` (default: ``4096``): default block size to negociate + - ``max_block_size`` (default: ``8192``): max block size to accept + - ``auto_accept`` (default: ``False``): if incoming streams should be + accepted automatically. + + """ name = 'xep_0047' - description = 'XEP-0047: In-band Bytestreams' + description = 'XEP-0047: In-Band Bytestreams' dependencies = {'xep_0030'} stanza = stanza default_config = { @@ -105,17 +132,29 @@ class XEP_0047(BasePlugin): def _preauthorize_sid(self, jid, sid, ifrom, data): self._preauthed_sids[(jid, sid, ifrom)] = True - def open_stream(self, jid, block_size=None, sid=None, use_messages=False, - ifrom=None, timeout=None, callback=None): + async def open_stream(self, jid: JID, *, block_size: Optional[int] = None, + sid: Optional[str] = None, use_messages: bool = False, + ifrom: Optional[JID] = None, + **iqkwargs) -> IBBytestream: + """Open an IBB stream with a peer JID. + + .. versionchanged:: 1.8.0 + This function is now a coroutine and must be awaited. + All parameters except ``jid`` are keyword-args only. + + :param jid: The remote JID to initiate the stream with. + :param block_size: The block size to advertise. + :param sid: The IBB stream id (if not provided, will be auto-generated). + :param use_messages: If the stream should use message stanzas instead of iqs. + :returns: The opened byte stream with the remote JID + :raises .IqError: When the remote entity denied the stream. + """ if sid is None: sid = str(uuid.uuid4()) if block_size is None: block_size = self.block_size - iq = self.xmpp.Iq() - iq['type'] = 'set' - iq['to'] = jid - iq['from'] = ifrom + iq = self.xmpp.make_iq_set(ito=jid, ifrom=ifrom) iq['ibb_open']['block_size'] = block_size iq['ibb_open']['sid'] = sid iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq' @@ -123,25 +162,21 @@ class XEP_0047(BasePlugin): stream = IBBytestream(self.xmpp, sid, block_size, iq['from'], iq['to'], use_messages) - stream_future = asyncio.Future() + callback = iqkwargs.pop('callback', None) + result = await iq.send(**iqkwargs) - def _handle_opened_stream(iq): - log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from']) - stream.self_jid = iq['to'] - stream.peer_jid = iq['from'] - stream.stream_started = True - self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) - stream_future.set_result(stream) - if callback is not None: - callback(stream) - self.xmpp.event('ibb_stream_start', stream) - self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream) - - iq.send(timeout=timeout, callback=_handle_opened_stream) - - return stream_future + log.debug('IBB stream (%s) accepted by %s', stream.sid, result['from']) + stream.self_jid = result['to'] + stream.peer_jid = result['from'] + stream.stream_started = True + self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) + if callback is not None: + self.xmpp.add_event_handler('ibb_stream_start', callback, disposable=True) + self.xmpp.event('ibb_stream_start', stream) + self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream) + return stream - def _handle_open_request(self, iq): + def _handle_open_request(self, iq: Iq): sid = iq['ibb_open']['sid'] size = iq['ibb_open']['block_size'] or self.block_size @@ -165,7 +200,7 @@ class XEP_0047(BasePlugin): self.xmpp.event('ibb_stream_start', stream) self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream) - def _handle_data(self, stanza): + def _handle_data(self, stanza: Union[Iq, Message]): sid = stanza['ibb_data']['sid'] stream = self.api['get_stream'](stanza['to'], sid, stanza['from']) if stream is not None and stanza['from'] == stream.peer_jid: @@ -173,7 +208,7 @@ class XEP_0047(BasePlugin): else: raise XMPPError('item-not-found') - def _handle_close(self, iq): + def _handle_close(self, iq: Iq): sid = iq['ibb_close']['sid'] stream = self.api['get_stream'](iq['to'], sid, iq['from']) if stream is not None and iq['from'] == stream.peer_jid: diff --git a/slixmpp/plugins/xep_0047/stanza.py b/slixmpp/plugins/xep_0047/stanza.py index 5c47508a..ac2935ac 100644 --- a/slixmpp/plugins/xep_0047/stanza.py +++ b/slixmpp/plugins/xep_0047/stanza.py @@ -1,3 +1,6 @@ +# Slixmpp: The Slick XMPP Library +# This file is part of Slixmpp +# See the file LICENSE for copying permission import re import base64 diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index 535ba82b..f020ea68 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -1,17 +1,32 @@ +# Slixmpp: The Slick XMPP Library +# This file is part of Slixmpp +# See the file LICENSE for copying permission import asyncio import socket import logging -from slixmpp.stanza import Iq -from slixmpp.exceptions import XMPPError +from typing import ( + Optional, + IO, + Union, +) + +from slixmpp import JID +from slixmpp.stanza import Iq, Message +from slixmpp.exceptions import XMPPError, IqTimeout log = logging.getLogger(__name__) class IBBytestream(object): + """XEP-0047 Stream abstraction. Created by the ibb plugin automatically. + + Provides send methods and triggers :term:`ibb_stream_data` events. + """ - def __init__(self, xmpp, sid, block_size, jid, peer, use_messages=False): + def __init__(self, xmpp, sid: str, block_size: int, jid: JID, peer: JID, + use_messages: bool = False): self.xmpp = xmpp self.sid = sid self.block_size = block_size @@ -31,7 +46,12 @@ class IBBytestream(object): self.recv_queue = asyncio.Queue() - async def send(self, data, timeout=None): + async def send(self, data: bytes, timeout: Optional[int] = None) -> int: + """Send a single block of data. + + :param data: Data to send (will be truncated if above block size). + :returns: Number of bytes sent. + """ if not self.stream_started or self.stream_out_closed: raise socket.error if len(data) > self.block_size: @@ -58,19 +78,62 @@ class IBBytestream(object): await iq.send(timeout=timeout) return len(data) - async def sendall(self, data, timeout=None): + async def sendall(self, data: bytes, timeout: Optional[int] = None): + """Send all the contents of ``data`` in chunks. + + :param data: Raw data to send. + """ sent_len = 0 while sent_len < len(data): - sent_len += await self.send(data[sent_len:self.block_size], timeout=timeout) - - async def sendfile(self, file, timeout=None): + sent_len += await self.send(data[sent_len:sent_len+self.block_size], timeout=timeout) + + async def gather(self, max_data: Optional[int] = None, timeout: int = 3600) -> bytes: + """Gather all data sent on a stream until it is closed, and return it. + + .. versionadded:: 1.8.0 + + :param max_data: Max number of bytes to receive. (received data may be + over this limit depending on block_size) + :param timeout: Timeout after which an error will be raised. + :raises .IqTimeout: If the timeout is reached. + :returns: All bytes accumulated in the stream. + """ + result = b'' + end_future = asyncio.Future() + + def on_close(stream): + if stream is self: + end_future.set_result(True) + + def on_data(stream): + nonlocal result + if stream is self: + result += stream.read() + if max_data and len(result) > max_data: + end_future.set_result(True) + + self.xmpp.add_event_handler('ibb_stream_end', on_close) + self.xmpp.add_event_handler('ibb_stream_data', on_data) + try: + await asyncio.wait_for(end_future, timeout, loop=self.xmpp.loop) + except asyncio.TimeoutError: + raise IqTimeout(result) + finally: + self.xmpp.del_event_handler('ibb_stream_end', on_close) + self.xmpp.del_event_handler('ibb_stream_data', on_data) + return result + + async def sendfile(self, file: IO[bytes], timeout: Optional[int] = None): + """Send the contents of a file over the wire, in chunks. + + :param file: The opened file (or file-like) object, in bytes mode.""" while True: data = file.read(self.block_size) if not data: break await self.send(data, timeout=timeout) - def _recv_data(self, stanza): + def _recv_data(self, stanza: Union[Message, Iq]): new_seq = stanza['ibb_data']['seq'] if new_seq != (self.recv_seq + 1) % 65536: self.close() @@ -96,7 +159,8 @@ class IBBytestream(object): raise socket.error return self.recv_queue.get_nowait() - def close(self, timeout=None): + def close(self, timeout: Optional[int] = None) -> asyncio.Future: + """Close the stream.""" iq = self.xmpp.Iq() iq['type'] = 'set' iq['to'] = self.peer_jid @@ -109,7 +173,7 @@ class IBBytestream(object): self.xmpp.event('ibb_stream_end', self) return future - def _closed(self, iq): + def _closed(self, iq: Iq): self.stream_in_closed = True self.stream_out_closed = True iq.reply().send() -- cgit v1.2.3