From 05c9ea5c1d953637343c9fad07267e7f89b20561 Mon Sep 17 00:00:00 2001 From: Nathan Fritz Date: Mon, 31 Aug 2009 22:46:31 +0000 Subject: * converted sleekxmpp to Python 3.x * sleekxmpp no longer spawns threads for callback handlers -- there are now two threads: one for handlers and one for reading. callback handlers can get results from the read queue directly with the "wait" handler which is used in .send() for the reply catching argument. --- sleekxmpp/__init__.py | 12 +-- sleekxmpp/basexmpp.py | 4 +- sleekxmpp/component_example.py | 6 +- sleekxmpp/plugins/xep_0009.py | 2 +- sleekxmpp/plugins/xep_0030.py | 10 +- sleekxmpp/plugins/xep_0045.py | 2 +- sleekxmpp/plugins/xep_0050.py | 3 +- sleekxmpp/plugins/xep_0060.py | 1 - sleekxmpp/plugins/xep_0078.py | 6 +- sleekxmpp/plugins/xep_0086.py | 2 +- sleekxmpp/plugins/xep_0199.py | 4 +- sleekxmpp/xmlstream/handler/base.py | 3 + sleekxmpp/xmlstream/handler/callback.py | 26 +++-- sleekxmpp/xmlstream/handler/waiter.py | 11 ++- sleekxmpp/xmlstream/handler/xmlcallback.py | 4 +- sleekxmpp/xmlstream/handler/xmlwaiter.py | 4 +- sleekxmpp/xmlstream/xmlstream.py | 153 +++++++++++------------------ 17 files changed, 110 insertions(+), 143 deletions(-) diff --git a/sleekxmpp/__init__.py b/sleekxmpp/__init__.py index 8e664a7f..422b50f7 100644 --- a/sleekxmpp/__init__.py +++ b/sleekxmpp/__init__.py @@ -138,14 +138,14 @@ class ClientXMPP(basexmpp, XMLStream): if result: self.event("connected") else: - print "** Failed to connect -- disconnected" + logging.warning("Failed to connect") self.event("disconnected") return result # overriding reconnect and disconnect so that we can get some events # should events be part of or required by xmlstream? Maybe that would be cleaner def reconnect(self): - print "** Reconnect -- disconnected" + logging.info("Reconnecting") self.event("disconnected") XMLStream.reconnect(self) @@ -192,7 +192,7 @@ class ClientXMPP(basexmpp, XMLStream): def handler_starttls(self, xml): if self.ssl_support: - self.add_handler("", self.handler_tls_start) + self.add_handler("", self.handler_tls_start, instream=True) self.send(xml) return True else: @@ -206,14 +206,14 @@ class ClientXMPP(basexmpp, XMLStream): def handler_sasl_auth(self, xml): logging.debug("Starting SASL Auth") - self.add_handler("", self.handler_auth_success) - self.add_handler("", self.handler_auth_fail) + self.add_handler("", self.handler_auth_success, instream=True) + self.add_handler("", self.handler_auth_fail, instream=True) sasl_mechs = xml.findall('{urn:ietf:params:xml:ns:xmpp-sasl}mechanism') if len(sasl_mechs): for sasl_mech in sasl_mechs: self.features.append("sasl:%s" % sasl_mech.text) if 'sasl:PLAIN' in self.features: - self.send("""%s""" % str(base64.b64encode('\x00' + self.username + '\x00' + self.password))) + self.send("""%s""" % base64.b64encode(b'\x00' + bytes(self.username, 'utf-8') + b'\x00' + bytes(self.password, 'utf-8')).decode('utf-8')) else: logging.error("No appropriate login method.") self.disconnect() diff --git a/sleekxmpp/basexmpp.py b/sleekxmpp/basexmpp.py index 87b60496..e120a71a 100644 --- a/sleekxmpp/basexmpp.py +++ b/sleekxmpp/basexmpp.py @@ -137,9 +137,9 @@ class basexmpp(object): self.id += 1 return self.getId() - def add_handler(self, mask, pointer, disposable=False, threaded=False, filter=False): + def add_handler(self, mask, pointer, disposable=False, threaded=False, filter=False, instream=False): #logging.warning("Deprecated add_handler used for %s: %s." % (mask, pointer)) - self.registerHandler(XMLCallback('add_handler_%s' % self.getNewId(), MatchXMLMask(mask), pointer, threaded, disposable)) + self.registerHandler(XMLCallback('add_handler_%s' % self.getNewId(), MatchXMLMask(mask), pointer, threaded, disposable, instream)) def getId(self): return "%x".upper() % self.id diff --git a/sleekxmpp/component_example.py b/sleekxmpp/component_example.py index 04802370..37b7e96e 100644 --- a/sleekxmpp/component_example.py +++ b/sleekxmpp/component_example.py @@ -6,14 +6,14 @@ import time class Example(sleekxmpp.componentxmpp.ComponentXMPP): def __init__(self, jid, password): - sleekxmpp.componentxmpp.ComponentXMPP.__init__(self, jid, password, 'localhost', 5060) + sleekxmpp.componentxmpp.ComponentXMPP.__init__(self, jid, password, 'vm1', 5230) self.add_event_handler("session_start", self.start) self.add_event_handler("message", self.message) def start(self, event): #self.getRoster() #self.sendPresence(pto='admin@tigase.netflint.net/sarkozy') - self.sendPresence(pto='tigase.netflint.net') + #self.sendPresence(pto='tigase.netflint.net') pass def message(self, event): @@ -30,7 +30,7 @@ if __name__ == '__main__': opts,args = optp.parse_args() logging.basicConfig(level=opts.loglevel, format='%(levelname)-8s %(message)s') - xmpp = Example('component.server.tld', 'asdfasdf') + xmpp = Example('component.vm1', 'secreteating') xmpp.registerPlugin('xep_0004') xmpp.registerPlugin('xep_0030') xmpp.registerPlugin('xep_0060') diff --git a/sleekxmpp/plugins/xep_0009.py b/sleekxmpp/plugins/xep_0009.py index c6b7b5df..e0da8296 100644 --- a/sleekxmpp/plugins/xep_0009.py +++ b/sleekxmpp/plugins/xep_0009.py @@ -2,7 +2,7 @@ XEP-0009 XMPP Remote Procedure Calls """ from __future__ import with_statement -import base +from . import base import logging from xml.etree import cElementTree as ET import copy diff --git a/sleekxmpp/plugins/xep_0030.py b/sleekxmpp/plugins/xep_0030.py index d3795308..fc921020 100644 --- a/sleekxmpp/plugins/xep_0030.py +++ b/sleekxmpp/plugins/xep_0030.py @@ -17,11 +17,9 @@ along with SleekXMPP; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ -from __future__ import absolute_import, with_statement from . import base import logging from xml.etree import cElementTree as ET -import thread class xep_0030(base.base_plugin): """ @@ -36,13 +34,11 @@ class xep_0030(base.base_plugin): self.items = {'main': []} self.xmpp.add_handler("" % self.xmpp.default_ns, self.info_handler) self.xmpp.add_handler("" % self.xmpp.default_ns, self.item_handler) - self.lock = thread.allocate_lock() def add_feature(self, feature, node='main'): - with self.lock: - if not self.features.has_key(node): - self.features[node] = [] - self.features[node].append(feature) + if not self.features.has_key(node): + self.features[node] = [] + self.features[node].append(feature) def add_identity(self, category=None, itype=None, name=None, node='main'): if not self.identities.has_key(node): diff --git a/sleekxmpp/plugins/xep_0045.py b/sleekxmpp/plugins/xep_0045.py index a85bfec8..b0523755 100644 --- a/sleekxmpp/plugins/xep_0045.py +++ b/sleekxmpp/plugins/xep_0045.py @@ -18,7 +18,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ from __future__ import with_statement -import base +from . import base import logging from xml.etree import cElementTree as ET diff --git a/sleekxmpp/plugins/xep_0050.py b/sleekxmpp/plugins/xep_0050.py index 80025c8b..20e10570 100644 --- a/sleekxmpp/plugins/xep_0050.py +++ b/sleekxmpp/plugins/xep_0050.py @@ -18,12 +18,11 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ from __future__ import with_statement -import base +from . import base import logging from xml.etree import cElementTree as ET import traceback import time -import thread class xep_0050(base.base_plugin): """ diff --git a/sleekxmpp/plugins/xep_0060.py b/sleekxmpp/plugins/xep_0060.py index b5e338ad..68c391f6 100644 --- a/sleekxmpp/plugins/xep_0060.py +++ b/sleekxmpp/plugins/xep_0060.py @@ -190,7 +190,6 @@ class xep_0060(base.base_plugin): id = iq.get('id') result = self.xmpp.send(iq, "" % id) if result is None or result.get('type') == 'error': - print "---------- returning false, apparently" return False return True diff --git a/sleekxmpp/plugins/xep_0078.py b/sleekxmpp/plugins/xep_0078.py index 28aaeb20..24afc875 100644 --- a/sleekxmpp/plugins/xep_0078.py +++ b/sleekxmpp/plugins/xep_0078.py @@ -20,8 +20,8 @@ from __future__ import with_statement from xml.etree import cElementTree as ET import logging -import sha -import base +import hashlib +from . import base class xep_0078(base.base_plugin): @@ -66,7 +66,7 @@ class xep_0078(base.base_plugin): else: logging.debug("Authenticating via jabber:iq:auth Digest") digest = ET.Element('digest') - digest.text = sha.sha("%s%s" % (self.streamid, self.xmpp.password)).hexdigest() + digest.text = hashlib.sha1(b"%s%s" % (self.streamid, self.xmpp.password)).hexdigest() query.append(digest) attempt.append(query) result = self.xmpp.send(attempt, self.xmpp.makeIq(self.xmpp.id)) diff --git a/sleekxmpp/plugins/xep_0086.py b/sleekxmpp/plugins/xep_0086.py index 6871ef3f..e6c18c77 100644 --- a/sleekxmpp/plugins/xep_0086.py +++ b/sleekxmpp/plugins/xep_0086.py @@ -1,6 +1,6 @@ from __future__ import with_statement -import base +from . import base import logging from xml.etree import cElementTree as ET import copy diff --git a/sleekxmpp/plugins/xep_0199.py b/sleekxmpp/plugins/xep_0199.py index cab84ac9..57d56c02 100644 --- a/sleekxmpp/plugins/xep_0199.py +++ b/sleekxmpp/plugins/xep_0199.py @@ -31,8 +31,8 @@ class xep_0199(base.base_plugin): self.xep = "0199" self.xmpp.add_handler("" % self.xmpp.default_ns, self.handler_ping) self.running = False - if self.config.get('keepalive', True): - self.xmpp.add_event_handler('session_start', self.handler_pingserver, threaded=True) + #if self.config.get('keepalive', True): + #self.xmpp.add_event_handler('session_start', self.handler_pingserver, threaded=True) def post_init(self): self.xmpp['xep_0030'].add_feature('http://www.xmpp.org/extensions/xep-0199.html#ns') diff --git a/sleekxmpp/xmlstream/handler/base.py b/sleekxmpp/xmlstream/handler/base.py index 810aac91..9a951193 100644 --- a/sleekxmpp/xmlstream/handler/base.py +++ b/sleekxmpp/xmlstream/handler/base.py @@ -11,6 +11,9 @@ class BaseHandler(object): def match(self, xml): return self._matcher.match(xml) + def prerun(self, payload): + self._payload = payload + def run(self, payload): self._payload = payload diff --git a/sleekxmpp/xmlstream/handler/callback.py b/sleekxmpp/xmlstream/handler/callback.py index e3ef8ccc..c618b718 100644 --- a/sleekxmpp/xmlstream/handler/callback.py +++ b/sleekxmpp/xmlstream/handler/callback.py @@ -1,20 +1,26 @@ from . import base -import threading class Callback(base.BaseHandler): - def __init__(self, name, matcher, pointer, thread=False, once=False): + def __init__(self, name, matcher, pointer, thread=False, once=False, instream=False): base.BaseHandler.__init__(self, name, matcher) self._pointer = pointer self._thread = thread self._once = once + self._instream = instream + + def prerun(self, payload): + base.BaseHandler.prerun(self, payload) + if self._instream: + self.run(payload, True) - def run(self, payload): - base.BaseHandler.run(self, payload) - if self._thread: - x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,)) - x.start() - else: + def run(self, payload, instream=False): + if not self._instream or instream: + base.BaseHandler.run(self, payload) + #if self._thread: + # x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,)) + # x.start() + #else: self._pointer(payload) - if self._once: - self._destroy = True + if self._once: + self._destroy = True diff --git a/sleekxmpp/xmlstream/handler/waiter.py b/sleekxmpp/xmlstream/handler/waiter.py index 7c06ddf1..e62f330f 100644 --- a/sleekxmpp/xmlstream/handler/waiter.py +++ b/sleekxmpp/xmlstream/handler/waiter.py @@ -1,20 +1,23 @@ from . import base -import Queue +import queue import logging class Waiter(base.BaseHandler): def __init__(self, name, matcher): base.BaseHandler.__init__(self, name, matcher) - self._payload = Queue.Queue() + self._payload = queue.Queue() - def run(self, payload): + def prerun(self, payload): self._payload.put(payload) + + def run(self, payload): + pass def wait(self, timeout=60): try: return self._payload.get(True, timeout) - except Queue.Empty: + except queue.Empty: return False def checkDelete(self): diff --git a/sleekxmpp/xmlstream/handler/xmlcallback.py b/sleekxmpp/xmlstream/handler/xmlcallback.py index 50d3d5fa..ae288ff1 100644 --- a/sleekxmpp/xmlstream/handler/xmlcallback.py +++ b/sleekxmpp/xmlstream/handler/xmlcallback.py @@ -3,5 +3,5 @@ from . callback import Callback class XMLCallback(Callback): - def run(self, payload): - Callback.run(self, payload.xml) + def run(self, payload, instream=False): + Callback.run(self, payload.xml, instream) diff --git a/sleekxmpp/xmlstream/handler/xmlwaiter.py b/sleekxmpp/xmlstream/handler/xmlwaiter.py index 9b2b3394..1e524b02 100644 --- a/sleekxmpp/xmlstream/handler/xmlwaiter.py +++ b/sleekxmpp/xmlstream/handler/xmlwaiter.py @@ -2,5 +2,5 @@ from . waiter import Waiter class XMLWaiter(Waiter): - def run(self, payload): - Waiter.run(self, payload.xml) + def prerun(self, payload): + Waiter.prerun(self, payload.xml) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index a3f3d2ee..958a3b6e 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -1,12 +1,12 @@ from __future__ import with_statement -import Queue +import queue from . import statemachine from . stanzabase import StanzaBase from xml.etree import cElementTree from xml.parsers import expat import logging import socket -import thread +import threading import time import traceback import types @@ -14,7 +14,7 @@ import xml.sax.saxutils ssl_support = True try: - from tlslite.api import * + import ssl except ImportError: ssl_support = False @@ -27,66 +27,6 @@ class CloseStream(Exception): stanza_extensions = {} -class _fileobject(object): # we still need this because Socket.makefile is broken in python2.5 (but it works fine in 3.0) - - def __init__(self, sock, mode='rb', bufsize=-1): - self._sock = sock - if bufsize <= 0: - bufsize = 1024 - self.bufsize = bufsize - self.softspace = False - - def read(self, size=-1): - if size <= 0: - size = sys.maxint - blocks = [] - #while size > 0: - # b = self._sock.recv(min(size, self.bufsize)) - # size -= len(b) - # if not b: - # break - # blocks.append(b) - # print size - #return "".join(blocks) - buff = self._sock.recv(self.bufsize) - logging.debug("RECV: %s" % buff) - return buff - - def readline(self, size=-1): - return self.read(size) - if size < 0: - size = sys.maxint - blocks = [] - read_size = min(20, size) - found = 0 - while size and not found: - b = self._sock.recv(read_size, MSG_PEEK) - if not b: - break - found = b.find('\n') + 1 - length = found or len(b) - size -= length - blocks.append(self._sock.recv(length)) - read_size = min(read_size * 2, size, self.bufsize) - return "".join(blocks) - - def write(self, data): - self._sock.sendall(str(data)) - - def writelines(self, lines): - # This version mimics the current writelines, which calls - # str() on each line, but comments that we should reject - # non-string non-buffers. Let's omit the next line. - lines = [str(s) for s in lines] - self._sock.sendall(''.join(lines)) - - def flush(self): - pass - - def close(self): - self._sock.close() - - class XMLStream(object): "A connection manager with XML events." @@ -108,13 +48,18 @@ class XMLStream(object): self.__handlers = [] self.__tls_socket = None + self.filesocket = None self.use_ssl = False self.use_tls = False self.stream_header = "" self.stream_footer = "" + self.eventqueue = queue.Queue() + self.namespace_map = {} + + self.run = True def setSocket(self, socket): "Set the socket" @@ -147,10 +92,12 @@ class XMLStream(object): self.socket = ssl.wrap_socket(self.socket) try: self.socket.connect(self.address) + logging.info("creating filesocket") + self.filesocket = self.socket.makefile('rb', 0) self.state.set('connected', True) return True - except socket.error,(errno, strerror): - logging.error("Could not connect. Socket Error #%s: %s" % (errno, strerror)) + except socket.error as serr: + logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror)) time.sleep(1) def connectUnix(self, filepath): @@ -158,24 +105,24 @@ class XMLStream(object): def startTLS(self): "Handshakes for TLS" - #self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False) - #self.socket.do_handshake() if self.ssl_support: + logging.info("Negotiating TLS") self.realsocket = self.socket - self.socket = TLSConnection(self.socket) - self.socket.handshakeClientCert() - self.file = _fileobject(self.socket) + self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False) + self.socket.do_handshake() + self.filesocket = self.socket.makefile('rb', 0) return True else: - logging.warning("Tried to enable TLS, but tlslite module not found.") + logging.warning("Tried to enable TLS, but ssl module not found.") return False raise RestartStream() def process(self, threaded=True): - #self.__thread['process'] = threading.Thread(name='process', target=self._process) - #self.__thread['process'].start() + self.__thread['eventhandle'] = threading.Thread(name='eventhandle', target=self._eventRunner) + self.__thread['eventhandle'].start() if threaded: - thread.start_new(self._process, tuple()) + self.__thread['process'] = threading.Thread(name='process', target=self._process) + self.__thread['process'].start() else: self._process() @@ -196,12 +143,15 @@ class XMLStream(object): self.state.set('processing', False) self.state.set('reconnect', False) self.disconnect() + self.run = False + self.eventqueue.put(('quit', None, None)) return except CloseStream: return except SystemExit: + self.eventqueue.put(('quit', None, None)) return - except socket.EBADF: + except socket.error: if not self.state.reconnect: return else: @@ -218,6 +168,7 @@ class XMLStream(object): if self.state['reconnect']: self.reconnect() self.state.set('processing', False) + self.eventqueue.put(('quit', None, None)) #self.__thread['readXML'] = threading.Thread(name='readXML', target=self.__readXML) #self.__thread['readXML'].start() #self.__thread['spawnEvents'] = threading.Thread(name='spawnEvents', target=self.__spawnEvents) @@ -226,18 +177,17 @@ class XMLStream(object): def __readXML(self): "Parses the incoming stream, adding to xmlin queue as it goes" #build cElementTree object from expat was we go - #self.filesocket = self.socket.makefile('rb',0) #this is broken in python2.5, but works in python3.0 - self.filesocket = _fileobject(self.socket) + #self.filesocket = self.socket.makefile('rb', 0) edepth = 0 root = None - for (event, xmlobj) in cElementTree.iterparse(self.filesocket, ('end', 'start')): + for (event, xmlobj) in cElementTree.iterparse(self.filesocket, (b'end', b'start')): if edepth == 0: # and xmlobj.tag.split('}', 1)[-1] == self.basetag: - if event == 'start': + if event == b'start': root = xmlobj self.start_stream_handler(root) - if event == 'end': + if event == b'end': edepth += -1 - if edepth == 0 and event == 'end': + if edepth == 0 and event == b'end': return False elif edepth == 1: #self.xmlin.put(xmlobj) @@ -249,15 +199,13 @@ class XMLStream(object): return False if root: root.clear() - if event == 'start': + if event == b'start': edepth += 1 def sendRaw(self, data): logging.debug("SEND: %s" % data) - if type(data) == type(u''): - data = data.encode('utf-8') try: - self.socket.send(data) + self.socket.send(bytes(data, "utf-8")) #except socket.error,(errno, strerror): except: self.state.set('connected', False) @@ -277,7 +225,7 @@ class XMLStream(object): self.socket.close() self.filesocket.close() self.socket.shutdown(socket.SHUT_RDWR) - except socket.error,(errno,strerror): + except socket.error as serr: #logging.warning("Error while disconnecting. Socket Error #%s: %s" % (errno, strerror)) #thread.exit_thread() pass @@ -308,12 +256,28 @@ class XMLStream(object): stanza = StanzaBase(self, xmlobj) for handler in self.__handlers: if handler.match(xmlobj): - handler.run(stanza) + handler.prerun(stanza) + self.eventqueue.put(('stanza', handler, stanza)) if handler.checkDelete(): self.__handlers.pop(self.__handlers.index(handler)) #loop through handlers and test match #spawn threads as necessary, call handlers, sending Stanza + def _eventRunner(self): + logging.debug("Loading event runner") + while self.run: + try: + event = self.eventqueue.get(True, timeout=5) + except queue.Empty: + even = None + if event is not None: + etype, handler, stanza = event + if etype == 'stanza': + handler.run(stanza) + if etype == 'quit': + logging.debug("Quitting eventRunner thread") + return False + def registerHandler(self, handler, before=None, after=None): "Add handler with matcher class and parameters." self.__handlers.append(handler) @@ -386,24 +350,21 @@ class XMLStream(object): return ''.join(newoutput) def xmlesc(self, text): - if type(text) != types.UnicodeType: - text = list(unicode(text, 'utf-8', 'ignore')) - else: - text = list(text) + text = list(text) cc = 0 matches = ('&', '<', '"', '>', "'") for c in text: if c in matches: if c == '&': - text[cc] = u'&' + text[cc] = '&' elif c == '<': - text[cc] = u'<' + text[cc] = '<' elif c == '>': - text[cc] = u'>' + text[cc] = '>' elif c == "'": - text[cc] = u''' + text[cc] = ''' elif self.escape_quotes: - text[cc] = u'"' + text[cc] = '"' cc += 1 return ''.join(text) -- cgit v1.2.3