summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r--sleekxmpp/xmlstream/handler/base.py3
-rw-r--r--sleekxmpp/xmlstream/handler/callback.py26
-rw-r--r--sleekxmpp/xmlstream/handler/waiter.py11
-rw-r--r--sleekxmpp/xmlstream/handler/xmlcallback.py4
-rw-r--r--sleekxmpp/xmlstream/handler/xmlwaiter.py4
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py153
6 files changed, 87 insertions, 114 deletions
diff --git a/sleekxmpp/xmlstream/handler/base.py b/sleekxmpp/xmlstream/handler/base.py
index 810aac91..9a951193 100644
--- a/sleekxmpp/xmlstream/handler/base.py
+++ b/sleekxmpp/xmlstream/handler/base.py
@@ -11,6 +11,9 @@ class BaseHandler(object):
def match(self, xml):
return self._matcher.match(xml)
+ def prerun(self, payload):
+ self._payload = payload
+
def run(self, payload):
self._payload = payload
diff --git a/sleekxmpp/xmlstream/handler/callback.py b/sleekxmpp/xmlstream/handler/callback.py
index e3ef8ccc..c618b718 100644
--- a/sleekxmpp/xmlstream/handler/callback.py
+++ b/sleekxmpp/xmlstream/handler/callback.py
@@ -1,20 +1,26 @@
from . import base
-import threading
class Callback(base.BaseHandler):
- def __init__(self, name, matcher, pointer, thread=False, once=False):
+ def __init__(self, name, matcher, pointer, thread=False, once=False, instream=False):
base.BaseHandler.__init__(self, name, matcher)
self._pointer = pointer
self._thread = thread
self._once = once
+ self._instream = instream
+
+ def prerun(self, payload):
+ base.BaseHandler.prerun(self, payload)
+ if self._instream:
+ self.run(payload, True)
- def run(self, payload):
- base.BaseHandler.run(self, payload)
- if self._thread:
- x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,))
- x.start()
- else:
+ def run(self, payload, instream=False):
+ if not self._instream or instream:
+ base.BaseHandler.run(self, payload)
+ #if self._thread:
+ # x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,))
+ # x.start()
+ #else:
self._pointer(payload)
- if self._once:
- self._destroy = True
+ if self._once:
+ self._destroy = True
diff --git a/sleekxmpp/xmlstream/handler/waiter.py b/sleekxmpp/xmlstream/handler/waiter.py
index 7c06ddf1..e62f330f 100644
--- a/sleekxmpp/xmlstream/handler/waiter.py
+++ b/sleekxmpp/xmlstream/handler/waiter.py
@@ -1,20 +1,23 @@
from . import base
-import Queue
+import queue
import logging
class Waiter(base.BaseHandler):
def __init__(self, name, matcher):
base.BaseHandler.__init__(self, name, matcher)
- self._payload = Queue.Queue()
+ self._payload = queue.Queue()
- def run(self, payload):
+ def prerun(self, payload):
self._payload.put(payload)
+
+ def run(self, payload):
+ pass
def wait(self, timeout=60):
try:
return self._payload.get(True, timeout)
- except Queue.Empty:
+ except queue.Empty:
return False
def checkDelete(self):
diff --git a/sleekxmpp/xmlstream/handler/xmlcallback.py b/sleekxmpp/xmlstream/handler/xmlcallback.py
index 50d3d5fa..ae288ff1 100644
--- a/sleekxmpp/xmlstream/handler/xmlcallback.py
+++ b/sleekxmpp/xmlstream/handler/xmlcallback.py
@@ -3,5 +3,5 @@ from . callback import Callback
class XMLCallback(Callback):
- def run(self, payload):
- Callback.run(self, payload.xml)
+ def run(self, payload, instream=False):
+ Callback.run(self, payload.xml, instream)
diff --git a/sleekxmpp/xmlstream/handler/xmlwaiter.py b/sleekxmpp/xmlstream/handler/xmlwaiter.py
index 9b2b3394..1e524b02 100644
--- a/sleekxmpp/xmlstream/handler/xmlwaiter.py
+++ b/sleekxmpp/xmlstream/handler/xmlwaiter.py
@@ -2,5 +2,5 @@ from . waiter import Waiter
class XMLWaiter(Waiter):
- def run(self, payload):
- Waiter.run(self, payload.xml)
+ def prerun(self, payload):
+ Waiter.prerun(self, payload.xml)
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index a3f3d2ee..958a3b6e 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -1,12 +1,12 @@
from __future__ import with_statement
-import Queue
+import queue
from . import statemachine
from . stanzabase import StanzaBase
from xml.etree import cElementTree
from xml.parsers import expat
import logging
import socket
-import thread
+import threading
import time
import traceback
import types
@@ -14,7 +14,7 @@ import xml.sax.saxutils
ssl_support = True
try:
- from tlslite.api import *
+ import ssl
except ImportError:
ssl_support = False
@@ -27,66 +27,6 @@ class CloseStream(Exception):
stanza_extensions = {}
-class _fileobject(object): # we still need this because Socket.makefile is broken in python2.5 (but it works fine in 3.0)
-
- def __init__(self, sock, mode='rb', bufsize=-1):
- self._sock = sock
- if bufsize <= 0:
- bufsize = 1024
- self.bufsize = bufsize
- self.softspace = False
-
- def read(self, size=-1):
- if size <= 0:
- size = sys.maxint
- blocks = []
- #while size > 0:
- # b = self._sock.recv(min(size, self.bufsize))
- # size -= len(b)
- # if not b:
- # break
- # blocks.append(b)
- # print size
- #return "".join(blocks)
- buff = self._sock.recv(self.bufsize)
- logging.debug("RECV: %s" % buff)
- return buff
-
- def readline(self, size=-1):
- return self.read(size)
- if size < 0:
- size = sys.maxint
- blocks = []
- read_size = min(20, size)
- found = 0
- while size and not found:
- b = self._sock.recv(read_size, MSG_PEEK)
- if not b:
- break
- found = b.find('\n') + 1
- length = found or len(b)
- size -= length
- blocks.append(self._sock.recv(length))
- read_size = min(read_size * 2, size, self.bufsize)
- return "".join(blocks)
-
- def write(self, data):
- self._sock.sendall(str(data))
-
- def writelines(self, lines):
- # This version mimics the current writelines, which calls
- # str() on each line, but comments that we should reject
- # non-string non-buffers. Let's omit the next line.
- lines = [str(s) for s in lines]
- self._sock.sendall(''.join(lines))
-
- def flush(self):
- pass
-
- def close(self):
- self._sock.close()
-
-
class XMLStream(object):
"A connection manager with XML events."
@@ -108,13 +48,18 @@ class XMLStream(object):
self.__handlers = []
self.__tls_socket = None
+ self.filesocket = None
self.use_ssl = False
self.use_tls = False
self.stream_header = "<stream>"
self.stream_footer = "</stream>"
+ self.eventqueue = queue.Queue()
+
self.namespace_map = {}
+
+ self.run = True
def setSocket(self, socket):
"Set the socket"
@@ -147,10 +92,12 @@ class XMLStream(object):
self.socket = ssl.wrap_socket(self.socket)
try:
self.socket.connect(self.address)
+ logging.info("creating filesocket")
+ self.filesocket = self.socket.makefile('rb', 0)
self.state.set('connected', True)
return True
- except socket.error,(errno, strerror):
- logging.error("Could not connect. Socket Error #%s: %s" % (errno, strerror))
+ except socket.error as serr:
+ logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror))
time.sleep(1)
def connectUnix(self, filepath):
@@ -158,24 +105,24 @@ class XMLStream(object):
def startTLS(self):
"Handshakes for TLS"
- #self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False)
- #self.socket.do_handshake()
if self.ssl_support:
+ logging.info("Negotiating TLS")
self.realsocket = self.socket
- self.socket = TLSConnection(self.socket)
- self.socket.handshakeClientCert()
- self.file = _fileobject(self.socket)
+ self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False)
+ self.socket.do_handshake()
+ self.filesocket = self.socket.makefile('rb', 0)
return True
else:
- logging.warning("Tried to enable TLS, but tlslite module not found.")
+ logging.warning("Tried to enable TLS, but ssl module not found.")
return False
raise RestartStream()
def process(self, threaded=True):
- #self.__thread['process'] = threading.Thread(name='process', target=self._process)
- #self.__thread['process'].start()
+ self.__thread['eventhandle'] = threading.Thread(name='eventhandle', target=self._eventRunner)
+ self.__thread['eventhandle'].start()
if threaded:
- thread.start_new(self._process, tuple())
+ self.__thread['process'] = threading.Thread(name='process', target=self._process)
+ self.__thread['process'].start()
else:
self._process()
@@ -196,12 +143,15 @@ class XMLStream(object):
self.state.set('processing', False)
self.state.set('reconnect', False)
self.disconnect()
+ self.run = False
+ self.eventqueue.put(('quit', None, None))
return
except CloseStream:
return
except SystemExit:
+ self.eventqueue.put(('quit', None, None))
return
- except socket.EBADF:
+ except socket.error:
if not self.state.reconnect:
return
else:
@@ -218,6 +168,7 @@ class XMLStream(object):
if self.state['reconnect']:
self.reconnect()
self.state.set('processing', False)
+ self.eventqueue.put(('quit', None, None))
#self.__thread['readXML'] = threading.Thread(name='readXML', target=self.__readXML)
#self.__thread['readXML'].start()
#self.__thread['spawnEvents'] = threading.Thread(name='spawnEvents', target=self.__spawnEvents)
@@ -226,18 +177,17 @@ class XMLStream(object):
def __readXML(self):
"Parses the incoming stream, adding to xmlin queue as it goes"
#build cElementTree object from expat was we go
- #self.filesocket = self.socket.makefile('rb',0) #this is broken in python2.5, but works in python3.0
- self.filesocket = _fileobject(self.socket)
+ #self.filesocket = self.socket.makefile('rb', 0)
edepth = 0
root = None
- for (event, xmlobj) in cElementTree.iterparse(self.filesocket, ('end', 'start')):
+ for (event, xmlobj) in cElementTree.iterparse(self.filesocket, (b'end', b'start')):
if edepth == 0: # and xmlobj.tag.split('}', 1)[-1] == self.basetag:
- if event == 'start':
+ if event == b'start':
root = xmlobj
self.start_stream_handler(root)
- if event == 'end':
+ if event == b'end':
edepth += -1
- if edepth == 0 and event == 'end':
+ if edepth == 0 and event == b'end':
return False
elif edepth == 1:
#self.xmlin.put(xmlobj)
@@ -249,15 +199,13 @@ class XMLStream(object):
return False
if root:
root.clear()
- if event == 'start':
+ if event == b'start':
edepth += 1
def sendRaw(self, data):
logging.debug("SEND: %s" % data)
- if type(data) == type(u''):
- data = data.encode('utf-8')
try:
- self.socket.send(data)
+ self.socket.send(bytes(data, "utf-8"))
#except socket.error,(errno, strerror):
except:
self.state.set('connected', False)
@@ -277,7 +225,7 @@ class XMLStream(object):
self.socket.close()
self.filesocket.close()
self.socket.shutdown(socket.SHUT_RDWR)
- except socket.error,(errno,strerror):
+ except socket.error as serr:
#logging.warning("Error while disconnecting. Socket Error #%s: %s" % (errno, strerror))
#thread.exit_thread()
pass
@@ -308,12 +256,28 @@ class XMLStream(object):
stanza = StanzaBase(self, xmlobj)
for handler in self.__handlers:
if handler.match(xmlobj):
- handler.run(stanza)
+ handler.prerun(stanza)
+ self.eventqueue.put(('stanza', handler, stanza))
if handler.checkDelete(): self.__handlers.pop(self.__handlers.index(handler))
#loop through handlers and test match
#spawn threads as necessary, call handlers, sending Stanza
+ def _eventRunner(self):
+ logging.debug("Loading event runner")
+ while self.run:
+ try:
+ event = self.eventqueue.get(True, timeout=5)
+ except queue.Empty:
+ even = None
+ if event is not None:
+ etype, handler, stanza = event
+ if etype == 'stanza':
+ handler.run(stanza)
+ if etype == 'quit':
+ logging.debug("Quitting eventRunner thread")
+ return False
+
def registerHandler(self, handler, before=None, after=None):
"Add handler with matcher class and parameters."
self.__handlers.append(handler)
@@ -386,24 +350,21 @@ class XMLStream(object):
return ''.join(newoutput)
def xmlesc(self, text):
- if type(text) != types.UnicodeType:
- text = list(unicode(text, 'utf-8', 'ignore'))
- else:
- text = list(text)
+ text = list(text)
cc = 0
matches = ('&', '<', '"', '>', "'")
for c in text:
if c in matches:
if c == '&':
- text[cc] = u'&amp;'
+ text[cc] = '&amp;'
elif c == '<':
- text[cc] = u'&lt;'
+ text[cc] = '&lt;'
elif c == '>':
- text[cc] = u'&gt;'
+ text[cc] = '&gt;'
elif c == "'":
- text[cc] = u'&apos;'
+ text[cc] = '&apos;'
elif self.escape_quotes:
- text[cc] = u'&quot;'
+ text[cc] = '&quot;'
cc += 1
return ''.join(text)