diff options
Diffstat (limited to 'slixmpp/plugins/xep_0047')
-rw-r--r-- | slixmpp/plugins/xep_0047/__init__.py | 21 | ||||
-rw-r--r-- | slixmpp/plugins/xep_0047/ibb.py | 183 | ||||
-rw-r--r-- | slixmpp/plugins/xep_0047/stanza.py | 70 | ||||
-rw-r--r-- | slixmpp/plugins/xep_0047/stream.py | 128 |
4 files changed, 402 insertions, 0 deletions
diff --git a/slixmpp/plugins/xep_0047/__init__.py b/slixmpp/plugins/xep_0047/__init__.py new file mode 100644 index 00000000..5bb9e7cc --- /dev/null +++ b/slixmpp/plugins/xep_0047/__init__.py @@ -0,0 +1,21 @@ +""" + Slixmpp: The Slick XMPP Library + Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout + This file is part of Slixmpp. + + See the file LICENSE for copying permission. +""" + +from slixmpp.plugins.base import register_plugin + +from slixmpp.plugins.xep_0047 import stanza +from slixmpp.plugins.xep_0047.stanza import Open, Close, Data +from slixmpp.plugins.xep_0047.stream import IBBytestream +from slixmpp.plugins.xep_0047.ibb import XEP_0047 + + +register_plugin(XEP_0047) + + +# Retain some backwards compatibility +xep_0047 = XEP_0047 diff --git a/slixmpp/plugins/xep_0047/ibb.py b/slixmpp/plugins/xep_0047/ibb.py new file mode 100644 index 00000000..52d7fbe5 --- /dev/null +++ b/slixmpp/plugins/xep_0047/ibb.py @@ -0,0 +1,183 @@ +import asyncio +import uuid +import logging + +from slixmpp import Message, Iq +from slixmpp.exceptions import XMPPError +from slixmpp.xmlstream.handler import Callback +from slixmpp.xmlstream.matcher import StanzaPath +from slixmpp.xmlstream import register_stanza_plugin +from slixmpp.plugins import BasePlugin +from slixmpp.plugins.xep_0047 import stanza, Open, Close, Data, IBBytestream + + +log = logging.getLogger(__name__) + + +class XEP_0047(BasePlugin): + + name = 'xep_0047' + description = 'XEP-0047: In-band Bytestreams' + dependencies = set(['xep_0030']) + stanza = stanza + default_config = { + 'block_size': 4096, + 'max_block_size': 8192, + 'auto_accept': False, + } + + def plugin_init(self): + self._streams = {} + self._preauthed_sids = {} + + register_stanza_plugin(Iq, Open) + register_stanza_plugin(Iq, Close) + register_stanza_plugin(Iq, Data) + register_stanza_plugin(Message, Data) + + self.xmpp.register_handler(Callback( + 'IBB Open', + StanzaPath('iq@type=set/ibb_open'), + self._handle_open_request)) + + self.xmpp.register_handler(Callback( + 'IBB Close', + StanzaPath('iq@type=set/ibb_close'), + self._handle_close)) + + self.xmpp.register_handler(Callback( + 'IBB Data', + StanzaPath('iq@type=set/ibb_data'), + self._handle_data)) + + self.xmpp.register_handler(Callback( + 'IBB Message Data', + StanzaPath('message/ibb_data'), + self._handle_data)) + + self.api.register(self._authorized, 'authorized', default=True) + self.api.register(self._authorized_sid, 'authorized_sid', default=True) + self.api.register(self._preauthorize_sid, 'preauthorize_sid', default=True) + self.api.register(self._get_stream, 'get_stream', default=True) + self.api.register(self._set_stream, 'set_stream', default=True) + self.api.register(self._del_stream, 'del_stream', default=True) + + def plugin_end(self): + self.xmpp.remove_handler('IBB Open') + self.xmpp.remove_handler('IBB Close') + self.xmpp.remove_handler('IBB Data') + self.xmpp.remove_handler('IBB Message Data') + self.xmpp['xep_0030'].del_feature(feature='http://jabber.org/protocol/ibb') + + def session_bind(self, jid): + self.xmpp['xep_0030'].add_feature('http://jabber.org/protocol/ibb') + + def _get_stream(self, jid, sid, peer_jid, data): + return self._streams.get((jid, sid, peer_jid), None) + + def _set_stream(self, jid, sid, peer_jid, stream): + self._streams[(jid, sid, peer_jid)] = stream + + def _del_stream(self, jid, sid, peer_jid, data): + if (jid, sid, peer_jid) in self._streams: + del self._streams[(jid, sid, peer_jid)] + + def _accept_stream(self, iq): + receiver = iq['to'] + sender = iq['from'] + sid = iq['ibb_open']['sid'] + + if self.api['authorized_sid'](receiver, sid, sender, iq): + return True + return self.api['authorized'](receiver, sid, sender, iq) + + def _authorized(self, jid, sid, ifrom, iq): + if self.auto_accept: + return True + return False + + def _authorized_sid(self, jid, sid, ifrom, iq): + if (jid, sid, ifrom) in self._preauthed_sids: + del self._preauthed_sids[(jid, sid, ifrom)] + return True + return False + + def _preauthorize_sid(self, jid, sid, ifrom, data): + self._preauthed_sids[(jid, sid, ifrom)] = True + + def open_stream(self, jid, block_size=None, sid=None, use_messages=False, + ifrom=None, timeout=None, callback=None): + if sid is None: + sid = str(uuid.uuid4()) + if block_size is None: + block_size = self.block_size + + iq = self.xmpp.Iq() + iq['type'] = 'set' + iq['to'] = jid + iq['from'] = ifrom + iq['ibb_open']['block_size'] = block_size + iq['ibb_open']['sid'] = sid + iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq' + + stream = IBBytestream(self.xmpp, sid, block_size, + iq['from'], iq['to'], use_messages) + + stream_future = asyncio.Future() + + def _handle_opened_stream(iq): + log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from']) + stream.self_jid = iq['to'] + stream.peer_jid = iq['from'] + stream.stream_started = True + self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) + stream_future.set_result(stream) + if callback is not None: + callback(stream) + self.xmpp.event('ibb_stream_start', stream) + self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream) + + iq.send(timeout=timeout, callback=_handle_opened_stream) + + return stream_future + + def _handle_open_request(self, iq): + sid = iq['ibb_open']['sid'] + size = iq['ibb_open']['block_size'] or self.block_size + + log.debug('Received IBB stream request from %s', iq['from']) + + if not sid: + raise XMPPError(etype='modify', condition='bad-request') + + if not self._accept_stream(iq): + raise XMPPError(etype='cancel', condition='not-acceptable') + + if size > self.max_block_size: + raise XMPPError('resource-constraint') + + stream = IBBytestream(self.xmpp, sid, size, + iq['to'], iq['from']) + stream.stream_started = True + self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) + iq.reply().send() + + self.xmpp.event('ibb_stream_start', stream) + self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream) + + def _handle_data(self, stanza): + sid = stanza['ibb_data']['sid'] + stream = self.api['get_stream'](stanza['to'], sid, stanza['from']) + if stream is not None and stanza['from'] == stream.peer_jid: + stream._recv_data(stanza) + else: + raise XMPPError('item-not-found') + + def _handle_close(self, iq): + sid = iq['ibb_close']['sid'] + stream = self.api['get_stream'](iq['to'], sid, iq['from']) + if stream is not None and iq['from'] == stream.peer_jid: + stream._closed(iq) + self.api['del_stream'](stream.self_jid, stream.sid, stream.peer_jid) + else: + raise XMPPError('item-not-found') diff --git a/slixmpp/plugins/xep_0047/stanza.py b/slixmpp/plugins/xep_0047/stanza.py new file mode 100644 index 00000000..7f8ff0ba --- /dev/null +++ b/slixmpp/plugins/xep_0047/stanza.py @@ -0,0 +1,70 @@ +import re +import base64 + +from slixmpp.util import bytes +from slixmpp.exceptions import XMPPError +from slixmpp.xmlstream import ElementBase + + +VALID_B64 = re.compile(r'[A-Za-z0-9\+\/]*=*') + + +def to_b64(data): + return bytes(base64.b64encode(bytes(data))).decode('utf-8') + + +def from_b64(data): + return bytes(base64.b64decode(bytes(data))) + + +class Open(ElementBase): + name = 'open' + namespace = 'http://jabber.org/protocol/ibb' + plugin_attrib = 'ibb_open' + interfaces = set(('block_size', 'sid', 'stanza')) + + def get_block_size(self): + return int(self._get_attr('block-size', '0')) + + def set_block_size(self, value): + self._set_attr('block-size', str(value)) + + def del_block_size(self): + self._del_attr('block-size') + + +class Data(ElementBase): + name = 'data' + namespace = 'http://jabber.org/protocol/ibb' + plugin_attrib = 'ibb_data' + interfaces = set(('seq', 'sid', 'data')) + sub_interfaces = set(['data']) + + def get_seq(self): + return int(self._get_attr('seq', '0')) + + def set_seq(self, value): + self._set_attr('seq', str(value)) + + def get_data(self): + text = self.xml.text + if not text: + raise XMPPError('not-acceptable', 'IBB data element is empty.') + b64_data = text.strip() + if VALID_B64.match(b64_data).group() == b64_data: + return from_b64(b64_data) + else: + raise XMPPError('not-acceptable') + + def set_data(self, value): + self.xml.text = to_b64(value) + + def del_data(self): + self.xml.text = '' + + +class Close(ElementBase): + name = 'close' + namespace = 'http://jabber.org/protocol/ibb' + plugin_attrib = 'ibb_close' + interfaces = set(['sid']) diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py new file mode 100644 index 00000000..3be894eb --- /dev/null +++ b/slixmpp/plugins/xep_0047/stream.py @@ -0,0 +1,128 @@ +import asyncio +import socket +import logging + +from slixmpp.stanza import Iq +from slixmpp.exceptions import XMPPError + + +log = logging.getLogger(__name__) + + +class IBBytestream(object): + + def __init__(self, xmpp, sid, block_size, jid, peer, use_messages=False): + self.xmpp = xmpp + self.sid = sid + self.block_size = block_size + self.use_messages = use_messages + + if jid is None: + jid = xmpp.boundjid + self.self_jid = jid + self.peer_jid = peer + + self.send_seq = -1 + self.recv_seq = -1 + + self.stream_started = False + self.stream_in_closed = False + self.stream_out_closed = False + + self.recv_queue = asyncio.Queue() + + @asyncio.coroutine + def send(self, data, timeout=None): + if not self.stream_started or self.stream_out_closed: + raise socket.error + if len(data) > self.block_size: + data = data[:self.block_size] + self.send_seq = (self.send_seq + 1) % 65535 + seq = self.send_seq + if self.use_messages: + msg = self.xmpp.Message() + msg['to'] = self.peer_jid + msg['from'] = self.self_jid + msg['id'] = self.xmpp.new_id() + msg['ibb_data']['sid'] = self.sid + msg['ibb_data']['seq'] = seq + msg['ibb_data']['data'] = data + msg.send() + else: + iq = self.xmpp.Iq() + iq['type'] = 'set' + iq['to'] = self.peer_jid + iq['from'] = self.self_jid + iq['ibb_data']['sid'] = self.sid + iq['ibb_data']['seq'] = seq + iq['ibb_data']['data'] = data + yield from iq.send(timeout=timeout) + return len(data) + + @asyncio.coroutine + def sendall(self, data, timeout=None): + sent_len = 0 + while sent_len < len(data): + sent_len += yield from self.send(data[sent_len:self.block_size], timeout=timeout) + + @asyncio.coroutine + def sendfile(self, file, timeout=None): + while True: + data = file.read(self.block_size) + if not data: + break + yield from self.send(data, timeout=timeout) + + def _recv_data(self, stanza): + new_seq = stanza['ibb_data']['seq'] + if new_seq != (self.recv_seq + 1) % 65535: + self.close() + raise XMPPError('unexpected-request') + self.recv_seq = new_seq + + data = stanza['ibb_data']['data'] + if len(data) > self.block_size: + self.close() + raise XMPPError('not-acceptable') + + self.recv_queue.put_nowait(data) + self.xmpp.event('ibb_stream_data', self) + + if isinstance(stanza, Iq): + stanza.reply().send() + + def recv(self, *args, **kwargs): + return self.read() + + def read(self): + if not self.stream_started or self.stream_in_closed: + raise socket.error + return self.recv_queue.get_nowait() + + def close(self, timeout=None): + iq = self.xmpp.Iq() + iq['type'] = 'set' + iq['to'] = self.peer_jid + iq['from'] = self.self_jid + iq['ibb_close']['sid'] = self.sid + self.stream_out_closed = True + def _close_stream(_): + self.stream_in_closed = True + future = iq.send(timeout=timeout, callback=_close_stream) + self.xmpp.event('ibb_stream_end', self) + return future + + def _closed(self, iq): + self.stream_in_closed = True + self.stream_out_closed = True + iq.reply().send() + self.xmpp.event('ibb_stream_end', self) + + def makefile(self, *args, **kwargs): + return self + + def connect(*args, **kwargs): + return None + + def shutdown(self, *args, **kwargs): + return None |