diff options
Diffstat (limited to 'sleekxmpp/xmlstream/xmlstream.py')
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 123 |
1 files changed, 66 insertions, 57 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 14c13e72..4e7b4050 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -55,7 +55,7 @@ RESPONSE_TIMEOUT = 30 WAIT_TIMEOUT = 0.1 #: The number of threads to use to handle XML stream events. This is not the -#: same as the number of custom event handling threads. +#: same as the number of custom event handling threads. #: :data:`HANDLER_THREADS` must be at least 1. For Python implementations #: with a GIL, this should be left at 1, but for implemetnations without #: a GIL increasing this value can provide better performance. @@ -124,7 +124,7 @@ class XMLStream(object): self.ssl_support = SSL_SUPPORT #: Most XMPP servers support TLSv1, but OpenFire in particular - #: does not work well with it. For OpenFire, set + #: does not work well with it. For OpenFire, set #: :attr:`ssl_version` to use ``SSLv23``:: #: #: import ssl @@ -134,30 +134,30 @@ class XMLStream(object): #: Path to a file containing certificates for verifying the #: server SSL certificate. A non-``None`` value will trigger #: certificate checking. - #: + #: #: .. note:: #: #: On Mac OS X, certificates in the system keyring will #: be consulted, even if they are not in the provided file. self.ca_certs = None - #: The time in seconds to wait for events from the event queue, + #: The time in seconds to wait for events from the event queue, #: and also the time between checks for the process stop signal. self.wait_timeout = WAIT_TIMEOUT - #: The time in seconds to wait before timing out waiting + #: The time in seconds to wait before timing out waiting #: for response stanzas. self.response_timeout = RESPONSE_TIMEOUT - #: The current amount to time to delay attempting to reconnect. + #: The current amount to time to delay attempting to reconnect. #: This value doubles (with some jitter) with each failed #: connection attempt up to :attr:`reconnect_max_delay` seconds. self.reconnect_delay = None - + #: Maximum time to delay between connection attempts is one hour. self.reconnect_max_delay = RECONNECT_MAX_DELAY - #: Maximum number of attempts to connect to the server before + #: Maximum number of attempts to connect to the server before #: quitting and raising a 'connect_failed' event. Setting to #: ``None`` allows infinite reattempts, while setting it to ``0`` #: will disable reconnection attempts. Defaults to ``None``. @@ -178,16 +178,16 @@ class XMLStream(object): #: The default port to return when querying DNS records. self.default_port = int(port) - - #: The domain to try when querying DNS records. + + #: The domain to try when querying DNS records. self.default_domain = '' #: The expected name of the server, for validation. self._expected_server_name = '' - + #: The desired, or actual, address of the connected server. self.address = (host, int(port)) - + #: A file-like wrapper for the socket for use with the #: :mod:`~xml.etree.ElementTree` module. self.filesocket = None @@ -258,7 +258,7 @@ class XMLStream(object): #: and data is sent immediately over the wire. self.session_started_event = threading.Event() - #: The default time in seconds to wait for a session to start + #: The default time in seconds to wait for a session to start #: after connecting before reconnecting and trying again. self.session_timeout = 45 @@ -417,12 +417,12 @@ class XMLStream(object): if use_tls is not None: self.use_tls = use_tls - # Repeatedly attempt to connect until a successful connection # is established. attempts = self.reconnect_max_attempts connected = self.state.transition('disconnected', 'connected', - func=self._connect, args=(reattempt,)) + func=self._connect, + args=(reattempt,)) while reattempt and not connected and not self.stop.is_set(): connected = self.state.transition('disconnected', 'connected', func=self._connect) @@ -437,7 +437,7 @@ class XMLStream(object): def _connect(self, reattempt=True): self.scheduler.remove('Session timeout check') self.stop.clear() - + if self.reconnect_delay is None or not reattempt: delay = 1.0 else: @@ -483,7 +483,7 @@ class XMLStream(object): if self.use_proxy: connected = self._connect_proxy() if not connected: - if reattempt: + if reattempt: self.reconnect_delay = delay return False @@ -520,7 +520,8 @@ class XMLStream(object): except (Socket.error, ssl.SSLError): log.error('CERT: Invalid certificate trust chain.') if not self.event_handled('ssl_invalid_chain'): - self.disconnect(self.auto_reconnect, send_close=False) + self.disconnect(self.auto_reconnect, + send_close=False) else: self.event('ssl_invalid_chain', direct=True) return False @@ -528,7 +529,7 @@ class XMLStream(object): self._der_cert = self.socket.getpeercert(binary_form=True) pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert) log.debug('CERT: %s', pem_cert) - + self.event('ssl_cert', pem_cert, direct=True) try: cert.verify(self._expected_server_name, self._der_cert) @@ -537,7 +538,9 @@ class XMLStream(object): if not self.event_handled('ssl_invalid_cert'): self.disconnect(send_close=False) else: - self.event('ssl_invalid_cert', pem_cert, direct=True) + self.event('ssl_invalid_cert', + pem_cert, + direct=True) self.set_socket(self.socket, ignore=True) #this event is where you should set your application state @@ -630,7 +633,7 @@ class XMLStream(object): If the disconnect should take place after all items in the send queue have been sent, use ``wait=True``. - + .. warning:: If you are constantly adding items to the queue @@ -651,7 +654,7 @@ class XMLStream(object): """ self.state.transition('connected', 'disconnected', wait=2.0, - func=self._disconnect, + func=self._disconnect, args=(reconnect, wait, send_close)) def _disconnect(self, reconnect=False, wait=None, send_close=True): @@ -705,16 +708,18 @@ class XMLStream(object): """Reset the stream's state and reconnect to the server.""" log.debug("reconnecting...") if self.state.ensure('connected'): - self.state.transition('connected', 'disconnected', + self.state.transition('connected', 'disconnected', wait=2.0, - func=self._disconnect, + func=self._disconnect, args=(True, wait, send_close)) attempts = self.reconnect_max_attempts log.debug("connecting...") connected = self.state.transition('disconnected', 'connected', - wait=2.0, func=self._connect, args=(reattempt,)) + wait=2.0, + func=self._connect, + args=(reattempt,)) while reattempt and not connected and not self.stop.is_set(): connected = self.state.transition('disconnected', 'connected', wait=2.0, func=self._connect) @@ -762,8 +767,8 @@ class XMLStream(object): """ Configure and set options for a :class:`~dns.resolver.Resolver` instance, and other DNS related tasks. For example, you - can also check :meth:`~socket.socket.getaddrinfo` to see - if you need to call out to ``libresolv.so.2`` to + can also check :meth:`~socket.socket.getaddrinfo` to see + if you need to call out to ``libresolv.so.2`` to run ``res_init()``. Meant to be overridden. @@ -817,7 +822,7 @@ class XMLStream(object): log.debug('CERT: %s', pem_cert) self.event('ssl_cert', pem_cert, direct=True) - try: + try: cert.verify(self._expected_server_name, self._der_cert) except cert.CertificateError as err: log.error(err.message) @@ -877,8 +882,8 @@ class XMLStream(object): self.schedule('Whitespace Keepalive', self.whitespace_keepalive_interval, self.send_raw, - args = (' ',), - kwargs = {'now': True}, + args=(' ',), + kwargs={'now': True}, repeat=True) def _remove_schedules(self, event): @@ -887,7 +892,7 @@ class XMLStream(object): self.scheduler.remove('Certificate Expiration') def start_stream_handler(self, xml): - """Perform any initialization actions, such as handshakes, + """Perform any initialization actions, such as handshakes, once the stream header has been sent. Meant to be overridden. @@ -895,8 +900,8 @@ class XMLStream(object): pass def register_stanza(self, stanza_class): - """Add a stanza object class as a known root stanza. - + """Add a stanza object class as a known root stanza. + A root stanza is one that appears as a direct child of the stream's root element. @@ -913,8 +918,8 @@ class XMLStream(object): self.__root_stanza.append(stanza_class) def remove_stanza(self, stanza_class): - """Remove a stanza from being a known root stanza. - + """Remove a stanza from being a known root stanza. + A root stanza is one that appears as a direct child of the stream's root element. @@ -979,8 +984,9 @@ class XMLStream(object): """Add a stream event handler that will be executed when a matching stanza is received. - :param handler: The :class:`~sleekxmpp.xmlstream.handler.base.BaseHandler` - derived object to execute. + :param handler: + The :class:`~sleekxmpp.xmlstream.handler.base.BaseHandler` + derived object to execute. """ if handler.stream is None: self.__handlers.append(handler) @@ -1007,11 +1013,12 @@ class XMLStream(object): """ if port is None: port = self.default_port - + resolver = default_resolver() self.configure_dns(resolver, domain=domain, port=port) - return resolve(domain, port, service=self.dns_service, resolver=resolver) + return resolve(domain, port, service=self.dns_service, + resolver=resolver) def pick_dns_answer(self, domain, port=None): """Pick a server and port from DNS answers. @@ -1029,7 +1036,7 @@ class XMLStream(object): return self.dns_answers.next() else: return next(self.dns_answers) - + def add_event_handler(self, name, pointer, threaded=False, disposable=False): """Add a custom event handler that will be executed whenever @@ -1144,9 +1151,9 @@ class XMLStream(object): May optionally block until an expected response is received. - :param data: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` + :param data: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` stanza to send on the stream. - :param mask: **DEPRECATED** + :param mask: **DEPRECATED** An XML string snippet matching the structure of the expected response. Execution will block in this thread until the response is received @@ -1198,9 +1205,9 @@ class XMLStream(object): """Send an XML object on the stream, and optionally wait for a response. - :param data: The :class:`~xml.etree.ElementTree.Element` XML object + :param data: The :class:`~xml.etree.ElementTree.Element` XML object to send on the stream. - :param mask: **DEPRECATED** + :param mask: **DEPRECATED** An XML string snippet matching the structure of the expected response. Execution will block in this thread until the response is received @@ -1240,14 +1247,15 @@ class XMLStream(object): count += 1 except ssl.SSLError as serr: if tries >= self.ssl_retry_max: - log.debug('SSL error - max retries reached') + log.debug('SSL error: max retries reached') self.exception(serr) log.warning("Failed to send %s", data) if reconnect is None: reconnect = self.auto_reconnect if not self.stop.is_set(): - self.disconnect(reconnect, send_close=False) - log.warning('SSL write error - reattempting') + self.disconnect(reconnect, + send_close=False) + log.warning('SSL write error: retrying') if not self.stop.is_set(): time.sleep(self.ssl_retry_delay) tries += 1 @@ -1302,7 +1310,7 @@ class XMLStream(object): def _wait_for_threads(self): with self.__thread_cond: if self.__thread_count != 0: - log.debug("Waiting for %s threads to exit." % + log.debug("Waiting for %s threads to exit." % self.__thread_count) name = threading.current_thread().name if name in self.__thread: @@ -1334,7 +1342,7 @@ class XMLStream(object): Defaults to ``True``. This does **not** mean that no threads are used at all if ``threaded=False``. - Regardless of these threading options, these threads will + Regardless of these threading options, these threads will always exist: - The event queue processor @@ -1424,7 +1432,7 @@ class XMLStream(object): def __read_xml(self): """Parse the incoming XML stream - + Stream events are raised for each received stanza. """ depth = 0 @@ -1435,8 +1443,8 @@ class XMLStream(object): # We have received the start of the root element. root = xml log.debug('RECV: %s', tostring(root, xmlns=self.default_ns, - stream=self, - top_level=True, + stream=self, + top_level=True, open_only=True)) # Perform any stream initialization actions, such # as handshakes. @@ -1468,10 +1476,10 @@ class XMLStream(object): """Create a stanza object from a given XML object. If a specialized stanza type is not found for the XML, then - a generic :class:`~sleekxmpp.xmlstream.stanzabase.StanzaBase` + a generic :class:`~sleekxmpp.xmlstream.stanzabase.StanzaBase` stanza will be returned. - :param xml: The :class:`~xml.etree.ElementTree.Element` XML object + :param xml: The :class:`~xml.etree.ElementTree.Element` XML object to convert into a stanza object. :param default_ns: Optional default namespace to use instead of the stream's current default namespace. @@ -1656,12 +1664,13 @@ class XMLStream(object): count += 1 except ssl.SSLError as serr: if tries >= self.ssl_retry_max: - log.debug('SSL error - max retries reached') + log.debug('SSL error: max retries reached') self.exception(serr) log.warning("Failed to send %s", data) if not self.stop.is_set(): - self.disconnect(self.auto_reconnect, send_close=False) - log.warning('SSL write error - reattempting') + self.disconnect(self.auto_reconnect, + send_close=False) + log.warning('SSL write error: retrying') if not self.stop.is_set(): time.sleep(self.ssl_retry_delay) tries += 1 |