summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sleekxmpp/plugins/xep_0065/proxy.py213
1 files changed, 32 insertions, 181 deletions
diff --git a/sleekxmpp/plugins/xep_0065/proxy.py b/sleekxmpp/plugins/xep_0065/proxy.py
index 73c3c63d..dec37612 100644
--- a/sleekxmpp/plugins/xep_0065/proxy.py
+++ b/sleekxmpp/plugins/xep_0065/proxy.py
@@ -1,12 +1,6 @@
-import sys
import logging
-import struct
-import pickle
-import socket
-from threading import Thread, Event
from hashlib import sha1
-from select import select
from uuid import uuid4
from sleekxmpp.plugins.xep_0065 import stanza
@@ -32,7 +26,7 @@ class XEP_0065(base_plugin):
# A dict contains for each SID, the proxy thread currently
# running.
- proxy_threads = {}
+ proxies = {}
def plugin_init(self):
""" Initializes the xep_0065 plugin and all event callbacks.
@@ -57,9 +51,9 @@ class XEP_0065(base_plugin):
""" Returns the socket associated to the SID.
"""
- proxy = self.proxy_threads.get(sid)
+ proxy = self.proxies.get(sid)
if proxy:
- return proxy.s
+ return proxy
def handshake(self, to, streamer=None):
""" Starts the handshake to establish the socks5 bytestreams
@@ -138,15 +132,11 @@ class XEP_0065(base_plugin):
# Next the Target attempts to open a standard TCP socket on
# the network address of the Proxy.
- self.proxy_thread = Proxy(sid, requester, target, self.proxy_host,
- self.proxy_port, self.on_recv)
- self.proxy_thread.start()
+ self.proxy = self._connect_proxy(sid, requester, target,
+ self.proxy_host, self.proxy_port)
- # Registers the new thread in the proxy_thread dict.
- self.proxy_threads[sid] = self.proxy_thread
-
- # Wait until the proxy is connected
- self.proxy_thread.connected.wait()
+ # Registers the new proxy to the proxies dict.
+ self.proxies[sid] = self.proxy
# Replies to the incoming iq with a streamhost-used stanza.
res_iq = iq.reply()
@@ -163,23 +153,18 @@ class XEP_0065(base_plugin):
# Sets the SID, the requester and the target.
sid = iq['socks']['sid']
requester = '%s' % self.xmpp.boundjid
- target = '%s' % iq['from']
+ target = '%s' % iq['from']
# The Requester will establish a connection to the SOCKS5
# proxy in the same way the Target did.
- self.proxy_thread = Proxy(sid, requester, target, self.proxy_host,
- self.proxy_port, self.on_recv)
- self.proxy_thread.start()
+ self.proxy = self._connect_proxy(sid, requester, target,
+ self.proxy_host, self.proxy_port)
# Registers the new thread in the proxy_thread dict.
- self.proxy_threads[sid] = self.proxy_thread
+ self.proxies[sid] = self.proxy
- # Wait until the proxy is connected
- self.proxy_thread.connected.wait()
-
- # Requester sends IQ-set to StreamHost requesting that
- # StreamHost activate the bytestream associated with the
- # StreamID.
+ # Requester sends IQ-set to StreamHost requesting that StreamHost
+ # activate the bytestream associated with the StreamID.
self.activate(iq['socks']['sid'], target)
def activate(self, sid, to):
@@ -199,197 +184,63 @@ class XEP_0065(base_plugin):
""" Closes the Proxy thread associated to this SID.
"""
- proxy = self.proxy_threads.get(sid)
+ proxy = self.proxies.get(sid)
if proxy:
proxy.s.close()
- del self.proxy_threads[sid]
+ del self.proxies[sid]
def close(self):
""" Closes all Proxy threads.
"""
- for sid, proxy in self.proxy_threads.items():
- proxy.s.close()
- del self.proxy_threads[sid]
+ for sid, proxy in self.proxies.items():
+ proxy.close()
+ del self.proxies[sid]
def send(self, sid, data):
""" Sends the data over the Proxy socket associated to the
SID.
"""
- proxy = self.proxy_threads.get(sid)
+ proxy = self.get_socket(sid)
if proxy:
- proxy.send(data)
+ proxy.sendall(data)
- def on_recv(self, sid, data):
- """ Calls when data is recv from the Proxy socket associated
- to the SID.
-
- Triggers a socks_closed event if the socket is closed. The sid
- is passed to this event.
-
- Triggers a socks_recv event if there's available data. A dict
- that contains the sid and the data is passed to this event.
- """
-
- proxy = self.proxy_threads.get(sid)
- if proxy:
- if not data:
- self.xmpp.event('socks_closed', sid)
- else:
- self.xmpp.event('socks_recv', {'sid': sid, 'data': data})
-
-
-class Proxy(Thread):
- """ Establishes in a thread a connection between the client and
- the server-side Socks5 proxy.
- """
-
- def __init__(self, sid, requester, target, proxy, proxy_port,
- on_recv):
- """ Initializes the proxy thread.
+ def _connect_proxy(self, sid, requester, target, proxy, proxy_port):
+ """ Establishes a connection between the client and the server-side
+ Socks5 proxy.
sid : The StreamID. <str>
requester : The JID of the requester. <str>
target : The JID of the target. <str>
proxy_host : The hostname or the IP of the proxy. <str>
proxy_port : The port of the proxy. <str> or <int>
- on_recv : A callback called when data are received from the
- socket. <Callable>
"""
- # Initializes the thread.
- Thread.__init__(self)
-
# Because the xep_0065 plugin uses the proxy_port as string,
# the Proxy class accepts the proxy_port argument as a string
# or an integer. Here, we force to use the port as an integer.
proxy_port = int(proxy_port)
- # Creates a connected event to warn when to proxy is
- # connected.
- self.connected = Event()
-
- # Registers the arguments.
- self.sid = sid
- self.requester = requester
- self.target = target
- self.proxy = proxy
- self.proxy_port = proxy_port
- self.on_recv = on_recv
-
- def run(self):
- """ Starts the thread.
- """
-
# Creates the socks5 proxy socket
- self.s = socksocket()
- self.s.setproxy(PROXY_TYPE_SOCKS5, self.proxy, port=self.proxy_port)
+ sock = socksocket()
+ sock.setproxy(PROXY_TYPE_SOCKS5, proxy, port=proxy_port)
# The hostname MUST be SHA1(SID + Requester JID + Target JID)
# where the output is hexadecimal-encoded (not binary).
digest = sha1()
- digest.update(self.sid) # SID
- digest.update(self.requester) # Requester JID
- digest.update(self.target) # Target JID
+ digest.update(sid) # SID
+ digest.update(requester) # Requester JID
+ digest.update(target) # Target JID
# Computes the digest in hex.
dest = '%s' % digest.hexdigest()
# The port MUST be 0.
- self.s.connect((dest, 0))
+ sock.connect((dest, 0))
log.info('Socket connected.')
- self.connected.set()
-
- # Blocks until the socket need to be closed.
- self.listen()
-
- # Closes the socket.
- self.s.close()
- log.info('Socket closed.')
-
- def send(self, data):
- """ Send data through the socket.
- """
-
- try:
- packed_data = self._pack(data)
- self.s.sendall(packed_data)
- except pickle.PickleError as err:
- log.error(err)
- def _pack(self, data):
- """ Packs the data.
- """
-
- # The data format is: `len_data`+`data`. Useful to receive all the data
- # at once (avoid splitted data) thanks to the recv_size method.
- data = pickle.dumps(data)
- return struct.pack('>i', len(data)) + data
-
- def _unpack(self, data):
- """ Unpacks the data. On error, log an error message and returns None.
- """
-
- try:
- return pickle.loads(data)
- except Exception as err:
- log.error(err)
-
- def listen(self):
- """ Listen for data on the socket. When receiving data, call
- the callback on_recv callable.
- """
+ # Send the XMPP event.
+ self.xmpp.event('socks_connected', sid)
- socket_open = True
- while socket_open:
- ins = []
- try:
- # Wait any read available data on socket. Timeout
- # after 5 secs.
- ins, out, err = select([self.s, ], [], [], 5)
- except socket.error as (errno, err):
- # 9 means the socket is closed. It can be normal. Otherwise,
- # log the error.
- if errno != 9:
- log.debug('Socket error: %s' % err)
- break
- except Exception as e:
- log.debug(e)
- break
-
- for s in ins:
- data = self.recv_size(self.s)
- if not data:
- socket_open = False
- else:
- unpacked_data = self._unpack(data)
- if unpacked_data:
- self.on_recv(self.sid, unpacked_data)
-
- def recv_size(self, the_socket):
- total_len = 0
- total_data = []
- size = sys.maxint
- size_data = sock_data = ''
- recv_size = 8192
-
- while total_len < size:
- sock_data = the_socket.recv(recv_size)
- if not sock_data:
- return ''.join(total_data)
-
- if not total_data:
- if len(sock_data) > 4:
- size_data += sock_data
- size = struct.unpack('>i', size_data[:4])[0]
- recv_size = size
- if recv_size > 524288:
- recv_size = 524288
- total_data.append(size_data[4:])
- else:
- size_data += sock_data
- else:
- total_data.append(sock_data)
- total_len = sum([len(i) for i in total_data])
- return ''.join(total_data)
+ return sock