summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0047/stream.py
blob: f020ea68ccc56f4ec949e0a951feb2029b6b5bca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# Slixmpp: The Slick XMPP Library
# This file is part of Slixmpp
# See the file LICENSE for copying permission
import asyncio
import socket
import logging

from typing import (
    Optional,
    IO,
    Union,
)

from slixmpp import JID
from slixmpp.stanza import Iq, Message
from slixmpp.exceptions import XMPPError, IqTimeout


log = logging.getLogger(__name__)


class IBBytestream(object):
    """XEP-0047 Stream abstraction. Created by the ibb plugin automatically.

    Provides send methods and triggers :term:`ibb_stream_data` events.
    """

    def __init__(self, xmpp, sid: str, block_size: int, jid: JID, peer: JID,
                 use_messages: bool = False):
        self.xmpp = xmpp
        self.sid = sid
        self.block_size = block_size
        self.use_messages = use_messages

        if jid is None:
            jid = xmpp.boundjid
        self.self_jid = jid
        self.peer_jid = peer

        self.send_seq = -1
        self.recv_seq = -1

        self.stream_started = False
        self.stream_in_closed = False
        self.stream_out_closed = False

        self.recv_queue = asyncio.Queue()

    async def send(self, data: bytes, timeout: Optional[int] = None) -> int:
        """Send a single block of data.

        :param data: Data to send (will be truncated if above block size).
        :returns: Number of bytes sent.
        """
        if not self.stream_started or self.stream_out_closed:
            raise socket.error
        if len(data) > self.block_size:
            data = data[:self.block_size]
        self.send_seq = (self.send_seq + 1) % 65536
        seq = self.send_seq
        if self.use_messages:
            msg = self.xmpp.Message()
            msg['to'] = self.peer_jid
            msg['from'] = self.self_jid
            msg['id'] = self.xmpp.new_id()
            msg['ibb_data']['sid'] = self.sid
            msg['ibb_data']['seq'] = seq
            msg['ibb_data']['data'] = data
            msg.send()
        else:
            iq = self.xmpp.Iq()
            iq['type'] = 'set'
            iq['to'] = self.peer_jid
            iq['from'] = self.self_jid
            iq['ibb_data']['sid'] = self.sid
            iq['ibb_data']['seq'] = seq
            iq['ibb_data']['data'] = data
            await iq.send(timeout=timeout)
        return len(data)

    async def sendall(self, data: bytes, timeout: Optional[int] = None):
        """Send all the contents of ``data`` in chunks.

        :param data: Raw data to send.
        """
        sent_len = 0
        while sent_len < len(data):
            sent_len += await self.send(data[sent_len:sent_len+self.block_size], timeout=timeout)

    async def gather(self, max_data: Optional[int] = None, timeout: int = 3600) -> bytes:
        """Gather all data sent on a stream until it is closed, and return it.

        .. versionadded:: 1.8.0

        :param max_data: Max number of bytes to receive. (received data may be
                         over this limit depending on block_size)
        :param timeout: Timeout after which an error will be raised.
        :raises .IqTimeout: If the timeout is reached.
        :returns: All bytes accumulated in the stream.
        """
        result = b''
        end_future = asyncio.Future()

        def on_close(stream):
            if stream is self:
                end_future.set_result(True)

        def on_data(stream):
            nonlocal result
            if stream is self:
                result += stream.read()
                if max_data and len(result) > max_data:
                    end_future.set_result(True)

        self.xmpp.add_event_handler('ibb_stream_end', on_close)
        self.xmpp.add_event_handler('ibb_stream_data', on_data)
        try:
            await asyncio.wait_for(end_future, timeout, loop=self.xmpp.loop)
        except asyncio.TimeoutError:
            raise IqTimeout(result)
        finally:
            self.xmpp.del_event_handler('ibb_stream_end', on_close)
            self.xmpp.del_event_handler('ibb_stream_data', on_data)
        return result

    async def sendfile(self, file: IO[bytes], timeout: Optional[int] = None):
        """Send the contents of a file over the wire, in chunks.

        :param file: The opened file (or file-like) object, in bytes mode."""
        while True:
            data = file.read(self.block_size)
            if not data:
                break
            await self.send(data, timeout=timeout)

    def _recv_data(self, stanza: Union[Message, Iq]):
        new_seq = stanza['ibb_data']['seq']
        if new_seq != (self.recv_seq + 1) % 65536:
            self.close()
            raise XMPPError('unexpected-request')
        self.recv_seq = new_seq

        data = stanza['ibb_data']['data']
        if len(data) > self.block_size:
            self.close()
            raise XMPPError('not-acceptable')

        self.recv_queue.put_nowait(data)
        self.xmpp.event('ibb_stream_data', self)

        if isinstance(stanza, Iq):
            stanza.reply().send()

    def recv(self, *args, **kwargs):
        return self.read()

    def read(self):
        if not self.stream_started or self.stream_in_closed:
            raise socket.error
        return self.recv_queue.get_nowait()

    def close(self, timeout: Optional[int] = None) -> asyncio.Future:
        """Close the stream."""
        iq = self.xmpp.Iq()
        iq['type'] = 'set'
        iq['to'] = self.peer_jid
        iq['from'] = self.self_jid
        iq['ibb_close']['sid'] = self.sid
        self.stream_out_closed = True
        def _close_stream(_):
            self.stream_in_closed = True
        future = iq.send(timeout=timeout, callback=_close_stream)
        self.xmpp.event('ibb_stream_end', self)
        return future

    def _closed(self, iq: Iq):
        self.stream_in_closed = True
        self.stream_out_closed = True
        iq.reply().send()
        self.xmpp.event('ibb_stream_end', self)

    def makefile(self, *args, **kwargs):
        return self

    def connect(*args, **kwargs):
        return None

    def shutdown(self, *args, **kwargs):
        return None