summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream/xmlstream.py
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream/xmlstream.py')
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py907
1 files changed, 488 insertions, 419 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index b6d013b0..fb9f91bc 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -1,9 +1,16 @@
+# -*- coding: utf-8 -*-
"""
- SleekXMPP: The Sleek XMPP Library
- Copyright (C) 2010 Nathanael C. Fritz
- This file is part of SleekXMPP.
+ sleekxmpp.xmlstream.xmlstream
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- See the file LICENSE for copying permission.
+ This module provides the module for creating and
+ interacting with generic XML streams, along with
+ the necessary eventing infrastructure.
+
+ Part of SleekXMPP: The Sleek XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
"""
from __future__ import with_statement, unicode_literals
@@ -45,24 +52,35 @@ else:
DNSPYTHON = True
-# The time in seconds to wait before timing out waiting for response stanzas.
+#: The time in seconds to wait before timing out waiting for response stanzas.
RESPONSE_TIMEOUT = 30
-# The time in seconds to wait for events from the event queue, and also the
-# time between checks for the process stop signal.
+#: The time in seconds to wait for events from the event queue, and also the
+#: time between checks for the process stop signal.
WAIT_TIMEOUT = 1
-# The number of threads to use to handle XML stream events. This is not the
-# same as the number of custom event handling threads. HANDLER_THREADS must
-# be at least 1.
+#: The number of threads to use to handle XML stream events. This is not the
+#: same as the number of custom event handling threads.
+#: :data:`HANDLER_THREADS` must be at least 1. For Python implementations
+#: with a GIL, this should be left at 1, but for implemetnations without
+#: a GIL increasing this value can provide better performance.
HANDLER_THREADS = 1
-# Flag indicating if the SSL library is available for use.
+#: Flag indicating if the SSL library is available for use.
SSL_SUPPORT = True
-# Maximum time to delay between connection attempts is one hour.
+#: The time in seconds to delay between attempts to resend data
+#: after an SSL error.
+SSL_RETRY_DELAY = 0.5
+
+#: The maximum number of times to attempt resending data due to
+#: an SSL error.
+SSL_RETRY_MAX = 10
+
+#: Maximum time to delay between connection attempts is one hour.
RECONNECT_MAX_DELAY = 600
+
log = logging.getLogger(__name__)
@@ -85,115 +103,83 @@ class XMLStream(object):
streams should be complete and valid XML documents.
Three types of events are provided to manage the stream:
- Stream -- Triggered based on received stanzas, similar in concept
- to events in a SAX XML parser.
- Custom -- Triggered manually.
- Scheduled -- Triggered based on time delays.
+ :Stream: Triggered based on received stanzas, similar in concept
+ to events in a SAX XML parser.
+ :Custom: Triggered manually.
+ :Scheduled: Triggered based on time delays.
Typically, stanzas are first processed by a stream event handler which
will then trigger custom events to continue further processing,
especially since custom event handlers may run in individual threads.
-
- Attributes:
- address -- The hostname and port of the server.
- default_ns -- The default XML namespace that will be applied
- to all non-namespaced stanzas.
- event_queue -- A queue of stream, custom, and scheduled
- events to be processed.
- filesocket -- A filesocket created from the main connection socket.
- Required for ElementTree.iterparse.
- default_port -- Default port to connect to.
- namespace_map -- Optional mapping of namespaces to namespace prefixes.
- scheduler -- A scheduler object for triggering events
- after a given period of time.
- send_queue -- A queue of stanzas to be sent on the stream.
- socket -- The connection to the server.
- ssl_support -- Indicates if a SSL library is available for use.
- ssl_version -- The version of the SSL protocol to use.
- Defaults to ssl.PROTOCOL_TLSv1.
- ca_certs -- File path to a CA certificate to verify the
- server's identity.
- state -- A state machine for managing the stream's
- connection state.
- stream_footer -- The start tag and any attributes for the stream's
- root element.
- stream_header -- The closing tag of the stream's root element.
- use_ssl -- Flag indicating if SSL should be used.
- use_tls -- Flag indicating if TLS should be used.
- use_proxy -- Flag indicating that an HTTP Proxy should be used.
- stop -- threading Event used to stop all threads.
- proxy_config -- An optional dictionary with the following entries:
- host -- The host offering proxy services.
- port -- The port for the proxy service.
- username -- Optional username for the proxy.
- password -- Optional password for the proxy.
-
- auto_reconnect -- Flag to determine whether we auto reconnect.
- reconnect_max_delay -- Maximum time to delay between connection
- attempts. Defaults to RECONNECT_MAX_DELAY,
- which is one hour.
- dns_answers -- List of dns answers not yet used to connect.
-
- Methods:
- add_event_handler -- Add a handler for a custom event.
- add_handler -- Shortcut method for registerHandler.
- connect -- Connect to the given server.
- del_event_handler -- Remove a handler for a custom event.
- disconnect -- Disconnect from the server and terminate
- processing.
- event -- Trigger a custom event.
- get_id -- Return the current stream ID.
- incoming_filter -- Optionally filter stanzas before processing.
- new_id -- Generate a new, unique ID value.
- process -- Read XML stanzas from the stream and apply
- matching stream handlers.
- reconnect -- Reestablish a connection to the server.
- register_handler -- Add a handler for a stream event.
- register_stanza -- Add a new stanza object type that may appear
- as a direct child of the stream's root.
- remove_handler -- Remove a stream handler.
- remove_stanza -- Remove a stanza object type.
- schedule -- Schedule an event handler to execute after a
- given delay.
- send -- Send a stanza object on the stream.
- send_raw -- Send a raw string on the stream.
- send_xml -- Send an XML string on the stream.
- set_socket -- Set the stream's socket and generate a new
- filesocket.
- start_stream_handler -- Perform any stream initialization such
- as handshakes.
- start_tls -- Establish a TLS connection and restart
- the stream.
+ :param socket: Use an existing socket for the stream. Defaults to
+ ``None`` to generate a new socket.
+ :param string host: The name of the target server.
+ :param int port: The port to use for the connection. Defaults to 0.
"""
def __init__(self, socket=None, host='', port=0):
- """
- Establish a new XML stream.
-
- Arguments:
- socket -- Use an existing socket for the stream.
- Defaults to None to generate a new socket.
- host -- The name of the target server.
- Defaults to the empty string.
- port -- The port to use for the connection.
- Defaults to 0.
- """
+ #: Flag indicating if the SSL library is available for use.
self.ssl_support = SSL_SUPPORT
+
+ #: Most XMPP servers support TLSv1, but OpenFire in particular
+ #: does not work well with it. For OpenFire, set
+ #: :attr:`ssl_version` to use ``SSLv23``::
+ #:
+ #: import ssl
+ #: xmpp.ssl_version = ssl.PROTOCOL_SSLv23
self.ssl_version = ssl.PROTOCOL_TLSv1
+
+ #: Path to a file containing certificates for verifying the
+ #: server SSL certificate. A non-``None`` value will trigger
+ #: certificate checking.
+ #:
+ #: .. note::
+ #:
+ #: On Mac OS X, certificates in the system keyring will
+ #: be consulted, even if they are not in the provided file.
self.ca_certs = None
+ #: The time in seconds to wait for events from the event queue,
+ #: and also the time between checks for the process stop signal.
self.wait_timeout = WAIT_TIMEOUT
+
+ #: The time in seconds to wait before timing out waiting
+ #: for response stanzas.
self.response_timeout = RESPONSE_TIMEOUT
+
+ #: The current amount to time to delay attempting to reconnect.
+ #: This value doubles (with some jitter) with each failed
+ #: connection attempt up to :attr:`reconnect_max_delay` seconds.
self.reconnect_delay = None
+
+ #: Maximum time to delay between connection attempts is one hour.
self.reconnect_max_delay = RECONNECT_MAX_DELAY
+ #: The time in seconds to delay between attempts to resend data
+ #: after an SSL error.
+ self.ssl_retry_max = SSL_RETRY_MAX
+
+ #: The maximum number of times to attempt resending data due to
+ #: an SSL error.
+ self.ssl_retry_delay = SSL_RETRY_DELAY
+
+ #: The connection state machine tracks if the stream is
+ #: ``'connected'`` or ``'disconnected'``.
self.state = StateMachine(('disconnected', 'connected'))
self.state._set_state('disconnected')
+ #: The default port to return when querying DNS records.
self.default_port = int(port)
+
+ #: The domain to try when querying DNS records.
self.default_domain = ''
+
+ #: The desired, or actual, address of the connected server.
self.address = (host, int(port))
+
+ #: A file-like wrapper for the socket for use with the
+ #: :mod:`~xml.etree.ElementTree` module.
self.filesocket = None
self.set_socket(socket)
@@ -202,31 +188,79 @@ class XMLStream(object):
else:
self.socket_class = Socket.socket
+ #: Enable connecting to the server directly over SSL, in
+ #: particular when the service provides two ports: one for
+ #: non-SSL traffic and another for SSL traffic.
self.use_ssl = False
+
+ #: Enable connecting to the service without using SSL
+ #: immediately, but allow upgrading the connection later
+ #: to use SSL.
self.use_tls = False
+
+ #: If set to ``True``, attempt to connect through an HTTP
+ #: proxy based on the settings in :attr:`proxy_config`.
self.use_proxy = False
+ #: An optional dictionary of proxy settings. It may provide:
+ #: :host: The host offering proxy services.
+ #: :port: The port for the proxy service.
+ #: :username: Optional username for accessing the proxy.
+ #: :password: Optional password for accessing the proxy.
self.proxy_config = {}
+ #: The default namespace of the stream content, not of the
+ #: stream wrapper itself.
self.default_ns = ''
+
+ #: The namespace of the enveloping stream element.
self.stream_ns = ''
+
+ #: The default opening tag for the stream element.
self.stream_header = "<stream>"
+
+ #: The default closing tag for the stream element.
self.stream_footer = "</stream>"
+ #: If ``True``, periodically send a whitespace character over the
+ #: wire to keep the connection alive. Mainly useful for connections
+ #: traversing NAT.
self.whitespace_keepalive = True
+
+ #: The default interval between keepalive signals when
+ #: :attr:`whitespace_keepalive` is enabled.
self.whitespace_keepalive_interval = 300
+ #: An :class:`~threading.Event` to signal that the application
+ #: is stopping, and that all threads should shutdown.
self.stop = threading.Event()
+
+ #: An :class:`~threading.Event` to signal receiving a closing
+ #: stream tag from the server.
self.stream_end_event = threading.Event()
self.stream_end_event.set()
+
+ #: An :class:`~threading.Event` to signal the start of a stream
+ #: session. Until this event fires, the send queue is not used
+ #: and data is sent immediately over the wire.
self.session_started_event = threading.Event()
+
+ #: The default time in seconds to wait for a session to start
+ #: after connecting before reconnecting and trying again.
self.session_timeout = 45
+ #: A queue of stream, custom, and scheduled events to be processed.
self.event_queue = queue.Queue()
+
+ #: A queue of string data to be sent over the stream.
self.send_queue = queue.Queue()
- self.__failed_send_stanza = None
+
+ #: A :class:`~sleekxmpp.xmlstream.scheduler.Scheduler` instance for
+ #: executing callbacks in the future based on time delays.
self.scheduler = Scheduler(self.stop)
+ self.__failed_send_stanza = None
+ #: A mapping of XML namespaces to well-known prefixes.
self.namespace_map = {StanzaBase.xml_ns: 'xml'}
self.__thread = {}
@@ -238,7 +272,18 @@ class XMLStream(object):
self._id = 0
self._id_lock = threading.Lock()
+ #: The :attr:`auto_reconnnect` setting controls whether or not
+ #: the stream will be restarted in the event of an error.
self.auto_reconnect = True
+
+ #: The :attr:`disconnect_wait` setting is the default value
+ #: for controlling if the system waits for the send queue to
+ #: empty before ending the stream. This may be overridden by
+ #: passing ``wait=True`` or ``wait=False`` to :meth:`disconnect`.
+ #: The default :attr:`disconnect_wait` value is ``False``.
+ self.disconnect_wait = False
+
+ #: A list of DNS results that have not yet been tried.
self.dns_answers = []
self.add_event_handler('connected', self._handle_connected)
@@ -246,17 +291,16 @@ class XMLStream(object):
self.add_event_handler('session_end', self._end_keepalive)
def use_signals(self, signals=None):
- """
- Register signal handlers for SIGHUP and SIGTERM, if possible,
- which will raise a "killed" event when the application is
- terminated.
+ """Register signal handlers for ``SIGHUP`` and ``SIGTERM``.
+
+ By using signals, a ``'killed'`` event will be raised when the
+ application is terminated.
If a signal handler already existed, it will be executed first,
- before the "killed" event is raised.
+ before the ``'killed'`` event is raised.
- Arguments:
- signals -- A list of signal names to be monitored.
- Defaults to ['SIGHUP', 'SIGTERM'].
+ :param list signals: A list of signal names to be monitored.
+ Defaults to ``['SIGHUP', 'SIGTERM']``.
"""
if signals is None:
signals = ['SIGHUP', 'SIGTERM']
@@ -272,7 +316,7 @@ class XMLStream(object):
def handle_kill(signum, frame):
"""
Capture kill event and disconnect cleanly after first
- spawning the "killed" event.
+ spawning the ``'killed'`` event.
"""
if signum in existing_handlers and \
@@ -293,8 +337,7 @@ class XMLStream(object):
"SleekXMPP is not running from a main thread.")
def new_id(self):
- """
- Generate and return a new stream ID in hexadecimal form.
+ """Generate and return a new stream ID in hexadecimal form.
Many stanzas, handlers, or matchers may require unique
ID values. Using this method ensures that all new ID values
@@ -305,26 +348,25 @@ class XMLStream(object):
return self.get_id()
def get_id(self):
- """
- Return the current unique stream ID in hexadecimal form.
- """
+ """Return the current unique stream ID in hexadecimal form."""
return "%X" % self._id
def connect(self, host='', port=0, use_ssl=False,
use_tls=True, reattempt=True):
- """
- Create a new socket and connect to the server.
-
- Setting reattempt to True will cause connection attempts to be made
- every second until a successful connection is established.
-
- Arguments:
- host -- The name of the desired server for the connection.
- port -- Port to connect to on the server.
- use_ssl -- Flag indicating if SSL should be used.
- use_tls -- Flag indicating if TLS should be used.
- reattempt -- Flag indicating if the socket should reconnect
- after disconnections.
+ """Create a new socket and connect to the server.
+
+ Setting ``reattempt`` to ``True`` will cause connection attempts to
+ be made every second until a successful connection is established.
+
+ :param host: The name of the desired server for the connection.
+ :param port: Port to connect to on the server.
+ :param use_ssl: Flag indicating if SSL should be used by connecting
+ directly to a port using SSL.
+ :param use_tls: Flag indicating if TLS should be used, allowing for
+ connecting to a port without using SSL immediately and
+ later upgrading the connection.
+ :param reattempt: Flag indicating if the socket should reconnect
+ after disconnections.
"""
if host and port:
self.address = (host, int(port))
@@ -343,7 +385,7 @@ class XMLStream(object):
# is established.
connected = self.state.transition('disconnected', 'connected',
func=self._connect)
- while reattempt and not connected:
+ while reattempt and not connected and not self.stop.is_set():
connected = self.state.transition('disconnected', 'connected',
func=self._connect)
return connected
@@ -362,8 +404,18 @@ class XMLStream(object):
else:
delay = min(self.reconnect_delay * 2, self.reconnect_max_delay)
delay = random.normalvariate(delay, delay * 0.1)
- log.debug('Waiting %s seconds before connecting.' % delay)
- time.sleep(delay)
+ log.debug('Waiting %s seconds before connecting.', delay)
+ elapsed = 0
+ try:
+ while elapsed < delay and not self.stop.is_set():
+ time.sleep(0.1)
+ elapsed += 0.1
+ except KeyboardInterrupt:
+ self.stop.set()
+ return False
+ except SystemExit:
+ self.stop.set()
+ return False
if self.use_proxy:
connected = self._connect_proxy()
@@ -391,7 +443,7 @@ class XMLStream(object):
try:
if not self.use_proxy:
- log.debug("Connecting to %s:%s" % self.address)
+ log.debug("Connecting to %s:%s", *self.address)
self.socket.connect(self.address)
self.set_socket(self.socket, ignore=True)
@@ -402,8 +454,8 @@ class XMLStream(object):
except Socket.error as serr:
error_msg = "Could not connect to %s:%s. Socket Error #%s: %s"
self.event('socket_error', serr)
- log.error(error_msg % (self.address[0], self.address[1],
- serr.errno, serr.strerror))
+ log.error(error_msg, self.address[0], self.address[1],
+ serr.errno, serr.strerror)
self.reconnect_delay = delay
return False
@@ -435,18 +487,18 @@ class XMLStream(object):
headers = '\r\n'.join(headers) + '\r\n\r\n'
try:
- log.debug("Connecting to proxy: %s:%s" % address)
+ log.debug("Connecting to proxy: %s:%s", address)
self.socket.connect(address)
self.send_raw(headers, now=True)
resp = ''
- while '\r\n\r\n' not in resp:
+ while '\r\n\r\n' not in resp and not self.stop.is_set():
resp += self.socket.recv(1024).decode('utf-8')
- log.debug('RECV: %s' % resp)
+ log.debug('RECV: %s', resp)
lines = resp.split('\r\n')
if '200' not in lines[0]:
self.event('proxy_error', resp)
- log.error('Proxy Error: %s' % lines[0])
+ log.error('Proxy Error: %s', lines[0])
return False
# Proxy connection established, continue connecting
@@ -455,8 +507,8 @@ class XMLStream(object):
except Socket.error as serr:
error_msg = "Could not connect to %s:%s. Socket Error #%s: %s"
self.event('socket_error', serr)
- log.error(error_msg % (self.address[0], self.address[1],
- serr.errno, serr.strerror))
+ log.error(error_msg, self.address[0], self.address[1],
+ serr.errno, serr.strerror)
return False
def _handle_connected(self, event=None):
@@ -466,42 +518,48 @@ class XMLStream(object):
"""
def _handle_session_timeout():
- if not self.session_started_event.isSet():
+ if not self.session_started_event.is_set():
log.debug("Session start has taken more " + \
- "than %d seconds" % self.session_timeout)
+ "than %d seconds", self.session_timeout)
self.disconnect(reconnect=self.auto_reconnect)
self.schedule("Session timeout check",
self.session_timeout,
_handle_session_timeout)
-
- def disconnect(self, reconnect=False, wait=False):
- """
- Terminate processing and close the XML streams.
+ def disconnect(self, reconnect=False, wait=None):
+ """Terminate processing and close the XML streams.
Optionally, the connection may be reconnected and
resume processing afterwards.
If the disconnect should take place after all items
- in the send queue have been sent, use wait=True. However,
- take note: If you are constantly adding items to the queue
- such that it is never empty, then the disconnect will
- not occur and the call will continue to block.
-
- Arguments:
- reconnect -- Flag indicating if the connection
- and processing should be restarted.
- Defaults to False.
- wait -- Flag indicating if the send queue should
- be emptied before disconnecting.
- """
- self.state.transition('connected', 'disconnected', wait=0.0,
+ in the send queue have been sent, use ``wait=True``.
+
+ .. warning::
+
+ If you are constantly adding items to the queue
+ such that it is never empty, then the disconnect will
+ not occur and the call will continue to block.
+
+ :param reconnect: Flag indicating if the connection
+ and processing should be restarted.
+ Defaults to ``False``.
+ :param wait: Flag indicating if the send queue should
+ be emptied before disconnecting, overriding
+ :attr:`disconnect_wait`.
+ """
+ self.state.transition('connected', 'disconnected',
func=self._disconnect, args=(reconnect, wait))
- def _disconnect(self, reconnect=False, wait=False):
+ def _disconnect(self, reconnect=False, wait=None):
+ self.event('session_end', direct=True)
+
# Wait for the send queue to empty.
- if wait:
+ if wait is not None:
+ if wait:
+ self.send_queue.join()
+ elif self.disconnect_wait:
self.send_queue.join()
# Send the end of stream marker.
@@ -510,7 +568,7 @@ class XMLStream(object):
# Wait for confirmation that the stream was
# closed in the other direction.
self.auto_reconnect = reconnect
- log.debug('Waiting for %s from server' % self.stream_footer)
+ log.debug('Waiting for %s from server', self.stream_footer)
self.stream_end_event.wait(4)
if not self.auto_reconnect:
self.stop.set()
@@ -522,35 +580,33 @@ class XMLStream(object):
self.event('socket_error', serr)
finally:
#clear your application state
- self.event('session_end', direct=True)
self.event("disconnected", direct=True)
return True
- def reconnect(self):
- """
- Reset the stream's state and reconnect to the server.
- """
+ def reconnect(self, reattempt=True):
+ """Reset the stream's state and reconnect to the server."""
log.debug("reconnecting...")
- self.state.transition('connected', 'disconnected', wait=2.0,
- func=self._disconnect, args=(True,))
+ if self.state.ensure('connected'):
+ self.state.transition('connected', 'disconnected', wait=2.0,
+ func=self._disconnect, args=(True,))
log.debug("connecting...")
connected = self.state.transition('disconnected', 'connected',
wait=2.0, func=self._connect)
- while not connected:
+ while reattempt and not connected and not self.stop.is_set():
connected = self.state.transition('disconnected', 'connected',
wait=2.0, func=self._connect)
+ connected = connected or self.state.ensure('connected')
return connected
def set_socket(self, socket, ignore=False):
- """
- Set the socket to use for the stream.
+ """Set the socket to use for the stream.
The filesocket will be recreated as well.
- Arguments:
- socket -- The new socket to use.
- ignore -- don't set the state
+ :param socket: The new socket object to use.
+ :param bool ignore: If ``True``, don't set the connection
+ state to ``'connected'``.
"""
self.socket = socket
if socket is not None:
@@ -568,8 +624,7 @@ class XMLStream(object):
self.state._set_state('connected')
def configure_socket(self):
- """
- Set timeout and other options for self.socket.
+ """Set timeout and other options for self.socket.
Meant to be overridden.
"""
@@ -577,31 +632,30 @@ class XMLStream(object):
def configure_dns(self, resolver, domain=None, port=None):
"""
- Configure and set options for a dns.resolver.Resolver
+ Configure and set options for a :class:`~dns.resolver.Resolver`
instance, and other DNS related tasks. For example, you
- can also check Socket.getaddrinfo to see if you need to
- call out to libresolv.so.2 to run res_init().
+ can also check :meth:`~socket.socket.getaddrinfo` to see
+ if you need to call out to ``libresolv.so.2`` to
+ run ``res_init()``.
Meant to be overridden.
- Arguments:
- resolver -- A dns.resolver.Resolver instance, or None
- if dnspython is not installed.
- domain -- The initial domain under consideration.
- port -- The initial port under consideration.
+ :param resolver: A :class:`~dns.resolver.Resolver` instance
+ or ``None`` if ``dnspython`` is not installed.
+ :param domain: The initial domain under consideration.
+ :param port: The initial port under consideration.
"""
pass
def start_tls(self):
- """
- Perform handshakes for TLS.
+ """Perform handshakes for TLS.
If the handshake is successful, the XML stream will need
to be restarted.
"""
if self.ssl_support:
log.info("Negotiating TLS")
- log.info("Using SSL version: %s" % str(self.ssl_version))
+ log.info("Using SSL version: %s", str(self.ssl_version))
if self.ca_certs is None:
cert_policy = ssl.CERT_NONE
else:
@@ -627,13 +681,14 @@ class XMLStream(object):
return False
def _start_keepalive(self, event):
- """
- Begin sending whitespace periodically to keep the connection alive.
+ """Begin sending whitespace periodically to keep the connection alive.
+
+ May be disabled by setting::
- May be disabled by setting:
self.whitespace_keepalive = False
- The keepalive interval can be set using:
+ The keepalive interval can be set using::
+
self.whitespace_keepalive_interval = 300
"""
@@ -651,18 +706,18 @@ class XMLStream(object):
self.scheduler.remove('Whitespace Keepalive')
def start_stream_handler(self, xml):
- """
- Perform any initialization actions, such as handshakes, once the
- stream header has been sent.
+ """Perform any initialization actions, such as handshakes,
+ once the stream header has been sent.
Meant to be overridden.
"""
pass
def register_stanza(self, stanza_class):
- """
- Add a stanza object class as a known root stanza. A root stanza is
- one that appears as a direct child of the stream's root element.
+ """Add a stanza object class as a known root stanza.
+
+ A root stanza is one that appears as a direct child of the stream's
+ root element.
Stanzas that appear as substanzas of a root stanza do not need to
be registered here. That is done using register_stanza_plugin() from
@@ -672,15 +727,15 @@ class XMLStream(object):
stanza objects, but may still be processed using handlers and
matchers.
- Arguments:
- stanza_class -- The top-level stanza object's class.
+ :param stanza_class: The top-level stanza object's class.
"""
self.__root_stanza.append(stanza_class)
def remove_stanza(self, stanza_class):
- """
- Remove a stanza from being a known root stanza. A root stanza is
- one that appears as a direct child of the stream's root element.
+ """Remove a stanza from being a known root stanza.
+
+ A root stanza is one that appears as a direct child of the stream's
+ root element.
Stanzas that are not registered will not be converted into
stanza objects, but may still be processed using handlers and
@@ -690,22 +745,24 @@ class XMLStream(object):
def add_handler(self, mask, pointer, name=None, disposable=False,
threaded=False, filter=False, instream=False):
- """
- A shortcut method for registering a handler using XML masks.
-
- Arguments:
- mask -- An XML snippet matching the structure of the
- stanzas that will be passed to this handler.
- pointer -- The handler function itself.
- name -- A unique name for the handler. A name will
- be generated if one is not provided.
- disposable -- Indicates if the handler should be discarded
- after one use.
- threaded -- Deprecated. Remains for backwards compatibility.
- filter -- Deprecated. Remains for backwards compatibility.
- instream -- Indicates if the handler should execute during
- stream processing and not during normal event
- processing.
+ """A shortcut method for registering a handler using XML masks.
+
+ The use of :meth:`register_handler()` is preferred.
+
+ :param mask: An XML snippet matching the structure of the
+ stanzas that will be passed to this handler.
+ :param pointer: The handler function itself.
+ :parm name: A unique name for the handler. A name will
+ be generated if one is not provided.
+ :param disposable: Indicates if the handler should be discarded
+ after one use.
+ :param threaded: **DEPRECATED**.
+ Remains for backwards compatibility.
+ :param filter: **DEPRECATED**.
+ Remains for backwards compatibility.
+ :param instream: Indicates if the handler should execute during
+ stream processing and not during normal event
+ processing.
"""
# To prevent circular dependencies, we must load the matcher
# and handler classes here.
@@ -716,23 +773,20 @@ class XMLStream(object):
once=disposable, instream=instream))
def register_handler(self, handler, before=None, after=None):
- """
- Add a stream event handler that will be executed when a matching
+ """Add a stream event handler that will be executed when a matching
stanza is received.
- Arguments:
- handler -- The handler object to execute.
+ :param handler: The :class:`~sleekxmpp.xmlstream.handler.base.BaseHandler`
+ derived object to execute.
"""
if handler.stream is None:
self.__handlers.append(handler)
handler.stream = weakref.ref(self)
def remove_handler(self, name):
- """
- Remove any stream event handlers with the given name.
+ """Remove any stream event handlers with the given name.
- Arguments:
- name -- The name of the handler.
+ :param name: The name of the handler.
"""
idx = 0
for handler in self.__handlers:
@@ -743,12 +797,10 @@ class XMLStream(object):
return False
def get_dns_records(self, domain, port=None):
- """
- Get the DNS records for a domain.
+ """Get the DNS records for a domain.
- Arguments:
- domain -- The domain in question.
- port -- If the results don't include a port, use this one.
+ :param domain: The domain in question.
+ :param port: If the results don't include a port, use this one.
"""
if port is None:
port = self.default_port
@@ -759,11 +811,11 @@ class XMLStream(object):
try:
answers = resolver.query(domain, dns.rdatatype.A)
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
- log.warning("No A records for %s" % domain)
+ log.warning("No A records for %s", domain)
return [((domain, port), 0, 0)]
except dns.exception.Timeout:
log.warning("DNS resolution timed out " + \
- "for A record of %s" % domain)
+ "for A record of %s", domain)
return [((domain, port), 0, 0)]
else:
return [((ans.address, port), 0, 0) for ans in answers]
@@ -774,14 +826,13 @@ class XMLStream(object):
return [((domain, port), 0, 0)]
def pick_dns_answer(self, domain, port=None):
- """
- Pick a server and port from DNS answers.
+ """Pick a server and port from DNS answers.
+
Gets DNS answers if none available.
Removes used answer from available answers.
- Arguments:
- domain -- The domain in question.
- port -- If the results don't include a port, use this one.
+ :param domain: The domain in question.
+ :param port: If the results don't include a port, use this one.
"""
if not self.dns_answers:
self.dns_answers = self.get_dns_records(domain, port)
@@ -808,35 +859,31 @@ class XMLStream(object):
if self.dns_answers[0] == address:
break
self.dns_answers.pop(idx)
- log.debug("Trying to connect to %s:%s" % address)
+ log.debug("Trying to connect to %s:%s", *address)
return address
def add_event_handler(self, name, pointer,
threaded=False, disposable=False):
- """
- Add a custom event handler that will be executed whenever
+ """Add a custom event handler that will be executed whenever
its event is manually triggered.
- Arguments:
- name -- The name of the event that will trigger
- this handler.
- pointer -- The function to execute.
- threaded -- If set to True, the handler will execute
- in its own thread. Defaults to False.
- disposable -- If set to True, the handler will be
- discarded after one use. Defaults to False.
+ :param name: The name of the event that will trigger
+ this handler.
+ :param pointer: The function to execute.
+ :param threaded: If set to ``True``, the handler will execute
+ in its own thread. Defaults to ``False``.
+ :param disposable: If set to ``True``, the handler will be
+ discarded after one use. Defaults to ``False``.
"""
if not name in self.__event_handlers:
self.__event_handlers[name] = []
self.__event_handlers[name].append((pointer, threaded, disposable))
def del_event_handler(self, name, pointer):
- """
- Remove a function as a handler for an event.
+ """Remove a function as a handler for an event.
- Arguments:
- name -- The name of the event.
- pointer -- The function to remove as a handler.
+ :param name: The name of the event.
+ :param pointer: The function to remove as a handler.
"""
if not name in self.__event_handlers:
return
@@ -851,42 +898,42 @@ class XMLStream(object):
self.__event_handlers[name]))
def event_handled(self, name):
- """
- Indicates if an event has any associated handlers.
-
- Returns the number of registered handlers.
+ """Returns the number of registered handlers for an event.
- Arguments:
- name -- The name of the event to check.
+ :param name: The name of the event to check.
"""
return len(self.__event_handlers.get(name, []))
def event(self, name, data={}, direct=False):
- """
- Manually trigger a custom event.
-
- Arguments:
- name -- The name of the event to trigger.
- data -- Data that will be passed to each event handler.
- Defaults to an empty dictionary.
- direct -- Runs the event directly if True, skipping the
- event queue. All event handlers will run in the
- same thread.
- """
- for handler in self.__event_handlers.get(name, []):
+ """Manually trigger a custom event.
+
+ :param name: The name of the event to trigger.
+ :param data: Data that will be passed to each event handler.
+ Defaults to an empty dictionary, but is usually
+ a stanza object.
+ :param direct: Runs the event directly if True, skipping the
+ event queue. All event handlers will run in the
+ same thread.
+ """
+ handlers = self.__event_handlers.get(name, [])
+ for handler in handlers:
+ #TODO: Data should not be copied, but should be read only,
+ # but this might break current code so it's left for future.
+
+ out_data = copy.copy(data) if len(handlers) > 1 else data
+ old_exception = getattr(data, 'exception', None)
if direct:
try:
- handler[0](copy.copy(data))
+ handler[0](out_data)
except Exception as e:
error_msg = 'Error processing event handler: %s'
- log.exception(error_msg % str(handler[0]))
- if hasattr(data, 'exception'):
- data.exception(e)
+ log.exception(error_msg, str(handler[0]))
+ if old_exception:
+ old_exception(e)
else:
self.exception(e)
else:
- self.event_queue.put(('event', handler, copy.copy(data)))
-
+ self.event_queue.put(('event', handler, out_data))
if handler[2]:
# If the handler is disposable, we will go ahead and
# remove it now instead of waiting for it to be
@@ -900,25 +947,22 @@ class XMLStream(object):
def schedule(self, name, seconds, callback, args=None,
kwargs=None, repeat=False):
- """
- Schedule a callback function to execute after a given delay.
-
- Arguments:
- name -- A unique name for the scheduled callback.
- seconds -- The time in seconds to wait before executing.
- callback -- A pointer to the function to execute.
- args -- A tuple of arguments to pass to the function.
- kwargs -- A dictionary of keyword arguments to pass to
- the function.
- repeat -- Flag indicating if the scheduled event should
- be reset and repeat after executing.
+ """Schedule a callback function to execute after a given delay.
+
+ :param name: A unique name for the scheduled callback.
+ :param seconds: The time in seconds to wait before executing.
+ :param callback: A pointer to the function to execute.
+ :param args: A tuple of arguments to pass to the function.
+ :param kwargs: A dictionary of keyword arguments to pass to
+ the function.
+ :param repeat: Flag indicating if the scheduled event should
+ be reset and repeat after executing.
"""
self.scheduler.add(name, seconds, callback, args, kwargs,
repeat, qpointer=self.event_queue)
def incoming_filter(self, xml):
- """
- Filter incoming XML objects before they are processed.
+ """Filter incoming XML objects before they are processed.
Possible uses include remapping namespaces, or correcting elements
from sources with incorrect behavior.
@@ -928,23 +972,23 @@ class XMLStream(object):
return xml
def send(self, data, mask=None, timeout=None, now=False):
- """
- A wrapper for send_raw for sending stanza objects.
+ """A wrapper for :meth:`send_raw()` for sending stanza objects.
May optionally block until an expected response is received.
- Arguments:
- data -- The stanza object to send on the stream.
- mask -- Deprecated. An XML snippet matching the structure
- of the expected response. Execution will block
- in this thread until the response is received
- or a timeout occurs.
- timeout -- Time in seconds to wait for a response before
- continuing. Defaults to RESPONSE_TIMEOUT.
- now -- Indicates if the send queue should be skipped,
- sending the stanza immediately. Useful mainly
- for stream initialization stanzas.
- Defaults to False.
+ :param data: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase`
+ stanza to send on the stream.
+ :param mask: **DEPRECATED**
+ An XML string snippet matching the structure
+ of the expected response. Execution will block
+ in this thread until the response is received
+ or a timeout occurs.
+ :param int timeout: Time in seconds to wait for a response before
+ continuing. Defaults to :attr:`response_timeout`.
+ :param bool now: Indicates if the send queue should be skipped,
+ sending the stanza immediately. Useful mainly
+ for stream initialization stanzas.
+ Defaults to ``False``.
"""
if timeout is None:
timeout = self.response_timeout
@@ -961,53 +1005,64 @@ class XMLStream(object):
return wait_for.wait(timeout)
def send_xml(self, data, mask=None, timeout=None, now=False):
- """
- Send an XML object on the stream, and optionally wait
+ """Send an XML object on the stream, and optionally wait
for a response.
- Arguments:
- data -- The XML object to send on the stream.
- mask -- Deprecated. An XML snippet matching the structure
- of the expected response. Execution will block
- in this thread until the response is received
- or a timeout occurs.
- timeout -- Time in seconds to wait for a response before
- continuing. Defaults to RESPONSE_TIMEOUT.
- now -- Indicates if the send queue should be skipped,
- sending the stanza immediately. Useful mainly
- for stream initialization stanzas.
- Defaults to False.
+ :param data: The :class:`~xml.etree.ElementTree.Element` XML object
+ to send on the stream.
+ :param mask: **DEPRECATED**
+ An XML string snippet matching the structure
+ of the expected response. Execution will block
+ in this thread until the response is received
+ or a timeout occurs.
+ :param int timeout: Time in seconds to wait for a response before
+ continuing. Defaults to :attr:`response_timeout`.
+ :param bool now: Indicates if the send queue should be skipped,
+ sending the stanza immediately. Useful mainly
+ for stream initialization stanzas.
+ Defaults to ``False``.
"""
if timeout is None:
timeout = self.response_timeout
return self.send(tostring(data), mask, timeout, now)
def send_raw(self, data, now=False, reconnect=None):
- """
- Send raw data across the stream.
-
- Arguments:
- data -- Any string value.
- reconnect -- Indicates if the stream should be
- restarted if there is an error sending
- the stanza. Used mainly for testing.
- Defaults to self.auto_reconnect.
+ """Send raw data across the stream.
+
+ :param string data: Any string value.
+ :param bool reconnect: Indicates if the stream should be
+ restarted if there is an error sending
+ the stanza. Used mainly for testing.
+ Defaults to :attr:`auto_reconnect`.
"""
if now:
- log.debug("SEND (IMMED): %s" % data)
+ log.debug("SEND (IMMED): %s", data)
try:
data = data.encode('utf-8')
total = len(data)
sent = 0
count = 0
+ tries = 0
while sent < total and not self.stop.is_set():
- sent += self.socket.send(data[sent:])
- count += 1
+ try:
+ sent += self.socket.send(data[sent:])
+ count += 1
+ except ssl.SSLError as serr:
+ if tries >= self.ssl_retry_max:
+ log.debug('SSL error - max retries reached')
+ self.exception(serr)
+ log.warning("Failed to send %s", data)
+ if reconnect is None:
+ reconnect = self.auto_reconnect
+ self.disconnect(reconnect)
+ log.warning('SSL write error - reattempting')
+ time.sleep(self.ssl_retry_delay)
+ tries += 1
if count > 1:
- log.debug('SENT: %d chunks' % count)
+ log.debug('SENT: %d chunks', count)
except Socket.error as serr:
self.event('socket_error', serr)
- log.warning("Failed to send %s" % data)
+ log.warning("Failed to send %s", data)
if reconnect is None:
reconnect = self.auto_reconnect
self.disconnect(reconnect)
@@ -1016,27 +1071,29 @@ class XMLStream(object):
return True
def process(self, **kwargs):
- """
- Initialize the XML streams and begin processing events.
+ """Initialize the XML streams and begin processing events.
The number of threads used for processing stream events is determined
- by HANDLER_THREADS.
-
- Arguments:
- block -- If block=False then event dispatcher will run
- in a separate thread, allowing for the stream to be
- used in the background for another application.
- Otherwise, process(block=True) blocks the current thread.
- Defaults to False.
-
- **threaded is deprecated and included for API compatibility**
- threaded -- If threaded=True then event dispatcher will run
- in a separate thread, allowing for the stream to be
- used in the background for another application.
- Defaults to True.
-
- Event handlers and the send queue will be threaded
- regardless of these parameters.
+ by :data:`HANDLER_THREADS`.
+
+ :param bool block: If ``False``, then event dispatcher will run
+ in a separate thread, allowing for the stream to be
+ used in the background for another application.
+ Otherwise, ``process(block=True)`` blocks the current
+ thread. Defaults to ``False``.
+ :param bool threaded: **DEPRECATED**
+ If ``True``, then event dispatcher will run
+ in a separate thread, allowing for the stream to be
+ used in the background for another application.
+ Defaults to ``True``. This does **not** mean that no
+ threads are used at all if ``threaded=False``.
+
+ Regardless of these threading options, these threads will
+ always exist:
+
+ - The event queue processor
+ - The send queue processor
+ - The scheduler
"""
if 'threaded' in kwargs and 'block' in kwargs:
raise ValueError("process() called with both " + \
@@ -1050,7 +1107,6 @@ class XMLStream(object):
def start_thread(name, target):
self.__thread[name] = threading.Thread(name=name, target=target)
- self.__thread[name].daemon = True
self.__thread[name].start()
for t in range(0, HANDLER_THREADS):
@@ -1066,8 +1122,7 @@ class XMLStream(object):
self._process()
def _process(self):
- """
- Start processing the XML streams.
+ """Start processing the XML streams.
Processing will continue after any recoverable errors
if reconnections are allowed.
@@ -1077,6 +1132,7 @@ class XMLStream(object):
# Additional passes will be made only if an error occurs and
# reconnecting is permitted.
while True:
+ shutdown = False
try:
# The call to self.__read_xml will block and prevent
# the body of the loop from running until a disconnect
@@ -1094,33 +1150,36 @@ class XMLStream(object):
if not self.__read_xml():
# If the server terminated the stream, end processing
break
- except SyntaxError as e:
- log.error("Error reading from XML stream.")
- self.exception(e)
except KeyboardInterrupt:
log.debug("Keyboard Escape Detected in _process")
- self.stop.set()
+ self.event('killed', direct=True)
+ shutdown = True
except SystemExit:
log.debug("SystemExit in _process")
- self.stop.set()
- self.scheduler.quit()
+ shutdown = True
+ except SyntaxError as e:
+ log.error("Error reading from XML stream.")
+ shutdown = True
+ self.exception(e)
except Socket.error as serr:
self.event('socket_error', serr)
log.exception('Socket Error')
- except:
+ except Exception as e:
if not self.stop.is_set():
log.exception('Connection error.')
+ self.exception(e)
- if not self.stop.is_set() and self.auto_reconnect:
+ if not shutdown and not self.stop.is_set() \
+ and self.auto_reconnect:
self.reconnect()
else:
self.disconnect()
break
def __read_xml(self):
- """
- Parse the incoming XML stream, raising stream events for
- each received stanza.
+ """Parse the incoming XML stream
+
+ Stream events are raised for each received stanza.
"""
depth = 0
root = None
@@ -1156,16 +1215,16 @@ class XMLStream(object):
log.debug("Ending read XML loop")
def _build_stanza(self, xml, default_ns=None):
- """
- Create a stanza object from a given XML object.
+ """Create a stanza object from a given XML object.
If a specialized stanza type is not found for the XML, then
- a generic StanzaBase stanza will be returned.
+ a generic :class:`~sleekxmpp.xmlstream.stanzabase.StanzaBase`
+ stanza will be returned.
- Arguments:
- xml -- The XML object to convert into a stanza object.
- default_ns -- Optional default namespace to use instead of the
- stream's current default namespace.
+ :param xml: The :class:`~xml.etree.ElementTree.Element` XML object
+ to convert into a stanza object.
+ :param default_ns: Optional default namespace to use instead of the
+ stream's current default namespace.
"""
if default_ns is None:
default_ns = self.default_ns
@@ -1184,11 +1243,10 @@ class XMLStream(object):
objects if applicable and queue stream events to be processed
by matching handlers.
- Arguments:
- xml -- The XML stanza to analyze.
+ :param xml: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase`
+ stanza to analyze.
"""
- log.debug("RECV: %s" % tostring(xml,
- xmlns=self.default_ns,
+ log.debug("RECV: %s", tostring(xml, xmlns=self.default_ns,
stream=self))
# Apply any preprocessing filters.
xml = self.incoming_filter(xml)
@@ -1201,17 +1259,20 @@ class XMLStream(object):
# to run "in stream" will be executed immediately; the rest will
# be queued.
unhandled = True
- for handler in self.__handlers:
- if handler.match(stanza):
+ matched_handlers = [h for h in self.__handlers if h.match(stanza)]
+ for handler in matched_handlers:
+ if len(matched_handlers) > 1:
stanza_copy = copy.copy(stanza)
- handler.prerun(stanza_copy)
- self.event_queue.put(('stanza', handler, stanza_copy))
- try:
- if handler.check_delete():
- self.__handlers.remove(handler)
- except:
- pass # not thread safe
- unhandled = False
+ else:
+ stanza_copy = stanza
+ handler.prerun(stanza_copy)
+ self.event_queue.put(('stanza', handler, stanza_copy))
+ try:
+ if handler.check_delete():
+ self.__handlers.remove(handler)
+ except:
+ pass # not thread safe
+ unhandled = False
# Some stanzas require responses, such as Iq queries. A default
# handler will be executed immediately for this case.
@@ -1219,28 +1280,26 @@ class XMLStream(object):
stanza.unhandled()
def _threaded_event_wrapper(self, func, args):
- """
- Capture exceptions for event handlers that run
+ """Capture exceptions for event handlers that run
in individual threads.
- Arguments:
- func -- The event handler to execute.
- args -- Arguments to the event handler.
+ :param func: The event handler to execute.
+ :param args: Arguments to the event handler.
"""
- orig = copy.copy(args[0])
+ # this is always already copied before this is invoked
+ orig = args[0]
try:
func(*args)
except Exception as e:
error_msg = 'Error processing event handler: %s'
- log.exception(error_msg % str(func))
+ log.exception(error_msg, str(func))
if hasattr(orig, 'exception'):
orig.exception(e)
else:
self.exception(e)
def _event_runner(self):
- """
- Process the event queue and execute handlers.
+ """Process the event queue and execute handlers.
The number of event runner threads is controlled by HANDLER_THREADS.
@@ -1249,7 +1308,7 @@ class XMLStream(object):
"""
log.debug("Loading event runner")
try:
- while not self.stop.isSet():
+ while not self.stop.is_set():
try:
wait = self.wait_timeout
event = self.event_queue.get(True, timeout=wait)
@@ -1267,19 +1326,18 @@ class XMLStream(object):
handler.run(args[0])
except Exception as e:
error_msg = 'Error processing stream handler: %s'
- log.exception(error_msg % handler.name)
+ log.exception(error_msg, handler.name)
orig.exception(e)
elif etype == 'schedule':
name = args[1]
try:
- log.debug('Scheduled event: %s: %s' % (name, args[0]))
+ log.debug('Scheduled event: %s: %s', name, args[0])
handler(*args[0])
except Exception as e:
log.exception('Error processing scheduled task')
self.exception(e)
elif etype == 'event':
func, threaded, disposable = handler
- orig = copy.copy(args[0])
try:
if threaded:
x = threading.Thread(
@@ -1291,7 +1349,7 @@ class XMLStream(object):
func(*args)
except Exception as e:
error_msg = 'Error processing event handler: %s'
- log.exception(error_msg % str(func))
+ log.exception(error_msg, str(func))
if hasattr(orig, 'exception'):
orig.exception(e)
else:
@@ -1310,12 +1368,12 @@ class XMLStream(object):
return
def _send_thread(self):
- """
- Extract stanzas from the send queue and send them on the stream.
- """
+ """Extract stanzas from the send queue and send them on the stream."""
try:
while not self.stop.is_set():
- self.session_started_event.wait()
+ while not self.stop.is_set and \
+ not self.session_started_event.is_set():
+ self.session_started_event.wait(timeout=1)
if self.__failed_send_stanza is not None:
data = self.__failed_send_stanza
self.__failed_send_stanza = None
@@ -1324,37 +1382,48 @@ class XMLStream(object):
data = self.send_queue.get(True, 1)
except queue.Empty:
continue
- log.debug("SEND: %s" % data)
+ log.debug("SEND: %s", data)
+ enc_data = data.encode('utf-8')
+ total = len(enc_data)
+ sent = 0
+ count = 0
+ tries = 0
try:
- enc_data = data.encode('utf-8')
- total = len(enc_data)
- sent = 0
- count = 0
while sent < total and not self.stop.is_set():
- sent += self.socket.send(enc_data[sent:])
- count += 1
+ try:
+ sent += self.socket.send(enc_data[sent:])
+ count += 1
+ except ssl.SSLError as serr:
+ if tries >= self.ssl_retry_max:
+ log.debug('SSL error - max retries reached')
+ self.exception(serr)
+ log.warning("Failed to send %s", data)
+ if reconnect is None:
+ reconnect = self.auto_reconnect
+ self.disconnect(reconnect)
+ log.warning('SSL write error - reattempting')
+ time.sleep(self.ssl_retry_delay)
+ tries += 1
if count > 1:
- log.debug('SENT: %d chunks' % count)
+ log.debug('SENT: %d chunks', count)
self.send_queue.task_done()
except Socket.error as serr:
self.event('socket_error', serr)
- log.warning("Failed to send %s" % data)
+ log.warning("Failed to send %s", data)
self.__failed_send_stanza = data
self.disconnect(self.auto_reconnect)
except Exception as ex:
- log.exception('Unexpected error in send thread: %s' % ex)
+ log.exception('Unexpected error in send thread: %s', ex)
self.exception(ex)
if not self.stop.is_set():
self.disconnect(self.auto_reconnect)
def exception(self, exception):
- """
- Process an unknown exception.
+ """Process an unknown exception.
Meant to be overridden.
- Arguments:
- exception -- An unhandled exception object.
+ :param exception: An unhandled exception object.
"""
pass