From ede9dcd18f81dd094e587105071d19ca35d00a3e Mon Sep 17 00:00:00 2001 From: Florent Le Coz Date: Tue, 22 Jul 2014 02:58:34 +0200 Subject: An other cleanup of xmlstream.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove some useless things (like handling signals, managing the threads, etc), add some comment to recently added/fixed methods… --- slixmpp/xmlstream/xmlstream.py | 230 +++++++---------------------------------- 1 file changed, 39 insertions(+), 191 deletions(-) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 1540ae7a..8552dc6b 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -23,7 +23,6 @@ import signal import socket as Socket import ssl import sys -import threading import time import random import weakref @@ -46,7 +45,6 @@ RESPONSE_TIMEOUT = 30 log = logging.getLogger(__name__) - class NotConnectedError(Exception): """ Raised when we try to send something over the wire but we are not @@ -205,28 +203,15 @@ class XMLStream(object): #: if the connection is terminated. self.end_session_on_disconnect = True - #: A queue of string data to be sent over the stream. - self.send_queue = Queue() - self.send_queue_lock = threading.Lock() - self.send_lock = threading.RLock() - - self.__failed_send_stanza = None - #: A mapping of XML namespaces to well-known prefixes. self.namespace_map = {StanzaBase.xml_ns: 'xml'} - self.__thread = {} self.__root_stanza = [] self.__handlers = [] self.__event_handlers = {} self.__filters = {'in': [], 'out': [], 'out_sync': []} - self.__thread_count = 0 - self.__thread_cond = threading.Condition() - self.__active_threads = set() - self._use_daemons = False self._id = 0 - self._id_lock = threading.Lock() #: We use an ID prefix to ensure that all ID values are unique. self._id_prefix = '%s-' % uuid.uuid4() @@ -241,53 +226,6 @@ class XMLStream(object): self.add_event_handler('disconnected', self._remove_schedules) self.add_event_handler('session_start', self._start_keepalive) - self.add_event_handler('session_start', self._cert_expiration) - - def use_signals(self, signals=None): - """Register signal handlers for ``SIGHUP`` and ``SIGTERM``. - - By using signals, a ``'killed'`` event will be raised when the - application is terminated. - - If a signal handler already existed, it will be executed first, - before the ``'killed'`` event is raised. - - :param list signals: A list of signal names to be monitored. - Defaults to ``['SIGHUP', 'SIGTERM']``. - """ - if signals is None: - signals = ['SIGHUP', 'SIGTERM'] - - existing_handlers = {} - for sig_name in signals: - if hasattr(signal, sig_name): - sig = getattr(signal, sig_name) - handler = signal.getsignal(sig) - if handler: - existing_handlers[sig] = handler - - def handle_kill(signum, frame): - """ - Capture kill event and disconnect cleanly after first - spawning the ``'killed'`` event. - """ - - if signum in existing_handlers and \ - existing_handlers[signum] != handle_kill: - existing_handlers[signum](signum, frame) - - self.event("killed", direct=True) - self.disconnect() - - try: - for sig_name in signals: - if hasattr(signal, sig_name): - sig = getattr(signal, sig_name) - signal.signal(sig, handle_kill) - self.__signals_installed = True - except: - log.debug("Can not set interrupt signal handlers. " + \ - "Slixmpp is not running from a main thread.") def new_id(self): """Generate and return a new stream ID in hexadecimal form. @@ -296,9 +234,8 @@ class XMLStream(object): ID values. Using this method ensures that all new ID values are unique in this stream. """ - with self._id_lock: - self._id += 1 - return self.get_id() + self._id += 1 + return self.get_id() def get_id(self): """Return the current unique stream ID in hexadecimal form.""" @@ -350,11 +287,16 @@ class XMLStream(object): asyncio.async(connect_routine) def init_parser(self): + """init the XML parser. The parser must always be reset for each new + connexion + """ self.xml_depth = 0 self.xml_root = None self.parser = xml.etree.ElementTree.XMLPullParser(("start", "end")) def connection_made(self, transport): + """Called when the TCP connection has been established with the server + """ self.event("connected") self.transport = transport self.socket = self.transport.get_extra_info("socket") @@ -362,6 +304,12 @@ class XMLStream(object): self.send_raw(self.stream_header) def data_received(self, data): + """Called when incoming data is received on the socket. + + We feed that data to the parser and the see if this produced any XML + event. This could trigger one or more event (a stanza is received, + the stream is opened, etc). + """ self.parser.feed(data) for event, xml in self.parser.read_events(): if event == 'start': @@ -372,6 +320,7 @@ class XMLStream(object): stream=self, top_level=True, open_only=True)) + self.start_stream_handler(self.xml_root) self.xml_depth += 1 if event == 'end': self.xml_depth -= 1 @@ -381,8 +330,8 @@ class XMLStream(object): log.debug("End of stream received") self.abort() elif self.xml_depth == 1: - # We only raise events for stanzas that are direct - # children of the root element. + # A stanza is an XML element that is a direct child of + # the root element, hence the check of depth == 1 self.__spawn_event(xml) if self.xml_root is not None: # Keep the root element empty of children to @@ -390,28 +339,31 @@ class XMLStream(object): self.xml_root.clear() def eof_received(self): - """ - When the TCP connection is properly closed by the remote end + """When the TCP connection is properly closed by the remote end """ log.debug("eof_received") def connection_lost(self, exception): + """On any kind of disconnection, initiated by us or not. This signals the + closure of the TCP connection """ - On any kind of disconnection - """ - log.warning("connection_lost: %s", (exception,)) + log.info("connection_lost: %s", (exception,)) + self.event("disconnected") if self.end_session_on_disconnect: self.event('session_end') + # All these objects are associated with one TCP connection. Since + # we are not connected anymore, destroy them self.parser = None self.transport = None self.socket = None - self.event("disconnected") def disconnect(self, wait=2.0): """Close the XML stream and wait for an acknowldgement from the server for at most `wait` seconds. After the given number of seconds has - passed without a response from the serveur, abort() is called. If - wait is 0.0, this is equivalent to calling abort() directly. + passed without a response from the serveur, or when the server + successfuly responds with a closure of its own stream, abort() is + called. If wait is 0.0, this is almost equivalent to calling abort() + directly. Does nothing if we are not connected. @@ -429,7 +381,7 @@ class XMLStream(object): """ if self.transport: self.transport.abort() - self.event("killed", direct=True) + self.event("killed") def reconnect(self, wait=2.0): """Calls disconnect(), and once we are disconnected (after the timeout, or @@ -475,40 +427,6 @@ class XMLStream(object): server_hostname=self.address[0]) asyncio.async(ssl_connect_routine) - def _cert_expiration(self, event): - """Schedule an event for when the TLS certificate expires.""" - - if not self._der_cert: - log.warn("TLS or SSL was enabled, but no certificate was found.") - return - - def restart(): - if not self.event_handled('ssl_expired_cert'): - log.warn("The server certificate has expired. Restarting.") - self.reconnect() - else: - pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert) - self.event('ssl_expired_cert', pem_cert) - - cert_ttl = cert.get_ttl(self._der_cert) - if cert_ttl is None: - return - - if cert_ttl.days < 0: - log.warn('CERT: Certificate has expired.') - restart() - - try: - total_seconds = cert_ttl.total_seconds() - except AttributeError: - # for Python < 2.7 - total_seconds = (cert_ttl.microseconds + (cert_ttl.seconds + cert_ttl.days * 24 * 3600) * 10**6) / 10**6 - - log.info('CERT: Time until certificate expiration: %s' % cert_ttl) - self.schedule('Certificate Expiration', - total_seconds, - restart) - def _start_keepalive(self, event): """Begin sending whitespace periodically to keep the connection alive. @@ -596,39 +514,6 @@ class XMLStream(object): """Remove an incoming or outgoing filter.""" self.__filters[mode].remove(handler) - def add_handler(self, mask, pointer, name=None, disposable=False, - threaded=False, filter=False, instream=False): - """A shortcut method for registering a handler using XML masks. - - The use of :meth:`register_handler()` is preferred. - - :param mask: An XML snippet matching the structure of the - stanzas that will be passed to this handler. - :param pointer: The handler function itself. - :parm name: A unique name for the handler. A name will - be generated if one is not provided. - :param disposable: Indicates if the handler should be discarded - after one use. - :param threaded: **DEPRECATED**. - Remains for backwards compatibility. - :param filter: **DEPRECATED**. - Remains for backwards compatibility. - :param instream: Indicates if the handler should execute during - stream processing and not during normal event - processing. - """ - # To prevent circular dependencies, we must load the matcher - # and handler classes here. - - if name is None: - name = 'add_handler_%s' % self.new_id() - self.register_handler( - XMLCallback(name, - MatchXMLMask(mask, self.default_ns), - pointer, - once=disposable, - instream=instream)) - def register_handler(self, handler, before=None, after=None): """Add a stream event handler that will be executed when a matching stanza is received. @@ -688,22 +573,19 @@ class XMLStream(object): else: return next(self.dns_answers) - def add_event_handler(self, name, pointer, - threaded=False, disposable=False): + def add_event_handler(self, name, pointer, disposable=False): """Add a custom event handler that will be executed whenever its event is manually triggered. :param name: The name of the event that will trigger this handler. :param pointer: The function to execute. - :param threaded: If set to ``True``, the handler will execute - in its own thread. Defaults to ``False``. :param disposable: If set to ``True``, the handler will be discarded after one use. Defaults to ``False``. """ if not name in self.__event_handlers: self.__event_handlers[name] = [] - self.__event_handlers[name].append((pointer, threaded, disposable)) + self.__event_handlers[name].append((pointer, disposable)) def del_event_handler(self, name, pointer): """Remove a function as a handler for an event. @@ -747,30 +629,32 @@ class XMLStream(object): for handler in handlers: #TODO: Data should not be copied, but should be read only, # but this might break current code so it's left for future. + handler_callback, disposable = handler out_data = copy.copy(data) if len(handlers) > 1 else data old_exception = getattr(data, 'exception', None) if direct: try: - handler[0](out_data) + handler_callback(out_data) except Exception as e: error_msg = 'Error processing event handler: %s' - log.exception(error_msg, str(handler[0])) + log.exception(error_msg, str(handler_callback)) if old_exception: old_exception(e) else: self.exception(e) else: self.run_event(('event', handler, out_data)) - if handler[2]: + if disposable: # If the handler is disposable, we will go ahead and # remove it now instead of waiting for it to be # processed in the queue. try: h_index = self.__event_handlers[name].index(handler) - self.__event_handlers[name].pop(h_index) except: pass + else: + self.__event_handlers[name].pop(h_index) def schedule(self, name, seconds, callback, args=tuple(), kwargs={}, repeat=False): @@ -877,22 +761,13 @@ class XMLStream(object): :param string data: Any bytes or utf-8 string value. """ + print("SEND: %s" % (data)) if not self.transport: raise NotConnectedError() if isinstance(data, str): data = data.encode('utf-8') self.transport.write(data) - def _start_thread(self, name, target, track=True): - self.__thread[name] = threading.Thread(name=name, target=target) - self.__thread[name].daemon = self._use_daemons - self.__thread[name].start() - - if track: - self.__active_threads.add(name) - with self.__thread_cond: - self.__thread_count += 1 - def _build_stanza(self, xml, default_ns=None): """Create a stanza object from a given XML object. @@ -965,25 +840,6 @@ class XMLStream(object): if unhandled: stanza.unhandled() - def _threaded_event_wrapper(self, func, args): - """Capture exceptions for event handlers that run - in individual threads. - - :param func: The event handler to execute. - :param args: Arguments to the event handler. - """ - # this is always already copied before this is invoked - orig = args[0] - try: - func(*args) - except Exception as e: - error_msg = 'Error processing event handler: %s' - log.exception(error_msg, str(func)) - if hasattr(orig, 'exception'): - orig.exception(e) - else: - self.exception(e) - def run_event(self, event): etype, handler = event[0:2] args = event[2:] @@ -1005,17 +861,9 @@ class XMLStream(object): log.exception('Error processing scheduled task') self.exception(e) elif etype == 'event': - func, threaded, disposable = handler + func, disposable = handler try: - if threaded: - x = threading.Thread( - name="Event_%s" % str(func), - target=self._threaded_event_wrapper, - args=(func, args)) - x.daemon = self._use_daemons - x.start() - else: - func(*args) + func(*args) except Exception as e: error_msg = 'Error processing event handler: %s' log.exception(error_msg, str(func)) -- cgit v1.2.3