diff options
Diffstat (limited to 'sleekxmpp/xmlstream/xmlstream.py')
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 680 |
1 files changed, 339 insertions, 341 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index d9b78129..8fcf2b5f 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -1,9 +1,16 @@ +# -*- coding: utf-8 -*- """ - SleekXMPP: The Sleek XMPP Library - Copyright (C) 2010 Nathanael C. Fritz - This file is part of SleekXMPP. + sleekxmpp.xmlstream.xmlstream + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - See the file LICENSE for copying permission. + This module provides the module for creating and + interacting with generic XML streams, along with + the necessary eventing infrastructure. + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details """ from __future__ import with_statement, unicode_literals @@ -45,32 +52,35 @@ else: DNSPYTHON = True -# The time in seconds to wait before timing out waiting for response stanzas. +#: The time in seconds to wait before timing out waiting for response stanzas. RESPONSE_TIMEOUT = 30 -# The time in seconds to wait for events from the event queue, and also the -# time between checks for the process stop signal. +#: The time in seconds to wait for events from the event queue, and also the +#: time between checks for the process stop signal. WAIT_TIMEOUT = 1 -# The number of threads to use to handle XML stream events. This is not the -# same as the number of custom event handling threads. HANDLER_THREADS must -# be at least 1. +#: The number of threads to use to handle XML stream events. This is not the +#: same as the number of custom event handling threads. +#: :data:`HANDLER_THREADS` must be at least 1. For Python implementations +#: with a GIL, this should be left at 1, but for implemetnations without +#: a GIL increasing this value can provide better performance. HANDLER_THREADS = 1 -# Flag indicating if the SSL library is available for use. +#: Flag indicating if the SSL library is available for use. SSL_SUPPORT = True -# The time in seconds to delay between attempts to resend data -# after an SSL error. +#: The time in seconds to delay between attempts to resend data +#: after an SSL error. SSL_RETRY_DELAY = 0.5 -# The maximum number of times to attempt resending data due to -# an SSL error. +#: The maximum number of times to attempt resending data due to +#: an SSL error. SSL_RETRY_MAX = 10 -# Maximum time to delay between connection attempts is one hour. +#: Maximum time to delay between connection attempts is one hour. RECONNECT_MAX_DELAY = 600 + log = logging.getLogger(__name__) @@ -93,117 +103,83 @@ class XMLStream(object): streams should be complete and valid XML documents. Three types of events are provided to manage the stream: - Stream -- Triggered based on received stanzas, similar in concept - to events in a SAX XML parser. - Custom -- Triggered manually. - Scheduled -- Triggered based on time delays. + :Stream: Triggered based on received stanzas, similar in concept + to events in a SAX XML parser. + :Custom: Triggered manually. + :Scheduled: Triggered based on time delays. Typically, stanzas are first processed by a stream event handler which will then trigger custom events to continue further processing, especially since custom event handlers may run in individual threads. - - Attributes: - address -- The hostname and port of the server. - default_ns -- The default XML namespace that will be applied - to all non-namespaced stanzas. - event_queue -- A queue of stream, custom, and scheduled - events to be processed. - filesocket -- A filesocket created from the main connection socket. - Required for ElementTree.iterparse. - default_port -- Default port to connect to. - namespace_map -- Optional mapping of namespaces to namespace prefixes. - scheduler -- A scheduler object for triggering events - after a given period of time. - send_queue -- A queue of stanzas to be sent on the stream. - socket -- The connection to the server. - ssl_support -- Indicates if a SSL library is available for use. - ssl_version -- The version of the SSL protocol to use. - Defaults to ssl.PROTOCOL_TLSv1. - ca_certs -- File path to a CA certificate to verify the - server's identity. - state -- A state machine for managing the stream's - connection state. - stream_footer -- The start tag and any attributes for the stream's - root element. - stream_header -- The closing tag of the stream's root element. - use_ssl -- Flag indicating if SSL should be used. - use_tls -- Flag indicating if TLS should be used. - use_proxy -- Flag indicating that an HTTP Proxy should be used. - stop -- threading Event used to stop all threads. - proxy_config -- An optional dictionary with the following entries: - host -- The host offering proxy services. - port -- The port for the proxy service. - username -- Optional username for the proxy. - password -- Optional password for the proxy. - - auto_reconnect -- Flag to determine whether we auto reconnect. - reconnect_max_delay -- Maximum time to delay between connection - attempts. Defaults to RECONNECT_MAX_DELAY, - which is one hour. - dns_answers -- List of dns answers not yet used to connect. - - Methods: - add_event_handler -- Add a handler for a custom event. - add_handler -- Shortcut method for registerHandler. - connect -- Connect to the given server. - del_event_handler -- Remove a handler for a custom event. - disconnect -- Disconnect from the server and terminate - processing. - event -- Trigger a custom event. - get_id -- Return the current stream ID. - incoming_filter -- Optionally filter stanzas before processing. - new_id -- Generate a new, unique ID value. - process -- Read XML stanzas from the stream and apply - matching stream handlers. - reconnect -- Reestablish a connection to the server. - register_handler -- Add a handler for a stream event. - register_stanza -- Add a new stanza object type that may appear - as a direct child of the stream's root. - remove_handler -- Remove a stream handler. - remove_stanza -- Remove a stanza object type. - schedule -- Schedule an event handler to execute after a - given delay. - send -- Send a stanza object on the stream. - send_raw -- Send a raw string on the stream. - send_xml -- Send an XML string on the stream. - set_socket -- Set the stream's socket and generate a new - filesocket. - start_stream_handler -- Perform any stream initialization such - as handshakes. - start_tls -- Establish a TLS connection and restart - the stream. + :param socket: Use an existing socket for the stream. Defaults to + ``None`` to generate a new socket. + :param string host: The name of the target server. + :param int port: The port to use for the connection. Defaults to 0. """ def __init__(self, socket=None, host='', port=0): - """ - Establish a new XML stream. - - Arguments: - socket -- Use an existing socket for the stream. - Defaults to None to generate a new socket. - host -- The name of the target server. - Defaults to the empty string. - port -- The port to use for the connection. - Defaults to 0. - """ + #: Flag indicating if the SSL library is available for use. self.ssl_support = SSL_SUPPORT + + #: Most XMPP servers support TLSv1, but OpenFire in particular + #: does not work well with it. For OpenFire, set + #: :attr:`ssl_version` to use ``SSLv23``:: + #: + #: import ssl + #: xmpp.ssl_version = ssl.PROTOCOL_SSLv23 self.ssl_version = ssl.PROTOCOL_TLSv1 + + #: Path to a file containing certificates for verifying the + #: server SSL certificate. A non-``None`` value will trigger + #: certificate checking. + #: + #: .. note:: + #: + #: On Mac OS X, certificates in the system keyring will + #: be consulted, even if they are not in the provided file. self.ca_certs = None + #: The time in seconds to wait for events from the event queue, + #: and also the time between checks for the process stop signal. self.wait_timeout = WAIT_TIMEOUT + + #: The time in seconds to wait before timing out waiting + #: for response stanzas. self.response_timeout = RESPONSE_TIMEOUT + + #: The current amount to time to delay attempting to reconnect. + #: This value doubles (with some jitter) with each failed + #: connection attempt up to :attr:`reconnect_max_delay` seconds. self.reconnect_delay = None + + #: Maximum time to delay between connection attempts is one hour. self.reconnect_max_delay = RECONNECT_MAX_DELAY + + #: The time in seconds to delay between attempts to resend data + #: after an SSL error. self.ssl_retry_max = SSL_RETRY_MAX + + #: The maximum number of times to attempt resending data due to + #: an SSL error. self.ssl_retry_delay = SSL_RETRY_DELAY + #: The connection state machine tracks if the stream is + #: ``'connected'`` or ``'disconnected'``. self.state = StateMachine(('disconnected', 'connected')) self.state._set_state('disconnected') + #: The default port to return when querying DNS records. self.default_port = int(port) + + #: The domain to try when querying DNS records. self.default_domain = '' + + #: The desired, or actual, address of the connected server. self.address = (host, int(port)) + + #: A file-like wrapper for the socket for use with the + #: :mod:`~xml.etree.ElementTree` module. self.filesocket = None self.set_socket(socket) @@ -212,31 +188,79 @@ class XMLStream(object): else: self.socket_class = Socket.socket + #: Enable connecting to the server directly over SSL, in + #: particular when the service provides two ports: one for + #: non-SSL traffic and another for SSL traffic. self.use_ssl = False + + #: Enable connecting to the service without using SSL + #: immediately, but allow upgrading the connection later + #: to use SSL. self.use_tls = False + + #: If set to ``True``, attempt to connect through an HTTP + #: proxy based on the settings in :attr:`proxy_config`. self.use_proxy = False + #: An optional dictionary of proxy settings. It may provide: + #: :host: The host offering proxy services. + #: :port: The port for the proxy service. + #: :username: Optional username for accessing the proxy. + #: :password: Optional password for accessing the proxy. self.proxy_config = {} + #: The default namespace of the stream content, not of the + #: stream wrapper itself. self.default_ns = '' + + #: The namespace of the enveloping stream element. self.stream_ns = '' + + #: The default opening tag for the stream element. self.stream_header = "<stream>" + + #: The default closing tag for the stream element. self.stream_footer = "</stream>" + #: If ``True``, periodically send a whitespace character over the + #: wire to keep the connection alive. Mainly useful for connections + #: traversing NAT. self.whitespace_keepalive = True + + #: The default interval between keepalive signals when + #: :attr:`whitespace_keepalive` is enabled. self.whitespace_keepalive_interval = 300 + #: An :class:`~threading.Event` to signal that the application + #: is stopping, and that all threads should shutdown. self.stop = threading.Event() + + #: An :class:`~threading.Event` to signal receiving a closing + #: stream tag from the server. self.stream_end_event = threading.Event() self.stream_end_event.set() + + #: An :class:`~threading.Event` to signal the start of a stream + #: session. Until this event fires, the send queue is not used + #: and data is sent immediately over the wire. self.session_started_event = threading.Event() + + #: The default time in seconds to wait for a session to start + #: after connecting before reconnecting and trying again. self.session_timeout = 45 + #: A queue of stream, custom, and scheduled events to be processed. self.event_queue = queue.Queue() + + #: A queue of string data to be sent over the stream. self.send_queue = queue.Queue() - self.__failed_send_stanza = None + + #: A :class:`~sleekxmpp.xmlstream.scheduler.Scheduler` instance for + #: executing callbacks in the future based on time delays. self.scheduler = Scheduler(self.stop) + self.__failed_send_stanza = None + #: A mapping of XML namespaces to well-known prefixes. self.namespace_map = {StanzaBase.xml_ns: 'xml'} self.__thread = {} @@ -248,7 +272,11 @@ class XMLStream(object): self._id = 0 self._id_lock = threading.Lock() + #: The :attr:`auto_reconnnect` setting controls whether or not + #: the stream will be restarted in the event of an error. self.auto_reconnect = True + + #: A list of DNS results that have not yet been tried. self.dns_answers = [] self.add_event_handler('connected', self._handle_connected) @@ -256,17 +284,16 @@ class XMLStream(object): self.add_event_handler('session_end', self._end_keepalive) def use_signals(self, signals=None): - """ - Register signal handlers for SIGHUP and SIGTERM, if possible, - which will raise a "killed" event when the application is - terminated. + """Register signal handlers for ``SIGHUP`` and ``SIGTERM``. + + By using signals, a ``'killed'`` event will be raised when the + application is terminated. If a signal handler already existed, it will be executed first, - before the "killed" event is raised. + before the ``'killed'`` event is raised. - Arguments: - signals -- A list of signal names to be monitored. - Defaults to ['SIGHUP', 'SIGTERM']. + :param list signals: A list of signal names to be monitored. + Defaults to ``['SIGHUP', 'SIGTERM']``. """ if signals is None: signals = ['SIGHUP', 'SIGTERM'] @@ -282,7 +309,7 @@ class XMLStream(object): def handle_kill(signum, frame): """ Capture kill event and disconnect cleanly after first - spawning the "killed" event. + spawning the ``'killed'`` event. """ if signum in existing_handlers and \ @@ -303,8 +330,7 @@ class XMLStream(object): "SleekXMPP is not running from a main thread.") def new_id(self): - """ - Generate and return a new stream ID in hexadecimal form. + """Generate and return a new stream ID in hexadecimal form. Many stanzas, handlers, or matchers may require unique ID values. Using this method ensures that all new ID values @@ -315,26 +341,25 @@ class XMLStream(object): return self.get_id() def get_id(self): - """ - Return the current unique stream ID in hexadecimal form. - """ + """Return the current unique stream ID in hexadecimal form.""" return "%X" % self._id def connect(self, host='', port=0, use_ssl=False, use_tls=True, reattempt=True): - """ - Create a new socket and connect to the server. - - Setting reattempt to True will cause connection attempts to be made - every second until a successful connection is established. - - Arguments: - host -- The name of the desired server for the connection. - port -- Port to connect to on the server. - use_ssl -- Flag indicating if SSL should be used. - use_tls -- Flag indicating if TLS should be used. - reattempt -- Flag indicating if the socket should reconnect - after disconnections. + """Create a new socket and connect to the server. + + Setting ``reattempt`` to ``True`` will cause connection attempts to + be made every second until a successful connection is established. + + :param host: The name of the desired server for the connection. + :param port: Port to connect to on the server. + :param use_ssl: Flag indicating if SSL should be used by connecting + directly to a port using SSL. + :param use_tls: Flag indicating if TLS should be used, allowing for + connecting to a port without using SSL immediately and + later upgrading the connection. + :param reattempt: Flag indicating if the socket should reconnect + after disconnections. """ if host and port: self.address = (host, int(port)) @@ -486,24 +511,25 @@ class XMLStream(object): _handle_session_timeout) def disconnect(self, reconnect=False, wait=False): - """ - Terminate processing and close the XML streams. + """Terminate processing and close the XML streams. Optionally, the connection may be reconnected and resume processing afterwards. If the disconnect should take place after all items - in the send queue have been sent, use wait=True. However, - take note: If you are constantly adding items to the queue - such that it is never empty, then the disconnect will - not occur and the call will continue to block. - - Arguments: - reconnect -- Flag indicating if the connection - and processing should be restarted. - Defaults to False. - wait -- Flag indicating if the send queue should - be emptied before disconnecting. + in the send queue have been sent, use ``wait=True``. + + .. warning:: + + If you are constantly adding items to the queue + such that it is never empty, then the disconnect will + not occur and the call will continue to block. + + :param reconnect: Flag indicating if the connection + and processing should be restarted. + Defaults to ``False``. + :param wait: Flag indicating if the send queue should + be emptied before disconnecting. """ self.state.transition('connected', 'disconnected', func=self._disconnect, args=(reconnect, wait)) @@ -536,9 +562,7 @@ class XMLStream(object): return True def reconnect(self, reattempt=True): - """ - Reset the stream's state and reconnect to the server. - """ + """Reset the stream's state and reconnect to the server.""" log.debug("reconnecting...") if self.state.ensure('connected'): self.state.transition('connected', 'disconnected', wait=2.0, @@ -554,14 +578,13 @@ class XMLStream(object): return connected def set_socket(self, socket, ignore=False): - """ - Set the socket to use for the stream. + """Set the socket to use for the stream. The filesocket will be recreated as well. - Arguments: - socket -- The new socket to use. - ignore -- don't set the state + :param socket: The new socket object to use. + :param bool ignore: If ``True``, don't set the connection + state to ``'connected'``. """ self.socket = socket if socket is not None: @@ -579,8 +602,7 @@ class XMLStream(object): self.state._set_state('connected') def configure_socket(self): - """ - Set timeout and other options for self.socket. + """Set timeout and other options for self.socket. Meant to be overridden. """ @@ -588,24 +610,23 @@ class XMLStream(object): def configure_dns(self, resolver, domain=None, port=None): """ - Configure and set options for a dns.resolver.Resolver + Configure and set options for a :class:`~dns.resolver.Resolver` instance, and other DNS related tasks. For example, you - can also check Socket.getaddrinfo to see if you need to - call out to libresolv.so.2 to run res_init(). + can also check :meth:`~socket.socket.getaddrinfo` to see + if you need to call out to ``libresolv.so.2`` to + run ``res_init()``. Meant to be overridden. - Arguments: - resolver -- A dns.resolver.Resolver instance, or None - if dnspython is not installed. - domain -- The initial domain under consideration. - port -- The initial port under consideration. + :param resolver: A :class:`~dns.resolver.Resolver` instance + or ``None`` if ``dnspython`` is not installed. + :param domain: The initial domain under consideration. + :param port: The initial port under consideration. """ pass def start_tls(self): - """ - Perform handshakes for TLS. + """Perform handshakes for TLS. If the handshake is successful, the XML stream will need to be restarted. @@ -638,13 +659,14 @@ class XMLStream(object): return False def _start_keepalive(self, event): - """ - Begin sending whitespace periodically to keep the connection alive. + """Begin sending whitespace periodically to keep the connection alive. + + May be disabled by setting:: - May be disabled by setting: self.whitespace_keepalive = False - The keepalive interval can be set using: + The keepalive interval can be set using:: + self.whitespace_keepalive_interval = 300 """ @@ -662,18 +684,18 @@ class XMLStream(object): self.scheduler.remove('Whitespace Keepalive') def start_stream_handler(self, xml): - """ - Perform any initialization actions, such as handshakes, once the - stream header has been sent. + """Perform any initialization actions, such as handshakes, + once the stream header has been sent. Meant to be overridden. """ pass def register_stanza(self, stanza_class): - """ - Add a stanza object class as a known root stanza. A root stanza is - one that appears as a direct child of the stream's root element. + """Add a stanza object class as a known root stanza. + + A root stanza is one that appears as a direct child of the stream's + root element. Stanzas that appear as substanzas of a root stanza do not need to be registered here. That is done using register_stanza_plugin() from @@ -683,15 +705,15 @@ class XMLStream(object): stanza objects, but may still be processed using handlers and matchers. - Arguments: - stanza_class -- The top-level stanza object's class. + :param stanza_class: The top-level stanza object's class. """ self.__root_stanza.append(stanza_class) def remove_stanza(self, stanza_class): - """ - Remove a stanza from being a known root stanza. A root stanza is - one that appears as a direct child of the stream's root element. + """Remove a stanza from being a known root stanza. + + A root stanza is one that appears as a direct child of the stream's + root element. Stanzas that are not registered will not be converted into stanza objects, but may still be processed using handlers and @@ -701,22 +723,24 @@ class XMLStream(object): def add_handler(self, mask, pointer, name=None, disposable=False, threaded=False, filter=False, instream=False): - """ - A shortcut method for registering a handler using XML masks. - - Arguments: - mask -- An XML snippet matching the structure of the - stanzas that will be passed to this handler. - pointer -- The handler function itself. - name -- A unique name for the handler. A name will - be generated if one is not provided. - disposable -- Indicates if the handler should be discarded - after one use. - threaded -- Deprecated. Remains for backwards compatibility. - filter -- Deprecated. Remains for backwards compatibility. - instream -- Indicates if the handler should execute during - stream processing and not during normal event - processing. + """A shortcut method for registering a handler using XML masks. + + The use of :meth:`register_handler()` is preferred. + + :param mask: An XML snippet matching the structure of the + stanzas that will be passed to this handler. + :param pointer: The handler function itself. + :parm name: A unique name for the handler. A name will + be generated if one is not provided. + :param disposable: Indicates if the handler should be discarded + after one use. + :param threaded: **DEPRECATED**. + Remains for backwards compatibility. + :param filter: **DEPRECATED**. + Remains for backwards compatibility. + :param instream: Indicates if the handler should execute during + stream processing and not during normal event + processing. """ # To prevent circular dependencies, we must load the matcher # and handler classes here. @@ -727,23 +751,20 @@ class XMLStream(object): once=disposable, instream=instream)) def register_handler(self, handler, before=None, after=None): - """ - Add a stream event handler that will be executed when a matching + """Add a stream event handler that will be executed when a matching stanza is received. - Arguments: - handler -- The handler object to execute. + :param handler: The :class:`~sleekxmpp.xmlstream.handler.base.BaseHandler` + derived object to execute. """ if handler.stream is None: self.__handlers.append(handler) handler.stream = weakref.ref(self) def remove_handler(self, name): - """ - Remove any stream event handlers with the given name. + """Remove any stream event handlers with the given name. - Arguments: - name -- The name of the handler. + :param name: The name of the handler. """ idx = 0 for handler in self.__handlers: @@ -754,12 +775,10 @@ class XMLStream(object): return False def get_dns_records(self, domain, port=None): - """ - Get the DNS records for a domain. + """Get the DNS records for a domain. - Arguments: - domain -- The domain in question. - port -- If the results don't include a port, use this one. + :param domain: The domain in question. + :param port: If the results don't include a port, use this one. """ if port is None: port = self.default_port @@ -785,14 +804,13 @@ class XMLStream(object): return [((domain, port), 0, 0)] def pick_dns_answer(self, domain, port=None): - """ - Pick a server and port from DNS answers. + """Pick a server and port from DNS answers. + Gets DNS answers if none available. Removes used answer from available answers. - Arguments: - domain -- The domain in question. - port -- If the results don't include a port, use this one. + :param domain: The domain in question. + :param port: If the results don't include a port, use this one. """ if not self.dns_answers: self.dns_answers = self.get_dns_records(domain, port) @@ -824,30 +842,26 @@ class XMLStream(object): def add_event_handler(self, name, pointer, threaded=False, disposable=False): - """ - Add a custom event handler that will be executed whenever + """Add a custom event handler that will be executed whenever its event is manually triggered. - Arguments: - name -- The name of the event that will trigger - this handler. - pointer -- The function to execute. - threaded -- If set to True, the handler will execute - in its own thread. Defaults to False. - disposable -- If set to True, the handler will be - discarded after one use. Defaults to False. + :param name: The name of the event that will trigger + this handler. + :param pointer: The function to execute. + :param threaded: If set to ``True``, the handler will execute + in its own thread. Defaults to ``False``. + :param disposable: If set to ``True``, the handler will be + discarded after one use. Defaults to ``False``. """ if not name in self.__event_handlers: self.__event_handlers[name] = [] self.__event_handlers[name].append((pointer, threaded, disposable)) def del_event_handler(self, name, pointer): - """ - Remove a function as a handler for an event. + """Remove a function as a handler for an event. - Arguments: - name -- The name of the event. - pointer -- The function to remove as a handler. + :param name: The name of the event. + :param pointer: The function to remove as a handler. """ if not name in self.__event_handlers: return @@ -862,27 +876,22 @@ class XMLStream(object): self.__event_handlers[name])) def event_handled(self, name): - """ - Indicates if an event has any associated handlers. + """Returns the number of registered handlers for an event. - Returns the number of registered handlers. - - Arguments: - name -- The name of the event to check. + :param name: The name of the event to check. """ return len(self.__event_handlers.get(name, [])) def event(self, name, data={}, direct=False): - """ - Manually trigger a custom event. - - Arguments: - name -- The name of the event to trigger. - data -- Data that will be passed to each event handler. - Defaults to an empty dictionary. - direct -- Runs the event directly if True, skipping the - event queue. All event handlers will run in the - same thread. + """Manually trigger a custom event. + + :param name: The name of the event to trigger. + :param data: Data that will be passed to each event handler. + Defaults to an empty dictionary, but is usually + a stanza object. + :param direct: Runs the event directly if True, skipping the + event queue. All event handlers will run in the + same thread. """ handlers = self.__event_handlers.get(name, []) for handler in handlers: @@ -916,25 +925,22 @@ class XMLStream(object): def schedule(self, name, seconds, callback, args=None, kwargs=None, repeat=False): - """ - Schedule a callback function to execute after a given delay. - - Arguments: - name -- A unique name for the scheduled callback. - seconds -- The time in seconds to wait before executing. - callback -- A pointer to the function to execute. - args -- A tuple of arguments to pass to the function. - kwargs -- A dictionary of keyword arguments to pass to - the function. - repeat -- Flag indicating if the scheduled event should - be reset and repeat after executing. + """Schedule a callback function to execute after a given delay. + + :param name: A unique name for the scheduled callback. + :param seconds: The time in seconds to wait before executing. + :param callback: A pointer to the function to execute. + :param args: A tuple of arguments to pass to the function. + :param kwargs: A dictionary of keyword arguments to pass to + the function. + :param repeat: Flag indicating if the scheduled event should + be reset and repeat after executing. """ self.scheduler.add(name, seconds, callback, args, kwargs, repeat, qpointer=self.event_queue) def incoming_filter(self, xml): - """ - Filter incoming XML objects before they are processed. + """Filter incoming XML objects before they are processed. Possible uses include remapping namespaces, or correcting elements from sources with incorrect behavior. @@ -944,23 +950,23 @@ class XMLStream(object): return xml def send(self, data, mask=None, timeout=None, now=False): - """ - A wrapper for send_raw for sending stanza objects. + """A wrapper for :meth:`send_raw()` for sending stanza objects. May optionally block until an expected response is received. - Arguments: - data -- The stanza object to send on the stream. - mask -- Deprecated. An XML snippet matching the structure - of the expected response. Execution will block - in this thread until the response is received - or a timeout occurs. - timeout -- Time in seconds to wait for a response before - continuing. Defaults to RESPONSE_TIMEOUT. - now -- Indicates if the send queue should be skipped, - sending the stanza immediately. Useful mainly - for stream initialization stanzas. - Defaults to False. + :param data: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` + stanza to send on the stream. + :param mask: **DEPRECATED** + An XML string snippet matching the structure + of the expected response. Execution will block + in this thread until the response is received + or a timeout occurs. + :param int timeout: Time in seconds to wait for a response before + continuing. Defaults to :attr:`response_timeout`. + :param bool now: Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to ``False``. """ if timeout is None: timeout = self.response_timeout @@ -977,37 +983,35 @@ class XMLStream(object): return wait_for.wait(timeout) def send_xml(self, data, mask=None, timeout=None, now=False): - """ - Send an XML object on the stream, and optionally wait + """Send an XML object on the stream, and optionally wait for a response. - Arguments: - data -- The XML object to send on the stream. - mask -- Deprecated. An XML snippet matching the structure - of the expected response. Execution will block - in this thread until the response is received - or a timeout occurs. - timeout -- Time in seconds to wait for a response before - continuing. Defaults to RESPONSE_TIMEOUT. - now -- Indicates if the send queue should be skipped, - sending the stanza immediately. Useful mainly - for stream initialization stanzas. - Defaults to False. + :param data: The :class:`~xml.etree.ElementTree.Element` XML object + to send on the stream. + :param mask: **DEPRECATED** + An XML string snippet matching the structure + of the expected response. Execution will block + in this thread until the response is received + or a timeout occurs. + :param int timeout: Time in seconds to wait for a response before + continuing. Defaults to :attr:`response_timeout`. + :param bool now: Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to ``False``. """ if timeout is None: timeout = self.response_timeout return self.send(tostring(data), mask, timeout, now) def send_raw(self, data, now=False, reconnect=None): - """ - Send raw data across the stream. - - Arguments: - data -- Any string value. - reconnect -- Indicates if the stream should be - restarted if there is an error sending - the stanza. Used mainly for testing. - Defaults to self.auto_reconnect. + """Send raw data across the stream. + + :param string data: Any string value. + :param bool reconnect: Indicates if the stream should be + restarted if there is an error sending + the stanza. Used mainly for testing. + Defaults to :attr:`auto_reconnect`. """ if now: log.debug("SEND (IMMED): %s", data) @@ -1045,27 +1049,29 @@ class XMLStream(object): return True def process(self, **kwargs): - """ - Initialize the XML streams and begin processing events. + """Initialize the XML streams and begin processing events. The number of threads used for processing stream events is determined - by HANDLER_THREADS. - - Arguments: - block -- If block=False then event dispatcher will run - in a separate thread, allowing for the stream to be - used in the background for another application. - Otherwise, process(block=True) blocks the current thread. - Defaults to False. - - **threaded is deprecated and included for API compatibility** - threaded -- If threaded=True then event dispatcher will run - in a separate thread, allowing for the stream to be - used in the background for another application. - Defaults to True. - - Event handlers and the send queue will be threaded - regardless of these parameters. + by :data:`HANDLER_THREADS`. + + :param bool block: If ``False``, then event dispatcher will run + in a separate thread, allowing for the stream to be + used in the background for another application. + Otherwise, ``process(block=True)`` blocks the current + thread. Defaults to ``False``. + :param bool threaded: **DEPRECATED** + If ``True``, then event dispatcher will run + in a separate thread, allowing for the stream to be + used in the background for another application. + Defaults to ``True``. This does **not** mean that no + threads are used at all if ``threaded=False``. + + Regardless of these threading options, these threads will + always exist: + + - The event queue processor + - The send queue processor + - The scheduler """ if 'threaded' in kwargs and 'block' in kwargs: raise ValueError("process() called with both " + \ @@ -1094,8 +1100,7 @@ class XMLStream(object): self._process() def _process(self): - """ - Start processing the XML streams. + """Start processing the XML streams. Processing will continue after any recoverable errors if reconnections are allowed. @@ -1147,9 +1152,9 @@ class XMLStream(object): break def __read_xml(self): - """ - Parse the incoming XML stream, raising stream events for - each received stanza. + """Parse the incoming XML stream + + Stream events are raised for each received stanza. """ depth = 0 root = None @@ -1185,16 +1190,16 @@ class XMLStream(object): log.debug("Ending read XML loop") def _build_stanza(self, xml, default_ns=None): - """ - Create a stanza object from a given XML object. + """Create a stanza object from a given XML object. If a specialized stanza type is not found for the XML, then - a generic StanzaBase stanza will be returned. + a generic :class:`~sleekxmpp.xmlstream.stanzabase.StanzaBase` + stanza will be returned. - Arguments: - xml -- The XML object to convert into a stanza object. - default_ns -- Optional default namespace to use instead of the - stream's current default namespace. + :param xml: The :class:`~xml.etree.ElementTree.Element` XML object + to convert into a stanza object. + :param default_ns: Optional default namespace to use instead of the + stream's current default namespace. """ if default_ns is None: default_ns = self.default_ns @@ -1213,8 +1218,8 @@ class XMLStream(object): objects if applicable and queue stream events to be processed by matching handlers. - Arguments: - xml -- The XML stanza to analyze. + :param xml: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` + stanza to analyze. """ log.debug("RECV: %s", tostring(xml, xmlns=self.default_ns, stream=self)) @@ -1250,13 +1255,11 @@ class XMLStream(object): stanza.unhandled() def _threaded_event_wrapper(self, func, args): - """ - Capture exceptions for event handlers that run + """Capture exceptions for event handlers that run in individual threads. - Arguments: - func -- The event handler to execute. - args -- Arguments to the event handler. + :param func: The event handler to execute. + :param args: Arguments to the event handler. """ # this is always already copied before this is invoked orig = args[0] @@ -1271,8 +1274,7 @@ class XMLStream(object): self.exception(e) def _event_runner(self): - """ - Process the event queue and execute handlers. + """Process the event queue and execute handlers. The number of event runner threads is controlled by HANDLER_THREADS. @@ -1341,9 +1343,7 @@ class XMLStream(object): return def _send_thread(self): - """ - Extract stanzas from the send queue and send them on the stream. - """ + """Extract stanzas from the send queue and send them on the stream.""" try: while not self.stop.is_set(): while not self.stop.is_set and \ @@ -1394,13 +1394,11 @@ class XMLStream(object): self.disconnect(self.auto_reconnect) def exception(self, exception): - """ - Process an unknown exception. + """Process an unknown exception. Meant to be overridden. - Arguments: - exception -- An unhandled exception object. + :param exception: An unhandled exception object. """ pass |