summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore5
-rw-r--r--sleekxmpp/plugins/xep_0065/proxy.py177
2 files changed, 36 insertions, 146 deletions
diff --git a/.gitignore b/.gitignore
index e7f6bd09..9a90daeb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,4 @@
-*.pyc
+*.py[co]
build/
dist/
MANIFEST
@@ -8,3 +8,6 @@ docs/_build/
.coverage
sleekxmpp.egg-info/
.ropeproject/
+4913
+*~
+.baboon/
diff --git a/sleekxmpp/plugins/xep_0065/proxy.py b/sleekxmpp/plugins/xep_0065/proxy.py
index b027e4e0..dec37612 100644
--- a/sleekxmpp/plugins/xep_0065/proxy.py
+++ b/sleekxmpp/plugins/xep_0065/proxy.py
@@ -1,10 +1,6 @@
-import sys
import logging
-import struct
-from threading import Thread, Event
from hashlib import sha1
-from select import select
from uuid import uuid4
from sleekxmpp.plugins.xep_0065 import stanza
@@ -30,7 +26,7 @@ class XEP_0065(base_plugin):
# A dict contains for each SID, the proxy thread currently
# running.
- proxy_threads = {}
+ proxies = {}
def plugin_init(self):
""" Initializes the xep_0065 plugin and all event callbacks.
@@ -55,9 +51,9 @@ class XEP_0065(base_plugin):
""" Returns the socket associated to the SID.
"""
- proxy = self.proxy_threads.get(sid)
+ proxy = self.proxies.get(sid)
if proxy:
- return proxy.s
+ return proxy
def handshake(self, to, streamer=None):
""" Starts the handshake to establish the socks5 bytestreams
@@ -136,15 +132,11 @@ class XEP_0065(base_plugin):
# Next the Target attempts to open a standard TCP socket on
# the network address of the Proxy.
- self.proxy_thread = Proxy(sid, requester, target, self.proxy_host,
- self.proxy_port, self.on_recv)
- self.proxy_thread.start()
+ self.proxy = self._connect_proxy(sid, requester, target,
+ self.proxy_host, self.proxy_port)
- # Registers the new thread in the proxy_thread dict.
- self.proxy_threads[sid] = self.proxy_thread
-
- # Wait until the proxy is connected
- self.proxy_thread.connected.wait()
+ # Registers the new proxy to the proxies dict.
+ self.proxies[sid] = self.proxy
# Replies to the incoming iq with a streamhost-used stanza.
res_iq = iq.reply()
@@ -161,23 +153,18 @@ class XEP_0065(base_plugin):
# Sets the SID, the requester and the target.
sid = iq['socks']['sid']
requester = '%s' % self.xmpp.boundjid
- target = '%s' % iq['from']
+ target = '%s' % iq['from']
# The Requester will establish a connection to the SOCKS5
# proxy in the same way the Target did.
- self.proxy_thread = Proxy(sid, requester, target, self.proxy_host,
- self.proxy_port, self.on_recv)
- self.proxy_thread.start()
+ self.proxy = self._connect_proxy(sid, requester, target,
+ self.proxy_host, self.proxy_port)
# Registers the new thread in the proxy_thread dict.
- self.proxy_threads[sid] = self.proxy_thread
-
- # Wait until the proxy is connected
- self.proxy_thread.connected.wait()
+ self.proxies[sid] = self.proxy
- # Requester sends IQ-set to StreamHost requesting that
- # StreamHost activate the bytestream associated with the
- # StreamID.
+ # Requester sends IQ-set to StreamHost requesting that StreamHost
+ # activate the bytestream associated with the StreamID.
self.activate(iq['socks']['sid'], target)
def activate(self, sid, to):
@@ -197,163 +184,63 @@ class XEP_0065(base_plugin):
""" Closes the Proxy thread associated to this SID.
"""
- proxy = self.proxy_threads.get(sid)
+ proxy = self.proxies.get(sid)
if proxy:
proxy.s.close()
- del self.proxy_threads[sid]
+ del self.proxies[sid]
def close(self):
""" Closes all Proxy threads.
"""
- for sid, proxy in self.proxy_threads.items():
- proxy.s.close()
- del self.proxy_threads[sid]
+ for sid, proxy in self.proxies.items():
+ proxy.close()
+ del self.proxies[sid]
def send(self, sid, data):
""" Sends the data over the Proxy socket associated to the
SID.
"""
- proxy = self.proxy_threads.get(sid)
+ proxy = self.get_socket(sid)
if proxy:
- proxy.s.sendall(data)
-
- def on_recv(self, sid, data):
- """ Calls when data is recv from the Proxy socket associated
- to the SID.
+ proxy.sendall(data)
- Triggers a socks_closed event if the socket is closed. The sid
- is passed to this event.
-
- Triggers a socks_recv event if there's available data. A dict
- that contains the sid and the data is passed to this event.
- """
-
- proxy = self.proxy_threads.get(sid)
- if proxy:
- if not data:
- self.xmpp.event('socks_closed', sid)
- else:
- self.xmpp.event('socks_recv', {'sid': sid, 'data': data})
-
-
-class Proxy(Thread):
- """ Establishes in a thread a connection between the client and
- the server-side Socks5 proxy.
- """
-
- def __init__(self, sid, requester, target, proxy, proxy_port,
- on_recv):
- """ Initializes the proxy thread.
+ def _connect_proxy(self, sid, requester, target, proxy, proxy_port):
+ """ Establishes a connection between the client and the server-side
+ Socks5 proxy.
sid : The StreamID. <str>
requester : The JID of the requester. <str>
target : The JID of the target. <str>
proxy_host : The hostname or the IP of the proxy. <str>
proxy_port : The port of the proxy. <str> or <int>
- on_recv : A callback called when data are received from the
- socket. <Callable>
"""
- # Initializes the thread.
- Thread.__init__(self)
-
# Because the xep_0065 plugin uses the proxy_port as string,
# the Proxy class accepts the proxy_port argument as a string
# or an integer. Here, we force to use the port as an integer.
proxy_port = int(proxy_port)
- # Creates a connected event to warn when to proxy is
- # connected.
- self.connected = Event()
-
- # Registers the arguments.
- self.sid = sid
- self.requester = requester
- self.target = target
- self.proxy = proxy
- self.proxy_port = proxy_port
- self.on_recv = on_recv
-
- def run(self):
- """ Starts the thread.
- """
-
# Creates the socks5 proxy socket
- self.s = socksocket()
- self.s.setproxy(PROXY_TYPE_SOCKS5, self.proxy, port=self.proxy_port)
+ sock = socksocket()
+ sock.setproxy(PROXY_TYPE_SOCKS5, proxy, port=proxy_port)
# The hostname MUST be SHA1(SID + Requester JID + Target JID)
# where the output is hexadecimal-encoded (not binary).
digest = sha1()
- digest.update(self.sid) # SID
- digest.update(self.requester) # Requester JID
- digest.update(self.target) # Target JID
+ digest.update(sid) # SID
+ digest.update(requester) # Requester JID
+ digest.update(target) # Target JID
# Computes the digest in hex.
dest = '%s' % digest.hexdigest()
# The port MUST be 0.
- self.s.connect((dest, 0))
+ sock.connect((dest, 0))
log.info('Socket connected.')
- self.connected.set()
-
- # Blocks until the socket need to be closed.
- self.listen()
- # Closes the socket.
- self.s.close()
- log.info('Socket closed.')
-
- def listen(self):
- """ Listen for data on the socket. When receiving data, call
- the callback on_recv callable.
- """
+ # Send the XMPP event.
+ self.xmpp.event('socks_connected', sid)
- socket_open = True
- while socket_open:
- ins = []
- try:
- # Wait any read available data on socket. Timeout
- # after 5 secs.
- ins, out, err = select([self.s, ], [], [], 5)
- except Exception as e:
- # There's an error with the socket (maybe the socket
- # has been closed and the file descriptor is bad).
- log.debug('Socket error: %s' % e)
- break
-
- for s in ins:
- data = self.recv_size(self.s)
- if not data:
- socket_open = False
-
- self.on_recv(self.sid, data)
-
- def recv_size(self, the_socket):
- total_len = 0
- total_data = []
- size = sys.maxint
- size_data = sock_data = ''
- recv_size = 8192
-
- while total_len < size:
- sock_data = the_socket.recv(recv_size)
- if not sock_data:
- return ''.join(total_data)
-
- if not total_data:
- if len(sock_data) > 4:
- size_data += sock_data
- size = struct.unpack('>i', size_data[:4])[0]
- recv_size = size
- if recv_size > 524288:
- recv_size = 524288
- total_data.append(size_data[4:])
- else:
- size_data += sock_data
- else:
- total_data.append(sock_data)
- total_len = sum([len(i) for i in total_data])
- return ''.join(total_data)
+ return sock