From 5ab77c745270d7d5c016c1dc7ef2a82533a4b16e Mon Sep 17 00:00:00 2001 From: Florent Le Coz Date: Thu, 17 Jul 2014 14:19:04 +0200 Subject: Rename to slixmpp --- slixmpp/plugins/xep_0065/proxy.py | 292 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 292 insertions(+) create mode 100644 slixmpp/plugins/xep_0065/proxy.py (limited to 'slixmpp/plugins/xep_0065/proxy.py') diff --git a/slixmpp/plugins/xep_0065/proxy.py b/slixmpp/plugins/xep_0065/proxy.py new file mode 100644 index 00000000..654d9dce --- /dev/null +++ b/slixmpp/plugins/xep_0065/proxy.py @@ -0,0 +1,292 @@ +import logging +import threading +import socket + +from hashlib import sha1 +from uuid import uuid4 + +from slixmpp.thirdparty.socks import socksocket, PROXY_TYPE_SOCKS5 + +from slixmpp.stanza import Iq +from slixmpp.exceptions import XMPPError +from slixmpp.xmlstream import register_stanza_plugin +from slixmpp.xmlstream.handler import Callback +from slixmpp.xmlstream.matcher import StanzaPath +from slixmpp.plugins.base import base_plugin + +from slixmpp.plugins.xep_0065 import stanza, Socks5 + + +log = logging.getLogger(__name__) + + +class XEP_0065(base_plugin): + + name = 'xep_0065' + description = "Socks5 Bytestreams" + dependencies = set(['xep_0030']) + default_config = { + 'auto_accept': False + } + + def plugin_init(self): + register_stanza_plugin(Iq, Socks5) + + self._proxies = {} + self._sessions = {} + self._sessions_lock = threading.Lock() + + self._preauthed_sids_lock = threading.Lock() + self._preauthed_sids = {} + + self.xmpp.register_handler( + Callback('Socks5 Bytestreams', + StanzaPath('iq@type=set/socks/streamhost'), + self._handle_streamhost)) + + self.api.register(self._authorized, 'authorized', default=True) + self.api.register(self._authorized_sid, 'authorized_sid', default=True) + self.api.register(self._preauthorize_sid, 'preauthorize_sid', default=True) + + def session_bind(self, jid): + self.xmpp['xep_0030'].add_feature(Socks5.namespace) + + def plugin_end(self): + self.xmpp.remove_handler('Socks5 Bytestreams') + self.xmpp.remove_handler('Socks5 Streamhost Used') + self.xmpp['xep_0030'].del_feature(feature=Socks5.namespace) + + def get_socket(self, sid): + """Returns the socket associated to the SID.""" + return self._sessions.get(sid, None) + + def handshake(self, to, ifrom=None, sid=None, timeout=None): + """ Starts the handshake to establish the socks5 bytestreams + connection. + """ + if not self._proxies: + self._proxies = self.discover_proxies() + + if sid is None: + sid = uuid4().hex + + used = self.request_stream(to, sid=sid, ifrom=ifrom, timeout=timeout) + proxy = used['socks']['streamhost_used']['jid'] + + if proxy not in self._proxies: + log.warning('Received unknown SOCKS5 proxy: %s', proxy) + return + + with self._sessions_lock: + self._sessions[sid] = self._connect_proxy( + sid, + self.xmpp.boundjid, + to, + self._proxies[proxy][0], + self._proxies[proxy][1], + peer=to) + + # Request that the proxy activate the session with the target. + self.activate(proxy, sid, to, timeout=timeout) + socket = self.get_socket(sid) + self.xmpp.event('stream:%s:%s' % (sid, to), socket) + return socket + + def request_stream(self, to, sid=None, ifrom=None, block=True, timeout=None, callback=None): + if sid is None: + sid = uuid4().hex + + # Requester initiates S5B negotiation with Target by sending + # IQ-set that includes the JabberID and network address of + # StreamHost as well as the StreamID (SID) of the proposed + # bytestream. + iq = self.xmpp.Iq() + iq['to'] = to + iq['from'] = ifrom + iq['type'] = 'set' + iq['socks']['sid'] = sid + for proxy, (host, port) in self._proxies.items(): + iq['socks'].add_streamhost(proxy, host, port) + return iq.send(block=block, timeout=timeout, callback=callback) + + def discover_proxies(self, jid=None, ifrom=None, timeout=None): + """Auto-discover the JIDs of SOCKS5 proxies on an XMPP server.""" + if jid is None: + if self.xmpp.is_component: + jid = self.xmpp.server + else: + jid = self.xmpp.boundjid.server + + discovered = set() + + disco_items = self.xmpp['xep_0030'].get_items(jid, timeout=timeout) + + for item in disco_items['disco_items']['items']: + try: + disco_info = self.xmpp['xep_0030'].get_info(item[0], timeout=timeout) + except XMPPError: + continue + else: + # Verify that the identity is a bytestream proxy. + identities = disco_info['disco_info']['identities'] + for identity in identities: + if identity[0] == 'proxy' and identity[1] == 'bytestreams': + discovered.add(disco_info['from']) + + for jid in discovered: + try: + addr = self.get_network_address(jid, ifrom=ifrom, timeout=timeout) + self._proxies[jid] = (addr['socks']['streamhost']['host'], + addr['socks']['streamhost']['port']) + except XMPPError: + continue + + return self._proxies + + def get_network_address(self, proxy, ifrom=None, block=True, timeout=None, callback=None): + """Get the network information of a proxy.""" + iq = self.xmpp.Iq(sto=proxy, stype='get', sfrom=ifrom) + iq.enable('socks') + return iq.send(block=block, timeout=timeout, callback=callback) + + def _handle_streamhost(self, iq): + """Handle incoming SOCKS5 session request.""" + sid = iq['socks']['sid'] + if not sid: + raise XMPPError(etype='modify', condition='bad-request') + + if not self._accept_stream(iq): + raise XMPPError(etype='modify', condition='not-acceptable') + + streamhosts = iq['socks']['streamhosts'] + conn = None + used_streamhost = None + + sender = iq['from'] + for streamhost in streamhosts: + try: + conn = self._connect_proxy(sid, + sender, + self.xmpp.boundjid, + streamhost['host'], + streamhost['port'], + peer=sender) + used_streamhost = streamhost['jid'] + break + except socket.error: + continue + else: + raise XMPPError(etype='cancel', condition='item-not-found') + + iq.reply() + with self._sessions_lock: + self._sessions[sid] = conn + iq['socks']['sid'] = sid + iq['socks']['streamhost_used']['jid'] = used_streamhost + iq.send() + self.xmpp.event('socks5_stream', conn) + self.xmpp.event('stream:%s:%s' % (sid, conn.peer_jid), conn) + + def activate(self, proxy, sid, target, ifrom=None, block=True, timeout=None, callback=None): + """Activate the socks5 session that has been negotiated.""" + iq = self.xmpp.Iq(sto=proxy, stype='set', sfrom=ifrom) + iq['socks']['sid'] = sid + iq['socks']['activate'] = target + iq.send(block=block, timeout=timeout, callback=callback) + + def deactivate(self, sid): + """Closes the proxy socket associated with this SID.""" + sock = self._sessions.get(sid) + if sock: + try: + # sock.close() will also delete sid from self._sessions (see _connect_proxy) + sock.close() + except socket.error: + pass + # Though this should not be neccessary remove the closed session anyway + with self._sessions_lock: + if sid in self._sessions: + log.warn(('SOCKS5 session with sid = "%s" was not ' + + 'removed from _sessions by sock.close()') % sid) + del self._sessions[sid] + + def close(self): + """Closes all proxy sockets.""" + for sid, sock in self._sessions.items(): + sock.close() + with self._sessions_lock: + self._sessions = {} + + def _connect_proxy(self, sid, requester, target, proxy, proxy_port, peer=None): + """ Establishes a connection between the client and the server-side + Socks5 proxy. + + sid : The StreamID. + requester : The JID of the requester. + target : The JID of the target. + proxy_host : The hostname or the IP of the proxy. + proxy_port : The port of the proxy. or + peer : The JID for the other side of the stream, regardless + of target or requester status. + """ + # Because the xep_0065 plugin uses the proxy_port as string, + # the Proxy class accepts the proxy_port argument as a string + # or an integer. Here, we force to use the port as an integer. + proxy_port = int(proxy_port) + + sock = socksocket() + sock.setproxy(PROXY_TYPE_SOCKS5, proxy, port=proxy_port) + + # The hostname MUST be SHA1(SID + Requester JID + Target JID) + # where the output is hexadecimal-encoded (not binary). + digest = sha1() + digest.update(sid) + digest.update(str(requester)) + digest.update(str(target)) + + dest = digest.hexdigest() + + # The port MUST be 0. + sock.connect((dest, 0)) + log.info('Socket connected.') + + _close = sock.close + def close(*args, **kwargs): + with self._sessions_lock: + if sid in self._sessions: + del self._sessions[sid] + _close() + log.info('Socket closed.') + sock.close = close + + sock.peer_jid = peer + sock.self_jid = target if requester == peer else requester + + self.xmpp.event('socks_connected', sid) + return sock + + def _accept_stream(self, iq): + receiver = iq['to'] + sender = iq['from'] + sid = iq['socks']['sid'] + + if self.api['authorized_sid'](receiver, sid, sender, iq): + return True + return self.api['authorized'](receiver, sid, sender, iq) + + def _authorized(self, jid, sid, ifrom, iq): + return self.auto_accept + + def _authorized_sid(self, jid, sid, ifrom, iq): + with self._preauthed_sids_lock: + log.debug('>>> authed sids: %s', self._preauthed_sids) + log.debug('>>> lookup: %s %s %s', jid, sid, ifrom) + if (jid, sid, ifrom) in self._preauthed_sids: + del self._preauthed_sids[(jid, sid, ifrom)] + return True + return False + + def _preauthorize_sid(self, jid, sid, ifrom, data): + log.debug('>>>> %s %s %s %s', jid, sid, ifrom, data) + with self._preauthed_sids_lock: + self._preauthed_sids[(jid, sid, ifrom)] = True -- cgit v1.2.3