From 9165cbf7f6839ee8ba2a4514b297f27fb019098a Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Wed, 23 Jan 2013 02:18:27 -0800 Subject: Cleanup and expand XEP-0065 plugin. --- sleekxmpp/plugins/xep_0065/__init__.py | 2 + sleekxmpp/plugins/xep_0065/proxy.py | 319 ++++++++++++++++----------------- sleekxmpp/plugins/xep_0065/stanza.py | 54 +++--- 3 files changed, 186 insertions(+), 189 deletions(-) diff --git a/sleekxmpp/plugins/xep_0065/__init__.py b/sleekxmpp/plugins/xep_0065/__init__.py index c577d859..feca2ef1 100644 --- a/sleekxmpp/plugins/xep_0065/__init__.py +++ b/sleekxmpp/plugins/xep_0065/__init__.py @@ -1,4 +1,6 @@ from sleekxmpp.plugins.base import register_plugin + +from sleekxmpp.plugins.xep_0065.stanza import Socks5 from sleekxmpp.plugins.xep_0065.proxy import XEP_0065 diff --git a/sleekxmpp/plugins/xep_0065/proxy.py b/sleekxmpp/plugins/xep_0065/proxy.py index dec37612..473dd033 100644 --- a/sleekxmpp/plugins/xep_0065/proxy.py +++ b/sleekxmpp/plugins/xep_0065/proxy.py @@ -1,210 +1,195 @@ import logging +import threading +import socket from hashlib import sha1 from uuid import uuid4 -from sleekxmpp.plugins.xep_0065 import stanza +from sleekxmpp.thirdparty.socks import socksocket, PROXY_TYPE_SOCKS5 -from sleekxmpp.plugins.base import base_plugin +from sleekxmpp.stanza import Iq +from sleekxmpp.exceptions import XMPPError +from sleekxmpp.xmlstream import register_stanza_plugin from sleekxmpp.xmlstream.handler import Callback from sleekxmpp.xmlstream.matcher import StanzaPath -from sleekxmpp.thirdparty.socks import socksocket, PROXY_TYPE_SOCKS5 +from sleekxmpp.plugins.base import base_plugin + +from sleekxmpp.plugins.xep_0065 import stanza, Socks5 + -# Registers the sleekxmpp logger log = logging.getLogger(__name__) class XEP_0065(base_plugin): - """ - XEP-0065 Socks5 Bytestreams - """ - description = "Socks5 Bytestreams" - dependencies = set(['xep_0030', ]) - xep = '0065' name = 'xep_0065' - - # A dict contains for each SID, the proxy thread currently - # running. - proxies = {} + description = "Socks5 Bytestreams" + dependencies = set(['xep_0030']) def plugin_init(self): - """ Initializes the xep_0065 plugin and all event callbacks. - """ + register_stanza_plugin(Iq, Socks5) - # Shortcuts to access to the xep_0030 plugin. - self.disco = self.xmpp['xep_0030'] + self._proxies = {} + self._sessions = {} + self._sessions_lock = threading.Lock() - # Handler for the streamhost stanza. - self.xmpp.registerHandler( + self.xmpp.register_handler( Callback('Socks5 Bytestreams', StanzaPath('iq@type=set/socks/streamhost'), self._handle_streamhost)) - # Handler for the streamhost-used stanza. - self.xmpp.registerHandler( - Callback('Socks5 Bytestreams', - StanzaPath('iq@type=result/socks/streamhost-used'), - self._handle_streamhost_used)) + def session_bind(self, jid): + self.xmpp['xep_0030'].add_feature(Socks5.namespace) - def get_socket(self, sid): - """ Returns the socket associated to the SID. - """ + def plugin_end(self): + self.xmpp.remove_handler('Socks5 Bytestreams') + self.xmpp.remove_handler('Socks5 Streamhost Used') + self.xmpp['xep_0030'].del_feature(feature=Socks5.namespace) - proxy = self.proxies.get(sid) - if proxy: - return proxy + def get_socket(self, sid): + """Returns the socket associated to the SID.""" + return self._sessions.get(sid, None) - def handshake(self, to, streamer=None): + def handshake(self, to, ifrom=None, timeout=None): """ Starts the handshake to establish the socks5 bytestreams connection. """ + if not self._proxies: + self._proxies = self.discover_proxies() - # Discovers the proxy. - self.streamer = streamer or self.discover_proxy() + sid = uuid4().hex - # Requester requests network address from the proxy. - streamhost = self.get_network_address(self.streamer) - self.proxy_host = streamhost['socks']['streamhost']['host'] - self.proxy_port = streamhost['socks']['streamhost']['port'] + used = self.request_stream(to, sid=sid, ifrom=ifrom, timeout=timeout) + proxy = used['socks']['streamhost_used']['jid'] - # Generates the SID for this new handshake. - sid = uuid4().hex + if proxy not in self._proxies: + log.warning('Received unknown SOCKS5 proxy: %s', proxy) + return + + with self._sessions_lock: + self._sessions[sid] = self._connect_proxy( + sid, + self.xmpp.boundjid, + to, + self._proxies[proxy][0], + self._proxies[proxy][1]) + + # Request that the proxy activate the session with the target. + self.activate(proxy, sid, to, timeout=timeout) + return self.get_socket(sid) - # Requester initiates S5B negotation with Target by sending + def request_stream(self, to, sid=None, ifrom=None, block=True, timeout=None, callback=None): + if sid is None: + sid = uuid4().hex + + # Requester initiates S5B negotiation with Target by sending # IQ-set that includes the JabberID and network address of # StreamHost as well as the StreamID (SID) of the proposed # bytestream. - iq = self.xmpp.Iq(sto=to, stype='set') + iq = self.xmpp.Iq() + iq['to'] = to + iq['from'] = ifrom + iq['type'] = 'set' iq['socks']['sid'] = sid - iq['socks']['streamhost']['jid'] = self.streamer - iq['socks']['streamhost']['host'] = self.proxy_host - iq['socks']['streamhost']['port'] = self.proxy_port - - # Sends the new IQ. - return iq.send() + for proxy, (host, port) in self._proxies.items(): + iq['socks'].add_streamhost(proxy, host, port) + return iq.send(block=block, timeout=timeout, callback=callback) - def discover_proxy(self): - """ Auto-discovers (using XEP 0030) the available bytestream - proxy on the XMPP server. + def discover_proxies(self, jid=None, ifrom=None, timeout=None): + """Auto-discover the JIDs of SOCKS5 proxies on an XMPP server.""" + if jid is None: + if self.xmpp.is_component: + jid = self.xmpp.server + else: + jid = self.xmpp.boundjid.server - Returns the JID of the proxy. - """ + discovered = set() - # Gets all disco items. - disco_items = self.disco.get_items(self.xmpp.server) + disco_items = self.xmpp['xep_0030'].get_items(jid, timeout=timeout) for item in disco_items['disco_items']['items']: - # For each items, gets the disco info. - disco_info = self.disco.get_info(item[0]) - - # Gets and verifies if the identity is a bytestream proxy. - identities = disco_info['disco_info']['identities'] - for identity in identities: - if identity[0] == 'proxy' and identity[1] == 'bytestreams': - # Returns when the first occurence is found. - return '%s' % disco_info['from'] - - def get_network_address(self, streamer): - """ Gets the streamhost information of the proxy. - - streamer : The jid of the proxy. - """ - - iq = self.xmpp.Iq(sto=streamer, stype='get') - iq['socks'] # Adds the query eleme to the iq. - - return iq.send() + try: + disco_info = self.xmpp['xep_0030'].get_info(item[0], timeout=timeout) + except XMPPError: + continue + else: + # Verify that the identity is a bytestream proxy. + identities = disco_info['disco_info']['identities'] + for identity in identities: + if identity[0] == 'proxy' and identity[1] == 'bytestreams': + discovered.add(disco_info['from']) + + for jid in discovered: + try: + addr = self.get_network_address(jid, ifrom=ifrom, timeout=timeout) + self._proxies[jid] = (addr['socks']['streamhost']['host'], + addr['socks']['streamhost']['port']) + except XMPPError: + continue + + return self._proxies + + def get_network_address(self, proxy, ifrom=None, block=True, timeout=None, callback=None): + """Get the network information of a proxy.""" + iq = self.xmpp.Iq(sto=proxy, stype='get', sfrom=ifrom) + iq.enable('socks') + return iq.send(block=block, timeout=timeout, callback=callback) def _handle_streamhost(self, iq): - """ Handles all streamhost stanzas. - """ - - # Registers the streamhost info. - self.streamer = iq['socks']['streamhost']['jid'] - self.proxy_host = iq['socks']['streamhost']['host'] - self.proxy_port = iq['socks']['streamhost']['port'] - - # Sets the SID, the requester and the target. - sid = iq['socks']['sid'] - requester = '%s' % iq['from'] - target = '%s' % self.xmpp.boundjid - - # Next the Target attempts to open a standard TCP socket on - # the network address of the Proxy. - self.proxy = self._connect_proxy(sid, requester, target, - self.proxy_host, self.proxy_port) - - # Registers the new proxy to the proxies dict. - self.proxies[sid] = self.proxy - - # Replies to the incoming iq with a streamhost-used stanza. - res_iq = iq.reply() - res_iq['socks']['sid'] = sid - res_iq['socks']['streamhost-used']['jid'] = self.streamer - - # Sends the IQ - return res_iq.send() - - def _handle_streamhost_used(self, iq): - """ Handles all streamhost-used stanzas. - """ - - # Sets the SID, the requester and the target. + """Handle incoming SOCKS5 session request.""" sid = iq['socks']['sid'] - requester = '%s' % self.xmpp.boundjid - target = '%s' % iq['from'] - - # The Requester will establish a connection to the SOCKS5 - # proxy in the same way the Target did. - self.proxy = self._connect_proxy(sid, requester, target, - self.proxy_host, self.proxy_port) - - # Registers the new thread in the proxy_thread dict. - self.proxies[sid] = self.proxy - - # Requester sends IQ-set to StreamHost requesting that StreamHost - # activate the bytestream associated with the StreamID. - self.activate(iq['socks']['sid'], target) - - def activate(self, sid, to): - """ IQ-set to StreamHost requesting that StreamHost activate - the bytestream associated with the StreamID. - """ - - # Creates the activate IQ. - act_iq = self.xmpp.Iq(sto=self.streamer, stype='set') - act_iq['socks']['sid'] = sid - act_iq['socks']['activate'] = to + if not sid: + raise XMPPError(etype='modify', condition='not-acceptable') + + streamhosts = iq['socks']['streamhosts'] + conn = None + used_streamhost = None + + for streamhost in streamhosts: + try: + conn = self._connect_proxy(sid, + iq['from'], + self.xmpp.boundjid, + streamhost['host'], + streamhost['port']) + used_streamhost = streamhost['jid'] + break + except socket.error: + continue + else: + raise XMPPError(etype='cancel', condition='item-not-found') + + iq.reply() + with self._sessions_lock: + self._sessions[sid] = conn + iq['socks']['sid'] = sid + iq['socks']['streamhost_used']['jid'] = used_streamhost + iq.send() - # Send the IQ. - act_iq.send() + def activate(self, proxy, sid, target, ifrom=None, block=True, timeout=None, callback=None): + """Activate the socks5 session that has been negotiated.""" + iq = self.xmpp.Iq(sto=proxy, stype='set', sfrom=ifrom) + iq['socks']['sid'] = sid + iq['socks']['activate'] = target + iq.send(block=block, timeout=timeout, callback=callback) def deactivate(self, sid): - """ Closes the Proxy thread associated to this SID. - """ - - proxy = self.proxies.get(sid) - if proxy: - proxy.s.close() - del self.proxies[sid] + """Closes the proxy socket associated with this SID.""" + sock = self._sessions.get(sid) + if sock: + try: + sock.close() + except socket.error: + pass + with self._sessions_lock: + del self._sessions[sid] def close(self): - """ Closes all Proxy threads. - """ - - for sid, proxy in self.proxies.items(): - proxy.close() - del self.proxies[sid] - - def send(self, sid, data): - """ Sends the data over the Proxy socket associated to the - SID. - """ - - proxy = self.get_socket(sid) - if proxy: - proxy.sendall(data) + """Closes all proxy sockets.""" + for sid, sock in self._sessions.items(): + sock.close() + with self._sessions_lock: + self._sessions = {} def _connect_proxy(self, sid, requester, target, proxy, proxy_port): """ Establishes a connection between the client and the server-side @@ -216,31 +201,35 @@ class XEP_0065(base_plugin): proxy_host : The hostname or the IP of the proxy. proxy_port : The port of the proxy. or """ - # Because the xep_0065 plugin uses the proxy_port as string, # the Proxy class accepts the proxy_port argument as a string # or an integer. Here, we force to use the port as an integer. proxy_port = int(proxy_port) - # Creates the socks5 proxy socket sock = socksocket() sock.setproxy(PROXY_TYPE_SOCKS5, proxy, port=proxy_port) # The hostname MUST be SHA1(SID + Requester JID + Target JID) # where the output is hexadecimal-encoded (not binary). digest = sha1() - digest.update(sid) # SID - digest.update(requester) # Requester JID - digest.update(target) # Target JID + digest.update(sid) + digest.update(str(requester)) + digest.update(str(target)) - # Computes the digest in hex. - dest = '%s' % digest.hexdigest() + dest = digest.hexdigest() # The port MUST be 0. sock.connect((dest, 0)) log.info('Socket connected.') - # Send the XMPP event. + _close = sock.close + def close(*args, **kwargs): + with self._sessions_lock: + if sid in self._sessions: + del self._sessions[sid] + _close() + sock.close = close + self.xmpp.event('socks_connected', sid) return sock diff --git a/sleekxmpp/plugins/xep_0065/stanza.py b/sleekxmpp/plugins/xep_0065/stanza.py index ae57aba8..e48bf1b5 100644 --- a/sleekxmpp/plugins/xep_0065/stanza.py +++ b/sleekxmpp/plugins/xep_0065/stanza.py @@ -1,41 +1,47 @@ -from sleekxmpp import Iq +from sleekxmpp.jid import JID from sleekxmpp.xmlstream import ElementBase, register_stanza_plugin -# The protocol namespace defined in the Socks5Bytestream (0065) spec. -namespace = 'http://jabber.org/protocol/bytestreams' +class Socks5(ElementBase): + name = 'query' + namespace = 'http://jabber.org/protocol/bytestreams' + plugin_attrib = 'socks' + interfaces = set(['sid', 'activate']) + sub_interfaces = set(['activate']) + def add_streamhost(self, jid, host, port): + sh = StreamHost(parent=self) + sh['jid'] = jid + sh['host'] = host + sh['port'] = port -class StreamHost(ElementBase): - """ The streamhost xml element. - """ - namespace = namespace +class StreamHost(ElementBase): name = 'streamhost' + namespace = 'http://jabber.org/protocol/bytestreams' plugin_attrib = 'streamhost' - interfaces = set(('host', 'jid', 'port')) + plugin_multi_attrib = 'streamhosts' + interfaces = set(['host', 'jid', 'port']) + def set_jid(self, value): + return self._set_attr('jid', str(value)) -class StreamHostUsed(ElementBase): - """ The streamhost-used xml element. - """ + def get_jid(self): + return JID(self._get_attr('jid')) - namespace = namespace + +class StreamHostUsed(ElementBase): name = 'streamhost-used' - plugin_attrib = 'streamhost-used' - interfaces = set(('jid',)) + namespace = 'http://jabber.org/protocol/bytestreams' + plugin_attrib = 'streamhost_used' + interfaces = set(['jid']) + def set_jid(self, value): + return self._set_attr('jid', str(value)) -class Socks5(ElementBase): - """ The query xml element. - """ + def get_jid(self): + return JID(self._get_attr('jid')) - namespace = namespace - name = 'query' - plugin_attrib = 'socks' - interfaces = set(('sid', 'activate')) - sub_interfaces = set(('activate',)) -register_stanza_plugin(Iq, Socks5) -register_stanza_plugin(Socks5, StreamHost) +register_stanza_plugin(Socks5, StreamHost, iterable=True) register_stanza_plugin(Socks5, StreamHostUsed) -- cgit v1.2.3