diff options
Diffstat (limited to 'sleekxmpp/plugins/xep_0065/proxy.py')
-rw-r--r-- | sleekxmpp/plugins/xep_0065/proxy.py | 235 |
1 files changed, 235 insertions, 0 deletions
diff --git a/sleekxmpp/plugins/xep_0065/proxy.py b/sleekxmpp/plugins/xep_0065/proxy.py new file mode 100644 index 00000000..473dd033 --- /dev/null +++ b/sleekxmpp/plugins/xep_0065/proxy.py @@ -0,0 +1,235 @@ +import logging +import threading +import socket + +from hashlib import sha1 +from uuid import uuid4 + +from sleekxmpp.thirdparty.socks import socksocket, PROXY_TYPE_SOCKS5 + +from sleekxmpp.stanza import Iq +from sleekxmpp.exceptions import XMPPError +from sleekxmpp.xmlstream import register_stanza_plugin +from sleekxmpp.xmlstream.handler import Callback +from sleekxmpp.xmlstream.matcher import StanzaPath +from sleekxmpp.plugins.base import base_plugin + +from sleekxmpp.plugins.xep_0065 import stanza, Socks5 + + +log = logging.getLogger(__name__) + + +class XEP_0065(base_plugin): + + name = 'xep_0065' + description = "Socks5 Bytestreams" + dependencies = set(['xep_0030']) + + def plugin_init(self): + register_stanza_plugin(Iq, Socks5) + + self._proxies = {} + self._sessions = {} + self._sessions_lock = threading.Lock() + + self.xmpp.register_handler( + Callback('Socks5 Bytestreams', + StanzaPath('iq@type=set/socks/streamhost'), + self._handle_streamhost)) + + def session_bind(self, jid): + self.xmpp['xep_0030'].add_feature(Socks5.namespace) + + def plugin_end(self): + self.xmpp.remove_handler('Socks5 Bytestreams') + self.xmpp.remove_handler('Socks5 Streamhost Used') + self.xmpp['xep_0030'].del_feature(feature=Socks5.namespace) + + def get_socket(self, sid): + """Returns the socket associated to the SID.""" + return self._sessions.get(sid, None) + + def handshake(self, to, ifrom=None, timeout=None): + """ Starts the handshake to establish the socks5 bytestreams + connection. + """ + if not self._proxies: + self._proxies = self.discover_proxies() + + sid = uuid4().hex + + used = self.request_stream(to, sid=sid, ifrom=ifrom, timeout=timeout) + proxy = used['socks']['streamhost_used']['jid'] + + if proxy not in self._proxies: + log.warning('Received unknown SOCKS5 proxy: %s', proxy) + return + + with self._sessions_lock: + self._sessions[sid] = self._connect_proxy( + sid, + self.xmpp.boundjid, + to, + self._proxies[proxy][0], + self._proxies[proxy][1]) + + # Request that the proxy activate the session with the target. + self.activate(proxy, sid, to, timeout=timeout) + return self.get_socket(sid) + + def request_stream(self, to, sid=None, ifrom=None, block=True, timeout=None, callback=None): + if sid is None: + sid = uuid4().hex + + # Requester initiates S5B negotiation with Target by sending + # IQ-set that includes the JabberID and network address of + # StreamHost as well as the StreamID (SID) of the proposed + # bytestream. + iq = self.xmpp.Iq() + iq['to'] = to + iq['from'] = ifrom + iq['type'] = 'set' + iq['socks']['sid'] = sid + for proxy, (host, port) in self._proxies.items(): + iq['socks'].add_streamhost(proxy, host, port) + return iq.send(block=block, timeout=timeout, callback=callback) + + def discover_proxies(self, jid=None, ifrom=None, timeout=None): + """Auto-discover the JIDs of SOCKS5 proxies on an XMPP server.""" + if jid is None: + if self.xmpp.is_component: + jid = self.xmpp.server + else: + jid = self.xmpp.boundjid.server + + discovered = set() + + disco_items = self.xmpp['xep_0030'].get_items(jid, timeout=timeout) + + for item in disco_items['disco_items']['items']: + try: + disco_info = self.xmpp['xep_0030'].get_info(item[0], timeout=timeout) + except XMPPError: + continue + else: + # Verify that the identity is a bytestream proxy. + identities = disco_info['disco_info']['identities'] + for identity in identities: + if identity[0] == 'proxy' and identity[1] == 'bytestreams': + discovered.add(disco_info['from']) + + for jid in discovered: + try: + addr = self.get_network_address(jid, ifrom=ifrom, timeout=timeout) + self._proxies[jid] = (addr['socks']['streamhost']['host'], + addr['socks']['streamhost']['port']) + except XMPPError: + continue + + return self._proxies + + def get_network_address(self, proxy, ifrom=None, block=True, timeout=None, callback=None): + """Get the network information of a proxy.""" + iq = self.xmpp.Iq(sto=proxy, stype='get', sfrom=ifrom) + iq.enable('socks') + return iq.send(block=block, timeout=timeout, callback=callback) + + def _handle_streamhost(self, iq): + """Handle incoming SOCKS5 session request.""" + sid = iq['socks']['sid'] + if not sid: + raise XMPPError(etype='modify', condition='not-acceptable') + + streamhosts = iq['socks']['streamhosts'] + conn = None + used_streamhost = None + + for streamhost in streamhosts: + try: + conn = self._connect_proxy(sid, + iq['from'], + self.xmpp.boundjid, + streamhost['host'], + streamhost['port']) + used_streamhost = streamhost['jid'] + break + except socket.error: + continue + else: + raise XMPPError(etype='cancel', condition='item-not-found') + + iq.reply() + with self._sessions_lock: + self._sessions[sid] = conn + iq['socks']['sid'] = sid + iq['socks']['streamhost_used']['jid'] = used_streamhost + iq.send() + + def activate(self, proxy, sid, target, ifrom=None, block=True, timeout=None, callback=None): + """Activate the socks5 session that has been negotiated.""" + iq = self.xmpp.Iq(sto=proxy, stype='set', sfrom=ifrom) + iq['socks']['sid'] = sid + iq['socks']['activate'] = target + iq.send(block=block, timeout=timeout, callback=callback) + + def deactivate(self, sid): + """Closes the proxy socket associated with this SID.""" + sock = self._sessions.get(sid) + if sock: + try: + sock.close() + except socket.error: + pass + with self._sessions_lock: + del self._sessions[sid] + + def close(self): + """Closes all proxy sockets.""" + for sid, sock in self._sessions.items(): + sock.close() + with self._sessions_lock: + self._sessions = {} + + def _connect_proxy(self, sid, requester, target, proxy, proxy_port): + """ Establishes a connection between the client and the server-side + Socks5 proxy. + + sid : The StreamID. <str> + requester : The JID of the requester. <str> + target : The JID of the target. <str> + proxy_host : The hostname or the IP of the proxy. <str> + proxy_port : The port of the proxy. <str> or <int> + """ + # Because the xep_0065 plugin uses the proxy_port as string, + # the Proxy class accepts the proxy_port argument as a string + # or an integer. Here, we force to use the port as an integer. + proxy_port = int(proxy_port) + + sock = socksocket() + sock.setproxy(PROXY_TYPE_SOCKS5, proxy, port=proxy_port) + + # The hostname MUST be SHA1(SID + Requester JID + Target JID) + # where the output is hexadecimal-encoded (not binary). + digest = sha1() + digest.update(sid) + digest.update(str(requester)) + digest.update(str(target)) + + dest = digest.hexdigest() + + # The port MUST be 0. + sock.connect((dest, 0)) + log.info('Socket connected.') + + _close = sock.close + def close(*args, **kwargs): + with self._sessions_lock: + if sid in self._sessions: + del self._sessions[sid] + _close() + sock.close = close + + self.xmpp.event('socks_connected', sid) + + return sock |