diff options
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r-- | sleekxmpp/xmlstream/handler/base.py | 3 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/handler/callback.py | 26 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/handler/waiter.py | 11 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/handler/xmlcallback.py | 4 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/handler/xmlwaiter.py | 4 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 153 |
6 files changed, 87 insertions, 114 deletions
diff --git a/sleekxmpp/xmlstream/handler/base.py b/sleekxmpp/xmlstream/handler/base.py index 810aac91..9a951193 100644 --- a/sleekxmpp/xmlstream/handler/base.py +++ b/sleekxmpp/xmlstream/handler/base.py @@ -11,6 +11,9 @@ class BaseHandler(object): def match(self, xml): return self._matcher.match(xml) + def prerun(self, payload): + self._payload = payload + def run(self, payload): self._payload = payload diff --git a/sleekxmpp/xmlstream/handler/callback.py b/sleekxmpp/xmlstream/handler/callback.py index e3ef8ccc..c618b718 100644 --- a/sleekxmpp/xmlstream/handler/callback.py +++ b/sleekxmpp/xmlstream/handler/callback.py @@ -1,20 +1,26 @@ from . import base -import threading class Callback(base.BaseHandler): - def __init__(self, name, matcher, pointer, thread=False, once=False): + def __init__(self, name, matcher, pointer, thread=False, once=False, instream=False): base.BaseHandler.__init__(self, name, matcher) self._pointer = pointer self._thread = thread self._once = once + self._instream = instream + + def prerun(self, payload): + base.BaseHandler.prerun(self, payload) + if self._instream: + self.run(payload, True) - def run(self, payload): - base.BaseHandler.run(self, payload) - if self._thread: - x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,)) - x.start() - else: + def run(self, payload, instream=False): + if not self._instream or instream: + base.BaseHandler.run(self, payload) + #if self._thread: + # x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,)) + # x.start() + #else: self._pointer(payload) - if self._once: - self._destroy = True + if self._once: + self._destroy = True diff --git a/sleekxmpp/xmlstream/handler/waiter.py b/sleekxmpp/xmlstream/handler/waiter.py index 7c06ddf1..e62f330f 100644 --- a/sleekxmpp/xmlstream/handler/waiter.py +++ b/sleekxmpp/xmlstream/handler/waiter.py @@ -1,20 +1,23 @@ from . import base -import Queue +import queue import logging class Waiter(base.BaseHandler): def __init__(self, name, matcher): base.BaseHandler.__init__(self, name, matcher) - self._payload = Queue.Queue() + self._payload = queue.Queue() - def run(self, payload): + def prerun(self, payload): self._payload.put(payload) + + def run(self, payload): + pass def wait(self, timeout=60): try: return self._payload.get(True, timeout) - except Queue.Empty: + except queue.Empty: return False def checkDelete(self): diff --git a/sleekxmpp/xmlstream/handler/xmlcallback.py b/sleekxmpp/xmlstream/handler/xmlcallback.py index 50d3d5fa..ae288ff1 100644 --- a/sleekxmpp/xmlstream/handler/xmlcallback.py +++ b/sleekxmpp/xmlstream/handler/xmlcallback.py @@ -3,5 +3,5 @@ from . callback import Callback class XMLCallback(Callback): - def run(self, payload): - Callback.run(self, payload.xml) + def run(self, payload, instream=False): + Callback.run(self, payload.xml, instream) diff --git a/sleekxmpp/xmlstream/handler/xmlwaiter.py b/sleekxmpp/xmlstream/handler/xmlwaiter.py index 9b2b3394..1e524b02 100644 --- a/sleekxmpp/xmlstream/handler/xmlwaiter.py +++ b/sleekxmpp/xmlstream/handler/xmlwaiter.py @@ -2,5 +2,5 @@ from . waiter import Waiter class XMLWaiter(Waiter): - def run(self, payload): - Waiter.run(self, payload.xml) + def prerun(self, payload): + Waiter.prerun(self, payload.xml) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index a3f3d2ee..958a3b6e 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -1,12 +1,12 @@ from __future__ import with_statement -import Queue +import queue from . import statemachine from . stanzabase import StanzaBase from xml.etree import cElementTree from xml.parsers import expat import logging import socket -import thread +import threading import time import traceback import types @@ -14,7 +14,7 @@ import xml.sax.saxutils ssl_support = True try: - from tlslite.api import * + import ssl except ImportError: ssl_support = False @@ -27,66 +27,6 @@ class CloseStream(Exception): stanza_extensions = {} -class _fileobject(object): # we still need this because Socket.makefile is broken in python2.5 (but it works fine in 3.0) - - def __init__(self, sock, mode='rb', bufsize=-1): - self._sock = sock - if bufsize <= 0: - bufsize = 1024 - self.bufsize = bufsize - self.softspace = False - - def read(self, size=-1): - if size <= 0: - size = sys.maxint - blocks = [] - #while size > 0: - # b = self._sock.recv(min(size, self.bufsize)) - # size -= len(b) - # if not b: - # break - # blocks.append(b) - # print size - #return "".join(blocks) - buff = self._sock.recv(self.bufsize) - logging.debug("RECV: %s" % buff) - return buff - - def readline(self, size=-1): - return self.read(size) - if size < 0: - size = sys.maxint - blocks = [] - read_size = min(20, size) - found = 0 - while size and not found: - b = self._sock.recv(read_size, MSG_PEEK) - if not b: - break - found = b.find('\n') + 1 - length = found or len(b) - size -= length - blocks.append(self._sock.recv(length)) - read_size = min(read_size * 2, size, self.bufsize) - return "".join(blocks) - - def write(self, data): - self._sock.sendall(str(data)) - - def writelines(self, lines): - # This version mimics the current writelines, which calls - # str() on each line, but comments that we should reject - # non-string non-buffers. Let's omit the next line. - lines = [str(s) for s in lines] - self._sock.sendall(''.join(lines)) - - def flush(self): - pass - - def close(self): - self._sock.close() - - class XMLStream(object): "A connection manager with XML events." @@ -108,13 +48,18 @@ class XMLStream(object): self.__handlers = [] self.__tls_socket = None + self.filesocket = None self.use_ssl = False self.use_tls = False self.stream_header = "<stream>" self.stream_footer = "</stream>" + self.eventqueue = queue.Queue() + self.namespace_map = {} + + self.run = True def setSocket(self, socket): "Set the socket" @@ -147,10 +92,12 @@ class XMLStream(object): self.socket = ssl.wrap_socket(self.socket) try: self.socket.connect(self.address) + logging.info("creating filesocket") + self.filesocket = self.socket.makefile('rb', 0) self.state.set('connected', True) return True - except socket.error,(errno, strerror): - logging.error("Could not connect. Socket Error #%s: %s" % (errno, strerror)) + except socket.error as serr: + logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror)) time.sleep(1) def connectUnix(self, filepath): @@ -158,24 +105,24 @@ class XMLStream(object): def startTLS(self): "Handshakes for TLS" - #self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False) - #self.socket.do_handshake() if self.ssl_support: + logging.info("Negotiating TLS") self.realsocket = self.socket - self.socket = TLSConnection(self.socket) - self.socket.handshakeClientCert() - self.file = _fileobject(self.socket) + self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False) + self.socket.do_handshake() + self.filesocket = self.socket.makefile('rb', 0) return True else: - logging.warning("Tried to enable TLS, but tlslite module not found.") + logging.warning("Tried to enable TLS, but ssl module not found.") return False raise RestartStream() def process(self, threaded=True): - #self.__thread['process'] = threading.Thread(name='process', target=self._process) - #self.__thread['process'].start() + self.__thread['eventhandle'] = threading.Thread(name='eventhandle', target=self._eventRunner) + self.__thread['eventhandle'].start() if threaded: - thread.start_new(self._process, tuple()) + self.__thread['process'] = threading.Thread(name='process', target=self._process) + self.__thread['process'].start() else: self._process() @@ -196,12 +143,15 @@ class XMLStream(object): self.state.set('processing', False) self.state.set('reconnect', False) self.disconnect() + self.run = False + self.eventqueue.put(('quit', None, None)) return except CloseStream: return except SystemExit: + self.eventqueue.put(('quit', None, None)) return - except socket.EBADF: + except socket.error: if not self.state.reconnect: return else: @@ -218,6 +168,7 @@ class XMLStream(object): if self.state['reconnect']: self.reconnect() self.state.set('processing', False) + self.eventqueue.put(('quit', None, None)) #self.__thread['readXML'] = threading.Thread(name='readXML', target=self.__readXML) #self.__thread['readXML'].start() #self.__thread['spawnEvents'] = threading.Thread(name='spawnEvents', target=self.__spawnEvents) @@ -226,18 +177,17 @@ class XMLStream(object): def __readXML(self): "Parses the incoming stream, adding to xmlin queue as it goes" #build cElementTree object from expat was we go - #self.filesocket = self.socket.makefile('rb',0) #this is broken in python2.5, but works in python3.0 - self.filesocket = _fileobject(self.socket) + #self.filesocket = self.socket.makefile('rb', 0) edepth = 0 root = None - for (event, xmlobj) in cElementTree.iterparse(self.filesocket, ('end', 'start')): + for (event, xmlobj) in cElementTree.iterparse(self.filesocket, (b'end', b'start')): if edepth == 0: # and xmlobj.tag.split('}', 1)[-1] == self.basetag: - if event == 'start': + if event == b'start': root = xmlobj self.start_stream_handler(root) - if event == 'end': + if event == b'end': edepth += -1 - if edepth == 0 and event == 'end': + if edepth == 0 and event == b'end': return False elif edepth == 1: #self.xmlin.put(xmlobj) @@ -249,15 +199,13 @@ class XMLStream(object): return False if root: root.clear() - if event == 'start': + if event == b'start': edepth += 1 def sendRaw(self, data): logging.debug("SEND: %s" % data) - if type(data) == type(u''): - data = data.encode('utf-8') try: - self.socket.send(data) + self.socket.send(bytes(data, "utf-8")) #except socket.error,(errno, strerror): except: self.state.set('connected', False) @@ -277,7 +225,7 @@ class XMLStream(object): self.socket.close() self.filesocket.close() self.socket.shutdown(socket.SHUT_RDWR) - except socket.error,(errno,strerror): + except socket.error as serr: #logging.warning("Error while disconnecting. Socket Error #%s: %s" % (errno, strerror)) #thread.exit_thread() pass @@ -308,12 +256,28 @@ class XMLStream(object): stanza = StanzaBase(self, xmlobj) for handler in self.__handlers: if handler.match(xmlobj): - handler.run(stanza) + handler.prerun(stanza) + self.eventqueue.put(('stanza', handler, stanza)) if handler.checkDelete(): self.__handlers.pop(self.__handlers.index(handler)) #loop through handlers and test match #spawn threads as necessary, call handlers, sending Stanza + def _eventRunner(self): + logging.debug("Loading event runner") + while self.run: + try: + event = self.eventqueue.get(True, timeout=5) + except queue.Empty: + even = None + if event is not None: + etype, handler, stanza = event + if etype == 'stanza': + handler.run(stanza) + if etype == 'quit': + logging.debug("Quitting eventRunner thread") + return False + def registerHandler(self, handler, before=None, after=None): "Add handler with matcher class and parameters." self.__handlers.append(handler) @@ -386,24 +350,21 @@ class XMLStream(object): return ''.join(newoutput) def xmlesc(self, text): - if type(text) != types.UnicodeType: - text = list(unicode(text, 'utf-8', 'ignore')) - else: - text = list(text) + text = list(text) cc = 0 matches = ('&', '<', '"', '>', "'") for c in text: if c in matches: if c == '&': - text[cc] = u'&' + text[cc] = '&' elif c == '<': - text[cc] = u'<' + text[cc] = '<' elif c == '>': - text[cc] = u'>' + text[cc] = '>' elif c == "'": - text[cc] = u''' + text[cc] = ''' elif self.escape_quotes: - text[cc] = u'"' + text[cc] = '"' cc += 1 return ''.join(text) |