summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream
diff options
context:
space:
mode:
authorNathan Fritz <fritzy@netflint.net>2009-08-31 22:46:31 +0000
committerNathan Fritz <fritzy@netflint.net>2009-08-31 22:46:31 +0000
commit05c9ea5c1d953637343c9fad07267e7f89b20561 (patch)
treed4b0d432870bb195a4f14ec07ce889fcd080a357 /sleekxmpp/xmlstream
parent00d46ee2b0fe4c0d76525d284dcc7ed588e701af (diff)
downloadslixmpp-05c9ea5c1d953637343c9fad07267e7f89b20561.tar.gz
slixmpp-05c9ea5c1d953637343c9fad07267e7f89b20561.tar.bz2
slixmpp-05c9ea5c1d953637343c9fad07267e7f89b20561.tar.xz
slixmpp-05c9ea5c1d953637343c9fad07267e7f89b20561.zip
* converted sleekxmpp to Python 3.x
* sleekxmpp no longer spawns threads for callback handlers -- there are now two threads: one for handlers and one for reading. callback handlers can get results from the read queue directly with the "wait" handler which is used in .send() for the reply catching argument.
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r--sleekxmpp/xmlstream/handler/base.py3
-rw-r--r--sleekxmpp/xmlstream/handler/callback.py26
-rw-r--r--sleekxmpp/xmlstream/handler/waiter.py11
-rw-r--r--sleekxmpp/xmlstream/handler/xmlcallback.py4
-rw-r--r--sleekxmpp/xmlstream/handler/xmlwaiter.py4
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py153
6 files changed, 87 insertions, 114 deletions
diff --git a/sleekxmpp/xmlstream/handler/base.py b/sleekxmpp/xmlstream/handler/base.py
index 810aac91..9a951193 100644
--- a/sleekxmpp/xmlstream/handler/base.py
+++ b/sleekxmpp/xmlstream/handler/base.py
@@ -11,6 +11,9 @@ class BaseHandler(object):
def match(self, xml):
return self._matcher.match(xml)
+ def prerun(self, payload):
+ self._payload = payload
+
def run(self, payload):
self._payload = payload
diff --git a/sleekxmpp/xmlstream/handler/callback.py b/sleekxmpp/xmlstream/handler/callback.py
index e3ef8ccc..c618b718 100644
--- a/sleekxmpp/xmlstream/handler/callback.py
+++ b/sleekxmpp/xmlstream/handler/callback.py
@@ -1,20 +1,26 @@
from . import base
-import threading
class Callback(base.BaseHandler):
- def __init__(self, name, matcher, pointer, thread=False, once=False):
+ def __init__(self, name, matcher, pointer, thread=False, once=False, instream=False):
base.BaseHandler.__init__(self, name, matcher)
self._pointer = pointer
self._thread = thread
self._once = once
+ self._instream = instream
+
+ def prerun(self, payload):
+ base.BaseHandler.prerun(self, payload)
+ if self._instream:
+ self.run(payload, True)
- def run(self, payload):
- base.BaseHandler.run(self, payload)
- if self._thread:
- x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,))
- x.start()
- else:
+ def run(self, payload, instream=False):
+ if not self._instream or instream:
+ base.BaseHandler.run(self, payload)
+ #if self._thread:
+ # x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,))
+ # x.start()
+ #else:
self._pointer(payload)
- if self._once:
- self._destroy = True
+ if self._once:
+ self._destroy = True
diff --git a/sleekxmpp/xmlstream/handler/waiter.py b/sleekxmpp/xmlstream/handler/waiter.py
index 7c06ddf1..e62f330f 100644
--- a/sleekxmpp/xmlstream/handler/waiter.py
+++ b/sleekxmpp/xmlstream/handler/waiter.py
@@ -1,20 +1,23 @@
from . import base
-import Queue
+import queue
import logging
class Waiter(base.BaseHandler):
def __init__(self, name, matcher):
base.BaseHandler.__init__(self, name, matcher)
- self._payload = Queue.Queue()
+ self._payload = queue.Queue()
- def run(self, payload):
+ def prerun(self, payload):
self._payload.put(payload)
+
+ def run(self, payload):
+ pass
def wait(self, timeout=60):
try:
return self._payload.get(True, timeout)
- except Queue.Empty:
+ except queue.Empty:
return False
def checkDelete(self):
diff --git a/sleekxmpp/xmlstream/handler/xmlcallback.py b/sleekxmpp/xmlstream/handler/xmlcallback.py
index 50d3d5fa..ae288ff1 100644
--- a/sleekxmpp/xmlstream/handler/xmlcallback.py
+++ b/sleekxmpp/xmlstream/handler/xmlcallback.py
@@ -3,5 +3,5 @@ from . callback import Callback
class XMLCallback(Callback):
- def run(self, payload):
- Callback.run(self, payload.xml)
+ def run(self, payload, instream=False):
+ Callback.run(self, payload.xml, instream)
diff --git a/sleekxmpp/xmlstream/handler/xmlwaiter.py b/sleekxmpp/xmlstream/handler/xmlwaiter.py
index 9b2b3394..1e524b02 100644
--- a/sleekxmpp/xmlstream/handler/xmlwaiter.py
+++ b/sleekxmpp/xmlstream/handler/xmlwaiter.py
@@ -2,5 +2,5 @@ from . waiter import Waiter
class XMLWaiter(Waiter):
- def run(self, payload):
- Waiter.run(self, payload.xml)
+ def prerun(self, payload):
+ Waiter.prerun(self, payload.xml)
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index a3f3d2ee..958a3b6e 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -1,12 +1,12 @@
from __future__ import with_statement
-import Queue
+import queue
from . import statemachine
from . stanzabase import StanzaBase
from xml.etree import cElementTree
from xml.parsers import expat
import logging
import socket
-import thread
+import threading
import time
import traceback
import types
@@ -14,7 +14,7 @@ import xml.sax.saxutils
ssl_support = True
try:
- from tlslite.api import *
+ import ssl
except ImportError:
ssl_support = False
@@ -27,66 +27,6 @@ class CloseStream(Exception):
stanza_extensions = {}
-class _fileobject(object): # we still need this because Socket.makefile is broken in python2.5 (but it works fine in 3.0)
-
- def __init__(self, sock, mode='rb', bufsize=-1):
- self._sock = sock
- if bufsize <= 0:
- bufsize = 1024
- self.bufsize = bufsize
- self.softspace = False
-
- def read(self, size=-1):
- if size <= 0:
- size = sys.maxint
- blocks = []
- #while size > 0:
- # b = self._sock.recv(min(size, self.bufsize))
- # size -= len(b)
- # if not b:
- # break
- # blocks.append(b)
- # print size
- #return "".join(blocks)
- buff = self._sock.recv(self.bufsize)
- logging.debug("RECV: %s" % buff)
- return buff
-
- def readline(self, size=-1):
- return self.read(size)
- if size < 0:
- size = sys.maxint
- blocks = []
- read_size = min(20, size)
- found = 0
- while size and not found:
- b = self._sock.recv(read_size, MSG_PEEK)
- if not b:
- break
- found = b.find('\n') + 1
- length = found or len(b)
- size -= length
- blocks.append(self._sock.recv(length))
- read_size = min(read_size * 2, size, self.bufsize)
- return "".join(blocks)
-
- def write(self, data):
- self._sock.sendall(str(data))
-
- def writelines(self, lines):
- # This version mimics the current writelines, which calls
- # str() on each line, but comments that we should reject
- # non-string non-buffers. Let's omit the next line.
- lines = [str(s) for s in lines]
- self._sock.sendall(''.join(lines))
-
- def flush(self):
- pass
-
- def close(self):
- self._sock.close()
-
-
class XMLStream(object):
"A connection manager with XML events."
@@ -108,13 +48,18 @@ class XMLStream(object):
self.__handlers = []
self.__tls_socket = None
+ self.filesocket = None
self.use_ssl = False
self.use_tls = False
self.stream_header = "<stream>"
self.stream_footer = "</stream>"
+ self.eventqueue = queue.Queue()
+
self.namespace_map = {}
+
+ self.run = True
def setSocket(self, socket):
"Set the socket"
@@ -147,10 +92,12 @@ class XMLStream(object):
self.socket = ssl.wrap_socket(self.socket)
try:
self.socket.connect(self.address)
+ logging.info("creating filesocket")
+ self.filesocket = self.socket.makefile('rb', 0)
self.state.set('connected', True)
return True
- except socket.error,(errno, strerror):
- logging.error("Could not connect. Socket Error #%s: %s" % (errno, strerror))
+ except socket.error as serr:
+ logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror))
time.sleep(1)
def connectUnix(self, filepath):
@@ -158,24 +105,24 @@ class XMLStream(object):
def startTLS(self):
"Handshakes for TLS"
- #self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False)
- #self.socket.do_handshake()
if self.ssl_support:
+ logging.info("Negotiating TLS")
self.realsocket = self.socket
- self.socket = TLSConnection(self.socket)
- self.socket.handshakeClientCert()
- self.file = _fileobject(self.socket)
+ self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False)
+ self.socket.do_handshake()
+ self.filesocket = self.socket.makefile('rb', 0)
return True
else:
- logging.warning("Tried to enable TLS, but tlslite module not found.")
+ logging.warning("Tried to enable TLS, but ssl module not found.")
return False
raise RestartStream()
def process(self, threaded=True):
- #self.__thread['process'] = threading.Thread(name='process', target=self._process)
- #self.__thread['process'].start()
+ self.__thread['eventhandle'] = threading.Thread(name='eventhandle', target=self._eventRunner)
+ self.__thread['eventhandle'].start()
if threaded:
- thread.start_new(self._process, tuple())
+ self.__thread['process'] = threading.Thread(name='process', target=self._process)
+ self.__thread['process'].start()
else:
self._process()
@@ -196,12 +143,15 @@ class XMLStream(object):
self.state.set('processing', False)
self.state.set('reconnect', False)
self.disconnect()
+ self.run = False
+ self.eventqueue.put(('quit', None, None))
return
except CloseStream:
return
except SystemExit:
+ self.eventqueue.put(('quit', None, None))
return
- except socket.EBADF:
+ except socket.error:
if not self.state.reconnect:
return
else:
@@ -218,6 +168,7 @@ class XMLStream(object):
if self.state['reconnect']:
self.reconnect()
self.state.set('processing', False)
+ self.eventqueue.put(('quit', None, None))
#self.__thread['readXML'] = threading.Thread(name='readXML', target=self.__readXML)
#self.__thread['readXML'].start()
#self.__thread['spawnEvents'] = threading.Thread(name='spawnEvents', target=self.__spawnEvents)
@@ -226,18 +177,17 @@ class XMLStream(object):
def __readXML(self):
"Parses the incoming stream, adding to xmlin queue as it goes"
#build cElementTree object from expat was we go
- #self.filesocket = self.socket.makefile('rb',0) #this is broken in python2.5, but works in python3.0
- self.filesocket = _fileobject(self.socket)
+ #self.filesocket = self.socket.makefile('rb', 0)
edepth = 0
root = None
- for (event, xmlobj) in cElementTree.iterparse(self.filesocket, ('end', 'start')):
+ for (event, xmlobj) in cElementTree.iterparse(self.filesocket, (b'end', b'start')):
if edepth == 0: # and xmlobj.tag.split('}', 1)[-1] == self.basetag:
- if event == 'start':
+ if event == b'start':
root = xmlobj
self.start_stream_handler(root)
- if event == 'end':
+ if event == b'end':
edepth += -1
- if edepth == 0 and event == 'end':
+ if edepth == 0 and event == b'end':
return False
elif edepth == 1:
#self.xmlin.put(xmlobj)
@@ -249,15 +199,13 @@ class XMLStream(object):
return False
if root:
root.clear()
- if event == 'start':
+ if event == b'start':
edepth += 1
def sendRaw(self, data):
logging.debug("SEND: %s" % data)
- if type(data) == type(u''):
- data = data.encode('utf-8')
try:
- self.socket.send(data)
+ self.socket.send(bytes(data, "utf-8"))
#except socket.error,(errno, strerror):
except:
self.state.set('connected', False)
@@ -277,7 +225,7 @@ class XMLStream(object):
self.socket.close()
self.filesocket.close()
self.socket.shutdown(socket.SHUT_RDWR)
- except socket.error,(errno,strerror):
+ except socket.error as serr:
#logging.warning("Error while disconnecting. Socket Error #%s: %s" % (errno, strerror))
#thread.exit_thread()
pass
@@ -308,12 +256,28 @@ class XMLStream(object):
stanza = StanzaBase(self, xmlobj)
for handler in self.__handlers:
if handler.match(xmlobj):
- handler.run(stanza)
+ handler.prerun(stanza)
+ self.eventqueue.put(('stanza', handler, stanza))
if handler.checkDelete(): self.__handlers.pop(self.__handlers.index(handler))
#loop through handlers and test match
#spawn threads as necessary, call handlers, sending Stanza
+ def _eventRunner(self):
+ logging.debug("Loading event runner")
+ while self.run:
+ try:
+ event = self.eventqueue.get(True, timeout=5)
+ except queue.Empty:
+ even = None
+ if event is not None:
+ etype, handler, stanza = event
+ if etype == 'stanza':
+ handler.run(stanza)
+ if etype == 'quit':
+ logging.debug("Quitting eventRunner thread")
+ return False
+
def registerHandler(self, handler, before=None, after=None):
"Add handler with matcher class and parameters."
self.__handlers.append(handler)
@@ -386,24 +350,21 @@ class XMLStream(object):
return ''.join(newoutput)
def xmlesc(self, text):
- if type(text) != types.UnicodeType:
- text = list(unicode(text, 'utf-8', 'ignore'))
- else:
- text = list(text)
+ text = list(text)
cc = 0
matches = ('&', '<', '"', '>', "'")
for c in text:
if c in matches:
if c == '&':
- text[cc] = u'&amp;'
+ text[cc] = '&amp;'
elif c == '<':
- text[cc] = u'&lt;'
+ text[cc] = '&lt;'
elif c == '>':
- text[cc] = u'&gt;'
+ text[cc] = '&gt;'
elif c == "'":
- text[cc] = u'&apos;'
+ text[cc] = '&apos;'
elif self.escape_quotes:
- text[cc] = u'&quot;'
+ text[cc] = '&quot;'
cc += 1
return ''.join(text)