summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream/xmlstream.py
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream/xmlstream.py')
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py128
1 files changed, 73 insertions, 55 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index 40ae38c9..4e7b4050 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -55,7 +55,7 @@ RESPONSE_TIMEOUT = 30
WAIT_TIMEOUT = 0.1
#: The number of threads to use to handle XML stream events. This is not the
-#: same as the number of custom event handling threads.
+#: same as the number of custom event handling threads.
#: :data:`HANDLER_THREADS` must be at least 1. For Python implementations
#: with a GIL, this should be left at 1, but for implemetnations without
#: a GIL increasing this value can provide better performance.
@@ -124,7 +124,7 @@ class XMLStream(object):
self.ssl_support = SSL_SUPPORT
#: Most XMPP servers support TLSv1, but OpenFire in particular
- #: does not work well with it. For OpenFire, set
+ #: does not work well with it. For OpenFire, set
#: :attr:`ssl_version` to use ``SSLv23``::
#:
#: import ssl
@@ -134,30 +134,30 @@ class XMLStream(object):
#: Path to a file containing certificates for verifying the
#: server SSL certificate. A non-``None`` value will trigger
#: certificate checking.
- #:
+ #:
#: .. note::
#:
#: On Mac OS X, certificates in the system keyring will
#: be consulted, even if they are not in the provided file.
self.ca_certs = None
- #: The time in seconds to wait for events from the event queue,
+ #: The time in seconds to wait for events from the event queue,
#: and also the time between checks for the process stop signal.
self.wait_timeout = WAIT_TIMEOUT
- #: The time in seconds to wait before timing out waiting
+ #: The time in seconds to wait before timing out waiting
#: for response stanzas.
self.response_timeout = RESPONSE_TIMEOUT
- #: The current amount to time to delay attempting to reconnect.
+ #: The current amount to time to delay attempting to reconnect.
#: This value doubles (with some jitter) with each failed
#: connection attempt up to :attr:`reconnect_max_delay` seconds.
self.reconnect_delay = None
-
+
#: Maximum time to delay between connection attempts is one hour.
self.reconnect_max_delay = RECONNECT_MAX_DELAY
- #: Maximum number of attempts to connect to the server before
+ #: Maximum number of attempts to connect to the server before
#: quitting and raising a 'connect_failed' event. Setting to
#: ``None`` allows infinite reattempts, while setting it to ``0``
#: will disable reconnection attempts. Defaults to ``None``.
@@ -178,16 +178,16 @@ class XMLStream(object):
#: The default port to return when querying DNS records.
self.default_port = int(port)
-
- #: The domain to try when querying DNS records.
+
+ #: The domain to try when querying DNS records.
self.default_domain = ''
#: The expected name of the server, for validation.
self._expected_server_name = ''
-
+
#: The desired, or actual, address of the connected server.
self.address = (host, int(port))
-
+
#: A file-like wrapper for the socket for use with the
#: :mod:`~xml.etree.ElementTree` module.
self.filesocket = None
@@ -223,6 +223,9 @@ class XMLStream(object):
#: stream wrapper itself.
self.default_ns = ''
+ self.default_lang = None
+ self.peer_default_lang = None
+
#: The namespace of the enveloping stream element.
self.stream_ns = ''
@@ -255,7 +258,7 @@ class XMLStream(object):
#: and data is sent immediately over the wire.
self.session_started_event = threading.Event()
- #: The default time in seconds to wait for a session to start
+ #: The default time in seconds to wait for a session to start
#: after connecting before reconnecting and trying again.
self.session_timeout = 45
@@ -414,12 +417,12 @@ class XMLStream(object):
if use_tls is not None:
self.use_tls = use_tls
-
# Repeatedly attempt to connect until a successful connection
# is established.
attempts = self.reconnect_max_attempts
connected = self.state.transition('disconnected', 'connected',
- func=self._connect, args=(reattempt,))
+ func=self._connect,
+ args=(reattempt,))
while reattempt and not connected and not self.stop.is_set():
connected = self.state.transition('disconnected', 'connected',
func=self._connect)
@@ -434,7 +437,7 @@ class XMLStream(object):
def _connect(self, reattempt=True):
self.scheduler.remove('Session timeout check')
self.stop.clear()
-
+
if self.reconnect_delay is None or not reattempt:
delay = 1.0
else:
@@ -480,7 +483,7 @@ class XMLStream(object):
if self.use_proxy:
connected = self._connect_proxy()
if not connected:
- if reattempt:
+ if reattempt:
self.reconnect_delay = delay
return False
@@ -517,7 +520,8 @@ class XMLStream(object):
except (Socket.error, ssl.SSLError):
log.error('CERT: Invalid certificate trust chain.')
if not self.event_handled('ssl_invalid_chain'):
- self.disconnect(self.auto_reconnect, send_close=False)
+ self.disconnect(self.auto_reconnect,
+ send_close=False)
else:
self.event('ssl_invalid_chain', direct=True)
return False
@@ -525,7 +529,7 @@ class XMLStream(object):
self._der_cert = self.socket.getpeercert(binary_form=True)
pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert)
log.debug('CERT: %s', pem_cert)
-
+
self.event('ssl_cert', pem_cert, direct=True)
try:
cert.verify(self._expected_server_name, self._der_cert)
@@ -534,7 +538,9 @@ class XMLStream(object):
if not self.event_handled('ssl_invalid_cert'):
self.disconnect(send_close=False)
else:
- self.event('ssl_invalid_cert', pem_cert, direct=True)
+ self.event('ssl_invalid_cert',
+ pem_cert,
+ direct=True)
self.set_socket(self.socket, ignore=True)
#this event is where you should set your application state
@@ -627,7 +633,7 @@ class XMLStream(object):
If the disconnect should take place after all items
in the send queue have been sent, use ``wait=True``.
-
+
.. warning::
If you are constantly adding items to the queue
@@ -648,7 +654,7 @@ class XMLStream(object):
"""
self.state.transition('connected', 'disconnected',
wait=2.0,
- func=self._disconnect,
+ func=self._disconnect,
args=(reconnect, wait, send_close))
def _disconnect(self, reconnect=False, wait=None, send_close=True):
@@ -702,16 +708,18 @@ class XMLStream(object):
"""Reset the stream's state and reconnect to the server."""
log.debug("reconnecting...")
if self.state.ensure('connected'):
- self.state.transition('connected', 'disconnected',
+ self.state.transition('connected', 'disconnected',
wait=2.0,
- func=self._disconnect,
+ func=self._disconnect,
args=(True, wait, send_close))
attempts = self.reconnect_max_attempts
log.debug("connecting...")
connected = self.state.transition('disconnected', 'connected',
- wait=2.0, func=self._connect, args=(reattempt,))
+ wait=2.0,
+ func=self._connect,
+ args=(reattempt,))
while reattempt and not connected and not self.stop.is_set():
connected = self.state.transition('disconnected', 'connected',
wait=2.0, func=self._connect)
@@ -759,8 +767,8 @@ class XMLStream(object):
"""
Configure and set options for a :class:`~dns.resolver.Resolver`
instance, and other DNS related tasks. For example, you
- can also check :meth:`~socket.socket.getaddrinfo` to see
- if you need to call out to ``libresolv.so.2`` to
+ can also check :meth:`~socket.socket.getaddrinfo` to see
+ if you need to call out to ``libresolv.so.2`` to
run ``res_init()``.
Meant to be overridden.
@@ -814,7 +822,7 @@ class XMLStream(object):
log.debug('CERT: %s', pem_cert)
self.event('ssl_cert', pem_cert, direct=True)
- try:
+ try:
cert.verify(self._expected_server_name, self._der_cert)
except cert.CertificateError as err:
log.error(err.message)
@@ -874,8 +882,8 @@ class XMLStream(object):
self.schedule('Whitespace Keepalive',
self.whitespace_keepalive_interval,
self.send_raw,
- args = (' ',),
- kwargs = {'now': True},
+ args=(' ',),
+ kwargs={'now': True},
repeat=True)
def _remove_schedules(self, event):
@@ -884,7 +892,7 @@ class XMLStream(object):
self.scheduler.remove('Certificate Expiration')
def start_stream_handler(self, xml):
- """Perform any initialization actions, such as handshakes,
+ """Perform any initialization actions, such as handshakes,
once the stream header has been sent.
Meant to be overridden.
@@ -892,8 +900,8 @@ class XMLStream(object):
pass
def register_stanza(self, stanza_class):
- """Add a stanza object class as a known root stanza.
-
+ """Add a stanza object class as a known root stanza.
+
A root stanza is one that appears as a direct child of the stream's
root element.
@@ -910,8 +918,8 @@ class XMLStream(object):
self.__root_stanza.append(stanza_class)
def remove_stanza(self, stanza_class):
- """Remove a stanza from being a known root stanza.
-
+ """Remove a stanza from being a known root stanza.
+
A root stanza is one that appears as a direct child of the stream's
root element.
@@ -976,8 +984,9 @@ class XMLStream(object):
"""Add a stream event handler that will be executed when a matching
stanza is received.
- :param handler: The :class:`~sleekxmpp.xmlstream.handler.base.BaseHandler`
- derived object to execute.
+ :param handler:
+ The :class:`~sleekxmpp.xmlstream.handler.base.BaseHandler`
+ derived object to execute.
"""
if handler.stream is None:
self.__handlers.append(handler)
@@ -1004,11 +1013,12 @@ class XMLStream(object):
"""
if port is None:
port = self.default_port
-
+
resolver = default_resolver()
self.configure_dns(resolver, domain=domain, port=port)
- return resolve(domain, port, service=self.dns_service, resolver=resolver)
+ return resolve(domain, port, service=self.dns_service,
+ resolver=resolver)
def pick_dns_answer(self, domain, port=None):
"""Pick a server and port from DNS answers.
@@ -1026,7 +1036,7 @@ class XMLStream(object):
return self.dns_answers.next()
else:
return next(self.dns_answers)
-
+
def add_event_handler(self, name, pointer,
threaded=False, disposable=False):
"""Add a custom event handler that will be executed whenever
@@ -1141,9 +1151,9 @@ class XMLStream(object):
May optionally block until an expected response is received.
- :param data: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase`
+ :param data: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase`
stanza to send on the stream.
- :param mask: **DEPRECATED**
+ :param mask: **DEPRECATED**
An XML string snippet matching the structure
of the expected response. Execution will block
in this thread until the response is received
@@ -1195,9 +1205,9 @@ class XMLStream(object):
"""Send an XML object on the stream, and optionally wait
for a response.
- :param data: The :class:`~xml.etree.ElementTree.Element` XML object
+ :param data: The :class:`~xml.etree.ElementTree.Element` XML object
to send on the stream.
- :param mask: **DEPRECATED**
+ :param mask: **DEPRECATED**
An XML string snippet matching the structure
of the expected response. Execution will block
in this thread until the response is received
@@ -1237,14 +1247,15 @@ class XMLStream(object):
count += 1
except ssl.SSLError as serr:
if tries >= self.ssl_retry_max:
- log.debug('SSL error - max retries reached')
+ log.debug('SSL error: max retries reached')
self.exception(serr)
log.warning("Failed to send %s", data)
if reconnect is None:
reconnect = self.auto_reconnect
if not self.stop.is_set():
- self.disconnect(reconnect, send_close=False)
- log.warning('SSL write error - reattempting')
+ self.disconnect(reconnect,
+ send_close=False)
+ log.warning('SSL write error: retrying')
if not self.stop.is_set():
time.sleep(self.ssl_retry_delay)
tries += 1
@@ -1299,7 +1310,7 @@ class XMLStream(object):
def _wait_for_threads(self):
with self.__thread_cond:
if self.__thread_count != 0:
- log.debug("Waiting for %s threads to exit." %
+ log.debug("Waiting for %s threads to exit." %
self.__thread_count)
name = threading.current_thread().name
if name in self.__thread:
@@ -1331,7 +1342,7 @@ class XMLStream(object):
Defaults to ``True``. This does **not** mean that no
threads are used at all if ``threaded=False``.
- Regardless of these threading options, these threads will
+ Regardless of these threading options, these threads will
always exist:
- The event queue processor
@@ -1421,7 +1432,7 @@ class XMLStream(object):
def __read_xml(self):
"""Parse the incoming XML stream
-
+
Stream events are raised for each received stanza.
"""
depth = 0
@@ -1431,6 +1442,10 @@ class XMLStream(object):
if depth == 0:
# We have received the start of the root element.
root = xml
+ log.debug('RECV: %s', tostring(root, xmlns=self.default_ns,
+ stream=self,
+ top_level=True,
+ open_only=True))
# Perform any stream initialization actions, such
# as handshakes.
self.stream_end_event.clear()
@@ -1461,10 +1476,10 @@ class XMLStream(object):
"""Create a stanza object from a given XML object.
If a specialized stanza type is not found for the XML, then
- a generic :class:`~sleekxmpp.xmlstream.stanzabase.StanzaBase`
+ a generic :class:`~sleekxmpp.xmlstream.stanzabase.StanzaBase`
stanza will be returned.
- :param xml: The :class:`~xml.etree.ElementTree.Element` XML object
+ :param xml: The :class:`~xml.etree.ElementTree.Element` XML object
to convert into a stanza object.
:param default_ns: Optional default namespace to use instead of the
stream's current default namespace.
@@ -1478,6 +1493,8 @@ class XMLStream(object):
stanza_type = stanza_class
break
stanza = stanza_type(self, xml)
+ if stanza['lang'] is None and self.peer_default_lang:
+ stanza['lang'] = self.peer_default_lang
return stanza
def __spawn_event(self, xml):
@@ -1647,12 +1664,13 @@ class XMLStream(object):
count += 1
except ssl.SSLError as serr:
if tries >= self.ssl_retry_max:
- log.debug('SSL error - max retries reached')
+ log.debug('SSL error: max retries reached')
self.exception(serr)
log.warning("Failed to send %s", data)
if not self.stop.is_set():
- self.disconnect(self.auto_reconnect, send_close=False)
- log.warning('SSL write error - reattempting')
+ self.disconnect(self.auto_reconnect,
+ send_close=False)
+ log.warning('SSL write error: retrying')
if not self.stop.is_set():
time.sleep(self.ssl_retry_delay)
tries += 1