From 3732139fc3f364e6246c637441c9f5fd65c37bfb Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Tue, 19 Feb 2013 01:00:04 -0800 Subject: Save progress on SI file transfer --- sleekxmpp/plugins/xep_0047/ibb.py | 59 +++++-- sleekxmpp/plugins/xep_0047/stream.py | 23 ++- sleekxmpp/plugins/xep_0065/proxy.py | 64 +++++++- sleekxmpp/plugins/xep_0095/__init__.py | 16 ++ sleekxmpp/plugins/xep_0095/stanza.py | 25 +++ sleekxmpp/plugins/xep_0095/stream_initiation.py | 197 ++++++++++++++++++++++++ sleekxmpp/plugins/xep_0096/__init__.py | 16 ++ sleekxmpp/plugins/xep_0096/file_transfer.py | 58 +++++++ sleekxmpp/plugins/xep_0096/stanza.py | 48 ++++++ 9 files changed, 474 insertions(+), 32 deletions(-) create mode 100644 sleekxmpp/plugins/xep_0095/__init__.py create mode 100644 sleekxmpp/plugins/xep_0095/stanza.py create mode 100644 sleekxmpp/plugins/xep_0095/stream_initiation.py create mode 100644 sleekxmpp/plugins/xep_0096/__init__.py create mode 100644 sleekxmpp/plugins/xep_0096/file_transfer.py create mode 100644 sleekxmpp/plugins/xep_0096/stanza.py diff --git a/sleekxmpp/plugins/xep_0047/ibb.py b/sleekxmpp/plugins/xep_0047/ibb.py index e341433f..1960308e 100644 --- a/sleekxmpp/plugins/xep_0047/ibb.py +++ b/sleekxmpp/plugins/xep_0047/ibb.py @@ -21,10 +21,10 @@ class XEP_0047(BasePlugin): dependencies = set(['xep_0030']) stanza = stanza default_config = { + 'block_size': 4096, 'max_block_size': 8192, 'window_size': 1, - 'auto_accept': True, - 'accept_stream': None + 'auto_accept': False, } def plugin_init(self): @@ -33,6 +33,9 @@ class XEP_0047(BasePlugin): self.pending_close_streams = {} self._stream_lock = threading.Lock() + self._preauthed_sids_lock = threading.Lock() + self._preauthed_sids = {} + register_stanza_plugin(Iq, Open) register_stanza_plugin(Iq, Close) register_stanza_plugin(Iq, Data) @@ -58,6 +61,10 @@ class XEP_0047(BasePlugin): StanzaPath('message/ibb_data'), self._handle_data)) + self.api.register(self._authorized, 'authorized', default=True) + self.api.register(self._authorized_sid, 'authorized_sid', default=True) + self.api.register(self._preauthorize_sid, 'preauthorize_sid', default=True) + def plugin_end(self): self.xmpp.remove_handler('IBB Open') self.xmpp.remove_handler('IBB Close') @@ -69,17 +76,37 @@ class XEP_0047(BasePlugin): self.xmpp['xep_0030'].add_feature('http://jabber.org/protocol/ibb') def _accept_stream(self, iq): - if self.accept_stream is not None: - return self.accept_stream(iq) + receiver = iq['to'] + sender = iq['from'] + sid = iq['ibb_open']['sid'] + + if self.api['authorized_sid'](receiver, sid, sender, iq): + return True + return self.api['authorized'](receiver, sid, sender, iq) + + def _authorized(self, jid, sid, ifrom, iq): if self.auto_accept: if iq['ibb_open']['block_size'] <= self.max_block_size: return True return False - def open_stream(self, jid, block_size=4096, sid=None, window=1, use_messages=False, + def _authorized_sid(self, jid, sid, ifrom, iq): + with self._preauthed_sids_lock: + if (jid, sid, ifrom) in self._preauthed_sids: + del self._preauthed_sids[(jid, sid, ifrom)] + return True + return False + + def _preauthorize_sid(self, jid, sid, ifrom, data): + with self._preauthed_sids_lock: + self._preauthed_sids[(jid, sid, ifrom)] = True + + def open_stream(self, jid, block_size=None, sid=None, window=1, use_messages=False, ifrom=None, block=True, timeout=None, callback=None): if sid is None: sid = str(uuid.uuid4()) + if block_size is None: + block_size = self.block_size iq = self.xmpp.Iq() iq['type'] = 'set' @@ -90,7 +117,7 @@ class XEP_0047(BasePlugin): iq['ibb_open']['stanza'] = 'iq' stream = IBBytestream(self.xmpp, sid, block_size, - iq['to'], iq['from'], window, + iq['from'], iq['to'], window, use_messages) with self._stream_lock: @@ -118,11 +145,12 @@ class XEP_0047(BasePlugin): with self._stream_lock: stream = self.pending_streams.get(iq['id'], None) if stream is not None: - stream.sender = iq['to'] - stream.receiver = iq['from'] + stream.self_jid = iq['to'] + stream.peer_jid = iq['from'] stream.stream_started.set() self.streams[stream.sid] = stream self.xmpp.event('ibb_stream_start', stream) + self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream) with self._stream_lock: if iq['id'] in self.pending_streams: @@ -130,15 +158,19 @@ class XEP_0047(BasePlugin): def _handle_open_request(self, iq): sid = iq['ibb_open']['sid'] - size = iq['ibb_open']['block_size'] + size = iq['ibb_open']['block_size'] or self.block_size + + if not sid: + raise XMPPError(etype='modify', condition='bad-request') + if not self._accept_stream(iq): - raise XMPPError('not-acceptable') + raise XMPPError(etype='modify', condition='not-acceptable') if size > self.max_block_size: raise XMPPError('resource-constraint') stream = IBBytestream(self.xmpp, sid, size, - iq['from'], iq['to'], + iq['to'], iq['from'], self.window_size) stream.stream_started.set() self.streams[sid] = stream @@ -146,11 +178,12 @@ class XEP_0047(BasePlugin): iq.send() self.xmpp.event('ibb_stream_start', stream) + self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream) def _handle_data(self, stanza): sid = stanza['ibb_data']['sid'] stream = self.streams.get(sid, None) - if stream is not None and stanza['from'] != stream.sender: + if stream is not None and stanza['from'] == stream.peer_jid: stream._recv_data(stanza) else: raise XMPPError('item-not-found') @@ -158,7 +191,7 @@ class XEP_0047(BasePlugin): def _handle_close(self, iq): sid = iq['ibb_close']['sid'] stream = self.streams.get(sid, None) - if stream is not None and iq['from'] != stream.sender: + if stream is not None and iq['from'] == stream.peer_jid: stream._closed(iq) else: raise XMPPError('item-not-found') diff --git a/sleekxmpp/plugins/xep_0047/stream.py b/sleekxmpp/plugins/xep_0047/stream.py index adc86450..9651edf8 100644 --- a/sleekxmpp/plugins/xep_0047/stream.py +++ b/sleekxmpp/plugins/xep_0047/stream.py @@ -12,15 +12,17 @@ log = logging.getLogger(__name__) class IBBytestream(object): - def __init__(self, xmpp, sid, block_size, to, ifrom, window_size=1, use_messages=False): + def __init__(self, xmpp, sid, block_size, jid, peer, window_size=1, use_messages=False): self.xmpp = xmpp self.sid = sid self.block_size = block_size self.window_size = window_size self.use_messages = use_messages - self.receiver = to - self.sender = ifrom + if jid is None: + jid = xmpp.boundjid + self.self_jid = jid + self.peer_jid = peer self.send_seq = -1 self.recv_seq = -1 @@ -50,8 +52,8 @@ class IBBytestream(object): seq = self.send_seq if self.use_messages: msg = self.xmpp.Message() - msg['to'] = self.receiver - msg['from'] = self.sender + msg['to'] = self.peer_jid + msg['from'] = self.self_jid msg['id'] = self.xmpp.new_id() msg['ibb_data']['sid'] = self.sid msg['ibb_data']['seq'] = seq @@ -61,8 +63,8 @@ class IBBytestream(object): else: iq = self.xmpp.Iq() iq['type'] = 'set' - iq['to'] = self.receiver - iq['from'] = self.sender + iq['to'] = self.peer_jid + iq['from'] = self.self_jid iq['ibb_data']['sid'] = self.sid iq['ibb_data']['seq'] = seq iq['ibb_data']['data'] = data @@ -121,8 +123,8 @@ class IBBytestream(object): def close(self): iq = self.xmpp.Iq() iq['type'] = 'set' - iq['to'] = self.receiver - iq['from'] = self.sender + iq['to'] = self.peer_jid + iq['from'] = self.self_jid iq['ibb_close']['sid'] = self.sid self.stream_out_closed.set() iq.send(block=False, @@ -132,9 +134,6 @@ class IBBytestream(object): def _closed(self, iq): self.stream_in_closed.set() self.stream_out_closed.set() - while not self.window_empty.is_set(): - log.info('waiting for send window to empty') - self.window_empty.wait(timeout=1) iq.reply() iq.send() self.xmpp.event('ibb_stream_end', self) diff --git a/sleekxmpp/plugins/xep_0065/proxy.py b/sleekxmpp/plugins/xep_0065/proxy.py index 473dd033..265d3030 100644 --- a/sleekxmpp/plugins/xep_0065/proxy.py +++ b/sleekxmpp/plugins/xep_0065/proxy.py @@ -25,6 +25,9 @@ class XEP_0065(base_plugin): name = 'xep_0065' description = "Socks5 Bytestreams" dependencies = set(['xep_0030']) + default_config = { + 'auto_accept': False + } def plugin_init(self): register_stanza_plugin(Iq, Socks5) @@ -33,11 +36,18 @@ class XEP_0065(base_plugin): self._sessions = {} self._sessions_lock = threading.Lock() + self._preauthed_sids_lock = threading.Lock() + self._preauthed_sids = {} + self.xmpp.register_handler( Callback('Socks5 Bytestreams', StanzaPath('iq@type=set/socks/streamhost'), self._handle_streamhost)) + self.api.register(self._authorized, 'authorized', default=True) + self.api.register(self._authorized_sid, 'authorized_sid', default=True) + self.api.register(self._preauthorize_sid, 'preauthorize_sid', default=True) + def session_bind(self, jid): self.xmpp['xep_0030'].add_feature(Socks5.namespace) @@ -50,14 +60,15 @@ class XEP_0065(base_plugin): """Returns the socket associated to the SID.""" return self._sessions.get(sid, None) - def handshake(self, to, ifrom=None, timeout=None): + def handshake(self, to, ifrom=None, sid=None, timeout=None): """ Starts the handshake to establish the socks5 bytestreams connection. """ if not self._proxies: self._proxies = self.discover_proxies() - sid = uuid4().hex + if sid is None: + sid = uuid4().hex used = self.request_stream(to, sid=sid, ifrom=ifrom, timeout=timeout) proxy = used['socks']['streamhost_used']['jid'] @@ -72,10 +83,12 @@ class XEP_0065(base_plugin): self.xmpp.boundjid, to, self._proxies[proxy][0], - self._proxies[proxy][1]) + self._proxies[proxy][1], + peer=to) # Request that the proxy activate the session with the target. self.activate(proxy, sid, to, timeout=timeout) + self.xmpp.event('stream:%s:%s' % (sid, conn.peer_jid), conn) return self.get_socket(sid) def request_stream(self, to, sid=None, ifrom=None, block=True, timeout=None, callback=None): @@ -139,19 +152,24 @@ class XEP_0065(base_plugin): """Handle incoming SOCKS5 session request.""" sid = iq['socks']['sid'] if not sid: + raise XMPPError(etype='modify', condition='bad-request') + + if not self._accept_stream(iq): raise XMPPError(etype='modify', condition='not-acceptable') streamhosts = iq['socks']['streamhosts'] conn = None used_streamhost = None + sender = iq['from'] for streamhost in streamhosts: try: conn = self._connect_proxy(sid, - iq['from'], + sender, self.xmpp.boundjid, streamhost['host'], - streamhost['port']) + streamhost['port'], + peer=sender) used_streamhost = streamhost['jid'] break except socket.error: @@ -165,6 +183,8 @@ class XEP_0065(base_plugin): iq['socks']['sid'] = sid iq['socks']['streamhost_used']['jid'] = used_streamhost iq.send() + self.xmpp.event('socks5_stream', conn) + self.xmpp.event('stream:%s:%s' % (sid, conn.peer_jid), conn) def activate(self, proxy, sid, target, ifrom=None, block=True, timeout=None, callback=None): """Activate the socks5 session that has been negotiated.""" @@ -191,7 +211,7 @@ class XEP_0065(base_plugin): with self._sessions_lock: self._sessions = {} - def _connect_proxy(self, sid, requester, target, proxy, proxy_port): + def _connect_proxy(self, sid, requester, target, proxy, proxy_port, peer=None): """ Establishes a connection between the client and the server-side Socks5 proxy. @@ -200,6 +220,8 @@ class XEP_0065(base_plugin): target : The JID of the target. proxy_host : The hostname or the IP of the proxy. proxy_port : The port of the proxy. or + peer : The JID for the other side of the stream, regardless + of target or requester status. """ # Because the xep_0065 plugin uses the proxy_port as string, # the Proxy class accepts the proxy_port argument as a string @@ -230,6 +252,34 @@ class XEP_0065(base_plugin): _close() sock.close = close - self.xmpp.event('socks_connected', sid) + sock.peer_jid = peer + sock.self_jid = target if requester == peer else requester + self.xmpp.event('socks_connected', sid) return sock + + def _accept_stream(self, iq): + receiver = iq['to'] + sender = iq['from'] + sid = iq['socks']['sid'] + + if self.api['authorized_sid'](receiver, sid, sender, iq): + return True + return self.api['authorized'](receiver, sid, sender, iq) + + def _authorized(self, jid, sid, ifrom, iq): + return self.auto_accept + + def _authorized_sid(self, jid, sid, ifrom, iq): + with self._preauthed_sids_lock: + log.debug('>>> authed sids: %s', self._preauthed_sids) + log.debug('>>> lookup: %s %s %s', jid, sid, ifrom) + if (jid, sid, ifrom) in self._preauthed_sids: + del self._preauthed_sids[(jid, sid, ifrom)] + return True + return False + + def _preauthorize_sid(self, jid, sid, ifrom, data): + log.debug('>>>> %s %s %s %s', jid, sid, ifrom, data) + with self._preauthed_sids_lock: + self._preauthed_sids[(jid, sid, ifrom)] = True diff --git a/sleekxmpp/plugins/xep_0095/__init__.py b/sleekxmpp/plugins/xep_0095/__init__.py new file mode 100644 index 00000000..4465ef5c --- /dev/null +++ b/sleekxmpp/plugins/xep_0095/__init__.py @@ -0,0 +1,16 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.plugins.base import register_plugin + +from sleekxmpp.plugins.xep_0095 import stanza +from sleekxmpp.plugins.xep_0095.stanza import SI +from sleekxmpp.plugins.xep_0095.stream_initiation import XEP_0095 + + +register_plugin(XEP_0095) diff --git a/sleekxmpp/plugins/xep_0095/stanza.py b/sleekxmpp/plugins/xep_0095/stanza.py new file mode 100644 index 00000000..34999a11 --- /dev/null +++ b/sleekxmpp/plugins/xep_0095/stanza.py @@ -0,0 +1,25 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.xmlstream import ElementBase + + +class SI(ElementBase): + name = 'si' + namespace = 'http://jabber.org/protocol/si' + plugin_attrib = 'si' + interfaces = set(['id', 'mime_type', 'profile']) + + def get_mime_type(self): + return self._get_attr('mime-type', 'application/octet-stream') + + def set_mime_type(self, value): + self._set_attr('mime-type', value) + + def del_mime_type(self): + self._del_attr('mime-type') diff --git a/sleekxmpp/plugins/xep_0095/stream_initiation.py b/sleekxmpp/plugins/xep_0095/stream_initiation.py new file mode 100644 index 00000000..cfe6deb5 --- /dev/null +++ b/sleekxmpp/plugins/xep_0095/stream_initiation.py @@ -0,0 +1,197 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +import logging +import threading + +from uuid import uuid4 + +from sleekxmpp import Iq, Message +from sleekxmpp.exceptions import XMPPError +from sleekxmpp.plugins import BasePlugin +from sleekxmpp.xmlstream.handler import Callback +from sleekxmpp.xmlstream.matcher import StanzaPath +from sleekxmpp.xmlstream import register_stanza_plugin, JID +from sleekxmpp.plugins.xep_0095 import stanza, SI + + +log = logging.getLogger(__name__) + + +SOCKS5 = 'http://jabber.org/protocol/bytestreams' +IBB = 'http://jabber.org/protocol/ibb' + + +class XEP_0095(BasePlugin): + + name = 'xep_0095' + description = 'XEP-0095: Stream Initiation' + dependencies = set(['xep_0020', 'xep_0030', 'xep_0047', 'xep_0065']) + stanza = stanza + + def plugin_init(self): + self._profiles = {} + self._methods = {} + self._pending_lock = threading.Lock() + self._pending= {} + + self.register_method(SOCKS5, 'xep_0065') + self.register_method(IBB, 'xep_0047') + + register_stanza_plugin(Iq, SI) + register_stanza_plugin(SI, self.xmpp['xep_0020'].stanza.FeatureNegotiation) + + self.xmpp.register_handler( + Callback('SI Request', + StanzaPath('iq@type=set/si'), + self._handle_request)) + + self.api.register(self._add_pending, 'add_pending', default=True) + self.api.register(self._get_pending, 'get_pending', default=True) + self.api.register(self._del_pending, 'del_pending', default=True) + + def session_bind(self, jid): + self.xmpp['xep_0030'].add_feature(SI.namespace) + + def plugin_end(self): + self.xmpp.remove_handler('SI Request') + self.xmpp['xep_0030'].del_feature(feature=SI.namespace) + + def register_profile(self, profile_name, plugin): + self._profiles[profile_name] = plugin + + def unregister_profile(self, profile_name): + try: + del self._profiles[profile_name] + except KeyError: + pass + + def register_method(self, method, plugin_name): + self._methods[method] = plugin_name + + def _handle_request(self, iq): + profile = iq['si']['profile'] + sid = iq['si']['id'] + + if not sid: + raise XMPPError(etype='modify', condition='bad-request') + if profile not in self._profiles: + raise XMPPError( + etype='modify', + condition='bad-request', + extension='bad-profile', + extension_ns=SI.namespace) + + neg = iq['si']['feature_neg']['form']['fields'] + options = neg['stream-method']['options'] or [] + methods = [] + for opt in options: + methods.append(opt['value']) + for method in methods: + if method in self._methods: + supported = True + break + else: + raise XMPPError('bad-request', + extension='no-valid-streams', + extension_ns=SI.namespace) + + selected_method = SOCKS5 if SOCKS5 in methods else IBB + receiver = iq['to'] + sender = iq['from'] + + self.api['add_pending'](receiver, sid, sender, { + 'response_id': iq['id'], + 'method': selected_method, + 'profile': profile + }) + self.xmpp.event('si_request', iq) + + def offer(self, jid, sid=None, mime_type=None, profile=None, + methods=None, payload=None, ifrom=None, + **iqargs): + if sid is None: + sid = uuid4().hex + if methods is None: + methods = list(self._methods.keys()) + if not isinstance(methods, (list, tuple, set)): + methods = [methods] + + si = self.xmpp.Iq() + si['to'] = jid + si['from'] = ifrom + si['type'] = 'set' + si['si']['id'] = sid + si['si']['mime_type'] = mime_type + si['si']['profile'] = profile + if not isinstance(payload, (list, tuple, set)): + payload = [payload] + for item in payload: + si['si'].append(item) + si['si']['feature_neg']['form'].add_field( + var='stream-method', + ftype='list-single', + options=methods) + return si.send(**iqargs) + + def accept(self, jid, sid, payload=None, ifrom=None, stream_handler=None): + stream = self.api['get_pending'](ifrom, sid, jid) + iq = self.xmpp.Iq() + iq['id'] = stream['response_id'] + iq['to'] = jid + iq['from'] = ifrom + iq['type'] = 'result' + if payload: + iq['si'].append(payload) + iq['si']['feature_neg']['form']['type'] = 'submit' + iq['si']['feature_neg']['form'].add_field( + var='stream-method', + ftype='list-single', + value=stream['method']) + + if ifrom is None: + ifrom = self.xmpp.boundjid + + method_plugin = self._methods[stream['method']] + self.xmpp[method_plugin].api['preauthorize_sid'](ifrom, sid, jid) + + self.api['del_pending'](ifrom, sid, jid) + + if stream_handler: + self.xmpp.add_event_handler('stream:%s:%s' % (sid, jid), + stream_handler, + threaded=True, + disposable=True) + return iq.send() + + def decline(self, jid, sid, ifrom=None): + stream = self.api['get_pending'](ifrom, sid, jid) + if not stream: + return + iq = self.xmpp.Iq() + iq['id'] = stream['response_id'] + iq['to'] = jid + iq['from'] = ifrom + iq['type'] = 'error' + iq['error']['condition'] = 'forbidden' + iq['error']['text'] = 'Offer declined' + self.api['del_pending'](ifrom, sid, jid) + return iq.send() + + def _add_pending(self, jid, node, ifrom, data): + with self._pending_lock: + self._pending[(jid, node, ifrom)] = data + + def _get_pending(self, jid, node, ifrom, data): + with self._pending_lock: + return self._pending.get((jid, node, ifrom), None) + + def _del_pending(self, jid, node, ifrom, data): + with self._pending_lock: + if (jid, node, ifrom) in self._pending: + del self._pending[(jid, node, ifrom)] diff --git a/sleekxmpp/plugins/xep_0096/__init__.py b/sleekxmpp/plugins/xep_0096/__init__.py new file mode 100644 index 00000000..5f836169 --- /dev/null +++ b/sleekxmpp/plugins/xep_0096/__init__.py @@ -0,0 +1,16 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.plugins.base import register_plugin + +from sleekxmpp.plugins.xep_0096 import stanza +from sleekxmpp.plugins.xep_0096.stanza import File +from sleekxmpp.plugins.xep_0096.file_transfer import XEP_0096 + + +register_plugin(XEP_0096) diff --git a/sleekxmpp/plugins/xep_0096/file_transfer.py b/sleekxmpp/plugins/xep_0096/file_transfer.py new file mode 100644 index 00000000..6873c7f5 --- /dev/null +++ b/sleekxmpp/plugins/xep_0096/file_transfer.py @@ -0,0 +1,58 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +import logging + +from sleekxmpp import Iq, Message +from sleekxmpp.plugins import BasePlugin +from sleekxmpp.xmlstream.handler import Callback +from sleekxmpp.xmlstream.matcher import StanzaPath +from sleekxmpp.xmlstream import register_stanza_plugin, JID +from sleekxmpp.plugins.xep_0096 import stanza, File + + +log = logging.getLogger(__name__) + + +class XEP_0096(BasePlugin): + + name = 'xep_0096' + description = 'XEP-0096: SI File Transfer' + dependencies = set(['xep_0095']) + stanza = stanza + + def plugin_init(self): + register_stanza_plugin(self.xmpp['xep_0095'].stanza.SI, File) + + self.xmpp['xep_0095'].register_profile(File.namespace, self) + + def session_bind(self, jid): + self.xmpp['xep_0030'].add_feature(File.namespace) + + def plugin_end(self): + self.xmpp['xep_0030'].del_feature(feature=File.namespace) + self.xmpp['xep_0095'].unregister_profile(File.namespace, self) + + def request_file_transfer(self, jid, sid=None, name=None, size=None, + desc=None, hash=None, date=None, + allow_ranged=False, mime_type=None, + **iqargs): + data = File() + data['name'] = name + data['size'] = size + data['date'] = date + data['desc'] = desc + if allow_ranged: + data.enable('range') + + return self.xmpp['xep_0095'].offer(jid, + sid=sid, + mime_type=mime_type, + profile=File.namespace, + payload=data, + **iqargs) diff --git a/sleekxmpp/plugins/xep_0096/stanza.py b/sleekxmpp/plugins/xep_0096/stanza.py new file mode 100644 index 00000000..65eb5bc5 --- /dev/null +++ b/sleekxmpp/plugins/xep_0096/stanza.py @@ -0,0 +1,48 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +import datetime as dt + +from sleekxmpp.xmlstream import ElementBase, register_stanza_plugin +from sleekxmpp.plugins import xep_0082 + + +class File(ElementBase): + name = 'file' + namespace = 'http://jabber.org/protocol/si/profile/file-transfer' + plugin_attrib = 'file' + interfaces = set(['name', 'size', 'date', 'hash', 'desc']) + sub_interfaces = set(['desc']) + + def set_size(self, value): + self._set_attr('size', str(value)) + + def get_date(self): + timestamp = self._get_attr('date') + return xep_0082.parse(timestamp) + + def set_date(self, value): + if isinstance(value, dt.datetime): + value = xep_0082.format_datetime(value) + self._set_attr('date', value) + + +class Range(ElementBase): + name = 'range' + namespace = 'http://jabber.org/protocol/si/profile/file-transfer' + plugin_attrib = 'range' + interfaces = set(['length', 'offset']) + + def set_length(self, value): + self._set_attr('length', str(value)) + + def set_offset(self, value): + self._set_attr('offset', str(value)) + + +register_stanza_plugin(File, Range) -- cgit v1.2.3 From fae39e1ab4e516c7c7b6cef81188359e38fe0531 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Fri, 29 Mar 2013 13:14:22 -0700 Subject: Fix some errors in the IBB plugin. --- examples/ibb_transfer/ibb_receiver.py | 10 +++---- sleekxmpp/plugins/xep_0047/ibb.py | 54 +++++++++++++++++++++++------------ 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/examples/ibb_transfer/ibb_receiver.py b/examples/ibb_transfer/ibb_receiver.py index 0169d63d..6aba98e3 100755 --- a/examples/ibb_transfer/ibb_receiver.py +++ b/examples/ibb_transfer/ibb_receiver.py @@ -38,7 +38,7 @@ class IBBReceiver(sleekxmpp.ClientXMPP): self.register_plugin('xep_0030') # Service Discovery self.register_plugin('xep_0047', { - 'accept_stream': self.accept_stream + 'auto_accept': True }) # In-band Bytestreams # The session_start event will be triggered when @@ -48,7 +48,7 @@ class IBBReceiver(sleekxmpp.ClientXMPP): # our roster. self.add_event_handler("session_start", self.start) - self.add_event_handler("ibb_stream_start", self.stream_opened) + self.add_event_handler("ibb_stream_start", self.stream_opened, threaded=True) self.add_event_handler("ibb_stream_data", self.stream_data) def start(self, event): @@ -69,7 +69,7 @@ class IBBReceiver(sleekxmpp.ClientXMPP): def accept_stream(self, iq): """ - Check that it is ok to accept a stream request. + Check that it is ok to accept a stream request. Controlling stream acceptance can be done via either: - setting 'auto_accept' to False in the plugin @@ -83,9 +83,7 @@ class IBBReceiver(sleekxmpp.ClientXMPP): return True def stream_opened(self, stream): - # NOTE: IBB streams are bi-directional, so the original sender is - # now the opened stream's receiver. - print('Stream opened: %s from %s' % (stream.sid, stream.receiver)) + print('Stream opened: %s from %s' % (stream.sid, stream.peer_jid)) # You could run a loop reading from the stream using stream.recv(), # or use the ibb_stream_data event. diff --git a/sleekxmpp/plugins/xep_0047/ibb.py b/sleekxmpp/plugins/xep_0047/ibb.py index 1960308e..6110b26c 100644 --- a/sleekxmpp/plugins/xep_0047/ibb.py +++ b/sleekxmpp/plugins/xep_0047/ibb.py @@ -28,9 +28,9 @@ class XEP_0047(BasePlugin): } def plugin_init(self): - self.streams = {} - self.pending_streams = {} - self.pending_close_streams = {} + self._streams = {} + self._pending_streams = {} + self._pending_lock = threading.Lock() self._stream_lock = threading.Lock() self._preauthed_sids_lock = threading.Lock() @@ -64,6 +64,9 @@ class XEP_0047(BasePlugin): self.api.register(self._authorized, 'authorized', default=True) self.api.register(self._authorized_sid, 'authorized_sid', default=True) self.api.register(self._preauthorize_sid, 'preauthorize_sid', default=True) + self.api.register(self._get_stream, 'get_stream', default=True) + self.api.register(self._set_stream, 'set_stream', default=True) + self.api.register(self._del_stream, 'del_stream', default=True) def plugin_end(self): self.xmpp.remove_handler('IBB Open') @@ -75,6 +78,17 @@ class XEP_0047(BasePlugin): def session_bind(self, jid): self.xmpp['xep_0030'].add_feature('http://jabber.org/protocol/ibb') + def _get_stream(self, jid, sid, peer_jid, data): + return self._streams.get((jid, sid, peer_jid), None) + + def _set_stream(self, jid, sid, peer_jid, stream): + self._streams[(jid, sid, peer_jid)] = stream + + def _del_stream(self, jid, sid, peer_jid, data): + with self._streams_lock: + if (jid, sid, peer_jid) in self._streams: + del self._streams[(jid, sid, peer_jid)] + def _accept_stream(self, iq): receiver = iq['to'] sender = iq['from'] @@ -121,9 +135,9 @@ class XEP_0047(BasePlugin): use_messages) with self._stream_lock: - self.pending_streams[iq['id']] = stream + self._pending_streams[iq['id']] = stream - self.pending_streams[iq['id']] = stream + self._pending_streams[iq['id']] = stream if block: resp = iq.send(timeout=timeout) @@ -143,23 +157,26 @@ class XEP_0047(BasePlugin): def _handle_opened_stream(self, iq): if iq['type'] == 'result': with self._stream_lock: - stream = self.pending_streams.get(iq['id'], None) - if stream is not None: - stream.self_jid = iq['to'] - stream.peer_jid = iq['from'] - stream.stream_started.set() - self.streams[stream.sid] = stream - self.xmpp.event('ibb_stream_start', stream) - self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream) + stream = self._pending_streams.get(iq['id'], None) + if stream is not None: + log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from']) + stream.self_jid = iq['to'] + stream.peer_jid = iq['from'] + stream.stream_started.set() + self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) + self.xmpp.event('ibb_stream_start', stream) + self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream) with self._stream_lock: - if iq['id'] in self.pending_streams: - del self.pending_streams[iq['id']] + if iq['id'] in self._pending_streams: + del self._pending_streams[iq['id']] def _handle_open_request(self, iq): sid = iq['ibb_open']['sid'] size = iq['ibb_open']['block_size'] or self.block_size + log.debug('Received IBB stream request from %s', iq['from']) + if not sid: raise XMPPError(etype='modify', condition='bad-request') @@ -173,7 +190,7 @@ class XEP_0047(BasePlugin): iq['to'], iq['from'], self.window_size) stream.stream_started.set() - self.streams[sid] = stream + self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) iq.reply() iq.send() @@ -182,7 +199,7 @@ class XEP_0047(BasePlugin): def _handle_data(self, stanza): sid = stanza['ibb_data']['sid'] - stream = self.streams.get(sid, None) + stream = self.api['get_stream'](stanza['to'], sid, stanza['from']) if stream is not None and stanza['from'] == stream.peer_jid: stream._recv_data(stanza) else: @@ -190,8 +207,9 @@ class XEP_0047(BasePlugin): def _handle_close(self, iq): sid = iq['ibb_close']['sid'] - stream = self.streams.get(sid, None) + stream = self.api['get_stream'](stanza['to'], sid, stanza['from']) if stream is not None and iq['from'] == stream.peer_jid: stream._closed(iq) + self.api['del_stream'](stream.self_jid, stream.sid, stream.peer_jid) else: raise XMPPError('item-not-found') -- cgit v1.2.3