diff options
Diffstat (limited to 'slixmpp/plugins/xep_0047/stream.py')
-rw-r--r-- | slixmpp/plugins/xep_0047/stream.py | 86 |
1 files changed, 75 insertions, 11 deletions
diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index 535ba82b..f020ea68 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -1,17 +1,32 @@ +# Slixmpp: The Slick XMPP Library +# This file is part of Slixmpp +# See the file LICENSE for copying permission import asyncio import socket import logging -from slixmpp.stanza import Iq -from slixmpp.exceptions import XMPPError +from typing import ( + Optional, + IO, + Union, +) + +from slixmpp import JID +from slixmpp.stanza import Iq, Message +from slixmpp.exceptions import XMPPError, IqTimeout log = logging.getLogger(__name__) class IBBytestream(object): + """XEP-0047 Stream abstraction. Created by the ibb plugin automatically. + + Provides send methods and triggers :term:`ibb_stream_data` events. + """ - def __init__(self, xmpp, sid, block_size, jid, peer, use_messages=False): + def __init__(self, xmpp, sid: str, block_size: int, jid: JID, peer: JID, + use_messages: bool = False): self.xmpp = xmpp self.sid = sid self.block_size = block_size @@ -31,7 +46,12 @@ class IBBytestream(object): self.recv_queue = asyncio.Queue() - async def send(self, data, timeout=None): + async def send(self, data: bytes, timeout: Optional[int] = None) -> int: + """Send a single block of data. + + :param data: Data to send (will be truncated if above block size). + :returns: Number of bytes sent. + """ if not self.stream_started or self.stream_out_closed: raise socket.error if len(data) > self.block_size: @@ -58,19 +78,62 @@ class IBBytestream(object): await iq.send(timeout=timeout) return len(data) - async def sendall(self, data, timeout=None): + async def sendall(self, data: bytes, timeout: Optional[int] = None): + """Send all the contents of ``data`` in chunks. + + :param data: Raw data to send. + """ sent_len = 0 while sent_len < len(data): - sent_len += await self.send(data[sent_len:self.block_size], timeout=timeout) - - async def sendfile(self, file, timeout=None): + sent_len += await self.send(data[sent_len:sent_len+self.block_size], timeout=timeout) + + async def gather(self, max_data: Optional[int] = None, timeout: int = 3600) -> bytes: + """Gather all data sent on a stream until it is closed, and return it. + + .. versionadded:: 1.8.0 + + :param max_data: Max number of bytes to receive. (received data may be + over this limit depending on block_size) + :param timeout: Timeout after which an error will be raised. + :raises .IqTimeout: If the timeout is reached. + :returns: All bytes accumulated in the stream. + """ + result = b'' + end_future = asyncio.Future() + + def on_close(stream): + if stream is self: + end_future.set_result(True) + + def on_data(stream): + nonlocal result + if stream is self: + result += stream.read() + if max_data and len(result) > max_data: + end_future.set_result(True) + + self.xmpp.add_event_handler('ibb_stream_end', on_close) + self.xmpp.add_event_handler('ibb_stream_data', on_data) + try: + await asyncio.wait_for(end_future, timeout, loop=self.xmpp.loop) + except asyncio.TimeoutError: + raise IqTimeout(result) + finally: + self.xmpp.del_event_handler('ibb_stream_end', on_close) + self.xmpp.del_event_handler('ibb_stream_data', on_data) + return result + + async def sendfile(self, file: IO[bytes], timeout: Optional[int] = None): + """Send the contents of a file over the wire, in chunks. + + :param file: The opened file (or file-like) object, in bytes mode.""" while True: data = file.read(self.block_size) if not data: break await self.send(data, timeout=timeout) - def _recv_data(self, stanza): + def _recv_data(self, stanza: Union[Message, Iq]): new_seq = stanza['ibb_data']['seq'] if new_seq != (self.recv_seq + 1) % 65536: self.close() @@ -96,7 +159,8 @@ class IBBytestream(object): raise socket.error return self.recv_queue.get_nowait() - def close(self, timeout=None): + def close(self, timeout: Optional[int] = None) -> asyncio.Future: + """Close the stream.""" iq = self.xmpp.Iq() iq['type'] = 'set' iq['to'] = self.peer_jid @@ -109,7 +173,7 @@ class IBBytestream(object): self.xmpp.event('ibb_stream_end', self) return future - def _closed(self, iq): + def _closed(self, iq: Iq): self.stream_in_closed = True self.stream_out_closed = True iq.reply().send() |