From 032d41dbb827a9cdf907b6724743bf7373575f2d Mon Sep 17 00:00:00 2001 From: Sandro Munda Date: Sun, 4 Nov 2012 11:38:57 +0100 Subject: Adapted the xep_0065 plugin to be compatible with all kind of others XMPP client. Sent a 'socks_connected' xmpp event when the streamer is connected. --- sleekxmpp/plugins/xep_0065/proxy.py | 213 ++++++------------------------------ 1 file changed, 32 insertions(+), 181 deletions(-) (limited to 'sleekxmpp/plugins') diff --git a/sleekxmpp/plugins/xep_0065/proxy.py b/sleekxmpp/plugins/xep_0065/proxy.py index 73c3c63d..dec37612 100644 --- a/sleekxmpp/plugins/xep_0065/proxy.py +++ b/sleekxmpp/plugins/xep_0065/proxy.py @@ -1,12 +1,6 @@ -import sys import logging -import struct -import pickle -import socket -from threading import Thread, Event from hashlib import sha1 -from select import select from uuid import uuid4 from sleekxmpp.plugins.xep_0065 import stanza @@ -32,7 +26,7 @@ class XEP_0065(base_plugin): # A dict contains for each SID, the proxy thread currently # running. - proxy_threads = {} + proxies = {} def plugin_init(self): """ Initializes the xep_0065 plugin and all event callbacks. @@ -57,9 +51,9 @@ class XEP_0065(base_plugin): """ Returns the socket associated to the SID. """ - proxy = self.proxy_threads.get(sid) + proxy = self.proxies.get(sid) if proxy: - return proxy.s + return proxy def handshake(self, to, streamer=None): """ Starts the handshake to establish the socks5 bytestreams @@ -138,15 +132,11 @@ class XEP_0065(base_plugin): # Next the Target attempts to open a standard TCP socket on # the network address of the Proxy. - self.proxy_thread = Proxy(sid, requester, target, self.proxy_host, - self.proxy_port, self.on_recv) - self.proxy_thread.start() + self.proxy = self._connect_proxy(sid, requester, target, + self.proxy_host, self.proxy_port) - # Registers the new thread in the proxy_thread dict. - self.proxy_threads[sid] = self.proxy_thread - - # Wait until the proxy is connected - self.proxy_thread.connected.wait() + # Registers the new proxy to the proxies dict. + self.proxies[sid] = self.proxy # Replies to the incoming iq with a streamhost-used stanza. res_iq = iq.reply() @@ -163,23 +153,18 @@ class XEP_0065(base_plugin): # Sets the SID, the requester and the target. sid = iq['socks']['sid'] requester = '%s' % self.xmpp.boundjid - target = '%s' % iq['from'] + target = '%s' % iq['from'] # The Requester will establish a connection to the SOCKS5 # proxy in the same way the Target did. - self.proxy_thread = Proxy(sid, requester, target, self.proxy_host, - self.proxy_port, self.on_recv) - self.proxy_thread.start() + self.proxy = self._connect_proxy(sid, requester, target, + self.proxy_host, self.proxy_port) # Registers the new thread in the proxy_thread dict. - self.proxy_threads[sid] = self.proxy_thread + self.proxies[sid] = self.proxy - # Wait until the proxy is connected - self.proxy_thread.connected.wait() - - # Requester sends IQ-set to StreamHost requesting that - # StreamHost activate the bytestream associated with the - # StreamID. + # Requester sends IQ-set to StreamHost requesting that StreamHost + # activate the bytestream associated with the StreamID. self.activate(iq['socks']['sid'], target) def activate(self, sid, to): @@ -199,197 +184,63 @@ class XEP_0065(base_plugin): """ Closes the Proxy thread associated to this SID. """ - proxy = self.proxy_threads.get(sid) + proxy = self.proxies.get(sid) if proxy: proxy.s.close() - del self.proxy_threads[sid] + del self.proxies[sid] def close(self): """ Closes all Proxy threads. """ - for sid, proxy in self.proxy_threads.items(): - proxy.s.close() - del self.proxy_threads[sid] + for sid, proxy in self.proxies.items(): + proxy.close() + del self.proxies[sid] def send(self, sid, data): """ Sends the data over the Proxy socket associated to the SID. """ - proxy = self.proxy_threads.get(sid) + proxy = self.get_socket(sid) if proxy: - proxy.send(data) + proxy.sendall(data) - def on_recv(self, sid, data): - """ Calls when data is recv from the Proxy socket associated - to the SID. - - Triggers a socks_closed event if the socket is closed. The sid - is passed to this event. - - Triggers a socks_recv event if there's available data. A dict - that contains the sid and the data is passed to this event. - """ - - proxy = self.proxy_threads.get(sid) - if proxy: - if not data: - self.xmpp.event('socks_closed', sid) - else: - self.xmpp.event('socks_recv', {'sid': sid, 'data': data}) - - -class Proxy(Thread): - """ Establishes in a thread a connection between the client and - the server-side Socks5 proxy. - """ - - def __init__(self, sid, requester, target, proxy, proxy_port, - on_recv): - """ Initializes the proxy thread. + def _connect_proxy(self, sid, requester, target, proxy, proxy_port): + """ Establishes a connection between the client and the server-side + Socks5 proxy. sid : The StreamID. requester : The JID of the requester. target : The JID of the target. proxy_host : The hostname or the IP of the proxy. proxy_port : The port of the proxy. or - on_recv : A callback called when data are received from the - socket. """ - # Initializes the thread. - Thread.__init__(self) - # Because the xep_0065 plugin uses the proxy_port as string, # the Proxy class accepts the proxy_port argument as a string # or an integer. Here, we force to use the port as an integer. proxy_port = int(proxy_port) - # Creates a connected event to warn when to proxy is - # connected. - self.connected = Event() - - # Registers the arguments. - self.sid = sid - self.requester = requester - self.target = target - self.proxy = proxy - self.proxy_port = proxy_port - self.on_recv = on_recv - - def run(self): - """ Starts the thread. - """ - # Creates the socks5 proxy socket - self.s = socksocket() - self.s.setproxy(PROXY_TYPE_SOCKS5, self.proxy, port=self.proxy_port) + sock = socksocket() + sock.setproxy(PROXY_TYPE_SOCKS5, proxy, port=proxy_port) # The hostname MUST be SHA1(SID + Requester JID + Target JID) # where the output is hexadecimal-encoded (not binary). digest = sha1() - digest.update(self.sid) # SID - digest.update(self.requester) # Requester JID - digest.update(self.target) # Target JID + digest.update(sid) # SID + digest.update(requester) # Requester JID + digest.update(target) # Target JID # Computes the digest in hex. dest = '%s' % digest.hexdigest() # The port MUST be 0. - self.s.connect((dest, 0)) + sock.connect((dest, 0)) log.info('Socket connected.') - self.connected.set() - - # Blocks until the socket need to be closed. - self.listen() - - # Closes the socket. - self.s.close() - log.info('Socket closed.') - - def send(self, data): - """ Send data through the socket. - """ - - try: - packed_data = self._pack(data) - self.s.sendall(packed_data) - except pickle.PickleError as err: - log.error(err) - def _pack(self, data): - """ Packs the data. - """ - - # The data format is: `len_data`+`data`. Useful to receive all the data - # at once (avoid splitted data) thanks to the recv_size method. - data = pickle.dumps(data) - return struct.pack('>i', len(data)) + data - - def _unpack(self, data): - """ Unpacks the data. On error, log an error message and returns None. - """ - - try: - return pickle.loads(data) - except Exception as err: - log.error(err) - - def listen(self): - """ Listen for data on the socket. When receiving data, call - the callback on_recv callable. - """ + # Send the XMPP event. + self.xmpp.event('socks_connected', sid) - socket_open = True - while socket_open: - ins = [] - try: - # Wait any read available data on socket. Timeout - # after 5 secs. - ins, out, err = select([self.s, ], [], [], 5) - except socket.error as (errno, err): - # 9 means the socket is closed. It can be normal. Otherwise, - # log the error. - if errno != 9: - log.debug('Socket error: %s' % err) - break - except Exception as e: - log.debug(e) - break - - for s in ins: - data = self.recv_size(self.s) - if not data: - socket_open = False - else: - unpacked_data = self._unpack(data) - if unpacked_data: - self.on_recv(self.sid, unpacked_data) - - def recv_size(self, the_socket): - total_len = 0 - total_data = [] - size = sys.maxint - size_data = sock_data = '' - recv_size = 8192 - - while total_len < size: - sock_data = the_socket.recv(recv_size) - if not sock_data: - return ''.join(total_data) - - if not total_data: - if len(sock_data) > 4: - size_data += sock_data - size = struct.unpack('>i', size_data[:4])[0] - recv_size = size - if recv_size > 524288: - recv_size = 524288 - total_data.append(size_data[4:]) - else: - size_data += sock_data - else: - total_data.append(sock_data) - total_len = sum([len(i) for i in total_data]) - return ''.join(total_data) + return sock -- cgit v1.2.3