summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r--sleekxmpp/xmlstream/handler/waiter.py5
-rw-r--r--sleekxmpp/xmlstream/matcher/xmlmask.py8
-rw-r--r--sleekxmpp/xmlstream/scheduler.py9
-rw-r--r--sleekxmpp/xmlstream/stanzabase.py5
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py122
5 files changed, 110 insertions, 39 deletions
diff --git a/sleekxmpp/xmlstream/handler/waiter.py b/sleekxmpp/xmlstream/handler/waiter.py
index 80720226..a4bc3545 100644
--- a/sleekxmpp/xmlstream/handler/waiter.py
+++ b/sleekxmpp/xmlstream/handler/waiter.py
@@ -16,6 +16,9 @@ from sleekxmpp.xmlstream import StanzaBase, RESPONSE_TIMEOUT
from sleekxmpp.xmlstream.handler.base import BaseHandler
+log = logging.getLogger(__name__)
+
+
class Waiter(BaseHandler):
"""
@@ -85,7 +88,7 @@ class Waiter(BaseHandler):
stanza = self._payload.get(True, timeout)
except queue.Empty:
stanza = False
- logging.warning("Timed out waiting for %s" % self.name)
+ log.warning("Timed out waiting for %s" % self.name)
self.stream.removeHandler(self.name)
return stanza
diff --git a/sleekxmpp/xmlstream/matcher/xmlmask.py b/sleekxmpp/xmlstream/matcher/xmlmask.py
index 2967a2af..6ebb437d 100644
--- a/sleekxmpp/xmlstream/matcher/xmlmask.py
+++ b/sleekxmpp/xmlstream/matcher/xmlmask.py
@@ -6,6 +6,8 @@
See the file LICENSE for copying permission.
"""
+import logging
+
from xml.parsers.expat import ExpatError
from sleekxmpp.xmlstream.stanzabase import ET
@@ -18,6 +20,9 @@ from sleekxmpp.xmlstream.matcher.base import MatcherBase
IGNORE_NS = False
+log = logging.getLogger(__name__)
+
+
class MatchXMLMask(MatcherBase):
"""
@@ -97,8 +102,7 @@ class MatchXMLMask(MatcherBase):
try:
mask = ET.fromstring(mask)
except ExpatError:
- logging.log(logging.WARNING,
- "Expat error: %s\nIn parsing: %s" % ('', mask))
+ log.warning("Expat error: %s\nIn parsing: %s" % ('', mask))
if not use_ns:
# Compare the element without using namespaces.
diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py
index 240d4a4b..14359102 100644
--- a/sleekxmpp/xmlstream/scheduler.py
+++ b/sleekxmpp/xmlstream/scheduler.py
@@ -15,6 +15,9 @@ except ImportError:
import Queue as queue
+log = logging.getLogger(__name__)
+
+
class Task(object):
"""
@@ -146,6 +149,8 @@ class Scheduler(object):
if wait <= 0.0:
newtask = self.addq.get(False)
else:
+ if wait >= 3.0:
+ wait = 3.0
newtask = self.addq.get(True, wait)
except queue.Empty:
cleanup = []
@@ -168,13 +173,13 @@ class Scheduler(object):
except KeyboardInterrupt:
self.run = False
if self.parentstop is not None:
- logging.debug("stopping parent")
+ log.debug("stopping parent")
self.parentstop.set()
except SystemExit:
self.run = False
if self.parentstop is not None:
self.parentstop.set()
- logging.debug("Quitting Scheduler thread")
+ log.debug("Quitting Scheduler thread")
if self.parentqueue is not None:
self.parentqueue.put(('quit', None, None))
diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py
index f4d66aa8..aabd3864 100644
--- a/sleekxmpp/xmlstream/stanzabase.py
+++ b/sleekxmpp/xmlstream/stanzabase.py
@@ -16,6 +16,9 @@ from sleekxmpp.xmlstream import JID
from sleekxmpp.xmlstream.tostring import tostring
+log = logging.getLogger(__name__)
+
+
# Used to check if an argument is an XML object.
XML_TYPE = type(ET.Element('xml'))
@@ -1140,7 +1143,7 @@ class StanzaBase(ElementBase):
Meant to be overridden.
"""
- logging.exception('Error handling {%s}%s stanza' % (self.namespace,
+ log.exception('Error handling {%s}%s stanza' % (self.namespace,
self.name))
def send(self):
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index ace93cc3..30b76ce7 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -44,6 +44,9 @@ HANDLER_THREADS = 1
SSL_SUPPORT = True
+log = logging.getLogger(__name__)
+
+
class RestartStream(Exception):
"""
Exception to restart stream processing, including
@@ -87,6 +90,8 @@ class XMLStream(object):
send_queue -- A queue of stanzas to be sent on the stream.
socket -- The connection to the server.
ssl_support -- Indicates if a SSL library is available for use.
+ ssl_version -- The version of the SSL protocol to use.
+ Defaults to ssl.PROTOCOL_TLSv1.
state -- A state machine for managing the stream's
connection state.
stream_footer -- The start tag and any attributes for the stream's
@@ -155,6 +160,7 @@ class XMLStream(object):
self.sendXML = self.send_xml
self.ssl_support = SSL_SUPPORT
+ self.ssl_version = ssl.PROTOCOL_TLSv1
self.state = StateMachine(('disconnected', 'connected'))
self.state._set_state('disconnected')
@@ -196,8 +202,15 @@ class XMLStream(object):
self.auto_reconnect = True
self.is_client = False
- signal.signal(signal.SIGHUP, self._handle_kill)
- signal.signal(signal.SIGTERM, self._handle_kill) # used in Windows
+ try:
+ if hasattr(signal, 'SIGHUP'):
+ signal.signal(signal.SIGHUP, self._handle_kill)
+ if hasattr(signal, 'SIGTERM'):
+ # Used in Windows
+ signal.signal(signal.SIGTERM, self._handle_kill)
+ except:
+ log.debug("Can not set interrupt signal handlers. " + \
+ "SleekXMPP is not running from a main thread.")
def _handle_kill(self, signum, frame):
"""
@@ -265,7 +278,7 @@ class XMLStream(object):
self.socket = self.socket_class(Socket.AF_INET, Socket.SOCK_STREAM)
self.socket.settimeout(None)
if self.use_ssl and self.ssl_support:
- logging.debug("Socket Wrapped for SSL")
+ log.debug("Socket Wrapped for SSL")
ssl_socket = ssl.wrap_socket(self.socket)
if hasattr(self.socket, 'socket'):
# We are using a testing socket, so preserve the top
@@ -275,7 +288,7 @@ class XMLStream(object):
self.socket = ssl_socket
try:
- logging.debug("Connecting to %s:%s" % self.address)
+ log.debug("Connecting to %s:%s" % self.address)
self.socket.connect(self.address)
self.set_socket(self.socket, ignore=True)
#this event is where you should set your application state
@@ -283,7 +296,7 @@ class XMLStream(object):
return True
except Socket.error as serr:
error_msg = "Could not connect to %s:%s. Socket Error #%s: %s"
- logging.error(error_msg % (self.address[0], self.address[1],
+ log.error(error_msg % (self.address[0], self.address[1],
serr.errno, serr.strerror))
time.sleep(1)
return False
@@ -328,10 +341,10 @@ class XMLStream(object):
"""
Reset the stream's state and reconnect to the server.
"""
- logging.debug("reconnecting...")
+ log.debug("reconnecting...")
self.state.transition('connected', 'disconnected', wait=2.0,
func=self._disconnect, args=(True,))
- logging.debug("connecting...")
+ log.debug("connecting...")
return self.state.transition('disconnected', 'connected',
wait=2.0, func=self._connect)
@@ -368,9 +381,10 @@ class XMLStream(object):
to be restarted.
"""
if self.ssl_support:
- logging.info("Negotiating TLS")
+ log.info("Negotiating TLS")
+ log.info("Using SSL version: %s" % str(self.ssl_version))
ssl_socket = ssl.wrap_socket(self.socket,
- ssl_version=ssl.PROTOCOL_TLSv1,
+ ssl_version=self.ssl_version,
do_handshake_on_connect=False)
if hasattr(self.socket, 'socket'):
# We are using a testing socket, so preserve the top
@@ -382,7 +396,7 @@ class XMLStream(object):
self.set_socket(self.socket)
return True
else:
- logging.warning("Tried to enable TLS, but ssl module not found.")
+ log.warning("Tried to enable TLS, but ssl module not found.")
return False
def start_stream_handler(self, xml):
@@ -517,6 +531,17 @@ class XMLStream(object):
self.__event_handlers[name] = filter(filter_pointers,
self.__event_handlers[name])
+ def event_handled(self, name):
+ """
+ Indicates if an event has any associated handlers.
+
+ Returns the number of registered handlers.
+
+ Arguments:
+ name -- The name of the event to check.
+ """
+ return len(self.__event_handlers.get(name, []))
+
def event(self, name, data={}, direct=False):
"""
Manually trigger a custom event.
@@ -525,13 +550,22 @@ class XMLStream(object):
name -- The name of the event to trigger.
data -- Data that will be passed to each event handler.
Defaults to an empty dictionary.
- direct -- Runs the event directly if True.
+ direct -- Runs the event directly if True, skipping the
+ event queue. All event handlers will run in the
+ same thread.
"""
for handler in self.__event_handlers.get(name, []):
if direct:
- handler[0](copy.copy(data))
+ try:
+ handler[0](copy.copy(data))
+ except Exception as e:
+ error_msg = 'Error processing event handler: %s'
+ log.exception(error_msg % str(handler[0]))
+ if hasattr(data, 'exception'):
+ data.exception(e)
else:
self.event_queue.put(('event', handler, copy.copy(data)))
+
if handler[2]:
# If the handler is disposable, we will go ahead and
# remove it now instead of waiting for it to be
@@ -591,7 +625,7 @@ class XMLStream(object):
mask = mask.xml
data = str(data)
if mask is not None:
- logging.warning("Use of send mask waiters is deprecated.")
+ log.warning("Use of send mask waiters is deprecated.")
wait_for = Waiter("SendWait_%s" % self.new_id(),
MatchXMLMask(mask))
self.register_handler(wait_for)
@@ -648,7 +682,7 @@ class XMLStream(object):
self.__thread[name].start()
for t in range(0, HANDLER_THREADS):
- logging.debug("Starting HANDLER THREAD")
+ log.debug("Starting HANDLER THREAD")
start_thread('stream_event_handler_%s' % t, self._event_runner)
start_thread('send_thread', self._send_thread)
@@ -686,16 +720,16 @@ class XMLStream(object):
if self.is_client:
self.send_raw(self.stream_header)
except KeyboardInterrupt:
- logging.debug("Keyboard Escape Detected in _process")
+ log.debug("Keyboard Escape Detected in _process")
self.stop.set()
except SystemExit:
- logging.debug("SystemExit in _process")
+ log.debug("SystemExit in _process")
self.stop.set()
except Socket.error:
- logging.exception('Socket Error')
+ log.exception('Socket Error')
except:
if not self.stop.isSet():
- logging.exception('Connection error.')
+ log.exception('Connection error.')
if not self.stop.isSet() and self.auto_reconnect:
self.reconnect()
else:
@@ -725,7 +759,7 @@ class XMLStream(object):
if depth == 0:
# The stream's root element has closed,
# terminating the stream.
- logging.debug("End of stream recieved")
+ log.debug("End of stream recieved")
self.stream_end_event.set()
return False
elif depth == 1:
@@ -739,7 +773,29 @@ class XMLStream(object):
# Keep the root element empty of children to
# save on memory use.
root.clear()
- logging.debug("Ending read XML loop")
+ log.debug("Ending read XML loop")
+
+ def _build_stanza(self, xml, default_ns=None):
+ """
+ Create a stanza object from a given XML object.
+
+ If a specialized stanza type is not found for the XML, then
+ a generic StanzaBase stanza will be returned.
+
+ Arguments:
+ xml -- The XML object to convert into a stanza object.
+ default_ns -- Optional default namespace to use instead of the
+ stream's current default namespace.
+ """
+ if default_ns is None:
+ default_ns = self.default_ns
+ stanza_type = StanzaBase
+ for stanza_class in self.__root_stanza:
+ if xml.tag == "{%s}%s" % (default_ns, stanza_class.name):
+ stanza_type = stanza_class
+ break
+ stanza = stanza_type(self, xml)
+ return stanza
def __spawn_event(self, xml):
"""
@@ -750,7 +806,7 @@ class XMLStream(object):
Arguments:
xml -- The XML stanza to analyze.
"""
- logging.debug("RECV: %s" % tostring(xml,
+ log.debug("RECV: %s" % tostring(xml,
xmlns=self.default_ns,
stream=self))
# Apply any preprocessing filters.
@@ -788,7 +844,7 @@ class XMLStream(object):
def _threaded_event_wrapper(self, func, args):
"""
- Capture exceptions for event handlers that run
+ Capture exceptions for event handlers that run
in individual threads.
Arguments:
@@ -799,7 +855,7 @@ class XMLStream(object):
func(*args)
except Exception as e:
error_msg = 'Error processing event handler: %s'
- logging.exception(error_msg % str(func))
+ log.exception(error_msg % str(func))
if hasattr(args[0], 'exception'):
args[0].exception(e)
@@ -812,7 +868,7 @@ class XMLStream(object):
Stream event handlers will all execute in this thread. Custom event
handlers may be spawned in individual threads.
"""
- logging.debug("Loading event runner")
+ log.debug("Loading event runner")
try:
while not self.stop.isSet():
try:
@@ -830,14 +886,14 @@ class XMLStream(object):
handler.run(args[0])
except Exception as e:
error_msg = 'Error processing stream handler: %s'
- logging.exception(error_msg % handler.name)
+ log.exception(error_msg % handler.name)
args[0].exception(e)
elif etype == 'schedule':
try:
- logging.debug(args)
+ log.debug(args)
handler(*args[0])
except:
- logging.exception('Error processing scheduled task')
+ log.exception('Error processing scheduled task')
elif etype == 'event':
func, threaded, disposable = handler
try:
@@ -851,14 +907,14 @@ class XMLStream(object):
func(*args)
except Exception as e:
error_msg = 'Error processing event handler: %s'
- logging.exception(error_msg % str(func))
+ log.exception(error_msg % str(func))
if hasattr(args[0], 'exception'):
args[0].exception(e)
elif etype == 'quit':
- logging.debug("Quitting event runner thread")
+ log.debug("Quitting event runner thread")
return False
except KeyboardInterrupt:
- logging.debug("Keyboard Escape Detected in _event_runner")
+ log.debug("Keyboard Escape Detected in _event_runner")
self.disconnect()
return
except SystemExit:
@@ -876,14 +932,14 @@ class XMLStream(object):
data = self.send_queue.get(True, 1)
except queue.Empty:
continue
- logging.debug("SEND: %s" % data)
+ log.debug("SEND: %s" % data)
try:
self.socket.send(data.encode('utf-8'))
except:
- logging.warning("Failed to send %s" % data)
+ log.warning("Failed to send %s" % data)
self.disconnect(self.auto_reconnect)
except KeyboardInterrupt:
- logging.debug("Keyboard Escape Detected in _send_thread")
+ log.debug("Keyboard Escape Detected in _send_thread")
self.disconnect()
return
except SystemExit: