diff options
Diffstat (limited to 'sleekxmpp/xmlstream/xmlstream.py')
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 1808 |
1 files changed, 0 insertions, 1808 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py deleted file mode 100644 index 66985f3d..00000000 --- a/sleekxmpp/xmlstream/xmlstream.py +++ /dev/null @@ -1,1808 +0,0 @@ -""" - sleekxmpp.xmlstream.xmlstream - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - This module provides the module for creating and - interacting with generic XML streams, along with - the necessary eventing infrastructure. - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -from __future__ import with_statement, unicode_literals - -import base64 -import copy -import logging -import signal -import socket as Socket -import ssl -import sys -import threading -import time -import random -import weakref -import uuid -import errno - -from xml.parsers.expat import ExpatError - -import sleekxmpp -from sleekxmpp.util import Queue, QueueEmpty, safedict -from sleekxmpp.thirdparty.statemachine import StateMachine -from sleekxmpp.xmlstream import Scheduler, tostring, cert -from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET, ElementBase -from sleekxmpp.xmlstream.handler import Waiter, XMLCallback -from sleekxmpp.xmlstream.matcher import MatchXMLMask -from sleekxmpp.xmlstream.resolver import resolve, default_resolver - -# In Python 2.x, file socket objects are broken. A patched socket -# wrapper is provided for this case in filesocket.py. -if sys.version_info < (3, 0): - from sleekxmpp.xmlstream.filesocket import FileSocket, Socket26 - - -#: The time in seconds to wait before timing out waiting for response stanzas. -RESPONSE_TIMEOUT = 30 - -#: The time in seconds to wait for events from the event queue, and also the -#: time between checks for the process stop signal. -WAIT_TIMEOUT = 1.0 - -#: The number of threads to use to handle XML stream events. This is not the -#: same as the number of custom event handling threads. -#: :data:`HANDLER_THREADS` must be at least 1. For Python implementations -#: with a GIL, this should be left at 1, but for implemetnations without -#: a GIL increasing this value can provide better performance. -HANDLER_THREADS = 1 - -#: The time in seconds to delay between attempts to resend data -#: after an SSL error. -SSL_RETRY_DELAY = 0.5 - -#: The maximum number of times to attempt resending data due to -#: an SSL error. -SSL_RETRY_MAX = 10 - -#: Maximum time to delay between connection attempts is one hour. -RECONNECT_MAX_DELAY = 600 - -#: Maximum number of attempts to connect to the server before quitting -#: and raising a 'connect_failed' event. Setting this to ``None`` will -#: allow infinite reconnection attempts, and using ``0`` will disable -#: reconnections. Defaults to ``None``. -RECONNECT_MAX_ATTEMPTS = None - - -log = logging.getLogger(__name__) - - -class RestartStream(Exception): - """ - Exception to restart stream processing, including - resending the stream header. - """ - - -class XMLStream(object): - """ - An XML stream connection manager and event dispatcher. - - The XMLStream class abstracts away the issues of establishing a - connection with a server and sending and receiving XML "stanzas". - A stanza is a complete XML element that is a direct child of a root - document element. Two streams are used, one for each communication - direction, over the same socket. Once the connection is closed, both - streams should be complete and valid XML documents. - - Three types of events are provided to manage the stream: - :Stream: Triggered based on received stanzas, similar in concept - to events in a SAX XML parser. - :Custom: Triggered manually. - :Scheduled: Triggered based on time delays. - - Typically, stanzas are first processed by a stream event handler which - will then trigger custom events to continue further processing, - especially since custom event handlers may run in individual threads. - - :param socket: Use an existing socket for the stream. Defaults to - ``None`` to generate a new socket. - :param string host: The name of the target server. - :param int port: The port to use for the connection. Defaults to 0. - """ - - def __init__(self, socket=None, host='', port=0): - #: Most XMPP servers support TLSv1, but OpenFire in particular - #: does not work well with it. For OpenFire, set - #: :attr:`ssl_version` to use ``SSLv23``:: - #: - #: import ssl - #: xmpp.ssl_version = ssl.PROTOCOL_SSLv23 - self.ssl_version = ssl.PROTOCOL_TLSv1 - - #: The list of accepted ciphers, in OpenSSL Format. - #: It might be useful to override it for improved security - #: over the python defaults. - self.ciphers = None - - #: Path to a file containing certificates for verifying the - #: server SSL certificate. A non-``None`` value will trigger - #: certificate checking. - #: - #: .. note:: - #: - #: On Mac OS X, certificates in the system keyring will - #: be consulted, even if they are not in the provided file. - self.ca_certs = None - - #: Path to a file containing a client certificate to use for - #: authenticating via SASL EXTERNAL. If set, there must also - #: be a corresponding `:attr:keyfile` value. - self.certfile = None - - #: Path to a file containing the private key for the selected - #: client certificate to use for authenticating via SASL EXTERNAL. - self.keyfile = None - - self._der_cert = None - - #: The time in seconds to wait for events from the event queue, - #: and also the time between checks for the process stop signal. - self.wait_timeout = WAIT_TIMEOUT - - #: The time in seconds to wait before timing out waiting - #: for response stanzas. - self.response_timeout = RESPONSE_TIMEOUT - - #: The current amount to time to delay attempting to reconnect. - #: This value doubles (with some jitter) with each failed - #: connection attempt up to :attr:`reconnect_max_delay` seconds. - self.reconnect_delay = None - - #: Maximum time to delay between connection attempts is one hour. - self.reconnect_max_delay = RECONNECT_MAX_DELAY - - #: Maximum number of attempts to connect to the server before - #: quitting and raising a 'connect_failed' event. Setting to - #: ``None`` allows infinite reattempts, while setting it to ``0`` - #: will disable reconnection attempts. Defaults to ``None``. - self.reconnect_max_attempts = RECONNECT_MAX_ATTEMPTS - - #: The time in seconds to delay between attempts to resend data - #: after an SSL error. - self.ssl_retry_max = SSL_RETRY_MAX - - #: The maximum number of times to attempt resending data due to - #: an SSL error. - self.ssl_retry_delay = SSL_RETRY_DELAY - - #: The connection state machine tracks if the stream is - #: ``'connected'`` or ``'disconnected'``. - self.state = StateMachine(('disconnected', 'connected')) - self.state._set_state('disconnected') - - #: The default port to return when querying DNS records. - self.default_port = int(port) - - #: The domain to try when querying DNS records. - self.default_domain = '' - - #: The expected name of the server, for validation. - self._expected_server_name = '' - self._service_name = '' - - #: The desired, or actual, address of the connected server. - self.address = (host, int(port)) - - #: A file-like wrapper for the socket for use with the - #: :mod:`~xml.etree.ElementTree` module. - self.filesocket = None - self.set_socket(socket) - - if sys.version_info < (3, 0): - self.socket_class = Socket26 - else: - self.socket_class = Socket.socket - - #: Enable connecting to the server directly over SSL, in - #: particular when the service provides two ports: one for - #: non-SSL traffic and another for SSL traffic. - self.use_ssl = False - - #: Enable connecting to the service without using SSL - #: immediately, but allow upgrading the connection later - #: to use SSL. - self.use_tls = False - - #: If set to ``True``, attempt to connect through an HTTP - #: proxy based on the settings in :attr:`proxy_config`. - self.use_proxy = False - - #: If set to ``True``, attempt to use IPv6. - self.use_ipv6 = True - - #: If set to ``True``, allow using the ``dnspython`` DNS library - #: if available. If set to ``False``, the builtin DNS resolver - #: will be used, even if ``dnspython`` is installed. - self.use_dnspython = True - - #: Use CDATA for escaping instead of XML entities. Defaults - #: to ``False``. - self.use_cdata = False - - #: An optional dictionary of proxy settings. It may provide: - #: :host: The host offering proxy services. - #: :port: The port for the proxy service. - #: :username: Optional username for accessing the proxy. - #: :password: Optional password for accessing the proxy. - self.proxy_config = {} - - #: The default namespace of the stream content, not of the - #: stream wrapper itself. - self.default_ns = '' - - self.default_lang = None - self.peer_default_lang = None - - #: The namespace of the enveloping stream element. - self.stream_ns = '' - - #: The default opening tag for the stream element. - self.stream_header = "<stream>" - - #: The default closing tag for the stream element. - self.stream_footer = "</stream>" - - #: If ``True``, periodically send a whitespace character over the - #: wire to keep the connection alive. Mainly useful for connections - #: traversing NAT. - self.whitespace_keepalive = True - - #: The default interval between keepalive signals when - #: :attr:`whitespace_keepalive` is enabled. - self.whitespace_keepalive_interval = 300 - - #: An :class:`~threading.Event` to signal that the application - #: is stopping, and that all threads should shutdown. - self.stop = threading.Event() - - #: An :class:`~threading.Event` to signal receiving a closing - #: stream tag from the server. - self.stream_end_event = threading.Event() - self.stream_end_event.set() - - #: An :class:`~threading.Event` to signal the start of a stream - #: session. Until this event fires, the send queue is not used - #: and data is sent immediately over the wire. - self.session_started_event = threading.Event() - - #: The default time in seconds to wait for a session to start - #: after connecting before reconnecting and trying again. - self.session_timeout = 45 - - #: Flag for controlling if the session can be considered ended - #: if the connection is terminated. - self.end_session_on_disconnect = True - - #: A queue of stream, custom, and scheduled events to be processed. - self.event_queue = Queue() - - #: A queue of string data to be sent over the stream. - self.send_queue = Queue() - self.send_queue_lock = threading.Lock() - self.send_lock = threading.RLock() - - #: A :class:`~sleekxmpp.xmlstream.scheduler.Scheduler` instance for - #: executing callbacks in the future based on time delays. - self.scheduler = Scheduler(self.stop) - self.__failed_send_stanza = None - - #: A mapping of XML namespaces to well-known prefixes. - self.namespace_map = {StanzaBase.xml_ns: 'xml'} - - self.__thread = {} - self.__root_stanza = [] - self.__handlers = [] - self.__event_handlers = {} - self.__event_handlers_lock = threading.Lock() - self.__filters = {'in': [], 'out': [], 'out_sync': []} - self.__thread_count = 0 - self.__thread_cond = threading.Condition() - self.__active_threads = set() - self._use_daemons = False - self._disconnect_wait_for_threads = True - - self._id = 0 - self._id_lock = threading.Lock() - - #: We use an ID prefix to ensure that all ID values are unique. - self._id_prefix = '%s-' % uuid.uuid4() - - #: The :attr:`auto_reconnnect` setting controls whether or not - #: the stream will be restarted in the event of an error. - self.auto_reconnect = True - - #: The :attr:`disconnect_wait` setting is the default value - #: for controlling if the system waits for the send queue to - #: empty before ending the stream. This may be overridden by - #: passing ``wait=True`` or ``wait=False`` to :meth:`disconnect`. - #: The default :attr:`disconnect_wait` value is ``False``. - self.disconnect_wait = False - - #: A list of DNS results that have not yet been tried. - self.dns_answers = [] - - #: The service name to check with DNS SRV records. For - #: example, setting this to ``'xmpp-client'`` would query the - #: ``_xmpp-client._tcp`` service. - self.dns_service = None - - self.add_event_handler('connected', self._session_timeout_check) - self.add_event_handler('disconnected', self._remove_schedules) - self.add_event_handler('session_start', self._start_keepalive) - self.add_event_handler('session_start', self._cert_expiration) - - def use_signals(self, signals=None): - """Register signal handlers for ``SIGHUP`` and ``SIGTERM``. - - By using signals, a ``'killed'`` event will be raised when the - application is terminated. - - If a signal handler already existed, it will be executed first, - before the ``'killed'`` event is raised. - - :param list signals: A list of signal names to be monitored. - Defaults to ``['SIGHUP', 'SIGTERM']``. - """ - if signals is None: - signals = ['SIGHUP', 'SIGTERM'] - - existing_handlers = {} - for sig_name in signals: - if hasattr(signal, sig_name): - sig = getattr(signal, sig_name) - handler = signal.getsignal(sig) - if handler: - existing_handlers[sig] = handler - - def handle_kill(signum, frame): - """ - Capture kill event and disconnect cleanly after first - spawning the ``'killed'`` event. - """ - - if signum in existing_handlers and \ - existing_handlers[signum] != handle_kill: - existing_handlers[signum](signum, frame) - - self.event("killed", direct=True) - self.disconnect() - - try: - for sig_name in signals: - if hasattr(signal, sig_name): - sig = getattr(signal, sig_name) - signal.signal(sig, handle_kill) - self.__signals_installed = True - except: - log.debug("Can not set interrupt signal handlers. " + \ - "SleekXMPP is not running from a main thread.") - - def new_id(self): - """Generate and return a new stream ID in hexadecimal form. - - Many stanzas, handlers, or matchers may require unique - ID values. Using this method ensures that all new ID values - are unique in this stream. - """ - with self._id_lock: - self._id += 1 - return self.get_id() - - def get_id(self): - """Return the current unique stream ID in hexadecimal form.""" - return "%s%X" % (self._id_prefix, self._id) - - def connect(self, host='', port=0, use_ssl=False, - use_tls=True, reattempt=True): - """Create a new socket and connect to the server. - - Setting ``reattempt`` to ``True`` will cause connection - attempts to be made with an exponential backoff delay (max of - :attr:`reconnect_max_delay` which defaults to 10 minute) until a - successful connection is established. - - :param host: The name of the desired server for the connection. - :param port: Port to connect to on the server. - :param use_ssl: Flag indicating if SSL should be used by connecting - directly to a port using SSL. - :param use_tls: Flag indicating if TLS should be used, allowing for - connecting to a port without using SSL immediately and - later upgrading the connection. - :param reattempt: Flag indicating if the socket should reconnect - after disconnections. - """ - self.stop.clear() - - if host and port: - self.address = (host, int(port)) - try: - Socket.inet_aton(self.address[0]) - except (Socket.error, ssl.SSLError): - self.default_domain = self.address[0] - - # Respect previous SSL and TLS usage directives. - if use_ssl is not None: - self.use_ssl = use_ssl - if use_tls is not None: - self.use_tls = use_tls - - # Repeatedly attempt to connect until a successful connection - # is established. - attempts = self.reconnect_max_attempts - connected = self.state.transition('disconnected', 'connected', - func=self._connect, - args=(reattempt,)) - while reattempt and not connected and not self.stop.is_set(): - connected = self.state.transition('disconnected', 'connected', - func=self._connect) - if not connected: - if attempts is not None: - attempts -= 1 - if attempts <= 0: - self.event('connection_failed', direct=True) - return False - return connected - - def _connect(self, reattempt=True): - self.scheduler.remove('Session timeout check') - - if self.reconnect_delay is None or not reattempt: - delay = 1.0 - else: - delay = min(self.reconnect_delay * 2, self.reconnect_max_delay) - delay = random.normalvariate(delay, delay * 0.1) - log.debug('Waiting %s seconds before connecting.', delay) - elapsed = 0 - try: - while elapsed < delay and not self.stop.is_set(): - time.sleep(0.1) - elapsed += 0.1 - except KeyboardInterrupt: - self.set_stop() - return False - except SystemExit: - self.set_stop() - return False - - if self.default_domain: - try: - host, address, port = self.pick_dns_answer(self.default_domain, - self.address[1]) - self.address = (address, port) - self._service_name = host - except StopIteration: - log.debug("No remaining DNS records to try.") - self.dns_answers = None - if reattempt: - self.reconnect_delay = delay - return False - - af = Socket.AF_INET - proto = 'IPv4' - if ':' in self.address[0]: - af = Socket.AF_INET6 - proto = 'IPv6' - try: - self.socket = self.socket_class(af, Socket.SOCK_STREAM) - except Socket.error: - log.debug("Could not connect using %s", proto) - return False - - self.configure_socket() - - if self.use_proxy: - connected = self._connect_proxy() - if not connected: - if reattempt: - self.reconnect_delay = delay - return False - - if self.use_ssl: - log.debug("Socket Wrapped for SSL") - if self.ca_certs is None: - cert_policy = ssl.CERT_NONE - else: - cert_policy = ssl.CERT_REQUIRED - - ssl_args = safedict({ - 'certfile': self.certfile, - 'keyfile': self.keyfile, - 'ca_certs': self.ca_certs, - 'cert_reqs': cert_policy, - 'do_handshake_on_connect': False - }) - - if sys.version_info >= (2, 7): - ssl_args['ciphers'] = self.ciphers - - ssl_socket = ssl.wrap_socket(self.socket, **ssl_args) - - if hasattr(self.socket, 'socket'): - # We are using a testing socket, so preserve the top - # layer of wrapping. - self.socket.socket = ssl_socket - else: - self.socket = ssl_socket - - try: - if not self.use_proxy: - domain = self.address[0] - if ':' in domain: - domain = '[%s]' % domain - log.debug("Connecting to %s:%s", domain, self.address[1]) - self.socket.connect(self.address) - - if self.use_ssl: - try: - self.socket.do_handshake() - except (Socket.error, ssl.SSLError): - log.error('CERT: Invalid certificate trust chain.') - if not self.event_handled('ssl_invalid_chain'): - self.disconnect(self.auto_reconnect, - send_close=False) - else: - self.event('ssl_invalid_chain', direct=True) - return False - - self._der_cert = self.socket.getpeercert(binary_form=True) - pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert) - log.debug('CERT: %s', pem_cert) - - self.event('ssl_cert', pem_cert, direct=True) - try: - cert.verify(self._expected_server_name, self._der_cert) - except cert.CertificateError as err: - if not self.event_handled('ssl_invalid_cert'): - log.error(err) - self.disconnect(send_close=False) - else: - self.event('ssl_invalid_cert', - pem_cert, - direct=True) - - self.set_socket(self.socket, ignore=True) - #this event is where you should set your application state - self.event('connected', direct=True) - return True - except (Socket.error, ssl.SSLError) as serr: - error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" - self.event('socket_error', serr, direct=True) - domain = self.address[0] - if ':' in domain: - domain = '[%s]' % domain - log.error(error_msg, domain, self.address[1], - serr.errno, serr.strerror) - return False - - def _connect_proxy(self): - """Attempt to connect using an HTTP Proxy.""" - - # Extract the proxy address, and optional credentials - address = (self.proxy_config['host'], int(self.proxy_config['port'])) - cred = None - if self.proxy_config['username']: - username = self.proxy_config['username'] - password = self.proxy_config['password'] - - cred = '%s:%s' % (username, password) - if sys.version_info < (3, 0): - cred = bytes(cred) - else: - cred = bytes(cred, 'utf-8') - cred = base64.b64encode(cred).decode('utf-8') - - # Build the HTTP headers for connecting to the XMPP server - headers = ['CONNECT %s:%s HTTP/1.0' % self.address, - 'Host: %s:%s' % self.address, - 'Proxy-Connection: Keep-Alive', - 'Pragma: no-cache', - 'User-Agent: SleekXMPP/%s' % sleekxmpp.__version__] - if cred: - headers.append('Proxy-Authorization: Basic %s' % cred) - headers = '\r\n'.join(headers) + '\r\n\r\n' - - try: - log.debug("Connecting to proxy: %s:%s", *address) - self.socket.connect(address) - self.send_raw(headers, now=True) - resp = '' - while '\r\n\r\n' not in resp and not self.stop.is_set(): - resp += self.socket.recv(1024).decode('utf-8') - log.debug('RECV: %s', resp) - - lines = resp.split('\r\n') - if '200' not in lines[0]: - self.event('proxy_error', resp) - self.event('connection_failed', direct=True) - log.error('Proxy Error: %s', lines[0]) - return False - - # Proxy connection established, continue connecting - # with the XMPP server. - return True - except (Socket.error, ssl.SSLError) as serr: - error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" - self.event('socket_error', serr, direct=True) - log.error(error_msg, self.address[0], self.address[1], - serr.errno, serr.strerror) - return False - - def _session_timeout_check(self, event=None): - """ - Add check to ensure that a session is established within - a reasonable amount of time. - """ - - def _handle_session_timeout(): - if not self.session_started_event.is_set(): - log.debug("Session start has taken more " + \ - "than %d seconds", self.session_timeout) - self.disconnect(reconnect=self.auto_reconnect) - - self.schedule("Session timeout check", - self.session_timeout, - _handle_session_timeout) - - def disconnect(self, reconnect=False, wait=None, send_close=True): - """Terminate processing and close the XML streams. - - Optionally, the connection may be reconnected and - resume processing afterwards. - - If the disconnect should take place after all items - in the send queue have been sent, use ``wait=True``. - - .. warning:: - - If you are constantly adding items to the queue - such that it is never empty, then the disconnect will - not occur and the call will continue to block. - - :param reconnect: Flag indicating if the connection - and processing should be restarted. - Defaults to ``False``. - :param wait: Flag indicating if the send queue should - be emptied before disconnecting, overriding - :attr:`disconnect_wait`. - :param send_close: Flag indicating if the stream footer - should be sent before terminating the - connection. Setting this to ``False`` - prevents error loops when trying to - disconnect after a socket error. - """ - self.state.transition('connected', 'disconnected', - wait=2.0, - func=self._disconnect, - args=(reconnect, wait, send_close)) - - def _disconnect(self, reconnect=False, wait=None, send_close=True): - if not reconnect: - self.auto_reconnect = False - - if self.end_session_on_disconnect or send_close: - self.event('session_end', direct=True) - - # Wait for the send queue to empty. - if wait is not None: - if wait: - self.send_queue.join() - elif self.disconnect_wait: - self.send_queue.join() - - # Clearing this event will pause the send loop. - self.session_started_event.clear() - - self.__failed_send_stanza = None - - # Send the end of stream marker. - if send_close: - self.send_raw(self.stream_footer, now=True) - - # Wait for confirmation that the stream was - # closed in the other direction. If we didn't - # send a stream footer we don't need to wait - # since the server won't know to respond. - if send_close: - log.info('Waiting for %s from server', self.stream_footer) - self.stream_end_event.wait(4) - else: - self.stream_end_event.set() - - if not self.auto_reconnect: - self.set_stop() - if self._disconnect_wait_for_threads: - self._wait_for_threads() - - try: - self.socket.shutdown(Socket.SHUT_RDWR) - self.socket.close() - self.filesocket.close() - except (Socket.error, ssl.SSLError) as serr: - self.event('socket_error', serr, direct=True) - finally: - #clear your application state - self.event('disconnected', direct=True) - return True - - def abort(self): - self.session_started_event.clear() - self.set_stop() - if self._disconnect_wait_for_threads: - self._wait_for_threads() - try: - self.socket.shutdown(Socket.SHUT_RDWR) - self.socket.close() - self.filesocket.close() - except Socket.error: - pass - self.state.transition_any(['connected', 'disconnected'], 'disconnected', func=lambda: True) - self.event("killed", direct=True) - - def reconnect(self, reattempt=True, wait=False, send_close=True): - """Reset the stream's state and reconnect to the server.""" - log.debug("reconnecting...") - if self.state.ensure('connected'): - self.state.transition('connected', 'disconnected', - wait=2.0, - func=self._disconnect, - args=(True, wait, send_close)) - - attempts = self.reconnect_max_attempts - - log.debug("connecting...") - connected = self.state.transition('disconnected', 'connected', - wait=2.0, - func=self._connect, - args=(reattempt,)) - while reattempt and not connected and not self.stop.is_set(): - connected = self.state.transition('disconnected', 'connected', - wait=2.0, func=self._connect) - connected = connected or self.state.ensure('connected') - if not connected: - if attempts is not None: - attempts -= 1 - if attempts <= 0: - self.event('connection_failed', direct=True) - return False - return connected - - def set_socket(self, socket, ignore=False): - """Set the socket to use for the stream. - - The filesocket will be recreated as well. - - :param socket: The new socket object to use. - :param bool ignore: If ``True``, don't set the connection - state to ``'connected'``. - """ - self.socket = socket - if socket is not None: - # ElementTree.iterparse requires a file. - # 0 buffer files have to be binary. - - # Use the correct fileobject type based on the Python - # version to work around a broken implementation in - # Python 2.x. - if sys.version_info < (3, 0): - self.filesocket = FileSocket(self.socket) - else: - self.filesocket = self.socket.makefile('rb', 0) - if not ignore: - self.state._set_state('connected') - - def configure_socket(self): - """Set timeout and other options for self.socket. - - Meant to be overridden. - """ - self.socket.settimeout(None) - - def configure_dns(self, resolver, domain=None, port=None): - """ - Configure and set options for a :class:`~dns.resolver.Resolver` - instance, and other DNS related tasks. For example, you - can also check :meth:`~socket.socket.getaddrinfo` to see - if you need to call out to ``libresolv.so.2`` to - run ``res_init()``. - - Meant to be overridden. - - :param resolver: A :class:`~dns.resolver.Resolver` instance - or ``None`` if ``dnspython`` is not installed. - :param domain: The initial domain under consideration. - :param port: The initial port under consideration. - """ - pass - - def start_tls(self): - """Perform handshakes for TLS. - - If the handshake is successful, the XML stream will need - to be restarted. - """ - log.info("Negotiating TLS") - ssl_versions = {3: 'TLS 1.0', 1: 'SSL 3', 2: 'SSL 2/3'} - log.info("Using SSL version: %s", ssl_versions[self.ssl_version]) - if self.ca_certs is None: - cert_policy = ssl.CERT_NONE - else: - cert_policy = ssl.CERT_REQUIRED - - ssl_args = safedict({ - 'certfile': self.certfile, - 'keyfile': self.keyfile, - 'ca_certs': self.ca_certs, - 'cert_reqs': cert_policy, - 'do_handshake_on_connect': False - }) - - if sys.version_info >= (2, 7): - ssl_args['ciphers'] = self.ciphers - - ssl_socket = ssl.wrap_socket(self.socket, **ssl_args); - - if hasattr(self.socket, 'socket'): - # We are using a testing socket, so preserve the top - # layer of wrapping. - self.socket.socket = ssl_socket - else: - self.socket = ssl_socket - - try: - self.socket.do_handshake() - except (Socket.error, ssl.SSLError): - log.error('CERT: Invalid certificate trust chain.') - if not self.event_handled('ssl_invalid_chain'): - self.disconnect(self.auto_reconnect, send_close=False) - else: - self._der_cert = self.socket.getpeercert(binary_form=True) - self.event('ssl_invalid_chain', direct=True) - return False - - self._der_cert = self.socket.getpeercert(binary_form=True) - pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert) - log.debug('CERT: %s', pem_cert) - self.event('ssl_cert', pem_cert, direct=True) - - try: - cert.verify(self._expected_server_name, self._der_cert) - except cert.CertificateError as err: - if not self.event_handled('ssl_invalid_cert'): - log.error(err) - self.disconnect(self.auto_reconnect, send_close=False) - else: - self.event('ssl_invalid_cert', pem_cert, direct=True) - - self.set_socket(self.socket) - return True - - def _cert_expiration(self, event): - """Schedule an event for when the TLS certificate expires.""" - - if not self.use_tls and not self.use_ssl: - return - - if not self._der_cert: - log.warn("TLS or SSL was enabled, but no certificate was found.") - return - - def restart(): - if not self.event_handled('ssl_expired_cert'): - log.warn("The server certificate has expired. Restarting.") - self.reconnect() - else: - pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert) - self.event('ssl_expired_cert', pem_cert) - - cert_ttl = cert.get_ttl(self._der_cert) - if cert_ttl is None: - return - - if cert_ttl.days < 0: - log.warn('CERT: Certificate has expired.') - restart() - - try: - total_seconds = cert_ttl.total_seconds() - except AttributeError: - # for Python < 2.7 - total_seconds = (cert_ttl.microseconds + (cert_ttl.seconds + cert_ttl.days * 24 * 3600) * 10**6) / 10**6 - - log.info('CERT: Time until certificate expiration: %s' % cert_ttl) - self.schedule('Certificate Expiration', - total_seconds, - restart) - - def _start_keepalive(self, event): - """Begin sending whitespace periodically to keep the connection alive. - - May be disabled by setting:: - - self.whitespace_keepalive = False - - The keepalive interval can be set using:: - - self.whitespace_keepalive_interval = 300 - """ - self.schedule('Whitespace Keepalive', - self.whitespace_keepalive_interval, - self.send_raw, - args=(' ',), - kwargs={'now': True}, - repeat=True) - - def _remove_schedules(self, event): - """Remove whitespace keepalive and certificate expiration schedules.""" - self.scheduler.remove('Whitespace Keepalive') - self.scheduler.remove('Certificate Expiration') - - def start_stream_handler(self, xml): - """Perform any initialization actions, such as handshakes, - once the stream header has been sent. - - Meant to be overridden. - """ - pass - - def register_stanza(self, stanza_class): - """Add a stanza object class as a known root stanza. - - A root stanza is one that appears as a direct child of the stream's - root element. - - Stanzas that appear as substanzas of a root stanza do not need to - be registered here. That is done using register_stanza_plugin() from - sleekxmpp.xmlstream.stanzabase. - - Stanzas that are not registered will not be converted into - stanza objects, but may still be processed using handlers and - matchers. - - :param stanza_class: The top-level stanza object's class. - """ - self.__root_stanza.append(stanza_class) - - def remove_stanza(self, stanza_class): - """Remove a stanza from being a known root stanza. - - A root stanza is one that appears as a direct child of the stream's - root element. - - Stanzas that are not registered will not be converted into - stanza objects, but may still be processed using handlers and - matchers. - """ - self.__root_stanza.remove(stanza_class) - - def add_filter(self, mode, handler, order=None): - """Add a filter for incoming or outgoing stanzas. - - These filters are applied before incoming stanzas are - passed to any handlers, and before outgoing stanzas - are put in the send queue. - - Each filter must accept a single stanza, and return - either a stanza or ``None``. If the filter returns - ``None``, then the stanza will be dropped from being - processed for events or from being sent. - - :param mode: One of ``'in'`` or ``'out'``. - :param handler: The filter function. - :param int order: The position to insert the filter in - the list of active filters. - """ - if order: - self.__filters[mode].insert(order, handler) - else: - self.__filters[mode].append(handler) - - def del_filter(self, mode, handler): - """Remove an incoming or outgoing filter.""" - self.__filters[mode].remove(handler) - - def add_handler(self, mask, pointer, name=None, disposable=False, - threaded=False, filter=False, instream=False): - """A shortcut method for registering a handler using XML masks. - - The use of :meth:`register_handler()` is preferred. - - :param mask: An XML snippet matching the structure of the - stanzas that will be passed to this handler. - :param pointer: The handler function itself. - :parm name: A unique name for the handler. A name will - be generated if one is not provided. - :param disposable: Indicates if the handler should be discarded - after one use. - :param threaded: **DEPRECATED**. - Remains for backwards compatibility. - :param filter: **DEPRECATED**. - Remains for backwards compatibility. - :param instream: Indicates if the handler should execute during - stream processing and not during normal event - processing. - """ - # To prevent circular dependencies, we must load the matcher - # and handler classes here. - - if name is None: - name = 'add_handler_%s' % self.new_id() - self.register_handler( - XMLCallback(name, - MatchXMLMask(mask, self.default_ns), - pointer, - once=disposable, - instream=instream)) - - def register_handler(self, handler, before=None, after=None): - """Add a stream event handler that will be executed when a matching - stanza is received. - - :param handler: - The :class:`~sleekxmpp.xmlstream.handler.base.BaseHandler` - derived object to execute. - """ - if handler.stream is None: - self.__handlers.append(handler) - handler.stream = weakref.ref(self) - - def remove_handler(self, name): - """Remove any stream event handlers with the given name. - - :param name: The name of the handler. - """ - idx = 0 - for handler in self.__handlers: - if handler.name == name: - self.__handlers.pop(idx) - return True - idx += 1 - return False - - def get_dns_records(self, domain, port=None): - """Get the DNS records for a domain. - - :param domain: The domain in question. - :param port: If the results don't include a port, use this one. - """ - if port is None: - port = self.default_port - - resolver = default_resolver() - self.configure_dns(resolver, domain=domain, port=port) - - return resolve(domain, port, service=self.dns_service, - resolver=resolver, - use_ipv6=self.use_ipv6, - use_dnspython=self.use_dnspython) - - def pick_dns_answer(self, domain, port=None): - """Pick a server and port from DNS answers. - - Gets DNS answers if none available. - Removes used answer from available answers. - - :param domain: The domain in question. - :param port: If the results don't include a port, use this one. - """ - if not self.dns_answers: - self.dns_answers = self.get_dns_records(domain, port) - - if sys.version_info < (3, 0): - return self.dns_answers.next() - else: - return next(self.dns_answers) - - def add_event_handler(self, name, pointer, - threaded=False, disposable=False): - """Add a custom event handler that will be executed whenever - its event is manually triggered. - - :param name: The name of the event that will trigger - this handler. - :param pointer: The function to execute. - :param threaded: If set to ``True``, the handler will execute - in its own thread. Defaults to ``False``. - :param disposable: If set to ``True``, the handler will be - discarded after one use. Defaults to ``False``. - """ - if not name in self.__event_handlers: - self.__event_handlers[name] = [] - self.__event_handlers[name].append((pointer, threaded, disposable)) - - def del_event_handler(self, name, pointer): - """Remove a function as a handler for an event. - - :param name: The name of the event. - :param pointer: The function to remove as a handler. - """ - if not name in self.__event_handlers: - return - - # Need to keep handlers that do not use - # the given function pointer - def filter_pointers(handler): - return handler[0] != pointer - - self.__event_handlers[name] = list(filter( - filter_pointers, - self.__event_handlers[name])) - - def event_handled(self, name): - """Returns the number of registered handlers for an event. - - :param name: The name of the event to check. - """ - return len(self.__event_handlers.get(name, [])) - - def event(self, name, data={}, direct=False): - """Manually trigger a custom event. - - :param name: The name of the event to trigger. - :param data: Data that will be passed to each event handler. - Defaults to an empty dictionary, but is usually - a stanza object. - :param direct: Runs the event directly if True, skipping the - event queue. All event handlers will run in the - same thread. - """ - log.debug("Event triggered: " + name) - - handlers = self.__event_handlers.get(name, []) - for handler in handlers: - #TODO: Data should not be copied, but should be read only, - # but this might break current code so it's left for future. - - out_data = copy.copy(data) if len(handlers) > 1 else data - old_exception = getattr(data, 'exception', None) - if direct: - try: - handler[0](out_data) - except Exception as e: - error_msg = 'Error processing event handler: %s' - log.exception(error_msg, str(handler[0])) - if old_exception: - old_exception(e) - else: - self.exception(e) - else: - self.event_queue.put(('event', handler, out_data)) - if handler[2]: - # If the handler is disposable, we will go ahead and - # remove it now instead of waiting for it to be - # processed in the queue. - with self.__event_handlers_lock: - try: - h_index = self.__event_handlers[name].index(handler) - self.__event_handlers[name].pop(h_index) - except: - pass - - def schedule(self, name, seconds, callback, args=None, - kwargs=None, repeat=False): - """Schedule a callback function to execute after a given delay. - - :param name: A unique name for the scheduled callback. - :param seconds: The time in seconds to wait before executing. - :param callback: A pointer to the function to execute. - :param args: A tuple of arguments to pass to the function. - :param kwargs: A dictionary of keyword arguments to pass to - the function. - :param repeat: Flag indicating if the scheduled event should - be reset and repeat after executing. - """ - self.scheduler.add(name, seconds, callback, args, kwargs, - repeat, qpointer=self.event_queue) - - def incoming_filter(self, xml): - """Filter incoming XML objects before they are processed. - - Possible uses include remapping namespaces, or correcting elements - from sources with incorrect behavior. - - Meant to be overridden. - """ - return xml - - def send(self, data, mask=None, timeout=None, now=False, use_filters=True): - """A wrapper for :meth:`send_raw()` for sending stanza objects. - - May optionally block until an expected response is received. - - :param data: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` - stanza to send on the stream. - :param mask: **DEPRECATED** - An XML string snippet matching the structure - of the expected response. Execution will block - in this thread until the response is received - or a timeout occurs. - :param int timeout: Time in seconds to wait for a response before - continuing. Defaults to :attr:`response_timeout`. - :param bool now: Indicates if the send queue should be skipped, - sending the stanza immediately. Useful mainly - for stream initialization stanzas. - Defaults to ``False``. - :param bool use_filters: Indicates if outgoing filters should be - applied to the given stanza data. Disabling - filters is useful when resending stanzas. - Defaults to ``True``. - """ - if timeout is None: - timeout = self.response_timeout - if hasattr(mask, 'xml'): - mask = mask.xml - - if isinstance(data, ElementBase): - if use_filters: - for filter in self.__filters['out']: - data = filter(data) - if data is None: - return - - if mask is not None: - log.warning("Use of send mask waiters is deprecated.") - wait_for = Waiter("SendWait_%s" % self.new_id(), - MatchXMLMask(mask)) - self.register_handler(wait_for) - - if isinstance(data, ElementBase): - with self.send_queue_lock: - if use_filters: - for filter in self.__filters['out_sync']: - data = filter(data) - if data is None: - return - str_data = tostring(data.xml, xmlns=self.default_ns, - stream=self, - top_level=True) - self.send_raw(str_data, now) - else: - self.send_raw(data, now) - if mask is not None: - return wait_for.wait(timeout) - - def send_xml(self, data, mask=None, timeout=None, now=False): - """Send an XML object on the stream, and optionally wait - for a response. - - :param data: The :class:`~xml.etree.ElementTree.Element` XML object - to send on the stream. - :param mask: **DEPRECATED** - An XML string snippet matching the structure - of the expected response. Execution will block - in this thread until the response is received - or a timeout occurs. - :param int timeout: Time in seconds to wait for a response before - continuing. Defaults to :attr:`response_timeout`. - :param bool now: Indicates if the send queue should be skipped, - sending the stanza immediately. Useful mainly - for stream initialization stanzas. - Defaults to ``False``. - """ - if timeout is None: - timeout = self.response_timeout - return self.send(tostring(data), mask, timeout, now) - - def send_raw(self, data, now=False, reconnect=None): - """Send raw data across the stream. - - :param string data: Any string value. - :param bool reconnect: Indicates if the stream should be - restarted if there is an error sending - the stanza. Used mainly for testing. - Defaults to :attr:`auto_reconnect`. - """ - if now: - log.debug("SEND (IMMED): %s", data) - try: - data = data.encode('utf-8') - total = len(data) - sent = 0 - count = 0 - tries = 0 - with self.send_lock: - while sent < total and not self.stop.is_set(): - try: - sent += self.socket.send(data[sent:]) - count += 1 - except Socket.error as serr: - if serr.errno != errno.EINTR: - raise - except ssl.SSLError as serr: - if tries >= self.ssl_retry_max: - log.debug('SSL error: max retries reached') - self.exception(serr) - log.warning("Failed to send %s", data) - if reconnect is None: - reconnect = self.auto_reconnect - if not self.stop.is_set(): - self.disconnect(reconnect, - send_close=False) - log.warning('SSL write error: retrying') - if not self.stop.is_set(): - time.sleep(self.ssl_retry_delay) - tries += 1 - if count > 1: - log.debug('SENT: %d chunks', count) - except (Socket.error, ssl.SSLError) as serr: - self.event('socket_error', serr, direct=True) - log.warning("Failed to send %s", data) - if reconnect is None: - reconnect = self.auto_reconnect - if not self.stop.is_set(): - self.disconnect(reconnect, send_close=False) - else: - self.send_queue.put(data) - return True - - def _start_thread(self, name, target, track=True): - self.__thread[name] = threading.Thread(name=name, target=target) - self.__thread[name].daemon = self._use_daemons - self.__thread[name].start() - - if track: - self.__active_threads.add(name) - with self.__thread_cond: - self.__thread_count += 1 - - def _end_thread(self, name, early=False): - with self.__thread_cond: - curr_thread = threading.current_thread().name - if curr_thread in self.__active_threads: - self.__thread_count -= 1 - self.__active_threads.remove(curr_thread) - - if early: - log.debug('Threading deadlock prevention!') - log.debug(("Marked %s thread as ended due to " + \ - "disconnect() call. %s threads remain.") % ( - name, self.__thread_count)) - else: - log.debug("Stopped %s thread. %s threads remain." % ( - name, self.__thread_count)) - - else: - log.debug(("Finished exiting %s thread after early " + \ - "termination from disconnect() call. " + \ - "%s threads remain.") % ( - name, self.__thread_count)) - - if self.__thread_count == 0: - self.__thread_cond.notify() - - def set_stop(self): - self.stop.set() - - # Unlock queues - self.event_queue.put(None) - self.send_queue.put(None) - - def _wait_for_threads(self): - with self.__thread_cond: - if self.__thread_count != 0: - log.debug("Waiting for %s threads to exit." % - self.__thread_count) - name = threading.current_thread().name - if name in self.__thread: - self._end_thread(name, early=True) - self.__thread_cond.wait(4) - if self.__thread_count != 0: - log.error("Hanged threads: %s" % threading.enumerate()) - log.error("This may be due to calling disconnect() " + \ - "from a non-threaded event handler. Be " + \ - "sure that event handlers that call " + \ - "disconnect() are registered using: " + \ - "add_event_handler(..., threaded=True)") - - def process(self, **kwargs): - """Initialize the XML streams and begin processing events. - - The number of threads used for processing stream events is determined - by :data:`HANDLER_THREADS`. - - :param bool block: If ``False``, then event dispatcher will run - in a separate thread, allowing for the stream to be - used in the background for another application. - Otherwise, ``process(block=True)`` blocks the current - thread. Defaults to ``False``. - :param bool threaded: **DEPRECATED** - If ``True``, then event dispatcher will run - in a separate thread, allowing for the stream to be - used in the background for another application. - Defaults to ``True``. This does **not** mean that no - threads are used at all if ``threaded=False``. - - Regardless of these threading options, these threads will - always exist: - - - The event queue processor - - The send queue processor - - The scheduler - """ - if 'threaded' in kwargs and 'block' in kwargs: - raise ValueError("process() called with both " + \ - "block and threaded arguments") - elif 'block' in kwargs: - threaded = not(kwargs.get('block', False)) - else: - threaded = kwargs.get('threaded', True) - - for t in range(0, HANDLER_THREADS): - log.debug("Starting HANDLER THREAD") - self._start_thread('event_thread_%s' % t, self._event_runner) - - self._start_thread('send_thread', self._send_thread) - self._start_thread('scheduler_thread', self._scheduler_thread) - - if threaded: - # Run the XML stream in the background for another application. - self._start_thread('read_thread', self._process, track=False) - else: - self._process() - - def _process(self): - """Start processing the XML streams. - - Processing will continue after any recoverable errors - if reconnections are allowed. - """ - - # The body of this loop will only execute once per connection. - # Additional passes will be made only if an error occurs and - # reconnecting is permitted. - while True: - shutdown = False - try: - # The call to self.__read_xml will block and prevent - # the body of the loop from running until a disconnect - # occurs. After any reconnection, the stream header will - # be resent and processing will resume. - while not self.stop.is_set(): - # Only process the stream while connected to the server - if not self.state.ensure('connected', wait=0.1): - break - # Ensure the stream header is sent for any - # new connections. - if not self.session_started_event.is_set(): - self.send_raw(self.stream_header, now=True) - if not self.__read_xml(): - # If the server terminated the stream, end processing - break - except KeyboardInterrupt: - log.debug("Keyboard Escape Detected in _process") - self.event('killed', direct=True) - shutdown = True - except SystemExit: - log.debug("SystemExit in _process") - shutdown = True - except (SyntaxError, ExpatError) as e: - log.error("Error reading from XML stream.") - self.exception(e) - except (Socket.error, ssl.SSLError) as serr: - self.event('socket_error', serr, direct=True) - log.error('Socket Error #%s: %s', serr.errno, serr.strerror) - except ValueError as e: - msg = e.message if hasattr(e, 'message') else e.args[0] - - if 'I/O operation on closed file' in msg: - log.error('Can not read from closed socket.') - else: - self.exception(e) - except Exception as e: - if not self.stop.is_set(): - log.error('Connection error.') - self.exception(e) - - if not shutdown and not self.stop.is_set() \ - and self.auto_reconnect: - self.reconnect() - else: - self.disconnect() - break - - def __read_xml(self): - """Parse the incoming XML stream - - Stream events are raised for each received stanza. - """ - depth = 0 - root = None - for event, xml in ET.iterparse(self.filesocket, (b'end', b'start')): - if event == b'start': - if depth == 0: - # We have received the start of the root element. - root = xml - log.debug('RECV: %s', tostring(root, xmlns=self.default_ns, - stream=self, - top_level=True, - open_only=True)) - # Perform any stream initialization actions, such - # as handshakes. - self.stream_end_event.clear() - self.start_stream_handler(root) - - # We have a successful stream connection, so reset - # exponential backoff for new reconnect attempts. - self.reconnect_delay = 1.0 - depth += 1 - if event == b'end': - depth -= 1 - if depth == 0: - # The stream's root element has closed, - # terminating the stream. - log.debug("End of stream recieved") - self.stream_end_event.set() - return False - elif depth == 1: - # We only raise events for stanzas that are direct - # children of the root element. - try: - self.__spawn_event(xml) - except RestartStream: - return True - if root is not None: - # Keep the root element empty of children to - # save on memory use. - root.clear() - log.debug("Ending read XML loop") - - def _build_stanza(self, xml, default_ns=None): - """Create a stanza object from a given XML object. - - If a specialized stanza type is not found for the XML, then - a generic :class:`~sleekxmpp.xmlstream.stanzabase.StanzaBase` - stanza will be returned. - - :param xml: The :class:`~xml.etree.ElementTree.Element` XML object - to convert into a stanza object. - :param default_ns: Optional default namespace to use instead of the - stream's current default namespace. - """ - if default_ns is None: - default_ns = self.default_ns - stanza_type = StanzaBase - for stanza_class in self.__root_stanza: - if xml.tag == "{%s}%s" % (default_ns, stanza_class.name) or \ - xml.tag == stanza_class.tag_name(): - stanza_type = stanza_class - break - stanza = stanza_type(self, xml) - if stanza['lang'] is None and self.peer_default_lang: - stanza['lang'] = self.peer_default_lang - return stanza - - def __spawn_event(self, xml): - """ - Analyze incoming XML stanzas and convert them into stanza - objects if applicable and queue stream events to be processed - by matching handlers. - - :param xml: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` - stanza to analyze. - """ - # Apply any preprocessing filters. - xml = self.incoming_filter(xml) - - # Convert the raw XML object into a stanza object. If no registered - # stanza type applies, a generic StanzaBase stanza will be used. - stanza = self._build_stanza(xml) - - for filter in self.__filters['in']: - if stanza is not None: - stanza = filter(stanza) - if stanza is None: - return - - log.debug("RECV: %s", stanza) - - # Match the stanza against registered handlers. Handlers marked - # to run "in stream" will be executed immediately; the rest will - # be queued. - unhandled = True - matched_handlers = [h for h in self.__handlers if h.match(stanza)] - for handler in matched_handlers: - if len(matched_handlers) > 1: - stanza_copy = copy.copy(stanza) - else: - stanza_copy = stanza - handler.prerun(stanza_copy) - self.event_queue.put(('stanza', handler, stanza_copy)) - try: - if handler.check_delete(): - self.__handlers.remove(handler) - except: - pass # not thread safe - unhandled = False - - # Some stanzas require responses, such as Iq queries. A default - # handler will be executed immediately for this case. - if unhandled: - stanza.unhandled() - - def _threaded_event_wrapper(self, func, args): - """Capture exceptions for event handlers that run - in individual threads. - - :param func: The event handler to execute. - :param args: Arguments to the event handler. - """ - # this is always already copied before this is invoked - orig = args[0] - try: - func(*args) - except Exception as e: - error_msg = 'Error processing event handler: %s' - log.exception(error_msg, str(func)) - if hasattr(orig, 'exception'): - orig.exception(e) - else: - self.exception(e) - - def _event_runner(self): - """Process the event queue and execute handlers. - - The number of event runner threads is controlled by HANDLER_THREADS. - - Stream event handlers will all execute in this thread. Custom event - handlers may be spawned in individual threads. - """ - log.debug("Loading event runner") - try: - while not self.stop.is_set(): - event = self.event_queue.get() - if event is None: - continue - - etype, handler = event[0:2] - args = event[2:] - orig = copy.copy(args[0]) - - if etype == 'stanza': - try: - handler.run(args[0]) - except Exception as e: - error_msg = 'Error processing stream handler: %s' - log.exception(error_msg, handler.name) - orig.exception(e) - elif etype == 'schedule': - name = args[2] - try: - log.debug('Scheduled event: %s: %s', name, args[0]) - handler(*args[0], **args[1]) - except Exception as e: - log.exception('Error processing scheduled task') - self.exception(e) - elif etype == 'event': - func, threaded, disposable = handler - try: - if threaded: - x = threading.Thread( - name="Event_%s" % str(func), - target=self._threaded_event_wrapper, - args=(func, args)) - x.daemon = self._use_daemons - x.start() - else: - func(*args) - except Exception as e: - error_msg = 'Error processing event handler: %s' - log.exception(error_msg, str(func)) - if hasattr(orig, 'exception'): - orig.exception(e) - else: - self.exception(e) - elif etype == 'quit': - log.debug("Quitting event runner thread") - break - except KeyboardInterrupt: - log.debug("Keyboard Escape Detected in _event_runner") - self.event('killed', direct=True) - self.disconnect() - except SystemExit: - self.disconnect() - self.event_queue.put(('quit', None, None)) - - self._end_thread('event runner') - - def _send_thread(self): - """Extract stanzas from the send queue and send them on the stream.""" - try: - while not self.stop.is_set(): - while not self.stop.is_set() and \ - not self.session_started_event.is_set(): - self.session_started_event.wait(timeout=0.1) # Wait for session start - if self.__failed_send_stanza is not None: - data = self.__failed_send_stanza - self.__failed_send_stanza = None - else: - data = self.send_queue.get() # Wait for data to send - if data is None: - continue - log.debug("SEND: %s", data) - enc_data = data.encode('utf-8') - total = len(enc_data) - sent = 0 - count = 0 - tries = 0 - try: - with self.send_lock: - while sent < total and not self.stop.is_set() and \ - self.session_started_event.is_set(): - try: - sent += self.socket.send(enc_data[sent:]) - count += 1 - except Socket.error as serr: - if serr.errno != errno.EINTR: - raise - except ssl.SSLError as serr: - if tries >= self.ssl_retry_max: - log.debug('SSL error: max retries reached') - self.exception(serr) - log.warning("Failed to send %s", data) - if not self.stop.is_set(): - self.disconnect(self.auto_reconnect, - send_close=False) - log.warning('SSL write error: retrying') - if not self.stop.is_set(): - time.sleep(self.ssl_retry_delay) - tries += 1 - if count > 1: - log.debug('SENT: %d chunks', count) - self.send_queue.task_done() - except (Socket.error, ssl.SSLError) as serr: - self.event('socket_error', serr, direct=True) - log.warning("Failed to send %s", data) - if not self.stop.is_set(): - self.__failed_send_stanza = data - self._end_thread('send') - self.disconnect(self.auto_reconnect, send_close=False) - return - except Exception as ex: - log.exception('Unexpected error in send thread: %s', ex) - self.exception(ex) - if not self.stop.is_set(): - self._end_thread('send') - self.disconnect(self.auto_reconnect) - return - - self._end_thread('send') - - def _scheduler_thread(self): - self.scheduler.process(threaded=False) - self._end_thread('scheduler') - - def exception(self, exception): - """Process an unknown exception. - - Meant to be overridden. - - :param exception: An unhandled exception object. - """ - pass - - -# To comply with PEP8, method names now use underscores. -# Deprecated method names are re-mapped for backwards compatibility. -XMLStream.startTLS = XMLStream.start_tls -XMLStream.registerStanza = XMLStream.register_stanza -XMLStream.removeStanza = XMLStream.remove_stanza -XMLStream.registerHandler = XMLStream.register_handler -XMLStream.removeHandler = XMLStream.remove_handler -XMLStream.setSocket = XMLStream.set_socket -XMLStream.sendRaw = XMLStream.send_raw -XMLStream.getId = XMLStream.get_id -XMLStream.getNewId = XMLStream.new_id -XMLStream.sendXML = XMLStream.send_xml |