summaryrefslogtreecommitdiff
path: root/slixmpp/xmlstream
diff options
context:
space:
mode:
authorFlorent Le Coz <louiz@louiz.org>2014-07-17 14:19:04 +0200
committerFlorent Le Coz <louiz@louiz.org>2014-07-17 14:19:04 +0200
commit5ab77c745270d7d5c016c1dc7ef2a82533a4b16e (patch)
tree259377cc666f8b9c7954fc4e7b8f7a912bcfe101 /slixmpp/xmlstream
parente5582694c07236e6830c20361840360a1dde37f3 (diff)
downloadslixmpp-5ab77c745270d7d5c016c1dc7ef2a82533a4b16e.tar.gz
slixmpp-5ab77c745270d7d5c016c1dc7ef2a82533a4b16e.tar.bz2
slixmpp-5ab77c745270d7d5c016c1dc7ef2a82533a4b16e.tar.xz
slixmpp-5ab77c745270d7d5c016c1dc7ef2a82533a4b16e.zip
Rename to slixmpp
Diffstat (limited to 'slixmpp/xmlstream')
-rw-r--r--slixmpp/xmlstream/__init__.py19
-rw-r--r--slixmpp/xmlstream/cert.py184
-rw-r--r--slixmpp/xmlstream/filesocket.py54
-rw-r--r--slixmpp/xmlstream/handler/__init__.py15
-rw-r--r--slixmpp/xmlstream/handler/base.py84
-rw-r--r--slixmpp/xmlstream/handler/callback.py79
-rw-r--r--slixmpp/xmlstream/handler/collector.py66
-rw-r--r--slixmpp/xmlstream/handler/waiter.py83
-rw-r--r--slixmpp/xmlstream/handler/xmlcallback.py36
-rw-r--r--slixmpp/xmlstream/handler/xmlwaiter.py33
-rw-r--r--slixmpp/xmlstream/jid.py5
-rw-r--r--slixmpp/xmlstream/matcher/__init__.py17
-rw-r--r--slixmpp/xmlstream/matcher/base.py31
-rw-r--r--slixmpp/xmlstream/matcher/id.py29
-rw-r--r--slixmpp/xmlstream/matcher/idsender.py47
-rw-r--r--slixmpp/xmlstream/matcher/many.py40
-rw-r--r--slixmpp/xmlstream/matcher/stanzapath.py43
-rw-r--r--slixmpp/xmlstream/matcher/xmlmask.py117
-rw-r--r--slixmpp/xmlstream/matcher/xpath.py59
-rw-r--r--slixmpp/xmlstream/resolver.py333
-rw-r--r--slixmpp/xmlstream/scheduler.py250
-rw-r--r--slixmpp/xmlstream/stanzabase.py1644
-rw-r--r--slixmpp/xmlstream/tostring.py172
-rw-r--r--slixmpp/xmlstream/xmlstream.py1808
24 files changed, 5248 insertions, 0 deletions
diff --git a/slixmpp/xmlstream/__init__.py b/slixmpp/xmlstream/__init__.py
new file mode 100644
index 00000000..6b04d35c
--- /dev/null
+++ b/slixmpp/xmlstream/__init__.py
@@ -0,0 +1,19 @@
+"""
+ Slixmpp: The Slick XMPP Library
+ Copyright (C) 2010 Nathanael C. Fritz
+ This file is part of Slixmpp.
+
+ See the file LICENSE for copying permission.
+"""
+
+from slixmpp.jid import JID
+from slixmpp.xmlstream.scheduler import Scheduler
+from slixmpp.xmlstream.stanzabase import StanzaBase, ElementBase, ET
+from slixmpp.xmlstream.stanzabase import register_stanza_plugin
+from slixmpp.xmlstream.tostring import tostring
+from slixmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT
+from slixmpp.xmlstream.xmlstream import RestartStream
+
+__all__ = ['JID', 'Scheduler', 'StanzaBase', 'ElementBase',
+ 'ET', 'StateMachine', 'tostring', 'XMLStream',
+ 'RESPONSE_TIMEOUT', 'RestartStream']
diff --git a/slixmpp/xmlstream/cert.py b/slixmpp/xmlstream/cert.py
new file mode 100644
index 00000000..71146f36
--- /dev/null
+++ b/slixmpp/xmlstream/cert.py
@@ -0,0 +1,184 @@
+import logging
+from datetime import datetime, timedelta
+
+# Make a call to strptime before starting threads to
+# prevent thread safety issues.
+datetime.strptime('1970-01-01 12:00:00', "%Y-%m-%d %H:%M:%S")
+
+
+try:
+ from pyasn1.codec.der import decoder, encoder
+ from pyasn1.type.univ import Any, ObjectIdentifier, OctetString
+ from pyasn1.type.char import BMPString, IA5String, UTF8String
+ from pyasn1.type.useful import GeneralizedTime
+ from pyasn1_modules.rfc2459 import (Certificate, DirectoryString,
+ SubjectAltName, GeneralNames,
+ GeneralName)
+ from pyasn1_modules.rfc2459 import id_ce_subjectAltName as SUBJECT_ALT_NAME
+ from pyasn1_modules.rfc2459 import id_at_commonName as COMMON_NAME
+
+ XMPP_ADDR = ObjectIdentifier('1.3.6.1.5.5.7.8.5')
+ SRV_NAME = ObjectIdentifier('1.3.6.1.5.5.7.8.7')
+
+ HAVE_PYASN1 = True
+except ImportError:
+ HAVE_PYASN1 = False
+
+
+log = logging.getLogger(__name__)
+
+
+class CertificateError(Exception):
+ pass
+
+
+def decode_str(data):
+ encoding = 'utf-16-be' if isinstance(data, BMPString) else 'utf-8'
+ return bytes(data).decode(encoding)
+
+
+def extract_names(raw_cert):
+ results = {'CN': set(),
+ 'DNS': set(),
+ 'SRV': set(),
+ 'URI': set(),
+ 'XMPPAddr': set()}
+
+ cert = decoder.decode(raw_cert, asn1Spec=Certificate())[0]
+ tbs = cert.getComponentByName('tbsCertificate')
+ subject = tbs.getComponentByName('subject')
+ extensions = tbs.getComponentByName('extensions') or []
+
+ # Extract the CommonName(s) from the cert.
+ for rdnss in subject:
+ for rdns in rdnss:
+ for name in rdns:
+ oid = name.getComponentByName('type')
+ value = name.getComponentByName('value')
+
+ if oid != COMMON_NAME:
+ continue
+
+ value = decoder.decode(value, asn1Spec=DirectoryString())[0]
+ value = decode_str(value.getComponent())
+ results['CN'].add(value)
+
+ # Extract the Subject Alternate Names (DNS, SRV, URI, XMPPAddr)
+ for extension in extensions:
+ oid = extension.getComponentByName('extnID')
+ if oid != SUBJECT_ALT_NAME:
+ continue
+
+ value = decoder.decode(extension.getComponentByName('extnValue'),
+ asn1Spec=OctetString())[0]
+ sa_names = decoder.decode(value, asn1Spec=SubjectAltName())[0]
+ for name in sa_names:
+ name_type = name.getName()
+ if name_type == 'dNSName':
+ results['DNS'].add(decode_str(name.getComponent()))
+ if name_type == 'uniformResourceIdentifier':
+ value = decode_str(name.getComponent())
+ if value.startswith('xmpp:'):
+ results['URI'].add(value[5:])
+ elif name_type == 'otherName':
+ name = name.getComponent()
+
+ oid = name.getComponentByName('type-id')
+ value = name.getComponentByName('value')
+
+ if oid == XMPP_ADDR:
+ value = decoder.decode(value, asn1Spec=UTF8String())[0]
+ results['XMPPAddr'].add(decode_str(value))
+ elif oid == SRV_NAME:
+ value = decoder.decode(value, asn1Spec=IA5String())[0]
+ results['SRV'].add(decode_str(value))
+
+ return results
+
+
+def extract_dates(raw_cert):
+ if not HAVE_PYASN1:
+ log.warning("Could not find pyasn1 and pyasn1_modules. " + \
+ "SSL certificate expiration COULD NOT BE VERIFIED.")
+ return None, None
+
+ cert = decoder.decode(raw_cert, asn1Spec=Certificate())[0]
+ tbs = cert.getComponentByName('tbsCertificate')
+ validity = tbs.getComponentByName('validity')
+
+ not_before = validity.getComponentByName('notBefore')
+ not_before = str(not_before.getComponent())
+
+ not_after = validity.getComponentByName('notAfter')
+ not_after = str(not_after.getComponent())
+
+ if isinstance(not_before, GeneralizedTime):
+ not_before = datetime.strptime(not_before, '%Y%m%d%H%M%SZ')
+ else:
+ not_before = datetime.strptime(not_before, '%y%m%d%H%M%SZ')
+
+ if isinstance(not_after, GeneralizedTime):
+ not_after = datetime.strptime(not_after, '%Y%m%d%H%M%SZ')
+ else:
+ not_after = datetime.strptime(not_after, '%y%m%d%H%M%SZ')
+
+ return not_before, not_after
+
+
+def get_ttl(raw_cert):
+ not_before, not_after = extract_dates(raw_cert)
+ if not_after is None:
+ return None
+ return not_after - datetime.utcnow()
+
+
+def verify(expected, raw_cert):
+ if not HAVE_PYASN1:
+ log.warning("Could not find pyasn1 and pyasn1_modules. " + \
+ "SSL certificate COULD NOT BE VERIFIED.")
+ return
+
+ not_before, not_after = extract_dates(raw_cert)
+ cert_names = extract_names(raw_cert)
+
+ now = datetime.utcnow()
+
+ if not_before > now:
+ raise CertificateError(
+ 'Certificate has not entered its valid date range.')
+
+ if not_after <= now:
+ raise CertificateError(
+ 'Certificate has expired.')
+
+ if '.' in expected:
+ expected_wild = expected[expected.index('.'):]
+ else:
+ expected_wild = expected
+ expected_srv = '_xmpp-client.%s' % expected
+
+ for name in cert_names['XMPPAddr']:
+ if name == expected:
+ return True
+ for name in cert_names['SRV']:
+ if name == expected_srv or name == expected:
+ return True
+ for name in cert_names['DNS']:
+ if name == expected:
+ return True
+ if name.startswith('*'):
+ if '.' in name:
+ name_wild = name[name.index('.'):]
+ else:
+ name_wild = name
+ if expected_wild == name_wild:
+ return True
+ for name in cert_names['URI']:
+ if name == expected:
+ return True
+ for name in cert_names['CN']:
+ if name == expected:
+ return True
+
+ raise CertificateError(
+ 'Could not match certficate against hostname: %s' % expected)
diff --git a/slixmpp/xmlstream/filesocket.py b/slixmpp/xmlstream/filesocket.py
new file mode 100644
index 00000000..1b52251e
--- /dev/null
+++ b/slixmpp/xmlstream/filesocket.py
@@ -0,0 +1,54 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.xmlstream.filesocket
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ This module is a shim for correcting deficiencies in the file
+ socket implementation of Python2.6.
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+from socket import _fileobject
+import errno
+import socket
+
+
+class FileSocket(_fileobject):
+
+ """Create a file object wrapper for a socket to work around
+ issues present in Python 2.6 when using sockets as file objects.
+
+ The parser for :class:`~xml.etree.cElementTree` requires a file, but
+ we will be reading from the XMPP connection socket instead.
+ """
+
+ def read(self, size=4096):
+ """Read data from the socket as if it were a file."""
+ if self._sock is None:
+ return None
+ while True:
+ try:
+ data = self._sock.recv(size)
+ break
+ except socket.error as serr:
+ if serr.errno != errno.EINTR:
+ raise
+ if data is not None:
+ return data
+
+
+class Socket26(socket.socket):
+
+ """A custom socket implementation that uses our own FileSocket class
+ to work around issues in Python 2.6 when using sockets as files.
+ """
+
+ def makefile(self, mode='r', bufsize=-1):
+ """makefile([mode[, bufsize]]) -> file object
+ Return a regular file object corresponding to the socket. The mode
+ and bufsize arguments are as for the built-in open() function."""
+ return FileSocket(self._sock, mode, bufsize)
diff --git a/slixmpp/xmlstream/handler/__init__.py b/slixmpp/xmlstream/handler/__init__.py
new file mode 100644
index 00000000..31de9dfc
--- /dev/null
+++ b/slixmpp/xmlstream/handler/__init__.py
@@ -0,0 +1,15 @@
+"""
+ Slixmpp: The Slick XMPP Library
+ Copyright (C) 2010 Nathanael C. Fritz
+ This file is part of Slixmpp.
+
+ See the file LICENSE for copying permission.
+"""
+
+from slixmpp.xmlstream.handler.callback import Callback
+from slixmpp.xmlstream.handler.collector import Collector
+from slixmpp.xmlstream.handler.waiter import Waiter
+from slixmpp.xmlstream.handler.xmlcallback import XMLCallback
+from slixmpp.xmlstream.handler.xmlwaiter import XMLWaiter
+
+__all__ = ['Callback', 'Waiter', 'XMLCallback', 'XMLWaiter']
diff --git a/slixmpp/xmlstream/handler/base.py b/slixmpp/xmlstream/handler/base.py
new file mode 100644
index 00000000..36723597
--- /dev/null
+++ b/slixmpp/xmlstream/handler/base.py
@@ -0,0 +1,84 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.xmlstream.handler.base
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+import weakref
+
+
+class BaseHandler(object):
+
+ """
+ Base class for stream handlers. Stream handlers are matched with
+ incoming stanzas so that the stanza may be processed in some way.
+ Stanzas may be matched with multiple handlers.
+
+ Handler execution may take place in two phases: during the incoming
+ stream processing, and in the main event loop. The :meth:`prerun()`
+ method is executed in the first case, and :meth:`run()` is called
+ during the second.
+
+ :param string name: The name of the handler.
+ :param matcher: A :class:`~slixmpp.xmlstream.matcher.base.MatcherBase`
+ derived object that will be used to determine if a
+ stanza should be accepted by this handler.
+ :param stream: The :class:`~slixmpp.xmlstream.xmlstream.XMLStream`
+ instance that the handle will respond to.
+ """
+
+ def __init__(self, name, matcher, stream=None):
+ #: The name of the handler
+ self.name = name
+
+ #: The XML stream this handler is assigned to
+ self.stream = None
+ if stream is not None:
+ self.stream = weakref.ref(stream)
+ stream.register_handler(self)
+
+ self._destroy = False
+ self._payload = None
+ self._matcher = matcher
+
+ def match(self, xml):
+ """Compare a stanza or XML object with the handler's matcher.
+
+ :param xml: An XML or
+ :class:`~slixmpp.xmlstream.stanzabase.ElementBase` object
+ """
+ return self._matcher.match(xml)
+
+ def prerun(self, payload):
+ """Prepare the handler for execution while the XML
+ stream is being processed.
+
+ :param payload: A :class:`~slixmpp.xmlstream.stanzabase.ElementBase`
+ object.
+ """
+ self._payload = payload
+
+ def run(self, payload):
+ """Execute the handler after XML stream processing and during the
+ main event loop.
+
+ :param payload: A :class:`~slixmpp.xmlstream.stanzabase.ElementBase`
+ object.
+ """
+ self._payload = payload
+
+ def check_delete(self):
+ """Check if the handler should be removed from the list
+ of stream handlers.
+ """
+ return self._destroy
+
+
+# To comply with PEP8, method names now use underscores.
+# Deprecated method names are re-mapped for backwards compatibility.
+BaseHandler.checkDelete = BaseHandler.check_delete
diff --git a/slixmpp/xmlstream/handler/callback.py b/slixmpp/xmlstream/handler/callback.py
new file mode 100644
index 00000000..4cb329af
--- /dev/null
+++ b/slixmpp/xmlstream/handler/callback.py
@@ -0,0 +1,79 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.xmlstream.handler.callback
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+from slixmpp.xmlstream.handler.base import BaseHandler
+
+
+class Callback(BaseHandler):
+
+ """
+ The Callback handler will execute a callback function with
+ matched stanzas.
+
+ The handler may execute the callback either during stream
+ processing or during the main event loop.
+
+ Callback functions are all executed in the same thread, so be aware if
+ you are executing functions that will block for extended periods of
+ time. Typically, you should signal your own events using the Slixmpp
+ object's :meth:`~slixmpp.xmlstream.xmlstream.XMLStream.event()`
+ method to pass the stanza off to a threaded event handler for further
+ processing.
+
+
+ :param string name: The name of the handler.
+ :param matcher: A :class:`~slixmpp.xmlstream.matcher.base.MatcherBase`
+ derived object for matching stanza objects.
+ :param pointer: The function to execute during callback.
+ :param bool thread: **DEPRECATED.** Remains only for
+ backwards compatibility.
+ :param bool once: Indicates if the handler should be used only
+ once. Defaults to False.
+ :param bool instream: Indicates if the callback should be executed
+ during stream processing instead of in the
+ main event loop.
+ :param stream: The :class:`~slixmpp.xmlstream.xmlstream.XMLStream`
+ instance this handler should monitor.
+ """
+
+ def __init__(self, name, matcher, pointer, thread=False,
+ once=False, instream=False, stream=None):
+ BaseHandler.__init__(self, name, matcher, stream)
+ self._pointer = pointer
+ self._once = once
+ self._instream = instream
+
+ def prerun(self, payload):
+ """Execute the callback during stream processing, if
+ the callback was created with ``instream=True``.
+
+ :param payload: The matched
+ :class:`~slixmpp.xmlstream.stanzabase.ElementBase` object.
+ """
+ if self._once:
+ self._destroy = True
+ if self._instream:
+ self.run(payload, True)
+
+ def run(self, payload, instream=False):
+ """Execute the callback function with the matched stanza payload.
+
+ :param payload: The matched
+ :class:`~slixmpp.xmlstream.stanzabase.ElementBase` object.
+ :param bool instream: Force the handler to execute during stream
+ processing. This should only be used by
+ :meth:`prerun()`. Defaults to ``False``.
+ """
+ if not self._instream or instream:
+ self._pointer(payload)
+ if self._once:
+ self._destroy = True
+ del self._pointer
diff --git a/slixmpp/xmlstream/handler/collector.py b/slixmpp/xmlstream/handler/collector.py
new file mode 100644
index 00000000..03868187
--- /dev/null
+++ b/slixmpp/xmlstream/handler/collector.py
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.xmlstream.handler.collector
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2012 Nathanael C. Fritz, Lance J.T. Stout
+ :license: MIT, see LICENSE for more details
+"""
+
+import logging
+
+from slixmpp.util import Queue, QueueEmpty
+from slixmpp.xmlstream.handler.base import BaseHandler
+
+
+log = logging.getLogger(__name__)
+
+
+class Collector(BaseHandler):
+
+ """
+ The Collector handler allows for collecting a set of stanzas
+ that match a given pattern. Unlike the Waiter handler, a
+ Collector does not block execution, and will continue to
+ accumulate matching stanzas until told to stop.
+
+ :param string name: The name of the handler.
+ :param matcher: A :class:`~slixmpp.xmlstream.matcher.base.MatcherBase`
+ derived object for matching stanza objects.
+ :param stream: The :class:`~slixmpp.xmlstream.xmlstream.XMLStream`
+ instance this handler should monitor.
+ """
+
+ def __init__(self, name, matcher, stream=None):
+ BaseHandler.__init__(self, name, matcher, stream=stream)
+ self._payload = Queue()
+
+ def prerun(self, payload):
+ """Store the matched stanza when received during processing.
+
+ :param payload: The matched
+ :class:`~slixmpp.xmlstream.stanzabase.ElementBase` object.
+ """
+ self._payload.put(payload)
+
+ def run(self, payload):
+ """Do not process this handler during the main event loop."""
+ pass
+
+ def stop(self):
+ """
+ Stop collection of matching stanzas, and return the ones that
+ have been stored so far.
+ """
+ self._destroy = True
+ results = []
+ try:
+ while True:
+ results.append(self._payload.get(False))
+ except QueueEmpty:
+ pass
+
+ self.stream().remove_handler(self.name)
+ return results
diff --git a/slixmpp/xmlstream/handler/waiter.py b/slixmpp/xmlstream/handler/waiter.py
new file mode 100644
index 00000000..c8bc2e55
--- /dev/null
+++ b/slixmpp/xmlstream/handler/waiter.py
@@ -0,0 +1,83 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.xmlstream.handler.waiter
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+import logging
+
+from slixmpp.util import Queue, QueueEmpty
+from slixmpp.xmlstream.handler.base import BaseHandler
+
+
+log = logging.getLogger(__name__)
+
+
+class Waiter(BaseHandler):
+
+ """
+ The Waiter handler allows an event handler to block until a
+ particular stanza has been received. The handler will either be
+ given the matched stanza, or ``False`` if the waiter has timed out.
+
+ :param string name: The name of the handler.
+ :param matcher: A :class:`~slixmpp.xmlstream.matcher.base.MatcherBase`
+ derived object for matching stanza objects.
+ :param stream: The :class:`~slixmpp.xmlstream.xmlstream.XMLStream`
+ instance this handler should monitor.
+ """
+
+ def __init__(self, name, matcher, stream=None):
+ BaseHandler.__init__(self, name, matcher, stream=stream)
+ self._payload = Queue()
+
+ def prerun(self, payload):
+ """Store the matched stanza when received during processing.
+
+ :param payload: The matched
+ :class:`~slixmpp.xmlstream.stanzabase.ElementBase` object.
+ """
+ self._payload.put(payload)
+
+ def run(self, payload):
+ """Do not process this handler during the main event loop."""
+ pass
+
+ def wait(self, timeout=None):
+ """Block an event handler while waiting for a stanza to arrive.
+
+ Be aware that this will impact performance if called from a
+ non-threaded event handler.
+
+ Will return either the received stanza, or ``False`` if the
+ waiter timed out.
+
+ :param int timeout: The number of seconds to wait for the stanza
+ to arrive. Defaults to the the stream's
+ :class:`~slixmpp.xmlstream.xmlstream.XMLStream.response_timeout`
+ value.
+ """
+ if timeout is None:
+ timeout = self.stream().response_timeout
+
+ elapsed_time = 0
+ stanza = False
+ while elapsed_time < timeout and not self.stream().stop.is_set():
+ try:
+ stanza = self._payload.get(True, 1)
+ break
+ except QueueEmpty:
+ elapsed_time += 1
+ if elapsed_time >= timeout:
+ log.warning("Timed out waiting for %s", self.name)
+ self.stream().remove_handler(self.name)
+ return stanza
+
+ def check_delete(self):
+ """Always remove waiters after use."""
+ return True
diff --git a/slixmpp/xmlstream/handler/xmlcallback.py b/slixmpp/xmlstream/handler/xmlcallback.py
new file mode 100644
index 00000000..60ccbaed
--- /dev/null
+++ b/slixmpp/xmlstream/handler/xmlcallback.py
@@ -0,0 +1,36 @@
+"""
+ Slixmpp: The Slick XMPP Library
+ Copyright (C) 2010 Nathanael C. Fritz
+ This file is part of Slixmpp.
+
+ See the file LICENSE for copying permission.
+"""
+
+from slixmpp.xmlstream.handler import Callback
+
+
+class XMLCallback(Callback):
+
+ """
+ The XMLCallback class is identical to the normal Callback class,
+ except that XML contents of matched stanzas will be processed instead
+ of the stanza objects themselves.
+
+ Methods:
+ run -- Overrides Callback.run
+ """
+
+ def run(self, payload, instream=False):
+ """
+ Execute the callback function with the matched stanza's
+ XML contents, instead of the stanza itself.
+
+ Overrides BaseHandler.run
+
+ Arguments:
+ payload -- The matched stanza object.
+ instream -- Force the handler to execute during
+ stream processing. Used only by prerun.
+ Defaults to False.
+ """
+ Callback.run(self, payload.xml, instream)
diff --git a/slixmpp/xmlstream/handler/xmlwaiter.py b/slixmpp/xmlstream/handler/xmlwaiter.py
new file mode 100644
index 00000000..dc014da0
--- /dev/null
+++ b/slixmpp/xmlstream/handler/xmlwaiter.py
@@ -0,0 +1,33 @@
+"""
+ Slixmpp: The Slick XMPP Library
+ Copyright (C) 2010 Nathanael C. Fritz
+ This file is part of Slixmpp.
+
+ See the file LICENSE for copying permission.
+"""
+
+from slixmpp.xmlstream.handler import Waiter
+
+
+class XMLWaiter(Waiter):
+
+ """
+ The XMLWaiter class is identical to the normal Waiter class
+ except that it returns the XML contents of the stanza instead
+ of the full stanza object itself.
+
+ Methods:
+ prerun -- Overrides Waiter.prerun
+ """
+
+ def prerun(self, payload):
+ """
+ Store the XML contents of the stanza to return to the
+ waiting event handler.
+
+ Overrides Waiter.prerun
+
+ Arguments:
+ payload -- The matched stanza object.
+ """
+ Waiter.prerun(self, payload.xml)
diff --git a/slixmpp/xmlstream/jid.py b/slixmpp/xmlstream/jid.py
new file mode 100644
index 00000000..94841d98
--- /dev/null
+++ b/slixmpp/xmlstream/jid.py
@@ -0,0 +1,5 @@
+import logging
+
+logging.warning('Deprecated: slixmpp.xmlstream.jid is moving to slixmpp.jid')
+
+from slixmpp.jid import JID
diff --git a/slixmpp/xmlstream/matcher/__init__.py b/slixmpp/xmlstream/matcher/__init__.py
new file mode 100644
index 00000000..47487d4a
--- /dev/null
+++ b/slixmpp/xmlstream/matcher/__init__.py
@@ -0,0 +1,17 @@
+"""
+ Slixmpp: The Slick XMPP Library
+ Copyright (C) 2010 Nathanael C. Fritz
+ This file is part of Slixmpp.
+
+ See the file LICENSE for copying permission.
+"""
+
+from slixmpp.xmlstream.matcher.id import MatcherId
+from slixmpp.xmlstream.matcher.idsender import MatchIDSender
+from slixmpp.xmlstream.matcher.many import MatchMany
+from slixmpp.xmlstream.matcher.stanzapath import StanzaPath
+from slixmpp.xmlstream.matcher.xmlmask import MatchXMLMask
+from slixmpp.xmlstream.matcher.xpath import MatchXPath
+
+__all__ = ['MatcherId', 'MatchMany', 'StanzaPath',
+ 'MatchXMLMask', 'MatchXPath']
diff --git a/slixmpp/xmlstream/matcher/base.py b/slixmpp/xmlstream/matcher/base.py
new file mode 100644
index 00000000..4f15c63d
--- /dev/null
+++ b/slixmpp/xmlstream/matcher/base.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.xmlstream.matcher.base
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+
+class MatcherBase(object):
+
+ """
+ Base class for stanza matchers. Stanza matchers are used to pick
+ stanzas out of the XML stream and pass them to the appropriate
+ stream handlers.
+
+ :param criteria: Object to compare some aspect of a stanza against.
+ """
+
+ def __init__(self, criteria):
+ self._criteria = criteria
+
+ def match(self, xml):
+ """Check if a stanza matches the stored criteria.
+
+ Meant to be overridden.
+ """
+ return False
diff --git a/slixmpp/xmlstream/matcher/id.py b/slixmpp/xmlstream/matcher/id.py
new file mode 100644
index 00000000..ddef75dc
--- /dev/null
+++ b/slixmpp/xmlstream/matcher/id.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.xmlstream.matcher.id
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+from slixmpp.xmlstream.matcher.base import MatcherBase
+
+
+class MatcherId(MatcherBase):
+
+ """
+ The ID matcher selects stanzas that have the same stanza 'id'
+ interface value as the desired ID.
+ """
+
+ def match(self, xml):
+ """Compare the given stanza's ``'id'`` attribute to the stored
+ ``id`` value.
+
+ :param xml: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase`
+ stanza to compare against.
+ """
+ return xml['id'] == self._criteria
diff --git a/slixmpp/xmlstream/matcher/idsender.py b/slixmpp/xmlstream/matcher/idsender.py
new file mode 100644
index 00000000..79f73911
--- /dev/null
+++ b/slixmpp/xmlstream/matcher/idsender.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.xmlstream.matcher.id
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+from slixmpp.xmlstream.matcher.base import MatcherBase
+
+
+class MatchIDSender(MatcherBase):
+
+ """
+ The IDSender matcher selects stanzas that have the same stanza 'id'
+ interface value as the desired ID, and that the 'from' value is one
+ of a set of approved entities that can respond to a request.
+ """
+
+ def match(self, xml):
+ """Compare the given stanza's ``'id'`` attribute to the stored
+ ``id`` value, and verify the sender's JID.
+
+ :param xml: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase`
+ stanza to compare against.
+ """
+
+ selfjid = self._criteria['self']
+ peerjid = self._criteria['peer']
+
+ allowed = {}
+ allowed[''] = True
+ allowed[selfjid.bare] = True
+ allowed[selfjid.host] = True
+ allowed[peerjid.full] = True
+ allowed[peerjid.bare] = True
+ allowed[peerjid.host] = True
+
+ _from = xml['from']
+
+ try:
+ return xml['id'] == self._criteria['id'] and allowed[_from]
+ except KeyError:
+ return False
diff --git a/slixmpp/xmlstream/matcher/many.py b/slixmpp/xmlstream/matcher/many.py
new file mode 100644
index 00000000..ef6a64d3
--- /dev/null
+++ b/slixmpp/xmlstream/matcher/many.py
@@ -0,0 +1,40 @@
+"""
+ Slixmpp: The Slick XMPP Library
+ Copyright (C) 2010 Nathanael C. Fritz
+ This file is part of Slixmpp.
+
+ See the file LICENSE for copying permission.
+"""
+
+from slixmpp.xmlstream.matcher.base import MatcherBase
+
+
+class MatchMany(MatcherBase):
+
+ """
+ The MatchMany matcher may compare a stanza against multiple
+ criteria. It is essentially an OR relation combining multiple
+ matchers.
+
+ Each of the criteria must implement a match() method.
+
+ Methods:
+ match -- Overrides MatcherBase.match.
+ """
+
+ def match(self, xml):
+ """
+ Match a stanza against multiple criteria. The match is successful
+ if one of the criteria matches.
+
+ Each of the criteria must implement a match() method.
+
+ Overrides MatcherBase.match.
+
+ Arguments:
+ xml -- The stanza object to compare against.
+ """
+ for m in self._criteria:
+ if m.match(xml):
+ return True
+ return False
diff --git a/slixmpp/xmlstream/matcher/stanzapath.py b/slixmpp/xmlstream/matcher/stanzapath.py
new file mode 100644
index 00000000..c9f245e1
--- /dev/null
+++ b/slixmpp/xmlstream/matcher/stanzapath.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.xmlstream.matcher.stanzapath
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+from slixmpp.xmlstream.matcher.base import MatcherBase
+from slixmpp.xmlstream.stanzabase import fix_ns
+
+
+class StanzaPath(MatcherBase):
+
+ """
+ The StanzaPath matcher selects stanzas that match a given "stanza path",
+ which is similar to a normal XPath except that it uses the interfaces and
+ plugins of the stanza instead of the actual, underlying XML.
+
+ :param criteria: Object to compare some aspect of a stanza against.
+ """
+
+ def __init__(self, criteria):
+ self._criteria = fix_ns(criteria, split=True,
+ propagate_ns=False,
+ default_ns='jabber:client')
+ self._raw_criteria = criteria
+
+ def match(self, stanza):
+ """
+ Compare a stanza against a "stanza path". A stanza path is similar to
+ an XPath expression, but uses the stanza's interfaces and plugins
+ instead of the underlying XML. See the documentation for the stanza
+ :meth:`~slixmpp.xmlstream.stanzabase.ElementBase.match()` method
+ for more information.
+
+ :param stanza: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase`
+ stanza to compare against.
+ """
+ return stanza.match(self._criteria) or stanza.match(self._raw_criteria)
diff --git a/slixmpp/xmlstream/matcher/xmlmask.py b/slixmpp/xmlstream/matcher/xmlmask.py
new file mode 100644
index 00000000..7e26abe2
--- /dev/null
+++ b/slixmpp/xmlstream/matcher/xmlmask.py
@@ -0,0 +1,117 @@
+"""
+ Slixmpp: The Slick XMPP Library
+ Copyright (C) 2010 Nathanael C. Fritz
+ This file is part of Slixmpp.
+
+ See the file LICENSE for copying permission.
+"""
+
+import logging
+
+from xml.parsers.expat import ExpatError
+
+from slixmpp.xmlstream.stanzabase import ET
+from slixmpp.xmlstream.matcher.base import MatcherBase
+
+
+log = logging.getLogger(__name__)
+
+
+class MatchXMLMask(MatcherBase):
+
+ """
+ The XMLMask matcher selects stanzas whose XML matches a given
+ XML pattern, or mask. For example, message stanzas with body elements
+ could be matched using the mask:
+
+ .. code-block:: xml
+
+ <message xmlns="jabber:client"><body /></message>
+
+ Use of XMLMask is discouraged, and
+ :class:`~slixmpp.xmlstream.matcher.xpath.MatchXPath` or
+ :class:`~slixmpp.xmlstream.matcher.stanzapath.StanzaPath`
+ should be used instead.
+
+ :param criteria: Either an :class:`~xml.etree.ElementTree.Element` XML
+ object or XML string to use as a mask.
+ """
+
+ def __init__(self, criteria, default_ns='jabber:client'):
+ MatcherBase.__init__(self, criteria)
+ if isinstance(criteria, str):
+ self._criteria = ET.fromstring(self._criteria)
+ self.default_ns = default_ns
+
+ def setDefaultNS(self, ns):
+ """Set the default namespace to use during comparisons.
+
+ :param ns: The new namespace to use as the default.
+ """
+ self.default_ns = ns
+
+ def match(self, xml):
+ """Compare a stanza object or XML object against the stored XML mask.
+
+ Overrides MatcherBase.match.
+
+ :param xml: The stanza object or XML object to compare against.
+ """
+ if hasattr(xml, 'xml'):
+ xml = xml.xml
+ return self._mask_cmp(xml, self._criteria, True)
+
+ def _mask_cmp(self, source, mask, use_ns=False, default_ns='__no_ns__'):
+ """Compare an XML object against an XML mask.
+
+ :param source: The :class:`~xml.etree.ElementTree.Element` XML object
+ to compare against the mask.
+ :param mask: The :class:`~xml.etree.ElementTree.Element` XML object
+ serving as the mask.
+ :param use_ns: Indicates if namespaces should be respected during
+ the comparison.
+ :default_ns: The default namespace to apply to elements that
+ do not have a specified namespace.
+ Defaults to ``"__no_ns__"``.
+ """
+ if source is None:
+ # If the element was not found. May happend during recursive calls.
+ return False
+
+ # Convert the mask to an XML object if it is a string.
+ if not hasattr(mask, 'attrib'):
+ try:
+ mask = ET.fromstring(mask)
+ except ExpatError:
+ log.warning("Expat error: %s\nIn parsing: %s", '', mask)
+
+ mask_ns_tag = "{%s}%s" % (self.default_ns, mask.tag)
+ if source.tag not in [mask.tag, mask_ns_tag]:
+ return False
+
+ # If the mask includes text, compare it.
+ if mask.text and source.text and \
+ source.text.strip() != mask.text.strip():
+ return False
+
+ # Compare attributes. The stanza must include the attributes
+ # defined by the mask, but may include others.
+ for name, value in mask.attrib.items():
+ if source.attrib.get(name, "__None__") != value:
+ return False
+
+ # Recursively check subelements.
+ matched_elements = {}
+ for subelement in mask:
+ matched = False
+ for other in source.findall(subelement.tag):
+ matched_elements[other] = False
+ if self._mask_cmp(other, subelement, use_ns):
+ if not matched_elements.get(other, False):
+ matched_elements[other] = True
+ matched = True
+ if not matched:
+ return False
+
+ # Everything matches.
+ return True
diff --git a/slixmpp/xmlstream/matcher/xpath.py b/slixmpp/xmlstream/matcher/xpath.py
new file mode 100644
index 00000000..31ab1b8c
--- /dev/null
+++ b/slixmpp/xmlstream/matcher/xpath.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.xmlstream.matcher.xpath
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+from slixmpp.xmlstream.stanzabase import ET, fix_ns
+from slixmpp.xmlstream.matcher.base import MatcherBase
+
+
+class MatchXPath(MatcherBase):
+
+ """
+ The XPath matcher selects stanzas whose XML contents matches a given
+ XPath expression.
+
+ .. warning::
+
+ Using this matcher may not produce expected behavior when using
+ attribute selectors. For Python 2.6 and 3.1, the ElementTree
+ :meth:`~xml.etree.ElementTree.Element.find()` method does
+ not support the use of attribute selectors. If you need to
+ support Python 2.6 or 3.1, it might be more useful to use a
+ :class:`~slixmpp.xmlstream.matcher.stanzapath.StanzaPath` matcher.
+
+ If the value of :data:`IGNORE_NS` is set to ``True``, then XPath
+ expressions will be matched without using namespaces.
+ """
+
+ def __init__(self, criteria):
+ self._criteria = fix_ns(criteria)
+
+ def match(self, xml):
+ """
+ Compare a stanza's XML contents to an XPath expression.
+
+ If the value of :data:`IGNORE_NS` is set to ``True``, then XPath
+ expressions will be matched without using namespaces.
+
+ .. warning::
+
+ In Python 2.6 and 3.1 the ElementTree
+ :meth:`~xml.etree.ElementTree.Element.find()` method does not
+ support attribute selectors in the XPath expression.
+
+ :param xml: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase`
+ stanza to compare against.
+ """
+ if hasattr(xml, 'xml'):
+ xml = xml.xml
+ x = ET.Element('x')
+ x.append(xml)
+
+ return x.find(self._criteria) is not None
diff --git a/slixmpp/xmlstream/resolver.py b/slixmpp/xmlstream/resolver.py
new file mode 100644
index 00000000..ed899569
--- /dev/null
+++ b/slixmpp/xmlstream/resolver.py
@@ -0,0 +1,333 @@
+# -*- encoding: utf-8 -*-
+
+"""
+ slixmpp.xmlstream.dns
+ ~~~~~~~~~~~~~~~~~~~~~~~
+
+ :copyright: (c) 2012 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+import socket
+import logging
+import random
+
+
+log = logging.getLogger(__name__)
+
+
+#: Global flag indicating the availability of the ``dnspython`` package.
+#: Installing ``dnspython`` can be done via:
+#:
+#: .. code-block:: sh
+#:
+#: pip install dnspython
+#:
+#: For Python3, installation may require installing from source using
+#: the ``python3`` branch:
+#:
+#: .. code-block:: sh
+#:
+#: git clone http://github.com/rthalley/dnspython
+#: cd dnspython
+#: git checkout python3
+#: python3 setup.py install
+DNSPYTHON_AVAILABLE = False
+try:
+ import dns.resolver
+ DNSPYTHON_AVAILABLE = True
+except ImportError as e:
+ log.debug("Could not find dnspython package. " + \
+ "Not all features will be available")
+
+
+def default_resolver():
+ """Return a basic DNS resolver object.
+
+ :returns: A :class:`dns.resolver.Resolver` object if dnspython
+ is available. Otherwise, ``None``.
+ """
+ if DNSPYTHON_AVAILABLE:
+ return dns.resolver.get_default_resolver()
+ return None
+
+
+def resolve(host, port=None, service=None, proto='tcp',
+ resolver=None, use_ipv6=True, use_dnspython=True):
+ """Peform DNS resolution for a given hostname.
+
+ Resolution may perform SRV record lookups if a service and protocol
+ are specified. The returned addresses will be sorted according to
+ the SRV priorities and weights.
+
+ If no resolver is provided, the dnspython resolver will be used if
+ available. Otherwise the built-in socket facilities will be used,
+ but those do not provide SRV support.
+
+ If SRV records were used, queries to resolve alternative hosts will
+ be made as needed instead of all at once.
+
+ :param host: The hostname to resolve.
+ :param port: A default port to connect with. SRV records may
+ dictate use of a different port.
+ :param service: Optional SRV service name without leading underscore.
+ :param proto: Optional SRV protocol name without leading underscore.
+ :param resolver: Optionally provide a DNS resolver object that has
+ been custom configured.
+ :param use_ipv6: Optionally control the use of IPv6 in situations
+ where it is either not available, or performance
+ is degraded. Defaults to ``True``.
+ :param use_dnspython: Optionally control if dnspython is used to make
+ the DNS queries instead of the built-in DNS
+ library.
+
+ :type host: string
+ :type port: int
+ :type service: string
+ :type proto: string
+ :type resolver: :class:`dns.resolver.Resolver`
+ :type use_ipv6: bool
+ :type use_dnspython: bool
+
+ :return: An iterable of IP address, port pairs in the order
+ dictated by SRV priorities and weights, if applicable.
+ """
+
+ if not use_dnspython:
+ if DNSPYTHON_AVAILABLE:
+ log.debug("DNS: Not using dnspython, but dnspython is installed.")
+ else:
+ log.debug("DNS: Not using dnspython.")
+
+ if not use_ipv6:
+ log.debug("DNS: Use of IPv6 has been disabled.")
+
+ if resolver is None and DNSPYTHON_AVAILABLE and use_dnspython:
+ resolver = dns.resolver.get_default_resolver()
+
+ # An IPv6 literal is allowed to be enclosed in square brackets, but
+ # the brackets must be stripped in order to process the literal;
+ # otherwise, things break.
+ host = host.strip('[]')
+
+ try:
+ # If `host` is an IPv4 literal, we can return it immediately.
+ ipv4 = socket.inet_aton(host)
+ yield (host, host, port)
+ except socket.error:
+ pass
+
+ if use_ipv6:
+ try:
+ # Likewise, If `host` is an IPv6 literal, we can return
+ # it immediately.
+ if hasattr(socket, 'inet_pton'):
+ ipv6 = socket.inet_pton(socket.AF_INET6, host)
+ yield (host, host, port)
+ except (socket.error, ValueError):
+ pass
+
+ # If no service was provided, then we can just do A/AAAA lookups on the
+ # provided host. Otherwise we need to get an ordered list of hosts to
+ # resolve based on SRV records.
+ if not service:
+ hosts = [(host, port)]
+ else:
+ hosts = get_SRV(host, port, service, proto,
+ resolver=resolver,
+ use_dnspython=use_dnspython)
+
+ for host, port in hosts:
+ results = []
+ if host == 'localhost':
+ if use_ipv6:
+ results.append((host, '::1', port))
+ results.append((host, '127.0.0.1', port))
+ if use_ipv6:
+ for address in get_AAAA(host, resolver=resolver,
+ use_dnspython=use_dnspython):
+ results.append((host, address, port))
+ for address in get_A(host, resolver=resolver,
+ use_dnspython=use_dnspython):
+ results.append((host, address, port))
+
+ for host, address, port in results:
+ yield host, address, port
+
+
+def get_A(host, resolver=None, use_dnspython=True):
+ """Lookup DNS A records for a given host.
+
+ If ``resolver`` is not provided, or is ``None``, then resolution will
+ be performed using the built-in :mod:`socket` module.
+
+ :param host: The hostname to resolve for A record IPv4 addresses.
+ :param resolver: Optional DNS resolver object to use for the query.
+ :param use_dnspython: Optionally control if dnspython is used to make
+ the DNS queries instead of the built-in DNS
+ library.
+
+ :type host: string
+ :type resolver: :class:`dns.resolver.Resolver` or ``None``
+ :type use_dnspython: bool
+
+ :return: A list of IPv4 literals.
+ """
+ log.debug("DNS: Querying %s for A records." % host)
+
+ # If not using dnspython, attempt lookup using the OS level
+ # getaddrinfo() method.
+ if resolver is None or not use_dnspython:
+ try:
+ recs = socket.getaddrinfo(host, None, socket.AF_INET,
+ socket.SOCK_STREAM)
+ return [rec[4][0] for rec in recs]
+ except socket.gaierror:
+ log.debug("DNS: Error retreiving A address info for %s." % host)
+ return []
+
+ # Using dnspython:
+ try:
+ recs = resolver.query(host, dns.rdatatype.A)
+ return [rec.to_text() for rec in recs]
+ except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
+ log.debug("DNS: No A records for %s" % host)
+ return []
+ except dns.exception.Timeout:
+ log.debug("DNS: A record resolution timed out for %s" % host)
+ return []
+ except dns.exception.DNSException as e:
+ log.debug("DNS: Error querying A records for %s" % host)
+ log.exception(e)
+ return []
+
+
+def get_AAAA(host, resolver=None, use_dnspython=True):
+ """Lookup DNS AAAA records for a given host.
+
+ If ``resolver`` is not provided, or is ``None``, then resolution will
+ be performed using the built-in :mod:`socket` module.
+
+ :param host: The hostname to resolve for AAAA record IPv6 addresses.
+ :param resolver: Optional DNS resolver object to use for the query.
+ :param use_dnspython: Optionally control if dnspython is used to make
+ the DNS queries instead of the built-in DNS
+ library.
+
+ :type host: string
+ :type resolver: :class:`dns.resolver.Resolver` or ``None``
+ :type use_dnspython: bool
+
+ :return: A list of IPv6 literals.
+ """
+ log.debug("DNS: Querying %s for AAAA records." % host)
+
+ # If not using dnspython, attempt lookup using the OS level
+ # getaddrinfo() method.
+ if resolver is None or not use_dnspython:
+ if not socket.has_ipv6:
+ log.debug("Unable to query %s for AAAA records: IPv6 is not supported", host)
+ return []
+ try:
+ recs = socket.getaddrinfo(host, None, socket.AF_INET6,
+ socket.SOCK_STREAM)
+ return [rec[4][0] for rec in recs]
+ except (OSError, socket.gaierror):
+ log.debug("DNS: Error retreiving AAAA address " + \
+ "info for %s." % host)
+ return []
+
+ # Using dnspython:
+ try:
+ recs = resolver.query(host, dns.rdatatype.AAAA)
+ return [rec.to_text() for rec in recs]
+ except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
+ log.debug("DNS: No AAAA records for %s" % host)
+ return []
+ except dns.exception.Timeout:
+ log.debug("DNS: AAAA record resolution timed out for %s" % host)
+ return []
+ except dns.exception.DNSException as e:
+ log.debug("DNS: Error querying AAAA records for %s" % host)
+ log.exception(e)
+ return []
+
+
+def get_SRV(host, port, service, proto='tcp', resolver=None, use_dnspython=True):
+ """Perform SRV record resolution for a given host.
+
+ .. note::
+
+ This function requires the use of the ``dnspython`` package. Calling
+ :func:`get_SRV` without ``dnspython`` will return the provided host
+ and port without performing any DNS queries.
+
+ :param host: The hostname to resolve.
+ :param port: A default port to connect with. SRV records may
+ dictate use of a different port.
+ :param service: Optional SRV service name without leading underscore.
+ :param proto: Optional SRV protocol name without leading underscore.
+ :param resolver: Optionally provide a DNS resolver object that has
+ been custom configured.
+
+ :type host: string
+ :type port: int
+ :type service: string
+ :type proto: string
+ :type resolver: :class:`dns.resolver.Resolver`
+
+ :return: A list of hostname, port pairs in the order dictacted
+ by SRV priorities and weights.
+ """
+ if resolver is None or not use_dnspython:
+ log.warning("DNS: dnspython not found. Can not use SRV lookup.")
+ return [(host, port)]
+
+ log.debug("DNS: Querying SRV records for %s" % host)
+ try:
+ recs = resolver.query('_%s._%s.%s' % (service, proto, host),
+ dns.rdatatype.SRV)
+ except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
+ log.debug("DNS: No SRV records for %s." % host)
+ return [(host, port)]
+ except dns.exception.Timeout:
+ log.debug("DNS: SRV record resolution timed out for %s." % host)
+ return [(host, port)]
+ except dns.exception.DNSException as e:
+ log.debug("DNS: Error querying SRV records for %s." % host)
+ log.exception(e)
+ return [(host, port)]
+
+ if len(recs) == 1 and recs[0].target == '.':
+ return [(host, port)]
+
+ answers = {}
+ for rec in recs:
+ if rec.priority not in answers:
+ answers[rec.priority] = []
+ if rec.weight == 0:
+ answers[rec.priority].insert(0, rec)
+ else:
+ answers[rec.priority].append(rec)
+
+ sorted_recs = []
+ for priority in sorted(answers.keys()):
+ while answers[priority]:
+ running_sum = 0
+ sums = {}
+ for rec in answers[priority]:
+ running_sum += rec.weight
+ sums[running_sum] = rec
+
+ selected = random.randint(0, running_sum + 1)
+ for running_sum in sums:
+ if running_sum >= selected:
+ rec = sums[running_sum]
+ host = rec.target.to_text()
+ if host.endswith('.'):
+ host = host[:-1]
+ sorted_recs.append((host, rec.port))
+ answers[priority].remove(rec)
+ break
+
+ return sorted_recs
diff --git a/slixmpp/xmlstream/scheduler.py b/slixmpp/xmlstream/scheduler.py
new file mode 100644
index 00000000..137230b6
--- /dev/null
+++ b/slixmpp/xmlstream/scheduler.py
@@ -0,0 +1,250 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.xmlstream.scheduler
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ This module provides a task scheduler that works better
+ with Slixmpp's threading usage than the stock version.
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+import time
+import threading
+import logging
+import itertools
+
+from slixmpp.util import Queue, QueueEmpty
+
+
+#: The time in seconds to wait for events from the event queue, and also the
+#: time between checks for the process stop signal.
+WAIT_TIMEOUT = 1.0
+
+
+log = logging.getLogger(__name__)
+
+
+class Task(object):
+
+ """
+ A scheduled task that will be executed by the scheduler
+ after a given time interval has passed.
+
+ :param string name: The name of the task.
+ :param int seconds: The number of seconds to wait before executing.
+ :param callback: The function to execute.
+ :param tuple args: The arguments to pass to the callback.
+ :param dict kwargs: The keyword arguments to pass to the callback.
+ :param bool repeat: Indicates if the task should repeat.
+ Defaults to ``False``.
+ :param pointer: A pointer to an event queue for queuing callback
+ execution instead of executing immediately.
+ """
+
+ def __init__(self, name, seconds, callback, args=None,
+ kwargs=None, repeat=False, qpointer=None):
+ #: The name of the task.
+ self.name = name
+
+ #: The number of seconds to wait before executing.
+ self.seconds = seconds
+
+ #: The function to execute once enough time has passed.
+ self.callback = callback
+
+ #: The arguments to pass to :attr:`callback`.
+ self.args = args or tuple()
+
+ #: The keyword arguments to pass to :attr:`callback`.
+ self.kwargs = kwargs or {}
+
+ #: Indicates if the task should repeat after executing,
+ #: using the same :attr:`seconds` delay.
+ self.repeat = repeat
+
+ #: The time when the task should execute next.
+ self.next = time.time() + self.seconds
+
+ #: The main event queue, which allows for callbacks to
+ #: be queued for execution instead of executing immediately.
+ self.qpointer = qpointer
+
+ def run(self):
+ """Execute the task's callback.
+
+ If an event queue was supplied, place the callback in the queue;
+ otherwise, execute the callback immediately.
+ """
+ if self.qpointer is not None:
+ self.qpointer.put(('schedule', self.callback,
+ self.args, self.kwargs, self.name))
+ else:
+ self.callback(*self.args, **self.kwargs)
+ self.reset()
+ return self.repeat
+
+ def reset(self):
+ """Reset the task's timer so that it will repeat."""
+ self.next = time.time() + self.seconds
+
+
+class Scheduler(object):
+
+ """
+ A threaded scheduler that allows for updates mid-execution unlike the
+ scheduler in the standard library.
+
+ Based on: http://docs.python.org/library/sched.html#module-sched
+
+ :param parentstop: An :class:`~threading.Event` to signal stopping
+ the scheduler.
+ """
+
+ def __init__(self, parentstop=None):
+ #: A queue for storing tasks
+ self.addq = Queue()
+
+ #: A list of tasks in order of execution time.
+ self.schedule = []
+
+ #: If running in threaded mode, this will be the thread processing
+ #: the schedule.
+ self.thread = None
+
+ #: A flag indicating that the scheduler is running.
+ self.run = False
+
+ #: An :class:`~threading.Event` instance for signalling to stop
+ #: the scheduler.
+ self.stop = parentstop
+
+ #: Lock for accessing the task queue.
+ self.schedule_lock = threading.RLock()
+
+ #: The time in seconds to wait for events from the event queue,
+ #: and also the time between checks for the process stop signal.
+ self.wait_timeout = WAIT_TIMEOUT
+
+ def process(self, threaded=True, daemon=False):
+ """Begin accepting and processing scheduled tasks.
+
+ :param bool threaded: Indicates if the scheduler should execute
+ in its own thread. Defaults to ``True``.
+ """
+ if threaded:
+ self.thread = threading.Thread(name='scheduler_process',
+ target=self._process)
+ self.thread.daemon = daemon
+ self.thread.start()
+ else:
+ self._process()
+
+ def _process(self):
+ """Process scheduled tasks."""
+ self.run = True
+ try:
+ while self.run and not self.stop.is_set():
+ updated = False
+ if self.schedule:
+ wait = self.schedule[0].next - time.time()
+ else:
+ wait = self.wait_timeout
+ try:
+ if wait <= 0.0:
+ newtask = self.addq.get(False)
+ else:
+ newtask = None
+ while self.run and \
+ not self.stop.is_set() and \
+ newtask is None and \
+ wait > 0:
+ try:
+ newtask = self.addq.get(True, min(wait, self.wait_timeout))
+ except QueueEmpty: # Nothing to add, nothing to do. Check run flags and continue waiting.
+ wait -= self.wait_timeout
+ except QueueEmpty: # Time to run some tasks, and no new tasks to add.
+ self.schedule_lock.acquire()
+ # select only those tasks which are to be executed now
+ relevant = itertools.takewhile(
+ lambda task: time.time() >= task.next, self.schedule)
+ # run the tasks and keep the return value in a tuple
+ status = map(lambda task: (task, task.run()), relevant)
+ # remove non-repeating tasks
+ for task, doRepeat in status:
+ if not doRepeat:
+ try:
+ self.schedule.remove(task)
+ except ValueError:
+ pass
+ else:
+ # only need to resort tasks if a repeated task has
+ # been kept in the list.
+ updated = True
+ else: # Add new task
+ self.schedule_lock.acquire()
+ if newtask is not None:
+ self.schedule.append(newtask)
+ updated = True
+ finally:
+ if updated:
+ self.schedule.sort(key=lambda task: task.next)
+ self.schedule_lock.release()
+ except KeyboardInterrupt:
+ self.run = False
+ except SystemExit:
+ self.run = False
+ log.debug("Quitting Scheduler thread")
+
+ def add(self, name, seconds, callback, args=None,
+ kwargs=None, repeat=False, qpointer=None):
+ """Schedule a new task.
+
+ :param string name: The name of the task.
+ :param int seconds: The number of seconds to wait before executing.
+ :param callback: The function to execute.
+ :param tuple args: The arguments to pass to the callback.
+ :param dict kwargs: The keyword arguments to pass to the callback.
+ :param bool repeat: Indicates if the task should repeat.
+ Defaults to ``False``.
+ :param pointer: A pointer to an event queue for queuing callback
+ execution instead of executing immediately.
+ """
+ try:
+ self.schedule_lock.acquire()
+ for task in self.schedule:
+ if task.name == name:
+ raise ValueError("Key %s already exists" % name)
+
+ self.addq.put(Task(name, seconds, callback, args,
+ kwargs, repeat, qpointer))
+ except:
+ raise
+ finally:
+ self.schedule_lock.release()
+
+ def remove(self, name):
+ """Remove a scheduled task ahead of schedule, and without
+ executing it.
+
+ :param string name: The name of the task to remove.
+ """
+ try:
+ self.schedule_lock.acquire()
+ the_task = None
+ for task in self.schedule:
+ if task.name == name:
+ the_task = task
+ if the_task is not None:
+ self.schedule.remove(the_task)
+ except:
+ raise
+ finally:
+ self.schedule_lock.release()
+
+ def quit(self):
+ """Shutdown the scheduler."""
+ self.run = False
diff --git a/slixmpp/xmlstream/stanzabase.py b/slixmpp/xmlstream/stanzabase.py
new file mode 100644
index 00000000..0921dde4
--- /dev/null
+++ b/slixmpp/xmlstream/stanzabase.py
@@ -0,0 +1,1644 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.xmlstream.stanzabase
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ module implements a wrapper layer for XML objects
+ that allows them to be treated like dictionaries.
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+from __future__ import with_statement, unicode_literals
+
+import copy
+import logging
+import weakref
+from xml.etree import cElementTree as ET
+
+from slixmpp.xmlstream import JID
+from slixmpp.xmlstream.tostring import tostring
+from slixmpp.thirdparty import OrderedDict
+
+
+log = logging.getLogger(__name__)
+
+
+# Used to check if an argument is an XML object.
+XML_TYPE = type(ET.Element('xml'))
+
+
+XML_NS = 'http://www.w3.org/XML/1998/namespace'
+
+
+def register_stanza_plugin(stanza, plugin, iterable=False, overrides=False):
+ """
+ Associate a stanza object as a plugin for another stanza.
+
+ >>> from slixmpp.xmlstream import register_stanza_plugin
+ >>> register_stanza_plugin(Iq, CustomStanza)
+
+ Plugin stanzas marked as iterable will be included in the list of
+ substanzas for the parent, using ``parent['substanzas']``. If the
+ attribute ``plugin_multi_attrib`` was defined for the plugin, then
+ the substanza set can be filtered to only instances of the plugin
+ class. For example, given a plugin class ``Foo`` with
+ ``plugin_multi_attrib = 'foos'`` then::
+
+ parent['foos']
+
+ would return a collection of all ``Foo`` substanzas.
+
+ :param class stanza: The class of the parent stanza.
+ :param class plugin: The class of the plugin stanza.
+ :param bool iterable: Indicates if the plugin stanza should be
+ included in the parent stanza's iterable
+ ``'substanzas'`` interface results.
+ :param bool overrides: Indicates if the plugin should be allowed
+ to override the interface handlers for
+ the parent stanza, based on the plugin's
+ ``overrides`` field.
+
+ .. versionadded:: 1.0-Beta1
+ Made ``register_stanza_plugin`` the default name. The prior
+ ``registerStanzaPlugin`` function name remains as an alias.
+ """
+ tag = "{%s}%s" % (plugin.namespace, plugin.name)
+
+ # Prevent weird memory reference gotchas by ensuring
+ # that the parent stanza class has its own set of
+ # plugin info maps and is not using the mappings from
+ # an ancestor class (like ElementBase).
+ plugin_info = ('plugin_attrib_map', 'plugin_tag_map',
+ 'plugin_iterables', 'plugin_overrides')
+ for attr in plugin_info:
+ info = getattr(stanza, attr)
+ setattr(stanza, attr, info.copy())
+
+ stanza.plugin_attrib_map[plugin.plugin_attrib] = plugin
+ stanza.plugin_tag_map[tag] = plugin
+
+ if iterable:
+ stanza.plugin_iterables.add(plugin)
+ if plugin.plugin_multi_attrib:
+ multiplugin = multifactory(plugin, plugin.plugin_multi_attrib)
+ register_stanza_plugin(stanza, multiplugin)
+ if overrides:
+ for interface in plugin.overrides:
+ stanza.plugin_overrides[interface] = plugin.plugin_attrib
+
+
+# To maintain backwards compatibility for now, preserve the camel case name.
+registerStanzaPlugin = register_stanza_plugin
+
+
+def multifactory(stanza, plugin_attrib):
+ """
+ Returns a ElementBase class for handling reoccuring child stanzas
+ """
+
+ def plugin_filter(self):
+ return lambda x: isinstance(x, self._multistanza)
+
+ def plugin_lang_filter(self, lang):
+ return lambda x: isinstance(x, self._multistanza) and \
+ x['lang'] == lang
+
+ class Multi(ElementBase):
+ """
+ Template class for multifactory
+ """
+ def setup(self, xml=None):
+ self.xml = ET.Element('')
+
+ def get_multi(self, lang=None):
+ parent = self.parent()
+ if not lang or lang == '*':
+ res = filter(plugin_filter(self), parent)
+ else:
+ res = filter(plugin_filter(self, lang), parent)
+ return list(res)
+
+ def set_multi(self, val, lang=None):
+ parent = self.parent()
+ del_multi = getattr(self, 'del_%s' % plugin_attrib)
+ del_multi(lang)
+ for sub in val:
+ parent.append(sub)
+
+ def del_multi(self, lang=None):
+ parent = self.parent()
+ if not lang or lang == '*':
+ res = filter(plugin_filter(self), parent)
+ else:
+ res = filter(plugin_filter(self, lang), parent)
+ res = list(res)
+ if not res:
+ del parent.plugins[(plugin_attrib, None)]
+ parent.loaded_plugins.remove(plugin_attrib)
+ try:
+ parent.xml.remove(self.xml)
+ except ValueError:
+ pass
+ else:
+ for stanza in list(res):
+ parent.iterables.remove(stanza)
+ parent.xml.remove(stanza.xml)
+
+ Multi.is_extension = True
+ Multi.plugin_attrib = plugin_attrib
+ Multi._multistanza = stanza
+ Multi.interfaces = set([plugin_attrib])
+ Multi.lang_interfaces = set([plugin_attrib])
+ setattr(Multi, "get_%s" % plugin_attrib, get_multi)
+ setattr(Multi, "set_%s" % plugin_attrib, set_multi)
+ setattr(Multi, "del_%s" % plugin_attrib, del_multi)
+ return Multi
+
+
+def fix_ns(xpath, split=False, propagate_ns=True, default_ns=''):
+ """Apply the stanza's namespace to elements in an XPath expression.
+
+ :param string xpath: The XPath expression to fix with namespaces.
+ :param bool split: Indicates if the fixed XPath should be left as a
+ list of element names with namespaces. Defaults to
+ False, which returns a flat string path.
+ :param bool propagate_ns: Overrides propagating parent element
+ namespaces to child elements. Useful if
+ you wish to simply split an XPath that has
+ non-specified namespaces, and child and
+ parent namespaces are known not to always
+ match. Defaults to True.
+ """
+ fixed = []
+ # Split the XPath into a series of blocks, where a block
+ # is started by an element with a namespace.
+ ns_blocks = xpath.split('{')
+ for ns_block in ns_blocks:
+ if '}' in ns_block:
+ # Apply the found namespace to following elements
+ # that do not have namespaces.
+ namespace = ns_block.split('}')[0]
+ elements = ns_block.split('}')[1].split('/')
+ else:
+ # Apply the stanza's namespace to the following
+ # elements since no namespace was provided.
+ namespace = default_ns
+ elements = ns_block.split('/')
+
+ for element in elements:
+ if element:
+ # Skip empty entry artifacts from splitting.
+ if propagate_ns and element[0] != '*':
+ tag = '{%s}%s' % (namespace, element)
+ else:
+ tag = element
+ fixed.append(tag)
+ if split:
+ return fixed
+ return '/'.join(fixed)
+
+
+class ElementBase(object):
+
+ """
+ The core of Slixmpp's stanza XML manipulation and handling is provided
+ by ElementBase. ElementBase wraps XML cElementTree objects and enables
+ access to the XML contents through dictionary syntax, similar in style
+ to the Ruby XMPP library Blather's stanza implementation.
+
+ Stanzas are defined by their name, namespace, and interfaces. For
+ example, a simplistic Message stanza could be defined as::
+
+ >>> class Message(ElementBase):
+ ... name = "message"
+ ... namespace = "jabber:client"
+ ... interfaces = set(('to', 'from', 'type', 'body'))
+ ... sub_interfaces = set(('body',))
+
+ The resulting Message stanza's contents may be accessed as so::
+
+ >>> message['to'] = "user@example.com"
+ >>> message['body'] = "Hi!"
+ >>> message['body']
+ "Hi!"
+ >>> del message['body']
+ >>> message['body']
+ ""
+
+ The interface values map to either custom access methods, stanza
+ XML attributes, or (if the interface is also in sub_interfaces) the
+ text contents of a stanza's subelement.
+
+ Custom access methods may be created by adding methods of the
+ form "getInterface", "setInterface", or "delInterface", where
+ "Interface" is the titlecase version of the interface name.
+
+ Stanzas may be extended through the use of plugins. A plugin
+ is simply a stanza that has a plugin_attrib value. For example::
+
+ >>> class MessagePlugin(ElementBase):
+ ... name = "custom_plugin"
+ ... namespace = "custom"
+ ... interfaces = set(('useful_thing', 'custom'))
+ ... plugin_attrib = "custom"
+
+ The plugin stanza class must be associated with its intended
+ container stanza by using register_stanza_plugin as so::
+
+ >>> register_stanza_plugin(Message, MessagePlugin)
+
+ The plugin may then be accessed as if it were built-in to the parent
+ stanza::
+
+ >>> message['custom']['useful_thing'] = 'foo'
+
+ If a plugin provides an interface that is the same as the plugin's
+ plugin_attrib value, then the plugin's interface may be assigned
+ directly from the parent stanza, as shown below, but retrieving
+ information will require all interfaces to be used, as so::
+
+ >>> # Same as using message['custom']['custom']
+ >>> message['custom'] = 'bar'
+ >>> # Must use all interfaces
+ >>> message['custom']['custom']
+ 'bar'
+
+ If the plugin sets :attr:`is_extension` to ``True``, then both setting
+ and getting an interface value that is the same as the plugin's
+ plugin_attrib value will work, as so::
+
+ >>> message['custom'] = 'bar' # Using is_extension=True
+ >>> message['custom']
+ 'bar'
+
+
+ :param xml: Initialize the stanza object with an existing XML object.
+ :param parent: Optionally specify a parent stanza object will
+ contain this substanza.
+ """
+
+ #: The XML tag name of the element, not including any namespace
+ #: prefixes. For example, an :class:`ElementBase` object for
+ #: ``<message />`` would use ``name = 'message'``.
+ name = 'stanza'
+
+ #: The XML namespace for the element. Given ``<foo xmlns="bar" />``,
+ #: then ``namespace = "bar"`` should be used. The default namespace
+ #: is ``jabber:client`` since this is being used in an XMPP library.
+ namespace = 'jabber:client'
+
+ #: For :class:`ElementBase` subclasses which are intended to be used
+ #: as plugins, the ``plugin_attrib`` value defines the plugin name.
+ #: Plugins may be accessed by using the ``plugin_attrib`` value as
+ #: the interface. An example using ``plugin_attrib = 'foo'``::
+ #:
+ #: register_stanza_plugin(Message, FooPlugin)
+ #: msg = Message()
+ #: msg['foo']['an_interface_from_the_foo_plugin']
+ plugin_attrib = 'plugin'
+
+ #: For :class:`ElementBase` subclasses that are intended to be an
+ #: iterable group of items, the ``plugin_multi_attrib`` value defines
+ #: an interface for the parent stanza which returns the entire group
+ #: of matching substanzas. So the following are equivalent::
+ #:
+ #: # Given stanza class Foo, with plugin_multi_attrib = 'foos'
+ #: parent['foos']
+ #: filter(isinstance(item, Foo), parent['substanzas'])
+ plugin_multi_attrib = ''
+
+ #: The set of keys that the stanza provides for accessing and
+ #: manipulating the underlying XML object. This set may be augmented
+ #: with the :attr:`plugin_attrib` value of any registered
+ #: stanza plugins.
+ interfaces = set(('type', 'to', 'from', 'id', 'payload'))
+
+ #: A subset of :attr:`interfaces` which maps interfaces to direct
+ #: subelements of the underlying XML object. Using this set, the text
+ #: of these subelements may be set, retrieved, or removed without
+ #: needing to define custom methods.
+ sub_interfaces = set()
+
+ #: A subset of :attr:`interfaces` which maps the presence of
+ #: subelements to boolean values. Using this set allows for quickly
+ #: checking for the existence of empty subelements like ``<required />``.
+ #:
+ #: .. versionadded:: 1.1
+ bool_interfaces = set()
+
+ #: .. versionadded:: 1.1.2
+ lang_interfaces = set()
+
+ #: In some cases you may wish to override the behaviour of one of the
+ #: parent stanza's interfaces. The ``overrides`` list specifies the
+ #: interface name and access method to be overridden. For example,
+ #: to override setting the parent's ``'condition'`` interface you
+ #: would use::
+ #:
+ #: overrides = ['set_condition']
+ #:
+ #: Getting and deleting the ``'condition'`` interface would not
+ #: be affected.
+ #:
+ #: .. versionadded:: 1.0-Beta5
+ overrides = []
+
+ #: If you need to add a new interface to an existing stanza, you
+ #: can create a plugin and set ``is_extension = True``. Be sure
+ #: to set the :attr:`plugin_attrib` value to the desired interface
+ #: name, and that it is the only interface listed in
+ #: :attr:`interfaces`. Requests for the new interface from the
+ #: parent stanza will be passed to the plugin directly.
+ #:
+ #: .. versionadded:: 1.0-Beta5
+ is_extension = False
+
+ #: A map of interface operations to the overriding functions.
+ #: For example, after overriding the ``set`` operation for
+ #: the interface ``body``, :attr:`plugin_overrides` would be::
+ #:
+ #: {'set_body': <some function>}
+ #:
+ #: .. versionadded: 1.0-Beta5
+ plugin_overrides = {}
+
+ #: A mapping of the :attr:`plugin_attrib` values of registered
+ #: plugins to their respective classes.
+ plugin_attrib_map = {}
+
+ #: A mapping of root element tag names (in ``'{namespace}elementname'``
+ #: format) to the plugin classes responsible for them.
+ plugin_tag_map = {}
+
+ #: The set of stanza classes that can be iterated over using
+ #: the 'substanzas' interface. Classes are added to this set
+ #: when registering a plugin with ``iterable=True``::
+ #:
+ #: register_stanza_plugin(DiscoInfo, DiscoItem, iterable=True)
+ #:
+ #: .. versionadded:: 1.0-Beta5
+ plugin_iterables = set()
+
+ #: A deprecated version of :attr:`plugin_iterables` that remains
+ #: for backward compatibility. It required a parent stanza to
+ #: know beforehand what stanza classes would be iterable::
+ #:
+ #: class DiscoItem(ElementBase):
+ #: ...
+ #:
+ #: class DiscoInfo(ElementBase):
+ #: subitem = (DiscoItem, )
+ #: ...
+ #:
+ #: .. deprecated:: 1.0-Beta5
+ subitem = set()
+
+ #: The default XML namespace: ``http://www.w3.org/XML/1998/namespace``.
+ xml_ns = XML_NS
+
+ def __init__(self, xml=None, parent=None):
+ self._index = 0
+
+ #: The underlying XML object for the stanza. It is a standard
+ #: :class:`xml.etree.cElementTree` object.
+ self.xml = xml
+
+ #: An ordered dictionary of plugin stanzas, mapped by their
+ #: :attr:`plugin_attrib` value.
+ self.plugins = OrderedDict()
+ self.loaded_plugins = set()
+
+ #: A list of child stanzas whose class is included in
+ #: :attr:`plugin_iterables`.
+ self.iterables = []
+
+ #: The name of the tag for the stanza's root element. It is the
+ #: same as calling :meth:`tag_name()` and is formatted as
+ #: ``'{namespace}elementname'``.
+ self.tag = self.tag_name()
+
+ #: A :class:`weakref.weakref` to the parent stanza, if there is one.
+ #: If not, then :attr:`parent` is ``None``.
+ self.parent = None
+ if parent is not None:
+ if not isinstance(parent, weakref.ReferenceType):
+ self.parent = weakref.ref(parent)
+ else:
+ self.parent = parent
+
+ if self.subitem is not None:
+ for sub in self.subitem:
+ self.plugin_iterables.add(sub)
+
+ if self.setup(xml):
+ # If we generated our own XML, then everything is ready.
+ return
+
+ # Initialize values using provided XML
+ for child in self.xml:
+ if child.tag in self.plugin_tag_map:
+ plugin_class = self.plugin_tag_map[child.tag]
+ self.init_plugin(plugin_class.plugin_attrib,
+ existing_xml=child,
+ reuse=False)
+
+ def setup(self, xml=None):
+ """Initialize the stanza's XML contents.
+
+ Will return ``True`` if XML was generated according to the stanza's
+ definition instead of building a stanza object from an existing
+ XML object.
+
+ :param xml: An existing XML object to use for the stanza's content
+ instead of generating new XML.
+ """
+ if self.xml is None:
+ self.xml = xml
+
+ last_xml = self.xml
+ if self.xml is None:
+ # Generate XML from the stanza definition
+ for ename in self.name.split('/'):
+ new = ET.Element("{%s}%s" % (self.namespace, ename))
+ if self.xml is None:
+ self.xml = new
+ else:
+ last_xml.append(new)
+ last_xml = new
+ if self.parent is not None:
+ self.parent().xml.append(self.xml)
+
+ # We had to generate XML
+ return True
+ else:
+ # We did not generate XML
+ return False
+
+ def enable(self, attrib, lang=None):
+ """Enable and initialize a stanza plugin.
+
+ Alias for :meth:`init_plugin`.
+
+ :param string attrib: The :attr:`plugin_attrib` value of the
+ plugin to enable.
+ """
+ return self.init_plugin(attrib, lang)
+
+ def _get_plugin(self, name, lang=None, check=False):
+ if lang is None:
+ lang = self.get_lang()
+
+ if name not in self.plugin_attrib_map:
+ return None
+
+ plugin_class = self.plugin_attrib_map[name]
+
+ if plugin_class.is_extension:
+ if (name, None) in self.plugins:
+ return self.plugins[(name, None)]
+ else:
+ return None if check else self.init_plugin(name, lang)
+ else:
+ if (name, lang) in self.plugins:
+ return self.plugins[(name, lang)]
+ else:
+ return None if check else self.init_plugin(name, lang)
+
+ def init_plugin(self, attrib, lang=None, existing_xml=None, reuse=True):
+ """Enable and initialize a stanza plugin.
+
+ :param string attrib: The :attr:`plugin_attrib` value of the
+ plugin to enable.
+ """
+ default_lang = self.get_lang()
+ if not lang:
+ lang = default_lang
+
+ plugin_class = self.plugin_attrib_map[attrib]
+
+ if plugin_class.is_extension and (attrib, None) in self.plugins:
+ return self.plugins[(attrib, None)]
+ if reuse and (attrib, lang) in self.plugins:
+ return self.plugins[(attrib, lang)]
+
+ plugin = plugin_class(parent=self, xml=existing_xml)
+
+ if plugin.is_extension:
+ self.plugins[(attrib, None)] = plugin
+ else:
+ if lang != default_lang:
+ plugin['lang'] = lang
+ self.plugins[(attrib, lang)] = plugin
+
+ if plugin_class in self.plugin_iterables:
+ self.iterables.append(plugin)
+ if plugin_class.plugin_multi_attrib:
+ self.init_plugin(plugin_class.plugin_multi_attrib)
+
+ self.loaded_plugins.add(attrib)
+
+ return plugin
+
+ def _get_stanza_values(self):
+ """Return A JSON/dictionary version of the XML content
+ exposed through the stanza's interfaces::
+
+ >>> msg = Message()
+ >>> msg.values
+ {'body': '', 'from': , 'mucnick': '', 'mucroom': '',
+ 'to': , 'type': 'normal', 'id': '', 'subject': ''}
+
+ Likewise, assigning to :attr:`values` will change the XML
+ content::
+
+ >>> msg = Message()
+ >>> msg.values = {'body': 'Hi!', 'to': 'user@example.com'}
+ >>> msg
+ '<message to="user@example.com"><body>Hi!</body></message>'
+
+ .. versionadded:: 1.0-Beta1
+ """
+ values = {}
+ values['lang'] = self['lang']
+ for interface in self.interfaces:
+ values[interface] = self[interface]
+ if interface in self.lang_interfaces:
+ values['%s|*' % interface] = self['%s|*' % interface]
+ for plugin, stanza in self.plugins.items():
+ lang = stanza['lang']
+ if lang:
+ values['%s|%s' % (plugin[0], lang)] = stanza.values
+ else:
+ values[plugin[0]] = stanza.values
+ if self.iterables:
+ iterables = []
+ for stanza in self.iterables:
+ iterables.append(stanza.values)
+ iterables[-1]['__childtag__'] = stanza.tag
+ values['substanzas'] = iterables
+ return values
+
+ def _set_stanza_values(self, values):
+ """Set multiple stanza interface values using a dictionary.
+
+ Stanza plugin values may be set using nested dictionaries.
+
+ :param values: A dictionary mapping stanza interface with values.
+ Plugin interfaces may accept a nested dictionary that
+ will be used recursively.
+
+ .. versionadded:: 1.0-Beta1
+ """
+ iterable_interfaces = [p.plugin_attrib for \
+ p in self.plugin_iterables]
+
+ if 'lang' in values:
+ self['lang'] = values['lang']
+
+ if 'substanzas' in values:
+ # Remove existing substanzas
+ for stanza in self.iterables:
+ try:
+ self.xml.remove(stanza.xml)
+ except ValueError:
+ pass
+ self.iterables = []
+
+ # Add new substanzas
+ for subdict in values['substanzas']:
+ if '__childtag__' in subdict:
+ for subclass in self.plugin_iterables:
+ child_tag = "{%s}%s" % (subclass.namespace,
+ subclass.name)
+ if subdict['__childtag__'] == child_tag:
+ sub = subclass(parent=self)
+ sub.values = subdict
+ self.iterables.append(sub)
+
+ for interface, value in values.items():
+ full_interface = interface
+ interface_lang = ('%s|' % interface).split('|')
+ interface = interface_lang[0]
+ lang = interface_lang[1] or self.get_lang()
+
+ if interface == 'lang':
+ continue
+ elif interface == 'substanzas':
+ continue
+ elif interface in self.interfaces:
+ self[full_interface] = value
+ elif interface in self.plugin_attrib_map:
+ if interface not in iterable_interfaces:
+ plugin = self._get_plugin(interface, lang)
+ if plugin:
+ plugin.values = value
+ return self
+
+ def __getitem__(self, attrib):
+ """Return the value of a stanza interface using dict-like syntax.
+
+ Example::
+
+ >>> msg['body']
+ 'Message contents'
+
+ Stanza interfaces are typically mapped directly to the underlying XML
+ object, but can be overridden by the presence of a ``get_attrib``
+ method (or ``get_foo`` where the interface is named ``'foo'``, etc).
+
+ The search order for interface value retrieval for an interface
+ named ``'foo'`` is:
+
+ 1. The list of substanzas (``'substanzas'``)
+ 2. The result of calling the ``get_foo`` override handler.
+ 3. The result of calling ``get_foo``.
+ 4. The result of calling ``getFoo``.
+ 5. The contents of the ``foo`` subelement, if ``foo`` is listed
+ in :attr:`sub_interfaces`.
+ 6. True or False depending on the existence of a ``foo``
+ subelement and ``foo`` is in :attr:`bool_interfaces`.
+ 7. The value of the ``foo`` attribute of the XML object.
+ 8. The plugin named ``'foo'``
+ 9. An empty string.
+
+ :param string attrib: The name of the requested stanza interface.
+ """
+ full_attrib = attrib
+ attrib_lang = ('%s|' % attrib).split('|')
+ attrib = attrib_lang[0]
+ lang = attrib_lang[1] or None
+
+ kwargs = {}
+ if lang and attrib in self.lang_interfaces:
+ kwargs['lang'] = lang
+
+ if attrib == 'substanzas':
+ return self.iterables
+ elif attrib in self.interfaces or attrib == 'lang':
+ get_method = "get_%s" % attrib.lower()
+ get_method2 = "get%s" % attrib.title()
+
+ if self.plugin_overrides:
+ name = self.plugin_overrides.get(get_method, None)
+ if name:
+ plugin = self._get_plugin(name, lang)
+ if plugin:
+ handler = getattr(plugin, get_method, None)
+ if handler:
+ return handler(**kwargs)
+
+ if hasattr(self, get_method):
+ return getattr(self, get_method)(**kwargs)
+ elif hasattr(self, get_method2):
+ return getattr(self, get_method2)(**kwargs)
+ else:
+ if attrib in self.sub_interfaces:
+ return self._get_sub_text(attrib, lang=lang)
+ elif attrib in self.bool_interfaces:
+ elem = self.xml.find('{%s}%s' % (self.namespace, attrib))
+ return elem is not None
+ else:
+ return self._get_attr(attrib)
+ elif attrib in self.plugin_attrib_map:
+ plugin = self._get_plugin(attrib, lang)
+ if plugin and plugin.is_extension:
+ return plugin[full_attrib]
+ return plugin
+ else:
+ return ''
+
+ def __setitem__(self, attrib, value):
+ """Set the value of a stanza interface using dictionary-like syntax.
+
+ Example::
+
+ >>> msg['body'] = "Hi!"
+ >>> msg['body']
+ 'Hi!'
+
+ Stanza interfaces are typically mapped directly to the underlying XML
+ object, but can be overridden by the presence of a ``set_attrib``
+ method (or ``set_foo`` where the interface is named ``'foo'``, etc).
+
+ The effect of interface value assignment for an interface
+ named ``'foo'`` will be one of:
+
+ 1. Delete the interface's contents if the value is None.
+ 2. Call the ``set_foo`` override handler, if it exists.
+ 3. Call ``set_foo``, if it exists.
+ 4. Call ``setFoo``, if it exists.
+ 5. Set the text of a ``foo`` element, if ``'foo'`` is
+ in :attr:`sub_interfaces`.
+ 6. Add or remove an empty subelement ``foo``
+ if ``foo`` is in :attr:`bool_interfaces`.
+ 7. Set the value of a top level XML attribute named ``foo``.
+ 8. Attempt to pass the value to a plugin named ``'foo'`` using
+ the plugin's ``'foo'`` interface.
+ 9. Do nothing.
+
+ :param string attrib: The name of the stanza interface to modify.
+ :param value: The new value of the stanza interface.
+ """
+ full_attrib = attrib
+ attrib_lang = ('%s|' % attrib).split('|')
+ attrib = attrib_lang[0]
+ lang = attrib_lang[1] or None
+
+ kwargs = {}
+ if lang and attrib in self.lang_interfaces:
+ kwargs['lang'] = lang
+
+ if attrib in self.interfaces or attrib == 'lang':
+ if value is not None:
+ set_method = "set_%s" % attrib.lower()
+ set_method2 = "set%s" % attrib.title()
+
+ if self.plugin_overrides:
+ name = self.plugin_overrides.get(set_method, None)
+ if name:
+ plugin = self._get_plugin(name, lang)
+ if plugin:
+ handler = getattr(plugin, set_method, None)
+ if handler:
+ return handler(value, **kwargs)
+
+ if hasattr(self, set_method):
+ getattr(self, set_method)(value, **kwargs)
+ elif hasattr(self, set_method2):
+ getattr(self, set_method2)(value, **kwargs)
+ else:
+ if attrib in self.sub_interfaces:
+ if lang == '*':
+ return self._set_all_sub_text(attrib,
+ value,
+ lang='*')
+ return self._set_sub_text(attrib, text=value,
+ lang=lang)
+ elif attrib in self.bool_interfaces:
+ if value:
+ return self._set_sub_text(attrib, '',
+ keep=True,
+ lang=lang)
+ else:
+ return self._set_sub_text(attrib, '',
+ keep=False,
+ lang=lang)
+ else:
+ self._set_attr(attrib, value)
+ else:
+ self.__delitem__(attrib)
+ elif attrib in self.plugin_attrib_map:
+ plugin = self._get_plugin(attrib, lang)
+ if plugin:
+ plugin[full_attrib] = value
+ return self
+
+ def __delitem__(self, attrib):
+ """Delete the value of a stanza interface using dict-like syntax.
+
+ Example::
+
+ >>> msg['body'] = "Hi!"
+ >>> msg['body']
+ 'Hi!'
+ >>> del msg['body']
+ >>> msg['body']
+ ''
+
+ Stanza interfaces are typically mapped directly to the underlyig XML
+ object, but can be overridden by the presence of a ``del_attrib``
+ method (or ``del_foo`` where the interface is named ``'foo'``, etc).
+
+ The effect of deleting a stanza interface value named ``foo`` will be
+ one of:
+
+ 1. Call ``del_foo`` override handler, if it exists.
+ 2. Call ``del_foo``, if it exists.
+ 3. Call ``delFoo``, if it exists.
+ 4. Delete ``foo`` element, if ``'foo'`` is in
+ :attr:`sub_interfaces`.
+ 5. Remove ``foo`` element if ``'foo'`` is in
+ :attr:`bool_interfaces`.
+ 6. Delete top level XML attribute named ``foo``.
+ 7. Remove the ``foo`` plugin, if it was loaded.
+ 8. Do nothing.
+
+ :param attrib: The name of the affected stanza interface.
+ """
+ full_attrib = attrib
+ attrib_lang = ('%s|' % attrib).split('|')
+ attrib = attrib_lang[0]
+ lang = attrib_lang[1] or None
+
+ kwargs = {}
+ if lang and attrib in self.lang_interfaces:
+ kwargs['lang'] = lang
+
+ if attrib in self.interfaces or attrib == 'lang':
+ del_method = "del_%s" % attrib.lower()
+ del_method2 = "del%s" % attrib.title()
+
+ if self.plugin_overrides:
+ name = self.plugin_overrides.get(del_method, None)
+ if name:
+ plugin = self._get_plugin(attrib, lang)
+ if plugin:
+ handler = getattr(plugin, del_method, None)
+ if handler:
+ return handler(**kwargs)
+
+ if hasattr(self, del_method):
+ getattr(self, del_method)(**kwargs)
+ elif hasattr(self, del_method2):
+ getattr(self, del_method2)(**kwargs)
+ else:
+ if attrib in self.sub_interfaces:
+ return self._del_sub(attrib, lang=lang)
+ elif attrib in self.bool_interfaces:
+ return self._del_sub(attrib, lang=lang)
+ else:
+ self._del_attr(attrib)
+ elif attrib in self.plugin_attrib_map:
+ plugin = self._get_plugin(attrib, lang, check=True)
+ if not plugin:
+ return self
+ if plugin.is_extension:
+ del plugin[full_attrib]
+ del self.plugins[(attrib, None)]
+ else:
+ del self.plugins[(attrib, plugin['lang'])]
+ self.loaded_plugins.remove(attrib)
+ try:
+ self.xml.remove(plugin.xml)
+ except ValueError:
+ pass
+ return self
+
+ def _set_attr(self, name, value):
+ """Set the value of a top level attribute of the XML object.
+
+ If the new value is None or an empty string, then the attribute will
+ be removed.
+
+ :param name: The name of the attribute.
+ :param value: The new value of the attribute, or None or '' to
+ remove it.
+ """
+ if value is None or value == '':
+ self.__delitem__(name)
+ else:
+ self.xml.attrib[name] = value
+
+ def _del_attr(self, name):
+ """Remove a top level attribute of the XML object.
+
+ :param name: The name of the attribute.
+ """
+ if name in self.xml.attrib:
+ del self.xml.attrib[name]
+
+ def _get_attr(self, name, default=''):
+ """Return the value of a top level attribute of the XML object.
+
+ In case the attribute has not been set, a default value can be
+ returned instead. An empty string is returned if no other default
+ is supplied.
+
+ :param name: The name of the attribute.
+ :param default: Optional value to return if the attribute has not
+ been set. An empty string is returned otherwise.
+ """
+ return self.xml.attrib.get(name, default)
+
+ def _get_sub_text(self, name, default='', lang=None):
+ """Return the text contents of a sub element.
+
+ In case the element does not exist, or it has no textual content,
+ a default value can be returned instead. An empty string is returned
+ if no other default is supplied.
+
+ :param name: The name or XPath expression of the element.
+ :param default: Optional default to return if the element does
+ not exists. An empty string is returned otherwise.
+ """
+ name = self._fix_ns(name)
+ if lang == '*':
+ return self._get_all_sub_text(name, default, None)
+
+ default_lang = self.get_lang()
+ if not lang:
+ lang = default_lang
+
+ stanzas = self.xml.findall(name)
+ if not stanzas:
+ return default
+ for stanza in stanzas:
+ if stanza.attrib.get('{%s}lang' % XML_NS, default_lang) == lang:
+ if stanza.text is None:
+ return default
+ return stanza.text
+ return default
+
+ def _get_all_sub_text(self, name, default='', lang=None):
+ name = self._fix_ns(name)
+
+ default_lang = self.get_lang()
+ results = OrderedDict()
+ stanzas = self.xml.findall(name)
+ if stanzas:
+ for stanza in stanzas:
+ stanza_lang = stanza.attrib.get('{%s}lang' % XML_NS,
+ default_lang)
+ if not lang or lang == '*' or stanza_lang == lang:
+ results[stanza_lang] = stanza.text
+ return results
+
+ def _set_sub_text(self, name, text=None, keep=False, lang=None):
+ """Set the text contents of a sub element.
+
+ In case the element does not exist, a element will be created,
+ and its text contents will be set.
+
+ If the text is set to an empty string, or None, then the
+ element will be removed, unless keep is set to True.
+
+ :param name: The name or XPath expression of the element.
+ :param text: The new textual content of the element. If the text
+ is an empty string or None, the element will be removed
+ unless the parameter keep is True.
+ :param keep: Indicates if the element should be kept if its text is
+ removed. Defaults to False.
+ """
+ default_lang = self.get_lang()
+ if lang is None:
+ lang = default_lang
+
+ if not text and not keep:
+ return self._del_sub(name, lang=lang)
+
+ path = self._fix_ns(name, split=True)
+ name = path[-1]
+ parent = self.xml
+
+ # The first goal is to find the parent of the subelement, or, if
+ # we can't find that, the closest grandparent element.
+ missing_path = []
+ search_order = path[:-1]
+ while search_order:
+ parent = self.xml.find('/'.join(search_order))
+ ename = search_order.pop()
+ if parent is not None:
+ break
+ else:
+ missing_path.append(ename)
+ missing_path.reverse()
+
+ # Find all existing elements that match the desired
+ # element path (there may be multiples due to different
+ # languages values).
+ if parent is not None:
+ elements = self.xml.findall('/'.join(path))
+ else:
+ parent = self.xml
+ elements = []
+
+ # Insert the remaining grandparent elements that don't exist yet.
+ for ename in missing_path:
+ element = ET.Element(ename)
+ parent.append(element)
+ parent = element
+
+ # Re-use an existing element with the proper language, if one exists.
+ for element in elements:
+ elang = element.attrib.get('{%s}lang' % XML_NS, default_lang)
+ if not lang and elang == default_lang or lang and lang == elang:
+ element.text = text
+ return element
+
+ # No useable element exists, so create a new one.
+ element = ET.Element(name)
+ element.text = text
+ if lang and lang != default_lang:
+ element.attrib['{%s}lang' % XML_NS] = lang
+ parent.append(element)
+ return element
+
+ def _set_all_sub_text(self, name, values, keep=False, lang=None):
+ self._del_sub(name, lang)
+ for value_lang, value in values.items():
+ if not lang or lang == '*' or value_lang == lang:
+ self._set_sub_text(name, text=value,
+ keep=keep,
+ lang=value_lang)
+
+ def _del_sub(self, name, all=False, lang=None):
+ """Remove sub elements that match the given name or XPath.
+
+ If the element is in a path, then any parent elements that become
+ empty after deleting the element may also be deleted if requested
+ by setting all=True.
+
+ :param name: The name or XPath expression for the element(s) to remove.
+ :param bool all: If True, remove all empty elements in the path to the
+ deleted element. Defaults to False.
+ """
+ path = self._fix_ns(name, split=True)
+ original_target = path[-1]
+
+ default_lang = self.get_lang()
+ if not lang:
+ lang = default_lang
+
+ for level, _ in enumerate(path):
+ # Generate the paths to the target elements and their parent.
+ element_path = "/".join(path[:len(path) - level])
+ parent_path = "/".join(path[:len(path) - level - 1])
+
+ elements = self.xml.findall(element_path)
+ parent = self.xml.find(parent_path)
+
+ if elements:
+ if parent is None:
+ parent = self.xml
+ for element in elements:
+ if element.tag == original_target or not list(element):
+ # Only delete the originally requested elements, and
+ # any parent elements that have become empty.
+ elem_lang = element.attrib.get('{%s}lang' % XML_NS,
+ default_lang)
+ if lang == '*' or elem_lang == lang:
+ parent.remove(element)
+ if not all:
+ # If we don't want to delete elements up the tree, stop
+ # after deleting the first level of elements.
+ return
+
+ def match(self, xpath):
+ """Compare a stanza object with an XPath-like expression.
+
+ If the XPath matches the contents of the stanza object, the match
+ is successful.
+
+ The XPath expression may include checks for stanza attributes.
+ For example::
+
+ 'presence@show=xa@priority=2/status'
+
+ Would match a presence stanza whose show value is set to ``'xa'``,
+ has a priority value of ``'2'``, and has a status element.
+
+ :param string xpath: The XPath expression to check against. It
+ may be either a string or a list of element
+ names with attribute checks.
+ """
+ if not isinstance(xpath, list):
+ xpath = self._fix_ns(xpath, split=True, propagate_ns=False)
+
+ # Extract the tag name and attribute checks for the first XPath node.
+ components = xpath[0].split('@')
+ tag = components[0]
+ attributes = components[1:]
+
+ if tag not in (self.name, "{%s}%s" % (self.namespace, self.name)) and \
+ tag not in self.loaded_plugins and tag not in self.plugin_attrib:
+ # The requested tag is not in this stanza, so no match.
+ return False
+
+ # Check the rest of the XPath against any substanzas.
+ matched_substanzas = False
+ for substanza in self.iterables:
+ if xpath[1:] == []:
+ break
+ matched_substanzas = substanza.match(xpath[1:])
+ if matched_substanzas:
+ break
+
+ # Check attribute values.
+ for attribute in attributes:
+ name, value = attribute.split('=')
+ if self[name] != value:
+ return False
+
+ # Check sub interfaces.
+ if len(xpath) > 1:
+ next_tag = xpath[1]
+ if next_tag in self.sub_interfaces and self[next_tag]:
+ return True
+
+ # Attempt to continue matching the XPath using the stanza's plugins.
+ if not matched_substanzas and len(xpath) > 1:
+ # Convert {namespace}tag@attribs to just tag
+ next_tag = xpath[1].split('@')[0].split('}')[-1]
+ langs = [name[1] for name in self.plugins if name[0] == next_tag]
+ for lang in langs:
+ plugin = self._get_plugin(next_tag, lang)
+ if plugin and plugin.match(xpath[1:]):
+ return True
+ return False
+
+ # Everything matched.
+ return True
+
+ def find(self, xpath):
+ """Find an XML object in this stanza given an XPath expression.
+
+ Exposes ElementTree interface for backwards compatibility.
+
+ .. note::
+
+ Matching on attribute values is not supported in Python 2.6
+ or Python 3.1
+
+ :param string xpath: An XPath expression matching a single
+ desired element.
+ """
+ return self.xml.find(xpath)
+
+ def findall(self, xpath):
+ """Find multiple XML objects in this stanza given an XPath expression.
+
+ Exposes ElementTree interface for backwards compatibility.
+
+ .. note::
+
+ Matching on attribute values is not supported in Python 2.6
+ or Python 3.1.
+
+ :param string xpath: An XPath expression matching multiple
+ desired elements.
+ """
+ return self.xml.findall(xpath)
+
+ def get(self, key, default=None):
+ """Return the value of a stanza interface.
+
+ If the found value is None or an empty string, return the supplied
+ default value.
+
+ Allows stanza objects to be used like dictionaries.
+
+ :param string key: The name of the stanza interface to check.
+ :param default: Value to return if the stanza interface has a value
+ of ``None`` or ``""``. Will default to returning None.
+ """
+ value = self[key]
+ if value is None or value == '':
+ return default
+ return value
+
+ def keys(self):
+ """Return the names of all stanza interfaces provided by the
+ stanza object.
+
+ Allows stanza objects to be used like dictionaries.
+ """
+ out = []
+ out += [x for x in self.interfaces]
+ out += [x for x in self.loaded_plugins]
+ out.append('lang')
+ if self.iterables:
+ out.append('substanzas')
+ return out
+
+ def append(self, item):
+ """Append either an XML object or a substanza to this stanza object.
+
+ If a substanza object is appended, it will be added to the list
+ of iterable stanzas.
+
+ Allows stanza objects to be used like lists.
+
+ :param item: Either an XML object or a stanza object to add to
+ this stanza's contents.
+ """
+ if not isinstance(item, ElementBase):
+ if type(item) == XML_TYPE:
+ return self.appendxml(item)
+ else:
+ raise TypeError
+ self.xml.append(item.xml)
+ self.iterables.append(item)
+ if item.__class__ in self.plugin_iterables:
+ if item.__class__.plugin_multi_attrib:
+ self.init_plugin(item.__class__.plugin_multi_attrib)
+ elif item.__class__ == self.plugin_tag_map.get(item.tag_name(), None):
+ self.init_plugin(item.plugin_attrib,
+ existing_xml=item.xml,
+ reuse=False)
+ return self
+
+ def appendxml(self, xml):
+ """Append an XML object to the stanza's XML.
+
+ The added XML will not be included in the list of
+ iterable substanzas.
+
+ :param XML xml: The XML object to add to the stanza.
+ """
+ self.xml.append(xml)
+ return self
+
+ def pop(self, index=0):
+ """Remove and return the last substanza in the list of
+ iterable substanzas.
+
+ Allows stanza objects to be used like lists.
+
+ :param int index: The index of the substanza to remove.
+ """
+ substanza = self.iterables.pop(index)
+ self.xml.remove(substanza.xml)
+ return substanza
+
+ def next(self):
+ """Return the next iterable substanza."""
+ return self.__next__()
+
+ def clear(self):
+ """Remove all XML element contents and plugins.
+
+ Any attribute values will be preserved.
+ """
+ for child in list(self.xml):
+ self.xml.remove(child)
+
+ for plugin in list(self.plugins.keys()):
+ del self.plugins[plugin]
+ return self
+
+ @classmethod
+ def tag_name(cls):
+ """Return the namespaced name of the stanza's root element.
+
+ The format for the tag name is::
+
+ '{namespace}elementname'
+
+ For example, for the stanza ``<foo xmlns="bar" />``,
+ ``stanza.tag_name()`` would return ``"{bar}foo"``.
+ """
+ return "{%s}%s" % (cls.namespace, cls.name)
+
+ def get_lang(self, lang=None):
+ result = self.xml.attrib.get('{%s}lang' % XML_NS, '')
+ if not result and self.parent and self.parent():
+ return self.parent()['lang']
+ return result
+
+ def set_lang(self, lang):
+ self.del_lang()
+ attr = '{%s}lang' % XML_NS
+ if lang:
+ self.xml.attrib[attr] = lang
+
+ def del_lang(self):
+ attr = '{%s}lang' % XML_NS
+ if attr in self.xml.attrib:
+ del self.xml.attrib[attr]
+
+ @property
+ def attrib(self):
+ """Return the stanza object itself.
+
+ Older implementations of stanza objects used XML objects directly,
+ requiring the use of ``.attrib`` to access attribute values.
+
+ Use of the dictionary syntax with the stanza object itself for
+ accessing stanza interfaces is preferred.
+
+ .. deprecated:: 1.0
+ """
+ return self
+
+ def _fix_ns(self, xpath, split=False, propagate_ns=True):
+ return fix_ns(xpath, split=split,
+ propagate_ns=propagate_ns,
+ default_ns=self.namespace)
+
+ def __eq__(self, other):
+ """Compare the stanza object with another to test for equality.
+
+ Stanzas are equal if their interfaces return the same values,
+ and if they are both instances of ElementBase.
+
+ :param ElementBase other: The stanza object to compare against.
+ """
+ if not isinstance(other, ElementBase):
+ return False
+
+ # Check that this stanza is a superset of the other stanza.
+ values = self.values
+ for key in other.keys():
+ if key not in values or values[key] != other[key]:
+ return False
+
+ # Check that the other stanza is a superset of this stanza.
+ values = other.values
+ for key in self.keys():
+ if key not in values or values[key] != self[key]:
+ return False
+
+ # Both stanzas are supersets of each other, therefore they
+ # must be equal.
+ return True
+
+ def __ne__(self, other):
+ """Compare the stanza object with another to test for inequality.
+
+ Stanzas are not equal if their interfaces return different values,
+ or if they are not both instances of ElementBase.
+
+ :param ElementBase other: The stanza object to compare against.
+ """
+ return not self.__eq__(other)
+
+ def __bool__(self):
+ """Stanza objects should be treated as True in boolean contexts.
+
+ Python 3.x version.
+ """
+ return True
+
+ def __nonzero__(self):
+ """Stanza objects should be treated as True in boolean contexts.
+
+ Python 2.x version.
+ """
+ return True
+
+ def __len__(self):
+ """Return the number of iterable substanzas in this stanza."""
+ return len(self.iterables)
+
+ def __iter__(self):
+ """Return an iterator object for the stanza's substanzas.
+
+ The iterator is the stanza object itself. Attempting to use two
+ iterators on the same stanza at the same time is discouraged.
+ """
+ self._index = 0
+ return self
+
+ def __next__(self):
+ """Return the next iterable substanza."""
+ self._index += 1
+ if self._index > len(self.iterables):
+ self._index = 0
+ raise StopIteration
+ return self.iterables[self._index - 1]
+
+ def __copy__(self):
+ """Return a copy of the stanza object that does not share the same
+ underlying XML object.
+ """
+ return self.__class__(xml=copy.deepcopy(self.xml), parent=self.parent)
+
+ def __str__(self, top_level_ns=True):
+ """Return a string serialization of the underlying XML object.
+
+ .. seealso:: :ref:`tostring`
+
+ :param bool top_level_ns: Display the top-most namespace.
+ Defaults to True.
+ """
+ return tostring(self.xml, xmlns='',
+ top_level=True)
+
+ def __repr__(self):
+ """Use the stanza's serialized XML as its representation."""
+ return self.__str__()
+
+
+class StanzaBase(ElementBase):
+
+ """
+ StanzaBase provides the foundation for all other stanza objects used
+ by Slixmpp, and defines a basic set of interfaces common to nearly
+ all stanzas. These interfaces are the ``'id'``, ``'type'``, ``'to'``,
+ and ``'from'`` attributes. An additional interface, ``'payload'``, is
+ available to access the XML contents of the stanza. Most stanza objects
+ will provided more specific interfaces, however.
+
+ **Stanza Interfaces:**
+
+ :id: An optional id value that can be used to associate stanzas
+ :to: A JID object representing the recipient's JID.
+ :from: A JID object representing the sender's JID.
+ with their replies.
+ :type: The type of stanza, typically will be ``'normal'``,
+ ``'error'``, ``'get'``, or ``'set'``, etc.
+ :payload: The XML contents of the stanza.
+
+ :param XMLStream stream: Optional :class:`slixmpp.xmlstream.XMLStream`
+ object responsible for sending this stanza.
+ :param XML xml: Optional XML contents to initialize stanza values.
+ :param string stype: Optional stanza type value.
+ :param sto: Optional string or :class:`slixmpp.xmlstream.JID`
+ object of the recipient's JID.
+ :param sfrom: Optional string or :class:`slixmpp.xmlstream.JID`
+ object of the sender's JID.
+ :param string sid: Optional ID value for the stanza.
+ :param parent: Optionally specify a parent stanza object will
+ contain this substanza.
+ """
+
+ #: The default XMPP client namespace
+ namespace = 'jabber:client'
+
+ #: There is a small set of attributes which apply to all XMPP stanzas:
+ #: the stanza type, the to and from JIDs, the stanza ID, and, especially
+ #: in the case of an Iq stanza, a payload.
+ interfaces = set(('type', 'to', 'from', 'id', 'payload'))
+
+ #: A basic set of allowed values for the ``'type'`` interface.
+ types = set(('get', 'set', 'error', None, 'unavailable', 'normal', 'chat'))
+
+ def __init__(self, stream=None, xml=None, stype=None,
+ sto=None, sfrom=None, sid=None, parent=None):
+ self.stream = stream
+ if stream is not None:
+ self.namespace = stream.default_ns
+ ElementBase.__init__(self, xml, parent)
+ if stype is not None:
+ self['type'] = stype
+ if sto is not None:
+ self['to'] = sto
+ if sfrom is not None:
+ self['from'] = sfrom
+ if sid is not None:
+ self['id'] = sid
+ self.tag = "{%s}%s" % (self.namespace, self.name)
+
+ def set_type(self, value):
+ """Set the stanza's ``'type'`` attribute.
+
+ Only type values contained in :attr:`types` are accepted.
+
+ :param string value: One of the values contained in :attr:`types`
+ """
+ if value in self.types:
+ self.xml.attrib['type'] = value
+ return self
+
+ def get_to(self):
+ """Return the value of the stanza's ``'to'`` attribute."""
+ return JID(self._get_attr('to'))
+
+ def set_to(self, value):
+ """Set the ``'to'`` attribute of the stanza.
+
+ :param value: A string or :class:`slixmpp.xmlstream.JID` object
+ representing the recipient's JID.
+ """
+ return self._set_attr('to', str(value))
+
+ def get_from(self):
+ """Return the value of the stanza's ``'from'`` attribute."""
+ return JID(self._get_attr('from'))
+
+ def set_from(self, value):
+ """Set the 'from' attribute of the stanza.
+
+ Arguments:
+ from -- A string or JID object representing the sender's JID.
+ """
+ return self._set_attr('from', str(value))
+
+ def get_payload(self):
+ """Return a list of XML objects contained in the stanza."""
+ return list(self.xml)
+
+ def set_payload(self, value):
+ """Add XML content to the stanza.
+
+ :param value: Either an XML or a stanza object, or a list
+ of XML or stanza objects.
+ """
+ if not isinstance(value, list):
+ value = [value]
+ for val in value:
+ self.append(val)
+ return self
+
+ def del_payload(self):
+ """Remove the XML contents of the stanza."""
+ self.clear()
+ return self
+
+ def reply(self, clear=True):
+ """Prepare the stanza for sending a reply.
+
+ Swaps the ``'from'`` and ``'to'`` attributes.
+
+ If ``clear=True``, then also remove the stanza's
+ contents to make room for the reply content.
+
+ For client streams, the ``'from'`` attribute is removed.
+
+ :param bool clear: Indicates if the stanza's contents should be
+ removed. Defaults to ``True``.
+ """
+ # if it's a component, use from
+ if self.stream and hasattr(self.stream, "is_component") and \
+ self.stream.is_component:
+ self['from'], self['to'] = self['to'], self['from']
+ else:
+ self['to'] = self['from']
+ del self['from']
+ if clear:
+ self.clear()
+ return self
+
+ def error(self):
+ """Set the stanza's type to ``'error'``."""
+ self['type'] = 'error'
+ return self
+
+ def unhandled(self):
+ """Called if no handlers have been registered to process this stanza.
+
+ Meant to be overridden.
+ """
+ pass
+
+ def exception(self, e):
+ """Handle exceptions raised during stanza processing.
+
+ Meant to be overridden.
+ """
+ log.exception('Error handling {%s}%s stanza', self.namespace,
+ self.name)
+
+ def send(self, now=False):
+ """Queue the stanza to be sent on the XML stream.
+
+ :param bool now: Indicates if the queue should be skipped and the
+ stanza sent immediately. Useful for stream
+ initialization. Defaults to ``False``.
+ """
+ self.stream.send(self, now=now)
+
+ def __copy__(self):
+ """Return a copy of the stanza object that does not share the
+ same underlying XML object, but does share the same XML stream.
+ """
+ return self.__class__(xml=copy.deepcopy(self.xml),
+ stream=self.stream)
+
+ def __str__(self, top_level_ns=False):
+ """Serialize the stanza's XML to a string.
+
+ :param bool top_level_ns: Display the top-most namespace.
+ Defaults to ``False``.
+ """
+ xmlns = self.stream.default_ns if self.stream else ''
+ return tostring(self.xml, xmlns=xmlns,
+ stream=self.stream,
+ top_level=(self.stream is None))
+
+
+#: A JSON/dictionary version of the XML content exposed through
+#: the stanza interfaces::
+#:
+#: >>> msg = Message()
+#: >>> msg.values
+#: {'body': '', 'from': , 'mucnick': '', 'mucroom': '',
+#: 'to': , 'type': 'normal', 'id': '', 'subject': ''}
+#:
+#: Likewise, assigning to the :attr:`values` will change the XML
+#: content::
+#:
+#: >>> msg = Message()
+#: >>> msg.values = {'body': 'Hi!', 'to': 'user@example.com'}
+#: >>> msg
+#: '<message to="user@example.com"><body>Hi!</body></message>'
+#:
+#: Child stanzas are exposed as nested dictionaries.
+ElementBase.values = property(ElementBase._get_stanza_values,
+ ElementBase._set_stanza_values)
+
+
+# To comply with PEP8, method names now use underscores.
+# Deprecated method names are re-mapped for backwards compatibility.
+ElementBase.initPlugin = ElementBase.init_plugin
+ElementBase._getAttr = ElementBase._get_attr
+ElementBase._setAttr = ElementBase._set_attr
+ElementBase._delAttr = ElementBase._del_attr
+ElementBase._getSubText = ElementBase._get_sub_text
+ElementBase._setSubText = ElementBase._set_sub_text
+ElementBase._delSub = ElementBase._del_sub
+ElementBase.getStanzaValues = ElementBase._get_stanza_values
+ElementBase.setStanzaValues = ElementBase._set_stanza_values
+
+StanzaBase.setType = StanzaBase.set_type
+StanzaBase.getTo = StanzaBase.get_to
+StanzaBase.setTo = StanzaBase.set_to
+StanzaBase.getFrom = StanzaBase.get_from
+StanzaBase.setFrom = StanzaBase.set_from
+StanzaBase.getPayload = StanzaBase.get_payload
+StanzaBase.setPayload = StanzaBase.set_payload
+StanzaBase.delPayload = StanzaBase.del_payload
diff --git a/slixmpp/xmlstream/tostring.py b/slixmpp/xmlstream/tostring.py
new file mode 100644
index 00000000..c1e4911a
--- /dev/null
+++ b/slixmpp/xmlstream/tostring.py
@@ -0,0 +1,172 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.xmlstream.tostring
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ This module converts XML objects into Unicode strings and
+ intelligently includes namespaces only when necessary to
+ keep the output readable.
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+from __future__ import unicode_literals
+
+import sys
+
+if sys.version_info < (3, 0):
+ import types
+
+
+XML_NS = 'http://www.w3.org/XML/1998/namespace'
+
+
+def tostring(xml=None, xmlns='', stream=None, outbuffer='',
+ top_level=False, open_only=False, namespaces=None):
+ """Serialize an XML object to a Unicode string.
+
+ If an outer xmlns is provided using ``xmlns``, then the current element's
+ namespace will not be included if it matches the outer namespace. An
+ exception is made for elements that have an attached stream, and appear
+ at the stream root.
+
+ :param XML xml: The XML object to serialize.
+ :param string xmlns: Optional namespace of an element wrapping the XML
+ object.
+ :param stream: The XML stream that generated the XML object.
+ :param string outbuffer: Optional buffer for storing serializations
+ during recursive calls.
+ :param bool top_level: Indicates that the element is the outermost
+ element.
+ :param set namespaces: Track which namespaces are in active use so
+ that new ones can be declared when needed.
+
+ :type xml: :py:class:`~xml.etree.ElementTree.Element`
+ :type stream: :class:`~slixmpp.xmlstream.xmlstream.XMLStream`
+
+ :rtype: Unicode string
+ """
+ # Add previous results to the start of the output.
+ output = [outbuffer]
+
+ # Extract the element's tag name.
+ tag_name = xml.tag.split('}', 1)[-1]
+
+ # Extract the element's namespace if it is defined.
+ if '}' in xml.tag:
+ tag_xmlns = xml.tag.split('}', 1)[0][1:]
+ else:
+ tag_xmlns = ''
+
+ default_ns = ''
+ stream_ns = ''
+ use_cdata = False
+
+ if stream:
+ default_ns = stream.default_ns
+ stream_ns = stream.stream_ns
+ use_cdata = stream.use_cdata
+
+ # Output the tag name and derived namespace of the element.
+ namespace = ''
+ if tag_xmlns:
+ if top_level and tag_xmlns not in [default_ns, xmlns, stream_ns] \
+ or not top_level and tag_xmlns != xmlns:
+ namespace = ' xmlns="%s"' % tag_xmlns
+ if stream and tag_xmlns in stream.namespace_map:
+ mapped_namespace = stream.namespace_map[tag_xmlns]
+ if mapped_namespace:
+ tag_name = "%s:%s" % (mapped_namespace, tag_name)
+ output.append("<%s" % tag_name)
+ output.append(namespace)
+
+ # Output escaped attribute values.
+ new_namespaces = set()
+ for attrib, value in xml.attrib.items():
+ value = escape(value, use_cdata)
+ if '}' not in attrib:
+ output.append(' %s="%s"' % (attrib, value))
+ else:
+ attrib_ns = attrib.split('}')[0][1:]
+ attrib = attrib.split('}')[1]
+ if attrib_ns == XML_NS:
+ output.append(' xml:%s="%s"' % (attrib, value))
+ elif stream and attrib_ns in stream.namespace_map:
+ mapped_ns = stream.namespace_map[attrib_ns]
+ if mapped_ns:
+ if namespaces is None:
+ namespaces = set()
+ if attrib_ns not in namespaces:
+ namespaces.add(attrib_ns)
+ new_namespaces.add(attrib_ns)
+ output.append(' xmlns:%s="%s"' % (
+ mapped_ns, attrib_ns))
+ output.append(' %s:%s="%s"' % (
+ mapped_ns, attrib, value))
+
+ if open_only:
+ # Only output the opening tag, regardless of content.
+ output.append(">")
+ return ''.join(output)
+
+ if len(xml) or xml.text:
+ # If there are additional child elements to serialize.
+ output.append(">")
+ if xml.text:
+ output.append(escape(xml.text, use_cdata))
+ if len(xml):
+ for child in xml:
+ output.append(tostring(child, tag_xmlns, stream,
+ namespaces=namespaces))
+ output.append("</%s>" % tag_name)
+ elif xml.text:
+ # If we only have text content.
+ output.append(">%s</%s>" % (escape(xml.text, use_cdata), tag_name))
+ else:
+ # Empty element.
+ output.append(" />")
+ if xml.tail:
+ # If there is additional text after the element.
+ output.append(escape(xml.tail, use_cdata))
+ for ns in new_namespaces:
+ # Remove namespaces introduced in this context. This is necessary
+ # because the namespaces object continues to be shared with other
+ # contexts.
+ namespaces.remove(ns)
+ return ''.join(output)
+
+
+def escape(text, use_cdata=False):
+ """Convert special characters in XML to escape sequences.
+
+ :param string text: The XML text to convert.
+ :rtype: Unicode string
+ """
+ if sys.version_info < (3, 0):
+ if type(text) != types.UnicodeType:
+ text = unicode(text, 'utf-8', 'ignore')
+
+ escapes = {'&': '&amp;',
+ '<': '&lt;',
+ '>': '&gt;',
+ "'": '&apos;',
+ '"': '&quot;'}
+
+ if not use_cdata:
+ text = list(text)
+ for i, c in enumerate(text):
+ text[i] = escapes.get(c, c)
+ return ''.join(text)
+ else:
+ escape_needed = False
+ for c in text:
+ if c in escapes:
+ escape_needed = True
+ break
+ if escape_needed:
+ escaped = map(lambda x : "<![CDATA[%s]]>" % x, text.split("]]>"))
+ return "<![CDATA[]]]><![CDATA[]>]]>".join(escaped)
+ return text
diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py
new file mode 100644
index 00000000..040f5096
--- /dev/null
+++ b/slixmpp/xmlstream/xmlstream.py
@@ -0,0 +1,1808 @@
+"""
+ slixmpp.xmlstream.xmlstream
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ This module provides the module for creating and
+ interacting with generic XML streams, along with
+ the necessary eventing infrastructure.
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+from __future__ import with_statement, unicode_literals
+
+import base64
+import copy
+import logging
+import signal
+import socket as Socket
+import ssl
+import sys
+import threading
+import time
+import random
+import weakref
+import uuid
+import errno
+
+from xml.parsers.expat import ExpatError
+
+import slixmpp
+from slixmpp.util import Queue, QueueEmpty, safedict
+from slixmpp.thirdparty.statemachine import StateMachine
+from slixmpp.xmlstream import Scheduler, tostring, cert
+from slixmpp.xmlstream.stanzabase import StanzaBase, ET, ElementBase
+from slixmpp.xmlstream.handler import Waiter, XMLCallback
+from slixmpp.xmlstream.matcher import MatchXMLMask
+from slixmpp.xmlstream.resolver import resolve, default_resolver
+
+# In Python 2.x, file socket objects are broken. A patched socket
+# wrapper is provided for this case in filesocket.py.
+if sys.version_info < (3, 0):
+ from slixmpp.xmlstream.filesocket import FileSocket, Socket26
+
+
+#: The time in seconds to wait before timing out waiting for response stanzas.
+RESPONSE_TIMEOUT = 30
+
+#: The time in seconds to wait for events from the event queue, and also the
+#: time between checks for the process stop signal.
+WAIT_TIMEOUT = 1.0
+
+#: The number of threads to use to handle XML stream events. This is not the
+#: same as the number of custom event handling threads.
+#: :data:`HANDLER_THREADS` must be at least 1. For Python implementations
+#: with a GIL, this should be left at 1, but for implemetnations without
+#: a GIL increasing this value can provide better performance.
+HANDLER_THREADS = 1
+
+#: The time in seconds to delay between attempts to resend data
+#: after an SSL error.
+SSL_RETRY_DELAY = 0.5
+
+#: The maximum number of times to attempt resending data due to
+#: an SSL error.
+SSL_RETRY_MAX = 10
+
+#: Maximum time to delay between connection attempts is one hour.
+RECONNECT_MAX_DELAY = 600
+
+#: Maximum number of attempts to connect to the server before quitting
+#: and raising a 'connect_failed' event. Setting this to ``None`` will
+#: allow infinite reconnection attempts, and using ``0`` will disable
+#: reconnections. Defaults to ``None``.
+RECONNECT_MAX_ATTEMPTS = None
+
+
+log = logging.getLogger(__name__)
+
+
+class RestartStream(Exception):
+ """
+ Exception to restart stream processing, including
+ resending the stream header.
+ """
+
+
+class XMLStream(object):
+ """
+ An XML stream connection manager and event dispatcher.
+
+ The XMLStream class abstracts away the issues of establishing a
+ connection with a server and sending and receiving XML "stanzas".
+ A stanza is a complete XML element that is a direct child of a root
+ document element. Two streams are used, one for each communication
+ direction, over the same socket. Once the connection is closed, both
+ streams should be complete and valid XML documents.
+
+ Three types of events are provided to manage the stream:
+ :Stream: Triggered based on received stanzas, similar in concept
+ to events in a SAX XML parser.
+ :Custom: Triggered manually.
+ :Scheduled: Triggered based on time delays.
+
+ Typically, stanzas are first processed by a stream event handler which
+ will then trigger custom events to continue further processing,
+ especially since custom event handlers may run in individual threads.
+
+ :param socket: Use an existing socket for the stream. Defaults to
+ ``None`` to generate a new socket.
+ :param string host: The name of the target server.
+ :param int port: The port to use for the connection. Defaults to 0.
+ """
+
+ def __init__(self, socket=None, host='', port=0):
+ #: Most XMPP servers support TLSv1, but OpenFire in particular
+ #: does not work well with it. For OpenFire, set
+ #: :attr:`ssl_version` to use ``SSLv23``::
+ #:
+ #: import ssl
+ #: xmpp.ssl_version = ssl.PROTOCOL_SSLv23
+ self.ssl_version = ssl.PROTOCOL_TLSv1
+
+ #: The list of accepted ciphers, in OpenSSL Format.
+ #: It might be useful to override it for improved security
+ #: over the python defaults.
+ self.ciphers = None
+
+ #: Path to a file containing certificates for verifying the
+ #: server SSL certificate. A non-``None`` value will trigger
+ #: certificate checking.
+ #:
+ #: .. note::
+ #:
+ #: On Mac OS X, certificates in the system keyring will
+ #: be consulted, even if they are not in the provided file.
+ self.ca_certs = None
+
+ #: Path to a file containing a client certificate to use for
+ #: authenticating via SASL EXTERNAL. If set, there must also
+ #: be a corresponding `:attr:keyfile` value.
+ self.certfile = None
+
+ #: Path to a file containing the private key for the selected
+ #: client certificate to use for authenticating via SASL EXTERNAL.
+ self.keyfile = None
+
+ self._der_cert = None
+
+ #: The time in seconds to wait for events from the event queue,
+ #: and also the time between checks for the process stop signal.
+ self.wait_timeout = WAIT_TIMEOUT
+
+ #: The time in seconds to wait before timing out waiting
+ #: for response stanzas.
+ self.response_timeout = RESPONSE_TIMEOUT
+
+ #: The current amount to time to delay attempting to reconnect.
+ #: This value doubles (with some jitter) with each failed
+ #: connection attempt up to :attr:`reconnect_max_delay` seconds.
+ self.reconnect_delay = None
+
+ #: Maximum time to delay between connection attempts is one hour.
+ self.reconnect_max_delay = RECONNECT_MAX_DELAY
+
+ #: Maximum number of attempts to connect to the server before
+ #: quitting and raising a 'connect_failed' event. Setting to
+ #: ``None`` allows infinite reattempts, while setting it to ``0``
+ #: will disable reconnection attempts. Defaults to ``None``.
+ self.reconnect_max_attempts = RECONNECT_MAX_ATTEMPTS
+
+ #: The time in seconds to delay between attempts to resend data
+ #: after an SSL error.
+ self.ssl_retry_max = SSL_RETRY_MAX
+
+ #: The maximum number of times to attempt resending data due to
+ #: an SSL error.
+ self.ssl_retry_delay = SSL_RETRY_DELAY
+
+ #: The connection state machine tracks if the stream is
+ #: ``'connected'`` or ``'disconnected'``.
+ self.state = StateMachine(('disconnected', 'connected'))
+ self.state._set_state('disconnected')
+
+ #: The default port to return when querying DNS records.
+ self.default_port = int(port)
+
+ #: The domain to try when querying DNS records.
+ self.default_domain = ''
+
+ #: The expected name of the server, for validation.
+ self._expected_server_name = ''
+ self._service_name = ''
+
+ #: The desired, or actual, address of the connected server.
+ self.address = (host, int(port))
+
+ #: A file-like wrapper for the socket for use with the
+ #: :mod:`~xml.etree.ElementTree` module.
+ self.filesocket = None
+ self.set_socket(socket)
+
+ if sys.version_info < (3, 0):
+ self.socket_class = Socket26
+ else:
+ self.socket_class = Socket.socket
+
+ #: Enable connecting to the server directly over SSL, in
+ #: particular when the service provides two ports: one for
+ #: non-SSL traffic and another for SSL traffic.
+ self.use_ssl = False
+
+ #: Enable connecting to the service without using SSL
+ #: immediately, but allow upgrading the connection later
+ #: to use SSL.
+ self.use_tls = False
+
+ #: If set to ``True``, attempt to connect through an HTTP
+ #: proxy based on the settings in :attr:`proxy_config`.
+ self.use_proxy = False
+
+ #: If set to ``True``, attempt to use IPv6.
+ self.use_ipv6 = True
+
+ #: If set to ``True``, allow using the ``dnspython`` DNS library
+ #: if available. If set to ``False``, the builtin DNS resolver
+ #: will be used, even if ``dnspython`` is installed.
+ self.use_dnspython = True
+
+ #: Use CDATA for escaping instead of XML entities. Defaults
+ #: to ``False``.
+ self.use_cdata = False
+
+ #: An optional dictionary of proxy settings. It may provide:
+ #: :host: The host offering proxy services.
+ #: :port: The port for the proxy service.
+ #: :username: Optional username for accessing the proxy.
+ #: :password: Optional password for accessing the proxy.
+ self.proxy_config = {}
+
+ #: The default namespace of the stream content, not of the
+ #: stream wrapper itself.
+ self.default_ns = ''
+
+ self.default_lang = None
+ self.peer_default_lang = None
+
+ #: The namespace of the enveloping stream element.
+ self.stream_ns = ''
+
+ #: The default opening tag for the stream element.
+ self.stream_header = "<stream>"
+
+ #: The default closing tag for the stream element.
+ self.stream_footer = "</stream>"
+
+ #: If ``True``, periodically send a whitespace character over the
+ #: wire to keep the connection alive. Mainly useful for connections
+ #: traversing NAT.
+ self.whitespace_keepalive = True
+
+ #: The default interval between keepalive signals when
+ #: :attr:`whitespace_keepalive` is enabled.
+ self.whitespace_keepalive_interval = 300
+
+ #: An :class:`~threading.Event` to signal that the application
+ #: is stopping, and that all threads should shutdown.
+ self.stop = threading.Event()
+
+ #: An :class:`~threading.Event` to signal receiving a closing
+ #: stream tag from the server.
+ self.stream_end_event = threading.Event()
+ self.stream_end_event.set()
+
+ #: An :class:`~threading.Event` to signal the start of a stream
+ #: session. Until this event fires, the send queue is not used
+ #: and data is sent immediately over the wire.
+ self.session_started_event = threading.Event()
+
+ #: The default time in seconds to wait for a session to start
+ #: after connecting before reconnecting and trying again.
+ self.session_timeout = 45
+
+ #: Flag for controlling if the session can be considered ended
+ #: if the connection is terminated.
+ self.end_session_on_disconnect = True
+
+ #: A queue of stream, custom, and scheduled events to be processed.
+ self.event_queue = Queue()
+
+ #: A queue of string data to be sent over the stream.
+ self.send_queue = Queue()
+ self.send_queue_lock = threading.Lock()
+ self.send_lock = threading.RLock()
+
+ #: A :class:`~slixmpp.xmlstream.scheduler.Scheduler` instance for
+ #: executing callbacks in the future based on time delays.
+ self.scheduler = Scheduler(self.stop)
+ self.__failed_send_stanza = None
+
+ #: A mapping of XML namespaces to well-known prefixes.
+ self.namespace_map = {StanzaBase.xml_ns: 'xml'}
+
+ self.__thread = {}
+ self.__root_stanza = []
+ self.__handlers = []
+ self.__event_handlers = {}
+ self.__event_handlers_lock = threading.Lock()
+ self.__filters = {'in': [], 'out': [], 'out_sync': []}
+ self.__thread_count = 0
+ self.__thread_cond = threading.Condition()
+ self.__active_threads = set()
+ self._use_daemons = False
+ self._disconnect_wait_for_threads = True
+
+ self._id = 0
+ self._id_lock = threading.Lock()
+
+ #: We use an ID prefix to ensure that all ID values are unique.
+ self._id_prefix = '%s-' % uuid.uuid4()
+
+ #: The :attr:`auto_reconnnect` setting controls whether or not
+ #: the stream will be restarted in the event of an error.
+ self.auto_reconnect = True
+
+ #: The :attr:`disconnect_wait` setting is the default value
+ #: for controlling if the system waits for the send queue to
+ #: empty before ending the stream. This may be overridden by
+ #: passing ``wait=True`` or ``wait=False`` to :meth:`disconnect`.
+ #: The default :attr:`disconnect_wait` value is ``False``.
+ self.disconnect_wait = False
+
+ #: A list of DNS results that have not yet been tried.
+ self.dns_answers = []
+
+ #: The service name to check with DNS SRV records. For
+ #: example, setting this to ``'xmpp-client'`` would query the
+ #: ``_xmpp-client._tcp`` service.
+ self.dns_service = None
+
+ self.add_event_handler('connected', self._session_timeout_check)
+ self.add_event_handler('disconnected', self._remove_schedules)
+ self.add_event_handler('session_start', self._start_keepalive)
+ self.add_event_handler('session_start', self._cert_expiration)
+
+ def use_signals(self, signals=None):
+ """Register signal handlers for ``SIGHUP`` and ``SIGTERM``.
+
+ By using signals, a ``'killed'`` event will be raised when the
+ application is terminated.
+
+ If a signal handler already existed, it will be executed first,
+ before the ``'killed'`` event is raised.
+
+ :param list signals: A list of signal names to be monitored.
+ Defaults to ``['SIGHUP', 'SIGTERM']``.
+ """
+ if signals is None:
+ signals = ['SIGHUP', 'SIGTERM']
+
+ existing_handlers = {}
+ for sig_name in signals:
+ if hasattr(signal, sig_name):
+ sig = getattr(signal, sig_name)
+ handler = signal.getsignal(sig)
+ if handler:
+ existing_handlers[sig] = handler
+
+ def handle_kill(signum, frame):
+ """
+ Capture kill event and disconnect cleanly after first
+ spawning the ``'killed'`` event.
+ """
+
+ if signum in existing_handlers and \
+ existing_handlers[signum] != handle_kill:
+ existing_handlers[signum](signum, frame)
+
+ self.event("killed", direct=True)
+ self.disconnect()
+
+ try:
+ for sig_name in signals:
+ if hasattr(signal, sig_name):
+ sig = getattr(signal, sig_name)
+ signal.signal(sig, handle_kill)
+ self.__signals_installed = True
+ except:
+ log.debug("Can not set interrupt signal handlers. " + \
+ "Slixmpp is not running from a main thread.")
+
+ def new_id(self):
+ """Generate and return a new stream ID in hexadecimal form.
+
+ Many stanzas, handlers, or matchers may require unique
+ ID values. Using this method ensures that all new ID values
+ are unique in this stream.
+ """
+ with self._id_lock:
+ self._id += 1
+ return self.get_id()
+
+ def get_id(self):
+ """Return the current unique stream ID in hexadecimal form."""
+ return "%s%X" % (self._id_prefix, self._id)
+
+ def connect(self, host='', port=0, use_ssl=False,
+ use_tls=True, reattempt=True):
+ """Create a new socket and connect to the server.
+
+ Setting ``reattempt`` to ``True`` will cause connection
+ attempts to be made with an exponential backoff delay (max of
+ :attr:`reconnect_max_delay` which defaults to 10 minute) until a
+ successful connection is established.
+
+ :param host: The name of the desired server for the connection.
+ :param port: Port to connect to on the server.
+ :param use_ssl: Flag indicating if SSL should be used by connecting
+ directly to a port using SSL.
+ :param use_tls: Flag indicating if TLS should be used, allowing for
+ connecting to a port without using SSL immediately and
+ later upgrading the connection.
+ :param reattempt: Flag indicating if the socket should reconnect
+ after disconnections.
+ """
+ self.stop.clear()
+
+ if host and port:
+ self.address = (host, int(port))
+ try:
+ Socket.inet_aton(self.address[0])
+ except (Socket.error, ssl.SSLError):
+ self.default_domain = self.address[0]
+
+ # Respect previous SSL and TLS usage directives.
+ if use_ssl is not None:
+ self.use_ssl = use_ssl
+ if use_tls is not None:
+ self.use_tls = use_tls
+
+ # Repeatedly attempt to connect until a successful connection
+ # is established.
+ attempts = self.reconnect_max_attempts
+ connected = self.state.transition('disconnected', 'connected',
+ func=self._connect,
+ args=(reattempt,))
+ while reattempt and not connected and not self.stop.is_set():
+ connected = self.state.transition('disconnected', 'connected',
+ func=self._connect)
+ if not connected:
+ if attempts is not None:
+ attempts -= 1
+ if attempts <= 0:
+ self.event('connection_failed', direct=True)
+ return False
+ return connected
+
+ def _connect(self, reattempt=True):
+ self.scheduler.remove('Session timeout check')
+
+ if self.reconnect_delay is None or not reattempt:
+ delay = 1.0
+ else:
+ delay = min(self.reconnect_delay * 2, self.reconnect_max_delay)
+ delay = random.normalvariate(delay, delay * 0.1)
+ log.debug('Waiting %s seconds before connecting.', delay)
+ elapsed = 0
+ try:
+ while elapsed < delay and not self.stop.is_set():
+ time.sleep(0.1)
+ elapsed += 0.1
+ except KeyboardInterrupt:
+ self.set_stop()
+ return False
+ except SystemExit:
+ self.set_stop()
+ return False
+
+ if self.default_domain:
+ try:
+ host, address, port = self.pick_dns_answer(self.default_domain,
+ self.address[1])
+ self.address = (address, port)
+ self._service_name = host
+ except StopIteration:
+ log.debug("No remaining DNS records to try.")
+ self.dns_answers = None
+ if reattempt:
+ self.reconnect_delay = delay
+ return False
+
+ af = Socket.AF_INET
+ proto = 'IPv4'
+ if ':' in self.address[0]:
+ af = Socket.AF_INET6
+ proto = 'IPv6'
+ try:
+ self.socket = self.socket_class(af, Socket.SOCK_STREAM)
+ except Socket.error:
+ log.debug("Could not connect using %s", proto)
+ return False
+
+ self.configure_socket()
+
+ if self.use_proxy:
+ connected = self._connect_proxy()
+ if not connected:
+ if reattempt:
+ self.reconnect_delay = delay
+ return False
+
+ if self.use_ssl:
+ log.debug("Socket Wrapped for SSL")
+ if self.ca_certs is None:
+ cert_policy = ssl.CERT_NONE
+ else:
+ cert_policy = ssl.CERT_REQUIRED
+
+ ssl_args = safedict({
+ 'certfile': self.certfile,
+ 'keyfile': self.keyfile,
+ 'ca_certs': self.ca_certs,
+ 'cert_reqs': cert_policy,
+ 'do_handshake_on_connect': False
+ })
+
+ if sys.version_info >= (2, 7):
+ ssl_args['ciphers'] = self.ciphers
+
+ ssl_socket = ssl.wrap_socket(self.socket, **ssl_args)
+
+ if hasattr(self.socket, 'socket'):
+ # We are using a testing socket, so preserve the top
+ # layer of wrapping.
+ self.socket.socket = ssl_socket
+ else:
+ self.socket = ssl_socket
+
+ try:
+ if not self.use_proxy:
+ domain = self.address[0]
+ if ':' in domain:
+ domain = '[%s]' % domain
+ log.debug("Connecting to %s:%s", domain, self.address[1])
+ self.socket.connect(self.address)
+
+ if self.use_ssl:
+ try:
+ self.socket.do_handshake()
+ except (Socket.error, ssl.SSLError):
+ log.error('CERT: Invalid certificate trust chain.')
+ if not self.event_handled('ssl_invalid_chain'):
+ self.disconnect(self.auto_reconnect,
+ send_close=False)
+ else:
+ self.event('ssl_invalid_chain', direct=True)
+ return False
+
+ self._der_cert = self.socket.getpeercert(binary_form=True)
+ pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert)
+ log.debug('CERT: %s', pem_cert)
+
+ self.event('ssl_cert', pem_cert, direct=True)
+ try:
+ cert.verify(self._expected_server_name, self._der_cert)
+ except cert.CertificateError as err:
+ if not self.event_handled('ssl_invalid_cert'):
+ log.error(err)
+ self.disconnect(send_close=False)
+ else:
+ self.event('ssl_invalid_cert',
+ pem_cert,
+ direct=True)
+
+ self.set_socket(self.socket, ignore=True)
+ #this event is where you should set your application state
+ self.event('connected', direct=True)
+ return True
+ except (Socket.error, ssl.SSLError) as serr:
+ error_msg = "Could not connect to %s:%s. Socket Error #%s: %s"
+ self.event('socket_error', serr, direct=True)
+ domain = self.address[0]
+ if ':' in domain:
+ domain = '[%s]' % domain
+ log.error(error_msg, domain, self.address[1],
+ serr.errno, serr.strerror)
+ return False
+
+ def _connect_proxy(self):
+ """Attempt to connect using an HTTP Proxy."""
+
+ # Extract the proxy address, and optional credentials
+ address = (self.proxy_config['host'], int(self.proxy_config['port']))
+ cred = None
+ if self.proxy_config['username']:
+ username = self.proxy_config['username']
+ password = self.proxy_config['password']
+
+ cred = '%s:%s' % (username, password)
+ if sys.version_info < (3, 0):
+ cred = bytes(cred)
+ else:
+ cred = bytes(cred, 'utf-8')
+ cred = base64.b64encode(cred).decode('utf-8')
+
+ # Build the HTTP headers for connecting to the XMPP server
+ headers = ['CONNECT %s:%s HTTP/1.0' % self.address,
+ 'Host: %s:%s' % self.address,
+ 'Proxy-Connection: Keep-Alive',
+ 'Pragma: no-cache',
+ 'User-Agent: Slixmpp/%s' % slixmpp.__version__]
+ if cred:
+ headers.append('Proxy-Authorization: Basic %s' % cred)
+ headers = '\r\n'.join(headers) + '\r\n\r\n'
+
+ try:
+ log.debug("Connecting to proxy: %s:%s", *address)
+ self.socket.connect(address)
+ self.send_raw(headers, now=True)
+ resp = ''
+ while '\r\n\r\n' not in resp and not self.stop.is_set():
+ resp += self.socket.recv(1024).decode('utf-8')
+ log.debug('RECV: %s', resp)
+
+ lines = resp.split('\r\n')
+ if '200' not in lines[0]:
+ self.event('proxy_error', resp)
+ self.event('connection_failed', direct=True)
+ log.error('Proxy Error: %s', lines[0])
+ return False
+
+ # Proxy connection established, continue connecting
+ # with the XMPP server.
+ return True
+ except (Socket.error, ssl.SSLError) as serr:
+ error_msg = "Could not connect to %s:%s. Socket Error #%s: %s"
+ self.event('socket_error', serr, direct=True)
+ log.error(error_msg, self.address[0], self.address[1],
+ serr.errno, serr.strerror)
+ return False
+
+ def _session_timeout_check(self, event=None):
+ """
+ Add check to ensure that a session is established within
+ a reasonable amount of time.
+ """
+
+ def _handle_session_timeout():
+ if not self.session_started_event.is_set():
+ log.debug("Session start has taken more " + \
+ "than %d seconds", self.session_timeout)
+ self.disconnect(reconnect=self.auto_reconnect)
+
+ self.schedule("Session timeout check",
+ self.session_timeout,
+ _handle_session_timeout)
+
+ def disconnect(self, reconnect=False, wait=None, send_close=True):
+ """Terminate processing and close the XML streams.
+
+ Optionally, the connection may be reconnected and
+ resume processing afterwards.
+
+ If the disconnect should take place after all items
+ in the send queue have been sent, use ``wait=True``.
+
+ .. warning::
+
+ If you are constantly adding items to the queue
+ such that it is never empty, then the disconnect will
+ not occur and the call will continue to block.
+
+ :param reconnect: Flag indicating if the connection
+ and processing should be restarted.
+ Defaults to ``False``.
+ :param wait: Flag indicating if the send queue should
+ be emptied before disconnecting, overriding
+ :attr:`disconnect_wait`.
+ :param send_close: Flag indicating if the stream footer
+ should be sent before terminating the
+ connection. Setting this to ``False``
+ prevents error loops when trying to
+ disconnect after a socket error.
+ """
+ self.state.transition('connected', 'disconnected',
+ wait=2.0,
+ func=self._disconnect,
+ args=(reconnect, wait, send_close))
+
+ def _disconnect(self, reconnect=False, wait=None, send_close=True):
+ if not reconnect:
+ self.auto_reconnect = False
+
+ if self.end_session_on_disconnect or send_close:
+ self.event('session_end', direct=True)
+
+ # Wait for the send queue to empty.
+ if wait is not None:
+ if wait:
+ self.send_queue.join()
+ elif self.disconnect_wait:
+ self.send_queue.join()
+
+ # Clearing this event will pause the send loop.
+ self.session_started_event.clear()
+
+ self.__failed_send_stanza = None
+
+ # Send the end of stream marker.
+ if send_close:
+ self.send_raw(self.stream_footer, now=True)
+
+ # Wait for confirmation that the stream was
+ # closed in the other direction. If we didn't
+ # send a stream footer we don't need to wait
+ # since the server won't know to respond.
+ if send_close:
+ log.info('Waiting for %s from server', self.stream_footer)
+ self.stream_end_event.wait(4)
+ else:
+ self.stream_end_event.set()
+
+ if not self.auto_reconnect:
+ self.set_stop()
+ if self._disconnect_wait_for_threads:
+ self._wait_for_threads()
+
+ try:
+ self.socket.shutdown(Socket.SHUT_RDWR)
+ self.socket.close()
+ self.filesocket.close()
+ except (Socket.error, ssl.SSLError) as serr:
+ self.event('socket_error', serr, direct=True)
+ finally:
+ #clear your application state
+ self.event('disconnected', direct=True)
+ return True
+
+ def abort(self):
+ self.session_started_event.clear()
+ self.set_stop()
+ if self._disconnect_wait_for_threads:
+ self._wait_for_threads()
+ try:
+ self.socket.shutdown(Socket.SHUT_RDWR)
+ self.socket.close()
+ self.filesocket.close()
+ except Socket.error:
+ pass
+ self.state.transition_any(['connected', 'disconnected'], 'disconnected', func=lambda: True)
+ self.event("killed", direct=True)
+
+ def reconnect(self, reattempt=True, wait=False, send_close=True):
+ """Reset the stream's state and reconnect to the server."""
+ log.debug("reconnecting...")
+ if self.state.ensure('connected'):
+ self.state.transition('connected', 'disconnected',
+ wait=2.0,
+ func=self._disconnect,
+ args=(True, wait, send_close))
+
+ attempts = self.reconnect_max_attempts
+
+ log.debug("connecting...")
+ connected = self.state.transition('disconnected', 'connected',
+ wait=2.0,
+ func=self._connect,
+ args=(reattempt,))
+ while reattempt and not connected and not self.stop.is_set():
+ connected = self.state.transition('disconnected', 'connected',
+ wait=2.0, func=self._connect)
+ connected = connected or self.state.ensure('connected')
+ if not connected:
+ if attempts is not None:
+ attempts -= 1
+ if attempts <= 0:
+ self.event('connection_failed', direct=True)
+ return False
+ return connected
+
+ def set_socket(self, socket, ignore=False):
+ """Set the socket to use for the stream.
+
+ The filesocket will be recreated as well.
+
+ :param socket: The new socket object to use.
+ :param bool ignore: If ``True``, don't set the connection
+ state to ``'connected'``.
+ """
+ self.socket = socket
+ if socket is not None:
+ # ElementTree.iterparse requires a file.
+ # 0 buffer files have to be binary.
+
+ # Use the correct fileobject type based on the Python
+ # version to work around a broken implementation in
+ # Python 2.x.
+ if sys.version_info < (3, 0):
+ self.filesocket = FileSocket(self.socket)
+ else:
+ self.filesocket = self.socket.makefile('rb', 0)
+ if not ignore:
+ self.state._set_state('connected')
+
+ def configure_socket(self):
+ """Set timeout and other options for self.socket.
+
+ Meant to be overridden.
+ """
+ self.socket.settimeout(None)
+
+ def configure_dns(self, resolver, domain=None, port=None):
+ """
+ Configure and set options for a :class:`~dns.resolver.Resolver`
+ instance, and other DNS related tasks. For example, you
+ can also check :meth:`~socket.socket.getaddrinfo` to see
+ if you need to call out to ``libresolv.so.2`` to
+ run ``res_init()``.
+
+ Meant to be overridden.
+
+ :param resolver: A :class:`~dns.resolver.Resolver` instance
+ or ``None`` if ``dnspython`` is not installed.
+ :param domain: The initial domain under consideration.
+ :param port: The initial port under consideration.
+ """
+ pass
+
+ def start_tls(self):
+ """Perform handshakes for TLS.
+
+ If the handshake is successful, the XML stream will need
+ to be restarted.
+ """
+ log.info("Negotiating TLS")
+ ssl_versions = {3: 'TLS 1.0', 1: 'SSL 3', 2: 'SSL 2/3'}
+ log.info("Using SSL version: %s", ssl_versions[self.ssl_version])
+ if self.ca_certs is None:
+ cert_policy = ssl.CERT_NONE
+ else:
+ cert_policy = ssl.CERT_REQUIRED
+
+ ssl_args = safedict({
+ 'certfile': self.certfile,
+ 'keyfile': self.keyfile,
+ 'ca_certs': self.ca_certs,
+ 'cert_reqs': cert_policy,
+ 'do_handshake_on_connect': False
+ })
+
+ if sys.version_info >= (2, 7):
+ ssl_args['ciphers'] = self.ciphers
+
+ ssl_socket = ssl.wrap_socket(self.socket, **ssl_args);
+
+ if hasattr(self.socket, 'socket'):
+ # We are using a testing socket, so preserve the top
+ # layer of wrapping.
+ self.socket.socket = ssl_socket
+ else:
+ self.socket = ssl_socket
+
+ try:
+ self.socket.do_handshake()
+ except (Socket.error, ssl.SSLError):
+ log.error('CERT: Invalid certificate trust chain.')
+ if not self.event_handled('ssl_invalid_chain'):
+ self.disconnect(self.auto_reconnect, send_close=False)
+ else:
+ self._der_cert = self.socket.getpeercert(binary_form=True)
+ self.event('ssl_invalid_chain', direct=True)
+ return False
+
+ self._der_cert = self.socket.getpeercert(binary_form=True)
+ pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert)
+ log.debug('CERT: %s', pem_cert)
+ self.event('ssl_cert', pem_cert, direct=True)
+
+ try:
+ cert.verify(self._expected_server_name, self._der_cert)
+ except cert.CertificateError as err:
+ if not self.event_handled('ssl_invalid_cert'):
+ log.error(err)
+ self.disconnect(self.auto_reconnect, send_close=False)
+ else:
+ self.event('ssl_invalid_cert', pem_cert, direct=True)
+
+ self.set_socket(self.socket)
+ return True
+
+ def _cert_expiration(self, event):
+ """Schedule an event for when the TLS certificate expires."""
+
+ if not self.use_tls and not self.use_ssl:
+ return
+
+ if not self._der_cert:
+ log.warn("TLS or SSL was enabled, but no certificate was found.")
+ return
+
+ def restart():
+ if not self.event_handled('ssl_expired_cert'):
+ log.warn("The server certificate has expired. Restarting.")
+ self.reconnect()
+ else:
+ pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert)
+ self.event('ssl_expired_cert', pem_cert)
+
+ cert_ttl = cert.get_ttl(self._der_cert)
+ if cert_ttl is None:
+ return
+
+ if cert_ttl.days < 0:
+ log.warn('CERT: Certificate has expired.')
+ restart()
+
+ try:
+ total_seconds = cert_ttl.total_seconds()
+ except AttributeError:
+ # for Python < 2.7
+ total_seconds = (cert_ttl.microseconds + (cert_ttl.seconds + cert_ttl.days * 24 * 3600) * 10**6) / 10**6
+
+ log.info('CERT: Time until certificate expiration: %s' % cert_ttl)
+ self.schedule('Certificate Expiration',
+ total_seconds,
+ restart)
+
+ def _start_keepalive(self, event):
+ """Begin sending whitespace periodically to keep the connection alive.
+
+ May be disabled by setting::
+
+ self.whitespace_keepalive = False
+
+ The keepalive interval can be set using::
+
+ self.whitespace_keepalive_interval = 300
+ """
+ self.schedule('Whitespace Keepalive',
+ self.whitespace_keepalive_interval,
+ self.send_raw,
+ args=(' ',),
+ kwargs={'now': True},
+ repeat=True)
+
+ def _remove_schedules(self, event):
+ """Remove whitespace keepalive and certificate expiration schedules."""
+ self.scheduler.remove('Whitespace Keepalive')
+ self.scheduler.remove('Certificate Expiration')
+
+ def start_stream_handler(self, xml):
+ """Perform any initialization actions, such as handshakes,
+ once the stream header has been sent.
+
+ Meant to be overridden.
+ """
+ pass
+
+ def register_stanza(self, stanza_class):
+ """Add a stanza object class as a known root stanza.
+
+ A root stanza is one that appears as a direct child of the stream's
+ root element.
+
+ Stanzas that appear as substanzas of a root stanza do not need to
+ be registered here. That is done using register_stanza_plugin() from
+ slixmpp.xmlstream.stanzabase.
+
+ Stanzas that are not registered will not be converted into
+ stanza objects, but may still be processed using handlers and
+ matchers.
+
+ :param stanza_class: The top-level stanza object's class.
+ """
+ self.__root_stanza.append(stanza_class)
+
+ def remove_stanza(self, stanza_class):
+ """Remove a stanza from being a known root stanza.
+
+ A root stanza is one that appears as a direct child of the stream's
+ root element.
+
+ Stanzas that are not registered will not be converted into
+ stanza objects, but may still be processed using handlers and
+ matchers.
+ """
+ self.__root_stanza.remove(stanza_class)
+
+ def add_filter(self, mode, handler, order=None):
+ """Add a filter for incoming or outgoing stanzas.
+
+ These filters are applied before incoming stanzas are
+ passed to any handlers, and before outgoing stanzas
+ are put in the send queue.
+
+ Each filter must accept a single stanza, and return
+ either a stanza or ``None``. If the filter returns
+ ``None``, then the stanza will be dropped from being
+ processed for events or from being sent.
+
+ :param mode: One of ``'in'`` or ``'out'``.
+ :param handler: The filter function.
+ :param int order: The position to insert the filter in
+ the list of active filters.
+ """
+ if order:
+ self.__filters[mode].insert(order, handler)
+ else:
+ self.__filters[mode].append(handler)
+
+ def del_filter(self, mode, handler):
+ """Remove an incoming or outgoing filter."""
+ self.__filters[mode].remove(handler)
+
+ def add_handler(self, mask, pointer, name=None, disposable=False,
+ threaded=False, filter=False, instream=False):
+ """A shortcut method for registering a handler using XML masks.
+
+ The use of :meth:`register_handler()` is preferred.
+
+ :param mask: An XML snippet matching the structure of the
+ stanzas that will be passed to this handler.
+ :param pointer: The handler function itself.
+ :parm name: A unique name for the handler. A name will
+ be generated if one is not provided.
+ :param disposable: Indicates if the handler should be discarded
+ after one use.
+ :param threaded: **DEPRECATED**.
+ Remains for backwards compatibility.
+ :param filter: **DEPRECATED**.
+ Remains for backwards compatibility.
+ :param instream: Indicates if the handler should execute during
+ stream processing and not during normal event
+ processing.
+ """
+ # To prevent circular dependencies, we must load the matcher
+ # and handler classes here.
+
+ if name is None:
+ name = 'add_handler_%s' % self.new_id()
+ self.register_handler(
+ XMLCallback(name,
+ MatchXMLMask(mask, self.default_ns),
+ pointer,
+ once=disposable,
+ instream=instream))
+
+ def register_handler(self, handler, before=None, after=None):
+ """Add a stream event handler that will be executed when a matching
+ stanza is received.
+
+ :param handler:
+ The :class:`~slixmpp.xmlstream.handler.base.BaseHandler`
+ derived object to execute.
+ """
+ if handler.stream is None:
+ self.__handlers.append(handler)
+ handler.stream = weakref.ref(self)
+
+ def remove_handler(self, name):
+ """Remove any stream event handlers with the given name.
+
+ :param name: The name of the handler.
+ """
+ idx = 0
+ for handler in self.__handlers:
+ if handler.name == name:
+ self.__handlers.pop(idx)
+ return True
+ idx += 1
+ return False
+
+ def get_dns_records(self, domain, port=None):
+ """Get the DNS records for a domain.
+
+ :param domain: The domain in question.
+ :param port: If the results don't include a port, use this one.
+ """
+ if port is None:
+ port = self.default_port
+
+ resolver = default_resolver()
+ self.configure_dns(resolver, domain=domain, port=port)
+
+ return resolve(domain, port, service=self.dns_service,
+ resolver=resolver,
+ use_ipv6=self.use_ipv6,
+ use_dnspython=self.use_dnspython)
+
+ def pick_dns_answer(self, domain, port=None):
+ """Pick a server and port from DNS answers.
+
+ Gets DNS answers if none available.
+ Removes used answer from available answers.
+
+ :param domain: The domain in question.
+ :param port: If the results don't include a port, use this one.
+ """
+ if not self.dns_answers:
+ self.dns_answers = self.get_dns_records(domain, port)
+
+ if sys.version_info < (3, 0):
+ return self.dns_answers.next()
+ else:
+ return next(self.dns_answers)
+
+ def add_event_handler(self, name, pointer,
+ threaded=False, disposable=False):
+ """Add a custom event handler that will be executed whenever
+ its event is manually triggered.
+
+ :param name: The name of the event that will trigger
+ this handler.
+ :param pointer: The function to execute.
+ :param threaded: If set to ``True``, the handler will execute
+ in its own thread. Defaults to ``False``.
+ :param disposable: If set to ``True``, the handler will be
+ discarded after one use. Defaults to ``False``.
+ """
+ if not name in self.__event_handlers:
+ self.__event_handlers[name] = []
+ self.__event_handlers[name].append((pointer, threaded, disposable))
+
+ def del_event_handler(self, name, pointer):
+ """Remove a function as a handler for an event.
+
+ :param name: The name of the event.
+ :param pointer: The function to remove as a handler.
+ """
+ if not name in self.__event_handlers:
+ return
+
+ # Need to keep handlers that do not use
+ # the given function pointer
+ def filter_pointers(handler):
+ return handler[0] != pointer
+
+ self.__event_handlers[name] = list(filter(
+ filter_pointers,
+ self.__event_handlers[name]))
+
+ def event_handled(self, name):
+ """Returns the number of registered handlers for an event.
+
+ :param name: The name of the event to check.
+ """
+ return len(self.__event_handlers.get(name, []))
+
+ def event(self, name, data={}, direct=False):
+ """Manually trigger a custom event.
+
+ :param name: The name of the event to trigger.
+ :param data: Data that will be passed to each event handler.
+ Defaults to an empty dictionary, but is usually
+ a stanza object.
+ :param direct: Runs the event directly if True, skipping the
+ event queue. All event handlers will run in the
+ same thread.
+ """
+ log.debug("Event triggered: " + name)
+
+ handlers = self.__event_handlers.get(name, [])
+ for handler in handlers:
+ #TODO: Data should not be copied, but should be read only,
+ # but this might break current code so it's left for future.
+
+ out_data = copy.copy(data) if len(handlers) > 1 else data
+ old_exception = getattr(data, 'exception', None)
+ if direct:
+ try:
+ handler[0](out_data)
+ except Exception as e:
+ error_msg = 'Error processing event handler: %s'
+ log.exception(error_msg, str(handler[0]))
+ if old_exception:
+ old_exception(e)
+ else:
+ self.exception(e)
+ else:
+ self.event_queue.put(('event', handler, out_data))
+ if handler[2]:
+ # If the handler is disposable, we will go ahead and
+ # remove it now instead of waiting for it to be
+ # processed in the queue.
+ with self.__event_handlers_lock:
+ try:
+ h_index = self.__event_handlers[name].index(handler)
+ self.__event_handlers[name].pop(h_index)
+ except:
+ pass
+
+ def schedule(self, name, seconds, callback, args=None,
+ kwargs=None, repeat=False):
+ """Schedule a callback function to execute after a given delay.
+
+ :param name: A unique name for the scheduled callback.
+ :param seconds: The time in seconds to wait before executing.
+ :param callback: A pointer to the function to execute.
+ :param args: A tuple of arguments to pass to the function.
+ :param kwargs: A dictionary of keyword arguments to pass to
+ the function.
+ :param repeat: Flag indicating if the scheduled event should
+ be reset and repeat after executing.
+ """
+ self.scheduler.add(name, seconds, callback, args, kwargs,
+ repeat, qpointer=self.event_queue)
+
+ def incoming_filter(self, xml):
+ """Filter incoming XML objects before they are processed.
+
+ Possible uses include remapping namespaces, or correcting elements
+ from sources with incorrect behavior.
+
+ Meant to be overridden.
+ """
+ return xml
+
+ def send(self, data, mask=None, timeout=None, now=False, use_filters=True):
+ """A wrapper for :meth:`send_raw()` for sending stanza objects.
+
+ May optionally block until an expected response is received.
+
+ :param data: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase`
+ stanza to send on the stream.
+ :param mask: **DEPRECATED**
+ An XML string snippet matching the structure
+ of the expected response. Execution will block
+ in this thread until the response is received
+ or a timeout occurs.
+ :param int timeout: Time in seconds to wait for a response before
+ continuing. Defaults to :attr:`response_timeout`.
+ :param bool now: Indicates if the send queue should be skipped,
+ sending the stanza immediately. Useful mainly
+ for stream initialization stanzas.
+ Defaults to ``False``.
+ :param bool use_filters: Indicates if outgoing filters should be
+ applied to the given stanza data. Disabling
+ filters is useful when resending stanzas.
+ Defaults to ``True``.
+ """
+ if timeout is None:
+ timeout = self.response_timeout
+ if hasattr(mask, 'xml'):
+ mask = mask.xml
+
+ if isinstance(data, ElementBase):
+ if use_filters:
+ for filter in self.__filters['out']:
+ data = filter(data)
+ if data is None:
+ return
+
+ if mask is not None:
+ log.warning("Use of send mask waiters is deprecated.")
+ wait_for = Waiter("SendWait_%s" % self.new_id(),
+ MatchXMLMask(mask))
+ self.register_handler(wait_for)
+
+ if isinstance(data, ElementBase):
+ with self.send_queue_lock:
+ if use_filters:
+ for filter in self.__filters['out_sync']:
+ data = filter(data)
+ if data is None:
+ return
+ str_data = tostring(data.xml, xmlns=self.default_ns,
+ stream=self,
+ top_level=True)
+ self.send_raw(str_data, now)
+ else:
+ self.send_raw(data, now)
+ if mask is not None:
+ return wait_for.wait(timeout)
+
+ def send_xml(self, data, mask=None, timeout=None, now=False):
+ """Send an XML object on the stream, and optionally wait
+ for a response.
+
+ :param data: The :class:`~xml.etree.ElementTree.Element` XML object
+ to send on the stream.
+ :param mask: **DEPRECATED**
+ An XML string snippet matching the structure
+ of the expected response. Execution will block
+ in this thread until the response is received
+ or a timeout occurs.
+ :param int timeout: Time in seconds to wait for a response before
+ continuing. Defaults to :attr:`response_timeout`.
+ :param bool now: Indicates if the send queue should be skipped,
+ sending the stanza immediately. Useful mainly
+ for stream initialization stanzas.
+ Defaults to ``False``.
+ """
+ if timeout is None:
+ timeout = self.response_timeout
+ return self.send(tostring(data), mask, timeout, now)
+
+ def send_raw(self, data, now=False, reconnect=None):
+ """Send raw data across the stream.
+
+ :param string data: Any string value.
+ :param bool reconnect: Indicates if the stream should be
+ restarted if there is an error sending
+ the stanza. Used mainly for testing.
+ Defaults to :attr:`auto_reconnect`.
+ """
+ if now:
+ log.debug("SEND (IMMED): %s", data)
+ try:
+ data = data.encode('utf-8')
+ total = len(data)
+ sent = 0
+ count = 0
+ tries = 0
+ with self.send_lock:
+ while sent < total and not self.stop.is_set():
+ try:
+ sent += self.socket.send(data[sent:])
+ count += 1
+ except Socket.error as serr:
+ if serr.errno != errno.EINTR:
+ raise
+ except ssl.SSLError as serr:
+ if tries >= self.ssl_retry_max:
+ log.debug('SSL error: max retries reached')
+ self.exception(serr)
+ log.warning("Failed to send %s", data)
+ if reconnect is None:
+ reconnect = self.auto_reconnect
+ if not self.stop.is_set():
+ self.disconnect(reconnect,
+ send_close=False)
+ log.warning('SSL write error: retrying')
+ if not self.stop.is_set():
+ time.sleep(self.ssl_retry_delay)
+ tries += 1
+ if count > 1:
+ log.debug('SENT: %d chunks', count)
+ except (Socket.error, ssl.SSLError) as serr:
+ self.event('socket_error', serr, direct=True)
+ log.warning("Failed to send %s", data)
+ if reconnect is None:
+ reconnect = self.auto_reconnect
+ if not self.stop.is_set():
+ self.disconnect(reconnect, send_close=False)
+ else:
+ self.send_queue.put(data)
+ return True
+
+ def _start_thread(self, name, target, track=True):
+ self.__thread[name] = threading.Thread(name=name, target=target)
+ self.__thread[name].daemon = self._use_daemons
+ self.__thread[name].start()
+
+ if track:
+ self.__active_threads.add(name)
+ with self.__thread_cond:
+ self.__thread_count += 1
+
+ def _end_thread(self, name, early=False):
+ with self.__thread_cond:
+ curr_thread = threading.current_thread().name
+ if curr_thread in self.__active_threads:
+ self.__thread_count -= 1
+ self.__active_threads.remove(curr_thread)
+
+ if early:
+ log.debug('Threading deadlock prevention!')
+ log.debug(("Marked %s thread as ended due to " + \
+ "disconnect() call. %s threads remain.") % (
+ name, self.__thread_count))
+ else:
+ log.debug("Stopped %s thread. %s threads remain." % (
+ name, self.__thread_count))
+
+ else:
+ log.debug(("Finished exiting %s thread after early " + \
+ "termination from disconnect() call. " + \
+ "%s threads remain.") % (
+ name, self.__thread_count))
+
+ if self.__thread_count == 0:
+ self.__thread_cond.notify()
+
+ def set_stop(self):
+ self.stop.set()
+
+ # Unlock queues
+ self.event_queue.put(None)
+ self.send_queue.put(None)
+
+ def _wait_for_threads(self):
+ with self.__thread_cond:
+ if self.__thread_count != 0:
+ log.debug("Waiting for %s threads to exit." %
+ self.__thread_count)
+ name = threading.current_thread().name
+ if name in self.__thread:
+ self._end_thread(name, early=True)
+ self.__thread_cond.wait(4)
+ if self.__thread_count != 0:
+ log.error("Hanged threads: %s" % threading.enumerate())
+ log.error("This may be due to calling disconnect() " + \
+ "from a non-threaded event handler. Be " + \
+ "sure that event handlers that call " + \
+ "disconnect() are registered using: " + \
+ "add_event_handler(..., threaded=True)")
+
+ def process(self, **kwargs):
+ """Initialize the XML streams and begin processing events.
+
+ The number of threads used for processing stream events is determined
+ by :data:`HANDLER_THREADS`.
+
+ :param bool block: If ``False``, then event dispatcher will run
+ in a separate thread, allowing for the stream to be
+ used in the background for another application.
+ Otherwise, ``process(block=True)`` blocks the current
+ thread. Defaults to ``False``.
+ :param bool threaded: **DEPRECATED**
+ If ``True``, then event dispatcher will run
+ in a separate thread, allowing for the stream to be
+ used in the background for another application.
+ Defaults to ``True``. This does **not** mean that no
+ threads are used at all if ``threaded=False``.
+
+ Regardless of these threading options, these threads will
+ always exist:
+
+ - The event queue processor
+ - The send queue processor
+ - The scheduler
+ """
+ if 'threaded' in kwargs and 'block' in kwargs:
+ raise ValueError("process() called with both " + \
+ "block and threaded arguments")
+ elif 'block' in kwargs:
+ threaded = not(kwargs.get('block', False))
+ else:
+ threaded = kwargs.get('threaded', True)
+
+ for t in range(0, HANDLER_THREADS):
+ log.debug("Starting HANDLER THREAD")
+ self._start_thread('event_thread_%s' % t, self._event_runner)
+
+ self._start_thread('send_thread', self._send_thread)
+ self._start_thread('scheduler_thread', self._scheduler_thread)
+
+ if threaded:
+ # Run the XML stream in the background for another application.
+ self._start_thread('read_thread', self._process, track=False)
+ else:
+ self._process()
+
+ def _process(self):
+ """Start processing the XML streams.
+
+ Processing will continue after any recoverable errors
+ if reconnections are allowed.
+ """
+
+ # The body of this loop will only execute once per connection.
+ # Additional passes will be made only if an error occurs and
+ # reconnecting is permitted.
+ while True:
+ shutdown = False
+ try:
+ # The call to self.__read_xml will block and prevent
+ # the body of the loop from running until a disconnect
+ # occurs. After any reconnection, the stream header will
+ # be resent and processing will resume.
+ while not self.stop.is_set():
+ # Only process the stream while connected to the server
+ if not self.state.ensure('connected', wait=0.1):
+ break
+ # Ensure the stream header is sent for any
+ # new connections.
+ if not self.session_started_event.is_set():
+ self.send_raw(self.stream_header, now=True)
+ if not self.__read_xml():
+ # If the server terminated the stream, end processing
+ break
+ except KeyboardInterrupt:
+ log.debug("Keyboard Escape Detected in _process")
+ self.event('killed', direct=True)
+ shutdown = True
+ except SystemExit:
+ log.debug("SystemExit in _process")
+ shutdown = True
+ except (SyntaxError, ExpatError) as e:
+ log.error("Error reading from XML stream.")
+ self.exception(e)
+ except (Socket.error, ssl.SSLError) as serr:
+ self.event('socket_error', serr, direct=True)
+ log.error('Socket Error #%s: %s', serr.errno, serr.strerror)
+ except ValueError as e:
+ msg = e.message if hasattr(e, 'message') else e.args[0]
+
+ if 'I/O operation on closed file' in msg:
+ log.error('Can not read from closed socket.')
+ else:
+ self.exception(e)
+ except Exception as e:
+ if not self.stop.is_set():
+ log.error('Connection error.')
+ self.exception(e)
+
+ if not shutdown and not self.stop.is_set() \
+ and self.auto_reconnect:
+ self.reconnect()
+ else:
+ self.disconnect()
+ break
+
+ def __read_xml(self):
+ """Parse the incoming XML stream
+
+ Stream events are raised for each received stanza.
+ """
+ depth = 0
+ root = None
+ for event, xml in ET.iterparse(self.filesocket, (b'end', b'start')):
+ if event == b'start':
+ if depth == 0:
+ # We have received the start of the root element.
+ root = xml
+ log.debug('RECV: %s', tostring(root, xmlns=self.default_ns,
+ stream=self,
+ top_level=True,
+ open_only=True))
+ # Perform any stream initialization actions, such
+ # as handshakes.
+ self.stream_end_event.clear()
+ self.start_stream_handler(root)
+
+ # We have a successful stream connection, so reset
+ # exponential backoff for new reconnect attempts.
+ self.reconnect_delay = 1.0
+ depth += 1
+ if event == b'end':
+ depth -= 1
+ if depth == 0:
+ # The stream's root element has closed,
+ # terminating the stream.
+ log.debug("End of stream recieved")
+ self.stream_end_event.set()
+ return False
+ elif depth == 1:
+ # We only raise events for stanzas that are direct
+ # children of the root element.
+ try:
+ self.__spawn_event(xml)
+ except RestartStream:
+ return True
+ if root is not None:
+ # Keep the root element empty of children to
+ # save on memory use.
+ root.clear()
+ log.debug("Ending read XML loop")
+
+ def _build_stanza(self, xml, default_ns=None):
+ """Create a stanza object from a given XML object.
+
+ If a specialized stanza type is not found for the XML, then
+ a generic :class:`~slixmpp.xmlstream.stanzabase.StanzaBase`
+ stanza will be returned.
+
+ :param xml: The :class:`~xml.etree.ElementTree.Element` XML object
+ to convert into a stanza object.
+ :param default_ns: Optional default namespace to use instead of the
+ stream's current default namespace.
+ """
+ if default_ns is None:
+ default_ns = self.default_ns
+ stanza_type = StanzaBase
+ for stanza_class in self.__root_stanza:
+ if xml.tag == "{%s}%s" % (default_ns, stanza_class.name) or \
+ xml.tag == stanza_class.tag_name():
+ stanza_type = stanza_class
+ break
+ stanza = stanza_type(self, xml)
+ if stanza['lang'] is None and self.peer_default_lang:
+ stanza['lang'] = self.peer_default_lang
+ return stanza
+
+ def __spawn_event(self, xml):
+ """
+ Analyze incoming XML stanzas and convert them into stanza
+ objects if applicable and queue stream events to be processed
+ by matching handlers.
+
+ :param xml: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase`
+ stanza to analyze.
+ """
+ # Apply any preprocessing filters.
+ xml = self.incoming_filter(xml)
+
+ # Convert the raw XML object into a stanza object. If no registered
+ # stanza type applies, a generic StanzaBase stanza will be used.
+ stanza = self._build_stanza(xml)
+
+ for filter in self.__filters['in']:
+ if stanza is not None:
+ stanza = filter(stanza)
+ if stanza is None:
+ return
+
+ log.debug("RECV: %s", stanza)
+
+ # Match the stanza against registered handlers. Handlers marked
+ # to run "in stream" will be executed immediately; the rest will
+ # be queued.
+ unhandled = True
+ matched_handlers = [h for h in self.__handlers if h.match(stanza)]
+ for handler in matched_handlers:
+ if len(matched_handlers) > 1:
+ stanza_copy = copy.copy(stanza)
+ else:
+ stanza_copy = stanza
+ handler.prerun(stanza_copy)
+ self.event_queue.put(('stanza', handler, stanza_copy))
+ try:
+ if handler.check_delete():
+ self.__handlers.remove(handler)
+ except:
+ pass # not thread safe
+ unhandled = False
+
+ # Some stanzas require responses, such as Iq queries. A default
+ # handler will be executed immediately for this case.
+ if unhandled:
+ stanza.unhandled()
+
+ def _threaded_event_wrapper(self, func, args):
+ """Capture exceptions for event handlers that run
+ in individual threads.
+
+ :param func: The event handler to execute.
+ :param args: Arguments to the event handler.
+ """
+ # this is always already copied before this is invoked
+ orig = args[0]
+ try:
+ func(*args)
+ except Exception as e:
+ error_msg = 'Error processing event handler: %s'
+ log.exception(error_msg, str(func))
+ if hasattr(orig, 'exception'):
+ orig.exception(e)
+ else:
+ self.exception(e)
+
+ def _event_runner(self):
+ """Process the event queue and execute handlers.
+
+ The number of event runner threads is controlled by HANDLER_THREADS.
+
+ Stream event handlers will all execute in this thread. Custom event
+ handlers may be spawned in individual threads.
+ """
+ log.debug("Loading event runner")
+ try:
+ while not self.stop.is_set():
+ event = self.event_queue.get()
+ if event is None:
+ continue
+
+ etype, handler = event[0:2]
+ args = event[2:]
+ orig = copy.copy(args[0])
+
+ if etype == 'stanza':
+ try:
+ handler.run(args[0])
+ except Exception as e:
+ error_msg = 'Error processing stream handler: %s'
+ log.exception(error_msg, handler.name)
+ orig.exception(e)
+ elif etype == 'schedule':
+ name = args[2]
+ try:
+ log.debug('Scheduled event: %s: %s', name, args[0])
+ handler(*args[0], **args[1])
+ except Exception as e:
+ log.exception('Error processing scheduled task')
+ self.exception(e)
+ elif etype == 'event':
+ func, threaded, disposable = handler
+ try:
+ if threaded:
+ x = threading.Thread(
+ name="Event_%s" % str(func),
+ target=self._threaded_event_wrapper,
+ args=(func, args))
+ x.daemon = self._use_daemons
+ x.start()
+ else:
+ func(*args)
+ except Exception as e:
+ error_msg = 'Error processing event handler: %s'
+ log.exception(error_msg, str(func))
+ if hasattr(orig, 'exception'):
+ orig.exception(e)
+ else:
+ self.exception(e)
+ elif etype == 'quit':
+ log.debug("Quitting event runner thread")
+ break
+ except KeyboardInterrupt:
+ log.debug("Keyboard Escape Detected in _event_runner")
+ self.event('killed', direct=True)
+ self.disconnect()
+ except SystemExit:
+ self.disconnect()
+ self.event_queue.put(('quit', None, None))
+
+ self._end_thread('event runner')
+
+ def _send_thread(self):
+ """Extract stanzas from the send queue and send them on the stream."""
+ try:
+ while not self.stop.is_set():
+ while not self.stop.is_set() and \
+ not self.session_started_event.is_set():
+ self.session_started_event.wait(timeout=0.1) # Wait for session start
+ if self.__failed_send_stanza is not None:
+ data = self.__failed_send_stanza
+ self.__failed_send_stanza = None
+ else:
+ data = self.send_queue.get() # Wait for data to send
+ if data is None:
+ continue
+ log.debug("SEND: %s", data)
+ enc_data = data.encode('utf-8')
+ total = len(enc_data)
+ sent = 0
+ count = 0
+ tries = 0
+ try:
+ with self.send_lock:
+ while sent < total and not self.stop.is_set() and \
+ self.session_started_event.is_set():
+ try:
+ sent += self.socket.send(enc_data[sent:])
+ count += 1
+ except Socket.error as serr:
+ if serr.errno != errno.EINTR:
+ raise
+ except ssl.SSLError as serr:
+ if tries >= self.ssl_retry_max:
+ log.debug('SSL error: max retries reached')
+ self.exception(serr)
+ log.warning("Failed to send %s", data)
+ if not self.stop.is_set():
+ self.disconnect(self.auto_reconnect,
+ send_close=False)
+ log.warning('SSL write error: retrying')
+ if not self.stop.is_set():
+ time.sleep(self.ssl_retry_delay)
+ tries += 1
+ if count > 1:
+ log.debug('SENT: %d chunks', count)
+ self.send_queue.task_done()
+ except (Socket.error, ssl.SSLError) as serr:
+ self.event('socket_error', serr, direct=True)
+ log.warning("Failed to send %s", data)
+ if not self.stop.is_set():
+ self.__failed_send_stanza = data
+ self._end_thread('send')
+ self.disconnect(self.auto_reconnect, send_close=False)
+ return
+ except Exception as ex:
+ log.exception('Unexpected error in send thread: %s', ex)
+ self.exception(ex)
+ if not self.stop.is_set():
+ self._end_thread('send')
+ self.disconnect(self.auto_reconnect)
+ return
+
+ self._end_thread('send')
+
+ def _scheduler_thread(self):
+ self.scheduler.process(threaded=False)
+ self._end_thread('scheduler')
+
+ def exception(self, exception):
+ """Process an unknown exception.
+
+ Meant to be overridden.
+
+ :param exception: An unhandled exception object.
+ """
+ pass
+
+
+# To comply with PEP8, method names now use underscores.
+# Deprecated method names are re-mapped for backwards compatibility.
+XMLStream.startTLS = XMLStream.start_tls
+XMLStream.registerStanza = XMLStream.register_stanza
+XMLStream.removeStanza = XMLStream.remove_stanza
+XMLStream.registerHandler = XMLStream.register_handler
+XMLStream.removeHandler = XMLStream.remove_handler
+XMLStream.setSocket = XMLStream.set_socket
+XMLStream.sendRaw = XMLStream.send_raw
+XMLStream.getId = XMLStream.get_id
+XMLStream.getNewId = XMLStream.new_id
+XMLStream.sendXML = XMLStream.send_xml