From 5ab77c745270d7d5c016c1dc7ef2a82533a4b16e Mon Sep 17 00:00:00 2001 From: Florent Le Coz Date: Thu, 17 Jul 2014 14:19:04 +0200 Subject: Rename to slixmpp --- slixmpp/xmlstream/xmlstream.py | 1808 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1808 insertions(+) create mode 100644 slixmpp/xmlstream/xmlstream.py (limited to 'slixmpp/xmlstream/xmlstream.py') diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py new file mode 100644 index 00000000..040f5096 --- /dev/null +++ b/slixmpp/xmlstream/xmlstream.py @@ -0,0 +1,1808 @@ +""" + slixmpp.xmlstream.xmlstream + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + This module provides the module for creating and + interacting with generic XML streams, along with + the necessary eventing infrastructure. + + Part of Slixmpp: The Slick XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +from __future__ import with_statement, unicode_literals + +import base64 +import copy +import logging +import signal +import socket as Socket +import ssl +import sys +import threading +import time +import random +import weakref +import uuid +import errno + +from xml.parsers.expat import ExpatError + +import slixmpp +from slixmpp.util import Queue, QueueEmpty, safedict +from slixmpp.thirdparty.statemachine import StateMachine +from slixmpp.xmlstream import Scheduler, tostring, cert +from slixmpp.xmlstream.stanzabase import StanzaBase, ET, ElementBase +from slixmpp.xmlstream.handler import Waiter, XMLCallback +from slixmpp.xmlstream.matcher import MatchXMLMask +from slixmpp.xmlstream.resolver import resolve, default_resolver + +# In Python 2.x, file socket objects are broken. A patched socket +# wrapper is provided for this case in filesocket.py. +if sys.version_info < (3, 0): + from slixmpp.xmlstream.filesocket import FileSocket, Socket26 + + +#: The time in seconds to wait before timing out waiting for response stanzas. +RESPONSE_TIMEOUT = 30 + +#: The time in seconds to wait for events from the event queue, and also the +#: time between checks for the process stop signal. +WAIT_TIMEOUT = 1.0 + +#: The number of threads to use to handle XML stream events. This is not the +#: same as the number of custom event handling threads. +#: :data:`HANDLER_THREADS` must be at least 1. For Python implementations +#: with a GIL, this should be left at 1, but for implemetnations without +#: a GIL increasing this value can provide better performance. +HANDLER_THREADS = 1 + +#: The time in seconds to delay between attempts to resend data +#: after an SSL error. +SSL_RETRY_DELAY = 0.5 + +#: The maximum number of times to attempt resending data due to +#: an SSL error. +SSL_RETRY_MAX = 10 + +#: Maximum time to delay between connection attempts is one hour. +RECONNECT_MAX_DELAY = 600 + +#: Maximum number of attempts to connect to the server before quitting +#: and raising a 'connect_failed' event. Setting this to ``None`` will +#: allow infinite reconnection attempts, and using ``0`` will disable +#: reconnections. Defaults to ``None``. +RECONNECT_MAX_ATTEMPTS = None + + +log = logging.getLogger(__name__) + + +class RestartStream(Exception): + """ + Exception to restart stream processing, including + resending the stream header. + """ + + +class XMLStream(object): + """ + An XML stream connection manager and event dispatcher. + + The XMLStream class abstracts away the issues of establishing a + connection with a server and sending and receiving XML "stanzas". + A stanza is a complete XML element that is a direct child of a root + document element. Two streams are used, one for each communication + direction, over the same socket. Once the connection is closed, both + streams should be complete and valid XML documents. + + Three types of events are provided to manage the stream: + :Stream: Triggered based on received stanzas, similar in concept + to events in a SAX XML parser. + :Custom: Triggered manually. + :Scheduled: Triggered based on time delays. + + Typically, stanzas are first processed by a stream event handler which + will then trigger custom events to continue further processing, + especially since custom event handlers may run in individual threads. + + :param socket: Use an existing socket for the stream. Defaults to + ``None`` to generate a new socket. + :param string host: The name of the target server. + :param int port: The port to use for the connection. Defaults to 0. + """ + + def __init__(self, socket=None, host='', port=0): + #: Most XMPP servers support TLSv1, but OpenFire in particular + #: does not work well with it. For OpenFire, set + #: :attr:`ssl_version` to use ``SSLv23``:: + #: + #: import ssl + #: xmpp.ssl_version = ssl.PROTOCOL_SSLv23 + self.ssl_version = ssl.PROTOCOL_TLSv1 + + #: The list of accepted ciphers, in OpenSSL Format. + #: It might be useful to override it for improved security + #: over the python defaults. + self.ciphers = None + + #: Path to a file containing certificates for verifying the + #: server SSL certificate. A non-``None`` value will trigger + #: certificate checking. + #: + #: .. note:: + #: + #: On Mac OS X, certificates in the system keyring will + #: be consulted, even if they are not in the provided file. + self.ca_certs = None + + #: Path to a file containing a client certificate to use for + #: authenticating via SASL EXTERNAL. If set, there must also + #: be a corresponding `:attr:keyfile` value. + self.certfile = None + + #: Path to a file containing the private key for the selected + #: client certificate to use for authenticating via SASL EXTERNAL. + self.keyfile = None + + self._der_cert = None + + #: The time in seconds to wait for events from the event queue, + #: and also the time between checks for the process stop signal. + self.wait_timeout = WAIT_TIMEOUT + + #: The time in seconds to wait before timing out waiting + #: for response stanzas. + self.response_timeout = RESPONSE_TIMEOUT + + #: The current amount to time to delay attempting to reconnect. + #: This value doubles (with some jitter) with each failed + #: connection attempt up to :attr:`reconnect_max_delay` seconds. + self.reconnect_delay = None + + #: Maximum time to delay between connection attempts is one hour. + self.reconnect_max_delay = RECONNECT_MAX_DELAY + + #: Maximum number of attempts to connect to the server before + #: quitting and raising a 'connect_failed' event. Setting to + #: ``None`` allows infinite reattempts, while setting it to ``0`` + #: will disable reconnection attempts. Defaults to ``None``. + self.reconnect_max_attempts = RECONNECT_MAX_ATTEMPTS + + #: The time in seconds to delay between attempts to resend data + #: after an SSL error. + self.ssl_retry_max = SSL_RETRY_MAX + + #: The maximum number of times to attempt resending data due to + #: an SSL error. + self.ssl_retry_delay = SSL_RETRY_DELAY + + #: The connection state machine tracks if the stream is + #: ``'connected'`` or ``'disconnected'``. + self.state = StateMachine(('disconnected', 'connected')) + self.state._set_state('disconnected') + + #: The default port to return when querying DNS records. + self.default_port = int(port) + + #: The domain to try when querying DNS records. + self.default_domain = '' + + #: The expected name of the server, for validation. + self._expected_server_name = '' + self._service_name = '' + + #: The desired, or actual, address of the connected server. + self.address = (host, int(port)) + + #: A file-like wrapper for the socket for use with the + #: :mod:`~xml.etree.ElementTree` module. + self.filesocket = None + self.set_socket(socket) + + if sys.version_info < (3, 0): + self.socket_class = Socket26 + else: + self.socket_class = Socket.socket + + #: Enable connecting to the server directly over SSL, in + #: particular when the service provides two ports: one for + #: non-SSL traffic and another for SSL traffic. + self.use_ssl = False + + #: Enable connecting to the service without using SSL + #: immediately, but allow upgrading the connection later + #: to use SSL. + self.use_tls = False + + #: If set to ``True``, attempt to connect through an HTTP + #: proxy based on the settings in :attr:`proxy_config`. + self.use_proxy = False + + #: If set to ``True``, attempt to use IPv6. + self.use_ipv6 = True + + #: If set to ``True``, allow using the ``dnspython`` DNS library + #: if available. If set to ``False``, the builtin DNS resolver + #: will be used, even if ``dnspython`` is installed. + self.use_dnspython = True + + #: Use CDATA for escaping instead of XML entities. Defaults + #: to ``False``. + self.use_cdata = False + + #: An optional dictionary of proxy settings. It may provide: + #: :host: The host offering proxy services. + #: :port: The port for the proxy service. + #: :username: Optional username for accessing the proxy. + #: :password: Optional password for accessing the proxy. + self.proxy_config = {} + + #: The default namespace of the stream content, not of the + #: stream wrapper itself. + self.default_ns = '' + + self.default_lang = None + self.peer_default_lang = None + + #: The namespace of the enveloping stream element. + self.stream_ns = '' + + #: The default opening tag for the stream element. + self.stream_header = "" + + #: The default closing tag for the stream element. + self.stream_footer = "" + + #: If ``True``, periodically send a whitespace character over the + #: wire to keep the connection alive. Mainly useful for connections + #: traversing NAT. + self.whitespace_keepalive = True + + #: The default interval between keepalive signals when + #: :attr:`whitespace_keepalive` is enabled. + self.whitespace_keepalive_interval = 300 + + #: An :class:`~threading.Event` to signal that the application + #: is stopping, and that all threads should shutdown. + self.stop = threading.Event() + + #: An :class:`~threading.Event` to signal receiving a closing + #: stream tag from the server. + self.stream_end_event = threading.Event() + self.stream_end_event.set() + + #: An :class:`~threading.Event` to signal the start of a stream + #: session. Until this event fires, the send queue is not used + #: and data is sent immediately over the wire. + self.session_started_event = threading.Event() + + #: The default time in seconds to wait for a session to start + #: after connecting before reconnecting and trying again. + self.session_timeout = 45 + + #: Flag for controlling if the session can be considered ended + #: if the connection is terminated. + self.end_session_on_disconnect = True + + #: A queue of stream, custom, and scheduled events to be processed. + self.event_queue = Queue() + + #: A queue of string data to be sent over the stream. + self.send_queue = Queue() + self.send_queue_lock = threading.Lock() + self.send_lock = threading.RLock() + + #: A :class:`~slixmpp.xmlstream.scheduler.Scheduler` instance for + #: executing callbacks in the future based on time delays. + self.scheduler = Scheduler(self.stop) + self.__failed_send_stanza = None + + #: A mapping of XML namespaces to well-known prefixes. + self.namespace_map = {StanzaBase.xml_ns: 'xml'} + + self.__thread = {} + self.__root_stanza = [] + self.__handlers = [] + self.__event_handlers = {} + self.__event_handlers_lock = threading.Lock() + self.__filters = {'in': [], 'out': [], 'out_sync': []} + self.__thread_count = 0 + self.__thread_cond = threading.Condition() + self.__active_threads = set() + self._use_daemons = False + self._disconnect_wait_for_threads = True + + self._id = 0 + self._id_lock = threading.Lock() + + #: We use an ID prefix to ensure that all ID values are unique. + self._id_prefix = '%s-' % uuid.uuid4() + + #: The :attr:`auto_reconnnect` setting controls whether or not + #: the stream will be restarted in the event of an error. + self.auto_reconnect = True + + #: The :attr:`disconnect_wait` setting is the default value + #: for controlling if the system waits for the send queue to + #: empty before ending the stream. This may be overridden by + #: passing ``wait=True`` or ``wait=False`` to :meth:`disconnect`. + #: The default :attr:`disconnect_wait` value is ``False``. + self.disconnect_wait = False + + #: A list of DNS results that have not yet been tried. + self.dns_answers = [] + + #: The service name to check with DNS SRV records. For + #: example, setting this to ``'xmpp-client'`` would query the + #: ``_xmpp-client._tcp`` service. + self.dns_service = None + + self.add_event_handler('connected', self._session_timeout_check) + self.add_event_handler('disconnected', self._remove_schedules) + self.add_event_handler('session_start', self._start_keepalive) + self.add_event_handler('session_start', self._cert_expiration) + + def use_signals(self, signals=None): + """Register signal handlers for ``SIGHUP`` and ``SIGTERM``. + + By using signals, a ``'killed'`` event will be raised when the + application is terminated. + + If a signal handler already existed, it will be executed first, + before the ``'killed'`` event is raised. + + :param list signals: A list of signal names to be monitored. + Defaults to ``['SIGHUP', 'SIGTERM']``. + """ + if signals is None: + signals = ['SIGHUP', 'SIGTERM'] + + existing_handlers = {} + for sig_name in signals: + if hasattr(signal, sig_name): + sig = getattr(signal, sig_name) + handler = signal.getsignal(sig) + if handler: + existing_handlers[sig] = handler + + def handle_kill(signum, frame): + """ + Capture kill event and disconnect cleanly after first + spawning the ``'killed'`` event. + """ + + if signum in existing_handlers and \ + existing_handlers[signum] != handle_kill: + existing_handlers[signum](signum, frame) + + self.event("killed", direct=True) + self.disconnect() + + try: + for sig_name in signals: + if hasattr(signal, sig_name): + sig = getattr(signal, sig_name) + signal.signal(sig, handle_kill) + self.__signals_installed = True + except: + log.debug("Can not set interrupt signal handlers. " + \ + "Slixmpp is not running from a main thread.") + + def new_id(self): + """Generate and return a new stream ID in hexadecimal form. + + Many stanzas, handlers, or matchers may require unique + ID values. Using this method ensures that all new ID values + are unique in this stream. + """ + with self._id_lock: + self._id += 1 + return self.get_id() + + def get_id(self): + """Return the current unique stream ID in hexadecimal form.""" + return "%s%X" % (self._id_prefix, self._id) + + def connect(self, host='', port=0, use_ssl=False, + use_tls=True, reattempt=True): + """Create a new socket and connect to the server. + + Setting ``reattempt`` to ``True`` will cause connection + attempts to be made with an exponential backoff delay (max of + :attr:`reconnect_max_delay` which defaults to 10 minute) until a + successful connection is established. + + :param host: The name of the desired server for the connection. + :param port: Port to connect to on the server. + :param use_ssl: Flag indicating if SSL should be used by connecting + directly to a port using SSL. + :param use_tls: Flag indicating if TLS should be used, allowing for + connecting to a port without using SSL immediately and + later upgrading the connection. + :param reattempt: Flag indicating if the socket should reconnect + after disconnections. + """ + self.stop.clear() + + if host and port: + self.address = (host, int(port)) + try: + Socket.inet_aton(self.address[0]) + except (Socket.error, ssl.SSLError): + self.default_domain = self.address[0] + + # Respect previous SSL and TLS usage directives. + if use_ssl is not None: + self.use_ssl = use_ssl + if use_tls is not None: + self.use_tls = use_tls + + # Repeatedly attempt to connect until a successful connection + # is established. + attempts = self.reconnect_max_attempts + connected = self.state.transition('disconnected', 'connected', + func=self._connect, + args=(reattempt,)) + while reattempt and not connected and not self.stop.is_set(): + connected = self.state.transition('disconnected', 'connected', + func=self._connect) + if not connected: + if attempts is not None: + attempts -= 1 + if attempts <= 0: + self.event('connection_failed', direct=True) + return False + return connected + + def _connect(self, reattempt=True): + self.scheduler.remove('Session timeout check') + + if self.reconnect_delay is None or not reattempt: + delay = 1.0 + else: + delay = min(self.reconnect_delay * 2, self.reconnect_max_delay) + delay = random.normalvariate(delay, delay * 0.1) + log.debug('Waiting %s seconds before connecting.', delay) + elapsed = 0 + try: + while elapsed < delay and not self.stop.is_set(): + time.sleep(0.1) + elapsed += 0.1 + except KeyboardInterrupt: + self.set_stop() + return False + except SystemExit: + self.set_stop() + return False + + if self.default_domain: + try: + host, address, port = self.pick_dns_answer(self.default_domain, + self.address[1]) + self.address = (address, port) + self._service_name = host + except StopIteration: + log.debug("No remaining DNS records to try.") + self.dns_answers = None + if reattempt: + self.reconnect_delay = delay + return False + + af = Socket.AF_INET + proto = 'IPv4' + if ':' in self.address[0]: + af = Socket.AF_INET6 + proto = 'IPv6' + try: + self.socket = self.socket_class(af, Socket.SOCK_STREAM) + except Socket.error: + log.debug("Could not connect using %s", proto) + return False + + self.configure_socket() + + if self.use_proxy: + connected = self._connect_proxy() + if not connected: + if reattempt: + self.reconnect_delay = delay + return False + + if self.use_ssl: + log.debug("Socket Wrapped for SSL") + if self.ca_certs is None: + cert_policy = ssl.CERT_NONE + else: + cert_policy = ssl.CERT_REQUIRED + + ssl_args = safedict({ + 'certfile': self.certfile, + 'keyfile': self.keyfile, + 'ca_certs': self.ca_certs, + 'cert_reqs': cert_policy, + 'do_handshake_on_connect': False + }) + + if sys.version_info >= (2, 7): + ssl_args['ciphers'] = self.ciphers + + ssl_socket = ssl.wrap_socket(self.socket, **ssl_args) + + if hasattr(self.socket, 'socket'): + # We are using a testing socket, so preserve the top + # layer of wrapping. + self.socket.socket = ssl_socket + else: + self.socket = ssl_socket + + try: + if not self.use_proxy: + domain = self.address[0] + if ':' in domain: + domain = '[%s]' % domain + log.debug("Connecting to %s:%s", domain, self.address[1]) + self.socket.connect(self.address) + + if self.use_ssl: + try: + self.socket.do_handshake() + except (Socket.error, ssl.SSLError): + log.error('CERT: Invalid certificate trust chain.') + if not self.event_handled('ssl_invalid_chain'): + self.disconnect(self.auto_reconnect, + send_close=False) + else: + self.event('ssl_invalid_chain', direct=True) + return False + + self._der_cert = self.socket.getpeercert(binary_form=True) + pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert) + log.debug('CERT: %s', pem_cert) + + self.event('ssl_cert', pem_cert, direct=True) + try: + cert.verify(self._expected_server_name, self._der_cert) + except cert.CertificateError as err: + if not self.event_handled('ssl_invalid_cert'): + log.error(err) + self.disconnect(send_close=False) + else: + self.event('ssl_invalid_cert', + pem_cert, + direct=True) + + self.set_socket(self.socket, ignore=True) + #this event is where you should set your application state + self.event('connected', direct=True) + return True + except (Socket.error, ssl.SSLError) as serr: + error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" + self.event('socket_error', serr, direct=True) + domain = self.address[0] + if ':' in domain: + domain = '[%s]' % domain + log.error(error_msg, domain, self.address[1], + serr.errno, serr.strerror) + return False + + def _connect_proxy(self): + """Attempt to connect using an HTTP Proxy.""" + + # Extract the proxy address, and optional credentials + address = (self.proxy_config['host'], int(self.proxy_config['port'])) + cred = None + if self.proxy_config['username']: + username = self.proxy_config['username'] + password = self.proxy_config['password'] + + cred = '%s:%s' % (username, password) + if sys.version_info < (3, 0): + cred = bytes(cred) + else: + cred = bytes(cred, 'utf-8') + cred = base64.b64encode(cred).decode('utf-8') + + # Build the HTTP headers for connecting to the XMPP server + headers = ['CONNECT %s:%s HTTP/1.0' % self.address, + 'Host: %s:%s' % self.address, + 'Proxy-Connection: Keep-Alive', + 'Pragma: no-cache', + 'User-Agent: Slixmpp/%s' % slixmpp.__version__] + if cred: + headers.append('Proxy-Authorization: Basic %s' % cred) + headers = '\r\n'.join(headers) + '\r\n\r\n' + + try: + log.debug("Connecting to proxy: %s:%s", *address) + self.socket.connect(address) + self.send_raw(headers, now=True) + resp = '' + while '\r\n\r\n' not in resp and not self.stop.is_set(): + resp += self.socket.recv(1024).decode('utf-8') + log.debug('RECV: %s', resp) + + lines = resp.split('\r\n') + if '200' not in lines[0]: + self.event('proxy_error', resp) + self.event('connection_failed', direct=True) + log.error('Proxy Error: %s', lines[0]) + return False + + # Proxy connection established, continue connecting + # with the XMPP server. + return True + except (Socket.error, ssl.SSLError) as serr: + error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" + self.event('socket_error', serr, direct=True) + log.error(error_msg, self.address[0], self.address[1], + serr.errno, serr.strerror) + return False + + def _session_timeout_check(self, event=None): + """ + Add check to ensure that a session is established within + a reasonable amount of time. + """ + + def _handle_session_timeout(): + if not self.session_started_event.is_set(): + log.debug("Session start has taken more " + \ + "than %d seconds", self.session_timeout) + self.disconnect(reconnect=self.auto_reconnect) + + self.schedule("Session timeout check", + self.session_timeout, + _handle_session_timeout) + + def disconnect(self, reconnect=False, wait=None, send_close=True): + """Terminate processing and close the XML streams. + + Optionally, the connection may be reconnected and + resume processing afterwards. + + If the disconnect should take place after all items + in the send queue have been sent, use ``wait=True``. + + .. warning:: + + If you are constantly adding items to the queue + such that it is never empty, then the disconnect will + not occur and the call will continue to block. + + :param reconnect: Flag indicating if the connection + and processing should be restarted. + Defaults to ``False``. + :param wait: Flag indicating if the send queue should + be emptied before disconnecting, overriding + :attr:`disconnect_wait`. + :param send_close: Flag indicating if the stream footer + should be sent before terminating the + connection. Setting this to ``False`` + prevents error loops when trying to + disconnect after a socket error. + """ + self.state.transition('connected', 'disconnected', + wait=2.0, + func=self._disconnect, + args=(reconnect, wait, send_close)) + + def _disconnect(self, reconnect=False, wait=None, send_close=True): + if not reconnect: + self.auto_reconnect = False + + if self.end_session_on_disconnect or send_close: + self.event('session_end', direct=True) + + # Wait for the send queue to empty. + if wait is not None: + if wait: + self.send_queue.join() + elif self.disconnect_wait: + self.send_queue.join() + + # Clearing this event will pause the send loop. + self.session_started_event.clear() + + self.__failed_send_stanza = None + + # Send the end of stream marker. + if send_close: + self.send_raw(self.stream_footer, now=True) + + # Wait for confirmation that the stream was + # closed in the other direction. If we didn't + # send a stream footer we don't need to wait + # since the server won't know to respond. + if send_close: + log.info('Waiting for %s from server', self.stream_footer) + self.stream_end_event.wait(4) + else: + self.stream_end_event.set() + + if not self.auto_reconnect: + self.set_stop() + if self._disconnect_wait_for_threads: + self._wait_for_threads() + + try: + self.socket.shutdown(Socket.SHUT_RDWR) + self.socket.close() + self.filesocket.close() + except (Socket.error, ssl.SSLError) as serr: + self.event('socket_error', serr, direct=True) + finally: + #clear your application state + self.event('disconnected', direct=True) + return True + + def abort(self): + self.session_started_event.clear() + self.set_stop() + if self._disconnect_wait_for_threads: + self._wait_for_threads() + try: + self.socket.shutdown(Socket.SHUT_RDWR) + self.socket.close() + self.filesocket.close() + except Socket.error: + pass + self.state.transition_any(['connected', 'disconnected'], 'disconnected', func=lambda: True) + self.event("killed", direct=True) + + def reconnect(self, reattempt=True, wait=False, send_close=True): + """Reset the stream's state and reconnect to the server.""" + log.debug("reconnecting...") + if self.state.ensure('connected'): + self.state.transition('connected', 'disconnected', + wait=2.0, + func=self._disconnect, + args=(True, wait, send_close)) + + attempts = self.reconnect_max_attempts + + log.debug("connecting...") + connected = self.state.transition('disconnected', 'connected', + wait=2.0, + func=self._connect, + args=(reattempt,)) + while reattempt and not connected and not self.stop.is_set(): + connected = self.state.transition('disconnected', 'connected', + wait=2.0, func=self._connect) + connected = connected or self.state.ensure('connected') + if not connected: + if attempts is not None: + attempts -= 1 + if attempts <= 0: + self.event('connection_failed', direct=True) + return False + return connected + + def set_socket(self, socket, ignore=False): + """Set the socket to use for the stream. + + The filesocket will be recreated as well. + + :param socket: The new socket object to use. + :param bool ignore: If ``True``, don't set the connection + state to ``'connected'``. + """ + self.socket = socket + if socket is not None: + # ElementTree.iterparse requires a file. + # 0 buffer files have to be binary. + + # Use the correct fileobject type based on the Python + # version to work around a broken implementation in + # Python 2.x. + if sys.version_info < (3, 0): + self.filesocket = FileSocket(self.socket) + else: + self.filesocket = self.socket.makefile('rb', 0) + if not ignore: + self.state._set_state('connected') + + def configure_socket(self): + """Set timeout and other options for self.socket. + + Meant to be overridden. + """ + self.socket.settimeout(None) + + def configure_dns(self, resolver, domain=None, port=None): + """ + Configure and set options for a :class:`~dns.resolver.Resolver` + instance, and other DNS related tasks. For example, you + can also check :meth:`~socket.socket.getaddrinfo` to see + if you need to call out to ``libresolv.so.2`` to + run ``res_init()``. + + Meant to be overridden. + + :param resolver: A :class:`~dns.resolver.Resolver` instance + or ``None`` if ``dnspython`` is not installed. + :param domain: The initial domain under consideration. + :param port: The initial port under consideration. + """ + pass + + def start_tls(self): + """Perform handshakes for TLS. + + If the handshake is successful, the XML stream will need + to be restarted. + """ + log.info("Negotiating TLS") + ssl_versions = {3: 'TLS 1.0', 1: 'SSL 3', 2: 'SSL 2/3'} + log.info("Using SSL version: %s", ssl_versions[self.ssl_version]) + if self.ca_certs is None: + cert_policy = ssl.CERT_NONE + else: + cert_policy = ssl.CERT_REQUIRED + + ssl_args = safedict({ + 'certfile': self.certfile, + 'keyfile': self.keyfile, + 'ca_certs': self.ca_certs, + 'cert_reqs': cert_policy, + 'do_handshake_on_connect': False + }) + + if sys.version_info >= (2, 7): + ssl_args['ciphers'] = self.ciphers + + ssl_socket = ssl.wrap_socket(self.socket, **ssl_args); + + if hasattr(self.socket, 'socket'): + # We are using a testing socket, so preserve the top + # layer of wrapping. + self.socket.socket = ssl_socket + else: + self.socket = ssl_socket + + try: + self.socket.do_handshake() + except (Socket.error, ssl.SSLError): + log.error('CERT: Invalid certificate trust chain.') + if not self.event_handled('ssl_invalid_chain'): + self.disconnect(self.auto_reconnect, send_close=False) + else: + self._der_cert = self.socket.getpeercert(binary_form=True) + self.event('ssl_invalid_chain', direct=True) + return False + + self._der_cert = self.socket.getpeercert(binary_form=True) + pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert) + log.debug('CERT: %s', pem_cert) + self.event('ssl_cert', pem_cert, direct=True) + + try: + cert.verify(self._expected_server_name, self._der_cert) + except cert.CertificateError as err: + if not self.event_handled('ssl_invalid_cert'): + log.error(err) + self.disconnect(self.auto_reconnect, send_close=False) + else: + self.event('ssl_invalid_cert', pem_cert, direct=True) + + self.set_socket(self.socket) + return True + + def _cert_expiration(self, event): + """Schedule an event for when the TLS certificate expires.""" + + if not self.use_tls and not self.use_ssl: + return + + if not self._der_cert: + log.warn("TLS or SSL was enabled, but no certificate was found.") + return + + def restart(): + if not self.event_handled('ssl_expired_cert'): + log.warn("The server certificate has expired. Restarting.") + self.reconnect() + else: + pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert) + self.event('ssl_expired_cert', pem_cert) + + cert_ttl = cert.get_ttl(self._der_cert) + if cert_ttl is None: + return + + if cert_ttl.days < 0: + log.warn('CERT: Certificate has expired.') + restart() + + try: + total_seconds = cert_ttl.total_seconds() + except AttributeError: + # for Python < 2.7 + total_seconds = (cert_ttl.microseconds + (cert_ttl.seconds + cert_ttl.days * 24 * 3600) * 10**6) / 10**6 + + log.info('CERT: Time until certificate expiration: %s' % cert_ttl) + self.schedule('Certificate Expiration', + total_seconds, + restart) + + def _start_keepalive(self, event): + """Begin sending whitespace periodically to keep the connection alive. + + May be disabled by setting:: + + self.whitespace_keepalive = False + + The keepalive interval can be set using:: + + self.whitespace_keepalive_interval = 300 + """ + self.schedule('Whitespace Keepalive', + self.whitespace_keepalive_interval, + self.send_raw, + args=(' ',), + kwargs={'now': True}, + repeat=True) + + def _remove_schedules(self, event): + """Remove whitespace keepalive and certificate expiration schedules.""" + self.scheduler.remove('Whitespace Keepalive') + self.scheduler.remove('Certificate Expiration') + + def start_stream_handler(self, xml): + """Perform any initialization actions, such as handshakes, + once the stream header has been sent. + + Meant to be overridden. + """ + pass + + def register_stanza(self, stanza_class): + """Add a stanza object class as a known root stanza. + + A root stanza is one that appears as a direct child of the stream's + root element. + + Stanzas that appear as substanzas of a root stanza do not need to + be registered here. That is done using register_stanza_plugin() from + slixmpp.xmlstream.stanzabase. + + Stanzas that are not registered will not be converted into + stanza objects, but may still be processed using handlers and + matchers. + + :param stanza_class: The top-level stanza object's class. + """ + self.__root_stanza.append(stanza_class) + + def remove_stanza(self, stanza_class): + """Remove a stanza from being a known root stanza. + + A root stanza is one that appears as a direct child of the stream's + root element. + + Stanzas that are not registered will not be converted into + stanza objects, but may still be processed using handlers and + matchers. + """ + self.__root_stanza.remove(stanza_class) + + def add_filter(self, mode, handler, order=None): + """Add a filter for incoming or outgoing stanzas. + + These filters are applied before incoming stanzas are + passed to any handlers, and before outgoing stanzas + are put in the send queue. + + Each filter must accept a single stanza, and return + either a stanza or ``None``. If the filter returns + ``None``, then the stanza will be dropped from being + processed for events or from being sent. + + :param mode: One of ``'in'`` or ``'out'``. + :param handler: The filter function. + :param int order: The position to insert the filter in + the list of active filters. + """ + if order: + self.__filters[mode].insert(order, handler) + else: + self.__filters[mode].append(handler) + + def del_filter(self, mode, handler): + """Remove an incoming or outgoing filter.""" + self.__filters[mode].remove(handler) + + def add_handler(self, mask, pointer, name=None, disposable=False, + threaded=False, filter=False, instream=False): + """A shortcut method for registering a handler using XML masks. + + The use of :meth:`register_handler()` is preferred. + + :param mask: An XML snippet matching the structure of the + stanzas that will be passed to this handler. + :param pointer: The handler function itself. + :parm name: A unique name for the handler. A name will + be generated if one is not provided. + :param disposable: Indicates if the handler should be discarded + after one use. + :param threaded: **DEPRECATED**. + Remains for backwards compatibility. + :param filter: **DEPRECATED**. + Remains for backwards compatibility. + :param instream: Indicates if the handler should execute during + stream processing and not during normal event + processing. + """ + # To prevent circular dependencies, we must load the matcher + # and handler classes here. + + if name is None: + name = 'add_handler_%s' % self.new_id() + self.register_handler( + XMLCallback(name, + MatchXMLMask(mask, self.default_ns), + pointer, + once=disposable, + instream=instream)) + + def register_handler(self, handler, before=None, after=None): + """Add a stream event handler that will be executed when a matching + stanza is received. + + :param handler: + The :class:`~slixmpp.xmlstream.handler.base.BaseHandler` + derived object to execute. + """ + if handler.stream is None: + self.__handlers.append(handler) + handler.stream = weakref.ref(self) + + def remove_handler(self, name): + """Remove any stream event handlers with the given name. + + :param name: The name of the handler. + """ + idx = 0 + for handler in self.__handlers: + if handler.name == name: + self.__handlers.pop(idx) + return True + idx += 1 + return False + + def get_dns_records(self, domain, port=None): + """Get the DNS records for a domain. + + :param domain: The domain in question. + :param port: If the results don't include a port, use this one. + """ + if port is None: + port = self.default_port + + resolver = default_resolver() + self.configure_dns(resolver, domain=domain, port=port) + + return resolve(domain, port, service=self.dns_service, + resolver=resolver, + use_ipv6=self.use_ipv6, + use_dnspython=self.use_dnspython) + + def pick_dns_answer(self, domain, port=None): + """Pick a server and port from DNS answers. + + Gets DNS answers if none available. + Removes used answer from available answers. + + :param domain: The domain in question. + :param port: If the results don't include a port, use this one. + """ + if not self.dns_answers: + self.dns_answers = self.get_dns_records(domain, port) + + if sys.version_info < (3, 0): + return self.dns_answers.next() + else: + return next(self.dns_answers) + + def add_event_handler(self, name, pointer, + threaded=False, disposable=False): + """Add a custom event handler that will be executed whenever + its event is manually triggered. + + :param name: The name of the event that will trigger + this handler. + :param pointer: The function to execute. + :param threaded: If set to ``True``, the handler will execute + in its own thread. Defaults to ``False``. + :param disposable: If set to ``True``, the handler will be + discarded after one use. Defaults to ``False``. + """ + if not name in self.__event_handlers: + self.__event_handlers[name] = [] + self.__event_handlers[name].append((pointer, threaded, disposable)) + + def del_event_handler(self, name, pointer): + """Remove a function as a handler for an event. + + :param name: The name of the event. + :param pointer: The function to remove as a handler. + """ + if not name in self.__event_handlers: + return + + # Need to keep handlers that do not use + # the given function pointer + def filter_pointers(handler): + return handler[0] != pointer + + self.__event_handlers[name] = list(filter( + filter_pointers, + self.__event_handlers[name])) + + def event_handled(self, name): + """Returns the number of registered handlers for an event. + + :param name: The name of the event to check. + """ + return len(self.__event_handlers.get(name, [])) + + def event(self, name, data={}, direct=False): + """Manually trigger a custom event. + + :param name: The name of the event to trigger. + :param data: Data that will be passed to each event handler. + Defaults to an empty dictionary, but is usually + a stanza object. + :param direct: Runs the event directly if True, skipping the + event queue. All event handlers will run in the + same thread. + """ + log.debug("Event triggered: " + name) + + handlers = self.__event_handlers.get(name, []) + for handler in handlers: + #TODO: Data should not be copied, but should be read only, + # but this might break current code so it's left for future. + + out_data = copy.copy(data) if len(handlers) > 1 else data + old_exception = getattr(data, 'exception', None) + if direct: + try: + handler[0](out_data) + except Exception as e: + error_msg = 'Error processing event handler: %s' + log.exception(error_msg, str(handler[0])) + if old_exception: + old_exception(e) + else: + self.exception(e) + else: + self.event_queue.put(('event', handler, out_data)) + if handler[2]: + # If the handler is disposable, we will go ahead and + # remove it now instead of waiting for it to be + # processed in the queue. + with self.__event_handlers_lock: + try: + h_index = self.__event_handlers[name].index(handler) + self.__event_handlers[name].pop(h_index) + except: + pass + + def schedule(self, name, seconds, callback, args=None, + kwargs=None, repeat=False): + """Schedule a callback function to execute after a given delay. + + :param name: A unique name for the scheduled callback. + :param seconds: The time in seconds to wait before executing. + :param callback: A pointer to the function to execute. + :param args: A tuple of arguments to pass to the function. + :param kwargs: A dictionary of keyword arguments to pass to + the function. + :param repeat: Flag indicating if the scheduled event should + be reset and repeat after executing. + """ + self.scheduler.add(name, seconds, callback, args, kwargs, + repeat, qpointer=self.event_queue) + + def incoming_filter(self, xml): + """Filter incoming XML objects before they are processed. + + Possible uses include remapping namespaces, or correcting elements + from sources with incorrect behavior. + + Meant to be overridden. + """ + return xml + + def send(self, data, mask=None, timeout=None, now=False, use_filters=True): + """A wrapper for :meth:`send_raw()` for sending stanza objects. + + May optionally block until an expected response is received. + + :param data: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase` + stanza to send on the stream. + :param mask: **DEPRECATED** + An XML string snippet matching the structure + of the expected response. Execution will block + in this thread until the response is received + or a timeout occurs. + :param int timeout: Time in seconds to wait for a response before + continuing. Defaults to :attr:`response_timeout`. + :param bool now: Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to ``False``. + :param bool use_filters: Indicates if outgoing filters should be + applied to the given stanza data. Disabling + filters is useful when resending stanzas. + Defaults to ``True``. + """ + if timeout is None: + timeout = self.response_timeout + if hasattr(mask, 'xml'): + mask = mask.xml + + if isinstance(data, ElementBase): + if use_filters: + for filter in self.__filters['out']: + data = filter(data) + if data is None: + return + + if mask is not None: + log.warning("Use of send mask waiters is deprecated.") + wait_for = Waiter("SendWait_%s" % self.new_id(), + MatchXMLMask(mask)) + self.register_handler(wait_for) + + if isinstance(data, ElementBase): + with self.send_queue_lock: + if use_filters: + for filter in self.__filters['out_sync']: + data = filter(data) + if data is None: + return + str_data = tostring(data.xml, xmlns=self.default_ns, + stream=self, + top_level=True) + self.send_raw(str_data, now) + else: + self.send_raw(data, now) + if mask is not None: + return wait_for.wait(timeout) + + def send_xml(self, data, mask=None, timeout=None, now=False): + """Send an XML object on the stream, and optionally wait + for a response. + + :param data: The :class:`~xml.etree.ElementTree.Element` XML object + to send on the stream. + :param mask: **DEPRECATED** + An XML string snippet matching the structure + of the expected response. Execution will block + in this thread until the response is received + or a timeout occurs. + :param int timeout: Time in seconds to wait for a response before + continuing. Defaults to :attr:`response_timeout`. + :param bool now: Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to ``False``. + """ + if timeout is None: + timeout = self.response_timeout + return self.send(tostring(data), mask, timeout, now) + + def send_raw(self, data, now=False, reconnect=None): + """Send raw data across the stream. + + :param string data: Any string value. + :param bool reconnect: Indicates if the stream should be + restarted if there is an error sending + the stanza. Used mainly for testing. + Defaults to :attr:`auto_reconnect`. + """ + if now: + log.debug("SEND (IMMED): %s", data) + try: + data = data.encode('utf-8') + total = len(data) + sent = 0 + count = 0 + tries = 0 + with self.send_lock: + while sent < total and not self.stop.is_set(): + try: + sent += self.socket.send(data[sent:]) + count += 1 + except Socket.error as serr: + if serr.errno != errno.EINTR: + raise + except ssl.SSLError as serr: + if tries >= self.ssl_retry_max: + log.debug('SSL error: max retries reached') + self.exception(serr) + log.warning("Failed to send %s", data) + if reconnect is None: + reconnect = self.auto_reconnect + if not self.stop.is_set(): + self.disconnect(reconnect, + send_close=False) + log.warning('SSL write error: retrying') + if not self.stop.is_set(): + time.sleep(self.ssl_retry_delay) + tries += 1 + if count > 1: + log.debug('SENT: %d chunks', count) + except (Socket.error, ssl.SSLError) as serr: + self.event('socket_error', serr, direct=True) + log.warning("Failed to send %s", data) + if reconnect is None: + reconnect = self.auto_reconnect + if not self.stop.is_set(): + self.disconnect(reconnect, send_close=False) + else: + self.send_queue.put(data) + return True + + def _start_thread(self, name, target, track=True): + self.__thread[name] = threading.Thread(name=name, target=target) + self.__thread[name].daemon = self._use_daemons + self.__thread[name].start() + + if track: + self.__active_threads.add(name) + with self.__thread_cond: + self.__thread_count += 1 + + def _end_thread(self, name, early=False): + with self.__thread_cond: + curr_thread = threading.current_thread().name + if curr_thread in self.__active_threads: + self.__thread_count -= 1 + self.__active_threads.remove(curr_thread) + + if early: + log.debug('Threading deadlock prevention!') + log.debug(("Marked %s thread as ended due to " + \ + "disconnect() call. %s threads remain.") % ( + name, self.__thread_count)) + else: + log.debug("Stopped %s thread. %s threads remain." % ( + name, self.__thread_count)) + + else: + log.debug(("Finished exiting %s thread after early " + \ + "termination from disconnect() call. " + \ + "%s threads remain.") % ( + name, self.__thread_count)) + + if self.__thread_count == 0: + self.__thread_cond.notify() + + def set_stop(self): + self.stop.set() + + # Unlock queues + self.event_queue.put(None) + self.send_queue.put(None) + + def _wait_for_threads(self): + with self.__thread_cond: + if self.__thread_count != 0: + log.debug("Waiting for %s threads to exit." % + self.__thread_count) + name = threading.current_thread().name + if name in self.__thread: + self._end_thread(name, early=True) + self.__thread_cond.wait(4) + if self.__thread_count != 0: + log.error("Hanged threads: %s" % threading.enumerate()) + log.error("This may be due to calling disconnect() " + \ + "from a non-threaded event handler. Be " + \ + "sure that event handlers that call " + \ + "disconnect() are registered using: " + \ + "add_event_handler(..., threaded=True)") + + def process(self, **kwargs): + """Initialize the XML streams and begin processing events. + + The number of threads used for processing stream events is determined + by :data:`HANDLER_THREADS`. + + :param bool block: If ``False``, then event dispatcher will run + in a separate thread, allowing for the stream to be + used in the background for another application. + Otherwise, ``process(block=True)`` blocks the current + thread. Defaults to ``False``. + :param bool threaded: **DEPRECATED** + If ``True``, then event dispatcher will run + in a separate thread, allowing for the stream to be + used in the background for another application. + Defaults to ``True``. This does **not** mean that no + threads are used at all if ``threaded=False``. + + Regardless of these threading options, these threads will + always exist: + + - The event queue processor + - The send queue processor + - The scheduler + """ + if 'threaded' in kwargs and 'block' in kwargs: + raise ValueError("process() called with both " + \ + "block and threaded arguments") + elif 'block' in kwargs: + threaded = not(kwargs.get('block', False)) + else: + threaded = kwargs.get('threaded', True) + + for t in range(0, HANDLER_THREADS): + log.debug("Starting HANDLER THREAD") + self._start_thread('event_thread_%s' % t, self._event_runner) + + self._start_thread('send_thread', self._send_thread) + self._start_thread('scheduler_thread', self._scheduler_thread) + + if threaded: + # Run the XML stream in the background for another application. + self._start_thread('read_thread', self._process, track=False) + else: + self._process() + + def _process(self): + """Start processing the XML streams. + + Processing will continue after any recoverable errors + if reconnections are allowed. + """ + + # The body of this loop will only execute once per connection. + # Additional passes will be made only if an error occurs and + # reconnecting is permitted. + while True: + shutdown = False + try: + # The call to self.__read_xml will block and prevent + # the body of the loop from running until a disconnect + # occurs. After any reconnection, the stream header will + # be resent and processing will resume. + while not self.stop.is_set(): + # Only process the stream while connected to the server + if not self.state.ensure('connected', wait=0.1): + break + # Ensure the stream header is sent for any + # new connections. + if not self.session_started_event.is_set(): + self.send_raw(self.stream_header, now=True) + if not self.__read_xml(): + # If the server terminated the stream, end processing + break + except KeyboardInterrupt: + log.debug("Keyboard Escape Detected in _process") + self.event('killed', direct=True) + shutdown = True + except SystemExit: + log.debug("SystemExit in _process") + shutdown = True + except (SyntaxError, ExpatError) as e: + log.error("Error reading from XML stream.") + self.exception(e) + except (Socket.error, ssl.SSLError) as serr: + self.event('socket_error', serr, direct=True) + log.error('Socket Error #%s: %s', serr.errno, serr.strerror) + except ValueError as e: + msg = e.message if hasattr(e, 'message') else e.args[0] + + if 'I/O operation on closed file' in msg: + log.error('Can not read from closed socket.') + else: + self.exception(e) + except Exception as e: + if not self.stop.is_set(): + log.error('Connection error.') + self.exception(e) + + if not shutdown and not self.stop.is_set() \ + and self.auto_reconnect: + self.reconnect() + else: + self.disconnect() + break + + def __read_xml(self): + """Parse the incoming XML stream + + Stream events are raised for each received stanza. + """ + depth = 0 + root = None + for event, xml in ET.iterparse(self.filesocket, (b'end', b'start')): + if event == b'start': + if depth == 0: + # We have received the start of the root element. + root = xml + log.debug('RECV: %s', tostring(root, xmlns=self.default_ns, + stream=self, + top_level=True, + open_only=True)) + # Perform any stream initialization actions, such + # as handshakes. + self.stream_end_event.clear() + self.start_stream_handler(root) + + # We have a successful stream connection, so reset + # exponential backoff for new reconnect attempts. + self.reconnect_delay = 1.0 + depth += 1 + if event == b'end': + depth -= 1 + if depth == 0: + # The stream's root element has closed, + # terminating the stream. + log.debug("End of stream recieved") + self.stream_end_event.set() + return False + elif depth == 1: + # We only raise events for stanzas that are direct + # children of the root element. + try: + self.__spawn_event(xml) + except RestartStream: + return True + if root is not None: + # Keep the root element empty of children to + # save on memory use. + root.clear() + log.debug("Ending read XML loop") + + def _build_stanza(self, xml, default_ns=None): + """Create a stanza object from a given XML object. + + If a specialized stanza type is not found for the XML, then + a generic :class:`~slixmpp.xmlstream.stanzabase.StanzaBase` + stanza will be returned. + + :param xml: The :class:`~xml.etree.ElementTree.Element` XML object + to convert into a stanza object. + :param default_ns: Optional default namespace to use instead of the + stream's current default namespace. + """ + if default_ns is None: + default_ns = self.default_ns + stanza_type = StanzaBase + for stanza_class in self.__root_stanza: + if xml.tag == "{%s}%s" % (default_ns, stanza_class.name) or \ + xml.tag == stanza_class.tag_name(): + stanza_type = stanza_class + break + stanza = stanza_type(self, xml) + if stanza['lang'] is None and self.peer_default_lang: + stanza['lang'] = self.peer_default_lang + return stanza + + def __spawn_event(self, xml): + """ + Analyze incoming XML stanzas and convert them into stanza + objects if applicable and queue stream events to be processed + by matching handlers. + + :param xml: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase` + stanza to analyze. + """ + # Apply any preprocessing filters. + xml = self.incoming_filter(xml) + + # Convert the raw XML object into a stanza object. If no registered + # stanza type applies, a generic StanzaBase stanza will be used. + stanza = self._build_stanza(xml) + + for filter in self.__filters['in']: + if stanza is not None: + stanza = filter(stanza) + if stanza is None: + return + + log.debug("RECV: %s", stanza) + + # Match the stanza against registered handlers. Handlers marked + # to run "in stream" will be executed immediately; the rest will + # be queued. + unhandled = True + matched_handlers = [h for h in self.__handlers if h.match(stanza)] + for handler in matched_handlers: + if len(matched_handlers) > 1: + stanza_copy = copy.copy(stanza) + else: + stanza_copy = stanza + handler.prerun(stanza_copy) + self.event_queue.put(('stanza', handler, stanza_copy)) + try: + if handler.check_delete(): + self.__handlers.remove(handler) + except: + pass # not thread safe + unhandled = False + + # Some stanzas require responses, such as Iq queries. A default + # handler will be executed immediately for this case. + if unhandled: + stanza.unhandled() + + def _threaded_event_wrapper(self, func, args): + """Capture exceptions for event handlers that run + in individual threads. + + :param func: The event handler to execute. + :param args: Arguments to the event handler. + """ + # this is always already copied before this is invoked + orig = args[0] + try: + func(*args) + except Exception as e: + error_msg = 'Error processing event handler: %s' + log.exception(error_msg, str(func)) + if hasattr(orig, 'exception'): + orig.exception(e) + else: + self.exception(e) + + def _event_runner(self): + """Process the event queue and execute handlers. + + The number of event runner threads is controlled by HANDLER_THREADS. + + Stream event handlers will all execute in this thread. Custom event + handlers may be spawned in individual threads. + """ + log.debug("Loading event runner") + try: + while not self.stop.is_set(): + event = self.event_queue.get() + if event is None: + continue + + etype, handler = event[0:2] + args = event[2:] + orig = copy.copy(args[0]) + + if etype == 'stanza': + try: + handler.run(args[0]) + except Exception as e: + error_msg = 'Error processing stream handler: %s' + log.exception(error_msg, handler.name) + orig.exception(e) + elif etype == 'schedule': + name = args[2] + try: + log.debug('Scheduled event: %s: %s', name, args[0]) + handler(*args[0], **args[1]) + except Exception as e: + log.exception('Error processing scheduled task') + self.exception(e) + elif etype == 'event': + func, threaded, disposable = handler + try: + if threaded: + x = threading.Thread( + name="Event_%s" % str(func), + target=self._threaded_event_wrapper, + args=(func, args)) + x.daemon = self._use_daemons + x.start() + else: + func(*args) + except Exception as e: + error_msg = 'Error processing event handler: %s' + log.exception(error_msg, str(func)) + if hasattr(orig, 'exception'): + orig.exception(e) + else: + self.exception(e) + elif etype == 'quit': + log.debug("Quitting event runner thread") + break + except KeyboardInterrupt: + log.debug("Keyboard Escape Detected in _event_runner") + self.event('killed', direct=True) + self.disconnect() + except SystemExit: + self.disconnect() + self.event_queue.put(('quit', None, None)) + + self._end_thread('event runner') + + def _send_thread(self): + """Extract stanzas from the send queue and send them on the stream.""" + try: + while not self.stop.is_set(): + while not self.stop.is_set() and \ + not self.session_started_event.is_set(): + self.session_started_event.wait(timeout=0.1) # Wait for session start + if self.__failed_send_stanza is not None: + data = self.__failed_send_stanza + self.__failed_send_stanza = None + else: + data = self.send_queue.get() # Wait for data to send + if data is None: + continue + log.debug("SEND: %s", data) + enc_data = data.encode('utf-8') + total = len(enc_data) + sent = 0 + count = 0 + tries = 0 + try: + with self.send_lock: + while sent < total and not self.stop.is_set() and \ + self.session_started_event.is_set(): + try: + sent += self.socket.send(enc_data[sent:]) + count += 1 + except Socket.error as serr: + if serr.errno != errno.EINTR: + raise + except ssl.SSLError as serr: + if tries >= self.ssl_retry_max: + log.debug('SSL error: max retries reached') + self.exception(serr) + log.warning("Failed to send %s", data) + if not self.stop.is_set(): + self.disconnect(self.auto_reconnect, + send_close=False) + log.warning('SSL write error: retrying') + if not self.stop.is_set(): + time.sleep(self.ssl_retry_delay) + tries += 1 + if count > 1: + log.debug('SENT: %d chunks', count) + self.send_queue.task_done() + except (Socket.error, ssl.SSLError) as serr: + self.event('socket_error', serr, direct=True) + log.warning("Failed to send %s", data) + if not self.stop.is_set(): + self.__failed_send_stanza = data + self._end_thread('send') + self.disconnect(self.auto_reconnect, send_close=False) + return + except Exception as ex: + log.exception('Unexpected error in send thread: %s', ex) + self.exception(ex) + if not self.stop.is_set(): + self._end_thread('send') + self.disconnect(self.auto_reconnect) + return + + self._end_thread('send') + + def _scheduler_thread(self): + self.scheduler.process(threaded=False) + self._end_thread('scheduler') + + def exception(self, exception): + """Process an unknown exception. + + Meant to be overridden. + + :param exception: An unhandled exception object. + """ + pass + + +# To comply with PEP8, method names now use underscores. +# Deprecated method names are re-mapped for backwards compatibility. +XMLStream.startTLS = XMLStream.start_tls +XMLStream.registerStanza = XMLStream.register_stanza +XMLStream.removeStanza = XMLStream.remove_stanza +XMLStream.registerHandler = XMLStream.register_handler +XMLStream.removeHandler = XMLStream.remove_handler +XMLStream.setSocket = XMLStream.set_socket +XMLStream.sendRaw = XMLStream.send_raw +XMLStream.getId = XMLStream.get_id +XMLStream.getNewId = XMLStream.new_id +XMLStream.sendXML = XMLStream.send_xml -- cgit v1.2.3