diff options
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r-- | sleekxmpp/xmlstream/handler/waiter.py | 5 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/matcher/xmlmask.py | 8 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/scheduler.py | 9 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/stanzabase.py | 5 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 122 |
5 files changed, 110 insertions, 39 deletions
diff --git a/sleekxmpp/xmlstream/handler/waiter.py b/sleekxmpp/xmlstream/handler/waiter.py index 80720226..a4bc3545 100644 --- a/sleekxmpp/xmlstream/handler/waiter.py +++ b/sleekxmpp/xmlstream/handler/waiter.py @@ -16,6 +16,9 @@ from sleekxmpp.xmlstream import StanzaBase, RESPONSE_TIMEOUT from sleekxmpp.xmlstream.handler.base import BaseHandler +log = logging.getLogger(__name__) + + class Waiter(BaseHandler): """ @@ -85,7 +88,7 @@ class Waiter(BaseHandler): stanza = self._payload.get(True, timeout) except queue.Empty: stanza = False - logging.warning("Timed out waiting for %s" % self.name) + log.warning("Timed out waiting for %s" % self.name) self.stream.removeHandler(self.name) return stanza diff --git a/sleekxmpp/xmlstream/matcher/xmlmask.py b/sleekxmpp/xmlstream/matcher/xmlmask.py index 2967a2af..6ebb437d 100644 --- a/sleekxmpp/xmlstream/matcher/xmlmask.py +++ b/sleekxmpp/xmlstream/matcher/xmlmask.py @@ -6,6 +6,8 @@ See the file LICENSE for copying permission. """ +import logging + from xml.parsers.expat import ExpatError from sleekxmpp.xmlstream.stanzabase import ET @@ -18,6 +20,9 @@ from sleekxmpp.xmlstream.matcher.base import MatcherBase IGNORE_NS = False +log = logging.getLogger(__name__) + + class MatchXMLMask(MatcherBase): """ @@ -97,8 +102,7 @@ class MatchXMLMask(MatcherBase): try: mask = ET.fromstring(mask) except ExpatError: - logging.log(logging.WARNING, - "Expat error: %s\nIn parsing: %s" % ('', mask)) + log.warning("Expat error: %s\nIn parsing: %s" % ('', mask)) if not use_ns: # Compare the element without using namespaces. diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py index 240d4a4b..14359102 100644 --- a/sleekxmpp/xmlstream/scheduler.py +++ b/sleekxmpp/xmlstream/scheduler.py @@ -15,6 +15,9 @@ except ImportError: import Queue as queue +log = logging.getLogger(__name__) + + class Task(object): """ @@ -146,6 +149,8 @@ class Scheduler(object): if wait <= 0.0: newtask = self.addq.get(False) else: + if wait >= 3.0: + wait = 3.0 newtask = self.addq.get(True, wait) except queue.Empty: cleanup = [] @@ -168,13 +173,13 @@ class Scheduler(object): except KeyboardInterrupt: self.run = False if self.parentstop is not None: - logging.debug("stopping parent") + log.debug("stopping parent") self.parentstop.set() except SystemExit: self.run = False if self.parentstop is not None: self.parentstop.set() - logging.debug("Quitting Scheduler thread") + log.debug("Quitting Scheduler thread") if self.parentqueue is not None: self.parentqueue.put(('quit', None, None)) diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py index f4d66aa8..aabd3864 100644 --- a/sleekxmpp/xmlstream/stanzabase.py +++ b/sleekxmpp/xmlstream/stanzabase.py @@ -16,6 +16,9 @@ from sleekxmpp.xmlstream import JID from sleekxmpp.xmlstream.tostring import tostring +log = logging.getLogger(__name__) + + # Used to check if an argument is an XML object. XML_TYPE = type(ET.Element('xml')) @@ -1140,7 +1143,7 @@ class StanzaBase(ElementBase): Meant to be overridden. """ - logging.exception('Error handling {%s}%s stanza' % (self.namespace, + log.exception('Error handling {%s}%s stanza' % (self.namespace, self.name)) def send(self): diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index ace93cc3..30b76ce7 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -44,6 +44,9 @@ HANDLER_THREADS = 1 SSL_SUPPORT = True +log = logging.getLogger(__name__) + + class RestartStream(Exception): """ Exception to restart stream processing, including @@ -87,6 +90,8 @@ class XMLStream(object): send_queue -- A queue of stanzas to be sent on the stream. socket -- The connection to the server. ssl_support -- Indicates if a SSL library is available for use. + ssl_version -- The version of the SSL protocol to use. + Defaults to ssl.PROTOCOL_TLSv1. state -- A state machine for managing the stream's connection state. stream_footer -- The start tag and any attributes for the stream's @@ -155,6 +160,7 @@ class XMLStream(object): self.sendXML = self.send_xml self.ssl_support = SSL_SUPPORT + self.ssl_version = ssl.PROTOCOL_TLSv1 self.state = StateMachine(('disconnected', 'connected')) self.state._set_state('disconnected') @@ -196,8 +202,15 @@ class XMLStream(object): self.auto_reconnect = True self.is_client = False - signal.signal(signal.SIGHUP, self._handle_kill) - signal.signal(signal.SIGTERM, self._handle_kill) # used in Windows + try: + if hasattr(signal, 'SIGHUP'): + signal.signal(signal.SIGHUP, self._handle_kill) + if hasattr(signal, 'SIGTERM'): + # Used in Windows + signal.signal(signal.SIGTERM, self._handle_kill) + except: + log.debug("Can not set interrupt signal handlers. " + \ + "SleekXMPP is not running from a main thread.") def _handle_kill(self, signum, frame): """ @@ -265,7 +278,7 @@ class XMLStream(object): self.socket = self.socket_class(Socket.AF_INET, Socket.SOCK_STREAM) self.socket.settimeout(None) if self.use_ssl and self.ssl_support: - logging.debug("Socket Wrapped for SSL") + log.debug("Socket Wrapped for SSL") ssl_socket = ssl.wrap_socket(self.socket) if hasattr(self.socket, 'socket'): # We are using a testing socket, so preserve the top @@ -275,7 +288,7 @@ class XMLStream(object): self.socket = ssl_socket try: - logging.debug("Connecting to %s:%s" % self.address) + log.debug("Connecting to %s:%s" % self.address) self.socket.connect(self.address) self.set_socket(self.socket, ignore=True) #this event is where you should set your application state @@ -283,7 +296,7 @@ class XMLStream(object): return True except Socket.error as serr: error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" - logging.error(error_msg % (self.address[0], self.address[1], + log.error(error_msg % (self.address[0], self.address[1], serr.errno, serr.strerror)) time.sleep(1) return False @@ -328,10 +341,10 @@ class XMLStream(object): """ Reset the stream's state and reconnect to the server. """ - logging.debug("reconnecting...") + log.debug("reconnecting...") self.state.transition('connected', 'disconnected', wait=2.0, func=self._disconnect, args=(True,)) - logging.debug("connecting...") + log.debug("connecting...") return self.state.transition('disconnected', 'connected', wait=2.0, func=self._connect) @@ -368,9 +381,10 @@ class XMLStream(object): to be restarted. """ if self.ssl_support: - logging.info("Negotiating TLS") + log.info("Negotiating TLS") + log.info("Using SSL version: %s" % str(self.ssl_version)) ssl_socket = ssl.wrap_socket(self.socket, - ssl_version=ssl.PROTOCOL_TLSv1, + ssl_version=self.ssl_version, do_handshake_on_connect=False) if hasattr(self.socket, 'socket'): # We are using a testing socket, so preserve the top @@ -382,7 +396,7 @@ class XMLStream(object): self.set_socket(self.socket) return True else: - logging.warning("Tried to enable TLS, but ssl module not found.") + log.warning("Tried to enable TLS, but ssl module not found.") return False def start_stream_handler(self, xml): @@ -517,6 +531,17 @@ class XMLStream(object): self.__event_handlers[name] = filter(filter_pointers, self.__event_handlers[name]) + def event_handled(self, name): + """ + Indicates if an event has any associated handlers. + + Returns the number of registered handlers. + + Arguments: + name -- The name of the event to check. + """ + return len(self.__event_handlers.get(name, [])) + def event(self, name, data={}, direct=False): """ Manually trigger a custom event. @@ -525,13 +550,22 @@ class XMLStream(object): name -- The name of the event to trigger. data -- Data that will be passed to each event handler. Defaults to an empty dictionary. - direct -- Runs the event directly if True. + direct -- Runs the event directly if True, skipping the + event queue. All event handlers will run in the + same thread. """ for handler in self.__event_handlers.get(name, []): if direct: - handler[0](copy.copy(data)) + try: + handler[0](copy.copy(data)) + except Exception as e: + error_msg = 'Error processing event handler: %s' + log.exception(error_msg % str(handler[0])) + if hasattr(data, 'exception'): + data.exception(e) else: self.event_queue.put(('event', handler, copy.copy(data))) + if handler[2]: # If the handler is disposable, we will go ahead and # remove it now instead of waiting for it to be @@ -591,7 +625,7 @@ class XMLStream(object): mask = mask.xml data = str(data) if mask is not None: - logging.warning("Use of send mask waiters is deprecated.") + log.warning("Use of send mask waiters is deprecated.") wait_for = Waiter("SendWait_%s" % self.new_id(), MatchXMLMask(mask)) self.register_handler(wait_for) @@ -648,7 +682,7 @@ class XMLStream(object): self.__thread[name].start() for t in range(0, HANDLER_THREADS): - logging.debug("Starting HANDLER THREAD") + log.debug("Starting HANDLER THREAD") start_thread('stream_event_handler_%s' % t, self._event_runner) start_thread('send_thread', self._send_thread) @@ -686,16 +720,16 @@ class XMLStream(object): if self.is_client: self.send_raw(self.stream_header) except KeyboardInterrupt: - logging.debug("Keyboard Escape Detected in _process") + log.debug("Keyboard Escape Detected in _process") self.stop.set() except SystemExit: - logging.debug("SystemExit in _process") + log.debug("SystemExit in _process") self.stop.set() except Socket.error: - logging.exception('Socket Error') + log.exception('Socket Error') except: if not self.stop.isSet(): - logging.exception('Connection error.') + log.exception('Connection error.') if not self.stop.isSet() and self.auto_reconnect: self.reconnect() else: @@ -725,7 +759,7 @@ class XMLStream(object): if depth == 0: # The stream's root element has closed, # terminating the stream. - logging.debug("End of stream recieved") + log.debug("End of stream recieved") self.stream_end_event.set() return False elif depth == 1: @@ -739,7 +773,29 @@ class XMLStream(object): # Keep the root element empty of children to # save on memory use. root.clear() - logging.debug("Ending read XML loop") + log.debug("Ending read XML loop") + + def _build_stanza(self, xml, default_ns=None): + """ + Create a stanza object from a given XML object. + + If a specialized stanza type is not found for the XML, then + a generic StanzaBase stanza will be returned. + + Arguments: + xml -- The XML object to convert into a stanza object. + default_ns -- Optional default namespace to use instead of the + stream's current default namespace. + """ + if default_ns is None: + default_ns = self.default_ns + stanza_type = StanzaBase + for stanza_class in self.__root_stanza: + if xml.tag == "{%s}%s" % (default_ns, stanza_class.name): + stanza_type = stanza_class + break + stanza = stanza_type(self, xml) + return stanza def __spawn_event(self, xml): """ @@ -750,7 +806,7 @@ class XMLStream(object): Arguments: xml -- The XML stanza to analyze. """ - logging.debug("RECV: %s" % tostring(xml, + log.debug("RECV: %s" % tostring(xml, xmlns=self.default_ns, stream=self)) # Apply any preprocessing filters. @@ -788,7 +844,7 @@ class XMLStream(object): def _threaded_event_wrapper(self, func, args): """ - Capture exceptions for event handlers that run + Capture exceptions for event handlers that run in individual threads. Arguments: @@ -799,7 +855,7 @@ class XMLStream(object): func(*args) except Exception as e: error_msg = 'Error processing event handler: %s' - logging.exception(error_msg % str(func)) + log.exception(error_msg % str(func)) if hasattr(args[0], 'exception'): args[0].exception(e) @@ -812,7 +868,7 @@ class XMLStream(object): Stream event handlers will all execute in this thread. Custom event handlers may be spawned in individual threads. """ - logging.debug("Loading event runner") + log.debug("Loading event runner") try: while not self.stop.isSet(): try: @@ -830,14 +886,14 @@ class XMLStream(object): handler.run(args[0]) except Exception as e: error_msg = 'Error processing stream handler: %s' - logging.exception(error_msg % handler.name) + log.exception(error_msg % handler.name) args[0].exception(e) elif etype == 'schedule': try: - logging.debug(args) + log.debug(args) handler(*args[0]) except: - logging.exception('Error processing scheduled task') + log.exception('Error processing scheduled task') elif etype == 'event': func, threaded, disposable = handler try: @@ -851,14 +907,14 @@ class XMLStream(object): func(*args) except Exception as e: error_msg = 'Error processing event handler: %s' - logging.exception(error_msg % str(func)) + log.exception(error_msg % str(func)) if hasattr(args[0], 'exception'): args[0].exception(e) elif etype == 'quit': - logging.debug("Quitting event runner thread") + log.debug("Quitting event runner thread") return False except KeyboardInterrupt: - logging.debug("Keyboard Escape Detected in _event_runner") + log.debug("Keyboard Escape Detected in _event_runner") self.disconnect() return except SystemExit: @@ -876,14 +932,14 @@ class XMLStream(object): data = self.send_queue.get(True, 1) except queue.Empty: continue - logging.debug("SEND: %s" % data) + log.debug("SEND: %s" % data) try: self.socket.send(data.encode('utf-8')) except: - logging.warning("Failed to send %s" % data) + log.warning("Failed to send %s" % data) self.disconnect(self.auto_reconnect) except KeyboardInterrupt: - logging.debug("Keyboard Escape Detected in _send_thread") + log.debug("Keyboard Escape Detected in _send_thread") self.disconnect() return except SystemExit: |