diff options
Diffstat (limited to 'sleekxmpp/xmlstream')
23 files changed, 4146 insertions, 0 deletions
diff --git a/sleekxmpp/xmlstream/__init__.py b/sleekxmpp/xmlstream/__init__.py new file mode 100644 index 00000000..67b20c56 --- /dev/null +++ b/sleekxmpp/xmlstream/__init__.py @@ -0,0 +1,19 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2010 Nathanael C. Fritz + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.xmlstream.jid import JID +from sleekxmpp.xmlstream.scheduler import Scheduler +from sleekxmpp.xmlstream.stanzabase import StanzaBase, ElementBase, ET +from sleekxmpp.xmlstream.stanzabase import register_stanza_plugin +from sleekxmpp.xmlstream.tostring import tostring +from sleekxmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT +from sleekxmpp.xmlstream.xmlstream import RestartStream + +__all__ = ['JID', 'Scheduler', 'StanzaBase', 'ElementBase', + 'ET', 'StateMachine', 'tostring', 'XMLStream', + 'RESPONSE_TIMEOUT', 'RestartStream'] diff --git a/sleekxmpp/xmlstream/filesocket.py b/sleekxmpp/xmlstream/filesocket.py new file mode 100644 index 00000000..56554c73 --- /dev/null +++ b/sleekxmpp/xmlstream/filesocket.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +""" + sleekxmpp.xmlstream.filesocket + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + This module is a shim for correcting deficiencies in the file + socket implementation of Python2.6. + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +from socket import _fileobject +import socket + + +class FileSocket(_fileobject): + + """Create a file object wrapper for a socket to work around + issues present in Python 2.6 when using sockets as file objects. + + The parser for :class:`~xml.etree.cElementTree` requires a file, but + we will be reading from the XMPP connection socket instead. + """ + + def read(self, size=4096): + """Read data from the socket as if it were a file.""" + if self._sock is None: + return None + data = self._sock.recv(size) + if data is not None: + return data + + +class Socket26(socket._socketobject): + + """A custom socket implementation that uses our own FileSocket class + to work around issues in Python 2.6 when using sockets as files. + """ + + def makefile(self, mode='r', bufsize=-1): + """makefile([mode[, bufsize]]) -> file object + Return a regular file object corresponding to the socket. The mode + and bufsize arguments are as for the built-in open() function.""" + return FileSocket(self._sock, mode, bufsize) diff --git a/sleekxmpp/xmlstream/handler/__init__.py b/sleekxmpp/xmlstream/handler/__init__.py new file mode 100644 index 00000000..7bcf0b71 --- /dev/null +++ b/sleekxmpp/xmlstream/handler/__init__.py @@ -0,0 +1,14 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2010 Nathanael C. Fritz + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.xmlstream.handler.callback import Callback +from sleekxmpp.xmlstream.handler.waiter import Waiter +from sleekxmpp.xmlstream.handler.xmlcallback import XMLCallback +from sleekxmpp.xmlstream.handler.xmlwaiter import XMLWaiter + +__all__ = ['Callback', 'Waiter', 'XMLCallback', 'XMLWaiter'] diff --git a/sleekxmpp/xmlstream/handler/base.py b/sleekxmpp/xmlstream/handler/base.py new file mode 100644 index 00000000..59dcb306 --- /dev/null +++ b/sleekxmpp/xmlstream/handler/base.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +""" + sleekxmpp.xmlstream.handler.base + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +import weakref + + +class BaseHandler(object): + + """ + Base class for stream handlers. Stream handlers are matched with + incoming stanzas so that the stanza may be processed in some way. + Stanzas may be matched with multiple handlers. + + Handler execution may take place in two phases: during the incoming + stream processing, and in the main event loop. The :meth:`prerun()` + method is executed in the first case, and :meth:`run()` is called + during the second. + + :param string name: The name of the handler. + :param matcher: A :class:`~sleekxmpp.xmlstream.matcher.base.MatcherBase` + derived object that will be used to determine if a + stanza should be accepted by this handler. + :param stream: The :class:`~sleekxmpp.xmlstream.xmlstream.XMLStream` + instance that the handle will respond to. + """ + + def __init__(self, name, matcher, stream=None): + #: The name of the handler + self.name = name + + #: The XML stream this handler is assigned to + self.stream = None + if stream is not None: + self.stream = weakref.ref(stream) + stream.register_handler(self) + + self._destroy = False + self._payload = None + self._matcher = matcher + + def match(self, xml): + """Compare a stanza or XML object with the handler's matcher. + + :param xml: An XML or + :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` object + """ + return self._matcher.match(xml) + + def prerun(self, payload): + """Prepare the handler for execution while the XML + stream is being processed. + + :param payload: A :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` + object. + """ + self._payload = payload + + def run(self, payload): + """Execute the handler after XML stream processing and during the + main event loop. + + :param payload: A :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` + object. + """ + self._payload = payload + + def check_delete(self): + """Check if the handler should be removed from the list + of stream handlers. + """ + return self._destroy + + +# To comply with PEP8, method names now use underscores. +# Deprecated method names are re-mapped for backwards compatibility. +BaseHandler.checkDelete = BaseHandler.check_delete diff --git a/sleekxmpp/xmlstream/handler/callback.py b/sleekxmpp/xmlstream/handler/callback.py new file mode 100644 index 00000000..37f53335 --- /dev/null +++ b/sleekxmpp/xmlstream/handler/callback.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +""" + sleekxmpp.xmlstream.handler.callback + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +from sleekxmpp.xmlstream.handler.base import BaseHandler + + +class Callback(BaseHandler): + + """ + The Callback handler will execute a callback function with + matched stanzas. + + The handler may execute the callback either during stream + processing or during the main event loop. + + Callback functions are all executed in the same thread, so be aware if + you are executing functions that will block for extended periods of + time. Typically, you should signal your own events using the SleekXMPP + object's :meth:`~sleekxmpp.xmlstream.xmlstream.XMLStream.event()` + method to pass the stanza off to a threaded event handler for further + processing. + + + :param string name: The name of the handler. + :param matcher: A :class:`~sleekxmpp.xmlstream.matcher.base.MatcherBase` + derived object for matching stanza objects. + :param pointer: The function to execute during callback. + :param bool thread: **DEPRECATED.** Remains only for + backwards compatibility. + :param bool once: Indicates if the handler should be used only + once. Defaults to False. + :param bool instream: Indicates if the callback should be executed + during stream processing instead of in the + main event loop. + :param stream: The :class:`~sleekxmpp.xmlstream.xmlstream.XMLStream` + instance this handler should monitor. + """ + + def __init__(self, name, matcher, pointer, thread=False, + once=False, instream=False, stream=None): + BaseHandler.__init__(self, name, matcher, stream) + self._pointer = pointer + self._once = once + self._instream = instream + + def prerun(self, payload): + """Execute the callback during stream processing, if + the callback was created with ``instream=True``. + + :param payload: The matched + :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` object. + """ + if self._once: + self._destroy = True + if self._instream: + self.run(payload, True) + + def run(self, payload, instream=False): + """Execute the callback function with the matched stanza payload. + + :param payload: The matched + :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` object. + :param bool instream: Force the handler to execute during stream + processing. This should only be used by + :meth:`prerun()`. Defaults to ``False``. + """ + if not self._instream or instream: + self._pointer(payload) + if self._once: + self._destroy = True + del self._pointer diff --git a/sleekxmpp/xmlstream/handler/waiter.py b/sleekxmpp/xmlstream/handler/waiter.py new file mode 100644 index 00000000..01ff5d67 --- /dev/null +++ b/sleekxmpp/xmlstream/handler/waiter.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +""" + sleekxmpp.xmlstream.handler.waiter + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +import logging +try: + import queue +except ImportError: + import Queue as queue + +from sleekxmpp.xmlstream import StanzaBase +from sleekxmpp.xmlstream.handler.base import BaseHandler + + +log = logging.getLogger(__name__) + + +class Waiter(BaseHandler): + + """ + The Waiter handler allows an event handler to block until a + particular stanza has been received. The handler will either be + given the matched stanza, or ``False`` if the waiter has timed out. + + :param string name: The name of the handler. + :param matcher: A :class:`~sleekxmpp.xmlstream.matcher.base.MatcherBase` + derived object for matching stanza objects. + :param stream: The :class:`~sleekxmpp.xmlstream.xmlstream.XMLStream` + instance this handler should monitor. + """ + + def __init__(self, name, matcher, stream=None): + BaseHandler.__init__(self, name, matcher, stream=stream) + self._payload = queue.Queue() + + def prerun(self, payload): + """Store the matched stanza when received during processing. + + :param payload: The matched + :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` object. + """ + self._payload.put(payload) + + def run(self, payload): + """Do not process this handler during the main event loop.""" + pass + + def wait(self, timeout=None): + """Block an event handler while waiting for a stanza to arrive. + + Be aware that this will impact performance if called from a + non-threaded event handler. + + Will return either the received stanza, or ``False`` if the + waiter timed out. + + :param int timeout: The number of seconds to wait for the stanza + to arrive. Defaults to the the stream's + :class:`~sleekxmpp.xmlstream.xmlstream.XMLStream.response_timeout` + value. + """ + if timeout is None: + timeout = self.stream().response_timeout + + elapsed_time = 0 + stanza = False + while elapsed_time < timeout and not self.stream().stop.is_set(): + try: + stanza = self._payload.get(True, 1) + break + except queue.Empty: + elapsed_time += 1 + if elapsed_time >= timeout: + log.warning("Timed out waiting for %s", self.name) + self.stream().remove_handler(self.name) + return stanza + + def check_delete(self): + """Always remove waiters after use.""" + return True diff --git a/sleekxmpp/xmlstream/handler/xmlcallback.py b/sleekxmpp/xmlstream/handler/xmlcallback.py new file mode 100644 index 00000000..11607ffb --- /dev/null +++ b/sleekxmpp/xmlstream/handler/xmlcallback.py @@ -0,0 +1,36 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2010 Nathanael C. Fritz + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.xmlstream.handler import Callback + + +class XMLCallback(Callback): + + """ + The XMLCallback class is identical to the normal Callback class, + except that XML contents of matched stanzas will be processed instead + of the stanza objects themselves. + + Methods: + run -- Overrides Callback.run + """ + + def run(self, payload, instream=False): + """ + Execute the callback function with the matched stanza's + XML contents, instead of the stanza itself. + + Overrides BaseHandler.run + + Arguments: + payload -- The matched stanza object. + instream -- Force the handler to execute during + stream processing. Used only by prerun. + Defaults to False. + """ + Callback.run(self, payload.xml, instream) diff --git a/sleekxmpp/xmlstream/handler/xmlwaiter.py b/sleekxmpp/xmlstream/handler/xmlwaiter.py new file mode 100644 index 00000000..5201caf3 --- /dev/null +++ b/sleekxmpp/xmlstream/handler/xmlwaiter.py @@ -0,0 +1,33 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2010 Nathanael C. Fritz + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.xmlstream.handler import Waiter + + +class XMLWaiter(Waiter): + + """ + The XMLWaiter class is identical to the normal Waiter class + except that it returns the XML contents of the stanza instead + of the full stanza object itself. + + Methods: + prerun -- Overrides Waiter.prerun + """ + + def prerun(self, payload): + """ + Store the XML contents of the stanza to return to the + waiting event handler. + + Overrides Waiter.prerun + + Arguments: + payload -- The matched stanza object. + """ + Waiter.prerun(self, payload.xml) diff --git a/sleekxmpp/xmlstream/jid.py b/sleekxmpp/xmlstream/jid.py new file mode 100644 index 00000000..281bf4ee --- /dev/null +++ b/sleekxmpp/xmlstream/jid.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +""" + sleekxmpp.xmlstream.jid + ~~~~~~~~~~~~~~~~~~~~~~~ + + This module allows for working with Jabber IDs (JIDs) by + providing accessors for the various components of a JID. + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +from __future__ import unicode_literals + + +class JID(object): + + """ + A representation of a Jabber ID, or JID. + + Each JID may have three components: a user, a domain, and an optional + resource. For example: user@domain/resource + + When a resource is not used, the JID is called a bare JID. + The JID is a full JID otherwise. + + **JID Properties:** + :jid: Alias for ``full``. + :full: The value of the full JID. + :bare: The value of the bare JID. + :user: The username portion of the JID. + :domain: The domain name portion of the JID. + :server: Alias for ``domain``. + :resource: The resource portion of the JID. + + :param string jid: A string of the form ``'[user@]domain[/resource]'``. + """ + + def __init__(self, jid): + """Initialize a new JID""" + self.reset(jid) + + def reset(self, jid): + """Start fresh from a new JID string. + + :param string jid: A string of the form ``'[user@]domain[/resource]'``. + """ + if isinstance(jid, JID): + jid = jid.full + self._full = self._jid = jid + self._domain = None + self._resource = None + self._user = None + self._bare = None + + def __getattr__(self, name): + """Handle getting the JID values, using cache if available. + + :param name: One of: user, server, domain, resource, + full, or bare. + """ + if name == 'resource': + if self._resource is None and '/' in self._jid: + self._resource = self._jid.split('/', 1)[-1] + return self._resource or "" + elif name == 'user': + if self._user is None: + if '@' in self._jid: + self._user = self._jid.split('@', 1)[0] + else: + self._user = self._user + return self._user or "" + elif name in ('server', 'domain', 'host'): + if self._domain is None: + self._domain = self._jid.split('@', 1)[-1].split('/', 1)[0] + return self._domain or "" + elif name in ('full', 'jid'): + return self._jid or "" + elif name == 'bare': + if self._bare is None: + self._bare = self._jid.split('/', 1)[0] + return self._bare or "" + + def __setattr__(self, name, value): + """Edit a JID by updating it's individual values, resetting the + generated JID in the end. + + Arguments: + name -- The name of the JID part. One of: user, domain, + server, resource, full, jid, or bare. + value -- The new value for the JID part. + """ + if name in ('resource', 'user', 'domain'): + object.__setattr__(self, "_%s" % name, value) + self.regenerate() + elif name in ('server', 'domain', 'host'): + self.domain = value + elif name in ('full', 'jid'): + self.reset(value) + self.regenerate() + elif name == 'bare': + if '@' in value: + u, d = value.split('@', 1) + object.__setattr__(self, "_user", u) + object.__setattr__(self, "_domain", d) + else: + object.__setattr__(self, "_user", '') + object.__setattr__(self, "_domain", value) + self.regenerate() + else: + object.__setattr__(self, name, value) + + def regenerate(self): + """Generate a new JID based on current values, useful after editing.""" + jid = "" + if self.user: + jid = "%s@" % self.user + jid += self.domain + if self.resource: + jid += "/%s" % self.resource + self.reset(jid) + + def __str__(self): + """Use the full JID as the string value.""" + return self.full + + def __repr__(self): + return self.full + + def __eq__(self, other): + """ + Two JIDs are considered equal if they have the same full JID value. + """ + other = JID(other) + return self.full == other.full + + def __ne__(self, other): + """Two JIDs are considered unequal if they are not equal.""" + return not self == other + + def __hash__(self): + """Hash a JID based on the string version of its full JID.""" + return hash(self.full) diff --git a/sleekxmpp/xmlstream/matcher/__init__.py b/sleekxmpp/xmlstream/matcher/__init__.py new file mode 100644 index 00000000..1038d1bd --- /dev/null +++ b/sleekxmpp/xmlstream/matcher/__init__.py @@ -0,0 +1,16 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2010 Nathanael C. Fritz + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.xmlstream.matcher.id import MatcherId +from sleekxmpp.xmlstream.matcher.many import MatchMany +from sleekxmpp.xmlstream.matcher.stanzapath import StanzaPath +from sleekxmpp.xmlstream.matcher.xmlmask import MatchXMLMask +from sleekxmpp.xmlstream.matcher.xpath import MatchXPath + +__all__ = ['MatcherId', 'MatchMany', 'StanzaPath', + 'MatchXMLMask', 'MatchXPath'] diff --git a/sleekxmpp/xmlstream/matcher/base.py b/sleekxmpp/xmlstream/matcher/base.py new file mode 100644 index 00000000..83c26688 --- /dev/null +++ b/sleekxmpp/xmlstream/matcher/base.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +""" + sleekxmpp.xmlstream.matcher.base + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + + +class MatcherBase(object): + + """ + Base class for stanza matchers. Stanza matchers are used to pick + stanzas out of the XML stream and pass them to the appropriate + stream handlers. + + :param criteria: Object to compare some aspect of a stanza against. + """ + + def __init__(self, criteria): + self._criteria = criteria + + def match(self, xml): + """Check if a stanza matches the stored criteria. + + Meant to be overridden. + """ + return False diff --git a/sleekxmpp/xmlstream/matcher/id.py b/sleekxmpp/xmlstream/matcher/id.py new file mode 100644 index 00000000..11ab70bb --- /dev/null +++ b/sleekxmpp/xmlstream/matcher/id.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +""" + sleekxmpp.xmlstream.matcher.id + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +from sleekxmpp.xmlstream.matcher.base import MatcherBase + + +class MatcherId(MatcherBase): + + """ + The ID matcher selects stanzas that have the same stanza 'id' + interface value as the desired ID. + """ + + def match(self, xml): + """Compare the given stanza's ``'id'`` attribute to the stored + ``id`` value. + + :param xml: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` + stanza to compare against. + """ + return xml['id'] == self._criteria diff --git a/sleekxmpp/xmlstream/matcher/many.py b/sleekxmpp/xmlstream/matcher/many.py new file mode 100644 index 00000000..f470ec9c --- /dev/null +++ b/sleekxmpp/xmlstream/matcher/many.py @@ -0,0 +1,40 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2010 Nathanael C. Fritz + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.xmlstream.matcher.base import MatcherBase + + +class MatchMany(MatcherBase): + + """ + The MatchMany matcher may compare a stanza against multiple + criteria. It is essentially an OR relation combining multiple + matchers. + + Each of the criteria must implement a match() method. + + Methods: + match -- Overrides MatcherBase.match. + """ + + def match(self, xml): + """ + Match a stanza against multiple criteria. The match is successful + if one of the criteria matches. + + Each of the criteria must implement a match() method. + + Overrides MatcherBase.match. + + Arguments: + xml -- The stanza object to compare against. + """ + for m in self._criteria: + if m.match(xml): + return True + return False diff --git a/sleekxmpp/xmlstream/matcher/stanzapath.py b/sleekxmpp/xmlstream/matcher/stanzapath.py new file mode 100644 index 00000000..a4c0fda0 --- /dev/null +++ b/sleekxmpp/xmlstream/matcher/stanzapath.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +""" + sleekxmpp.xmlstream.matcher.stanzapath + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +from sleekxmpp.xmlstream.matcher.base import MatcherBase +from sleekxmpp.xmlstream.stanzabase import fix_ns + + +class StanzaPath(MatcherBase): + + """ + The StanzaPath matcher selects stanzas that match a given "stanza path", + which is similar to a normal XPath except that it uses the interfaces and + plugins of the stanza instead of the actual, underlying XML. + + :param criteria: Object to compare some aspect of a stanza against. + """ + + def __init__(self, criteria): + self._criteria = fix_ns(criteria, split=True, + propagate_ns=False, + default_ns='jabber:client') + self._raw_criteria = criteria + + def match(self, stanza): + """ + Compare a stanza against a "stanza path". A stanza path is similar to + an XPath expression, but uses the stanza's interfaces and plugins + instead of the underlying XML. See the documentation for the stanza + :meth:`~sleekxmpp.xmlstream.stanzabase.ElementBase.match()` method + for more information. + + :param stanza: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` + stanza to compare against. + """ + return stanza.match(self._criteria) or stanza.match(self._raw_criteria) diff --git a/sleekxmpp/xmlstream/matcher/xmlmask.py b/sleekxmpp/xmlstream/matcher/xmlmask.py new file mode 100644 index 00000000..7977e767 --- /dev/null +++ b/sleekxmpp/xmlstream/matcher/xmlmask.py @@ -0,0 +1,158 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2010 Nathanael C. Fritz + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +import logging + +from xml.parsers.expat import ExpatError + +from sleekxmpp.xmlstream.stanzabase import ET +from sleekxmpp.xmlstream.matcher.base import MatcherBase + + +# Flag indicating if the builtin XPath matcher should be used, which +# uses namespaces, or a custom matcher that ignores namespaces. +# Changing this will affect ALL XMLMask matchers. +IGNORE_NS = False + + +log = logging.getLogger(__name__) + + +class MatchXMLMask(MatcherBase): + + """ + The XMLMask matcher selects stanzas whose XML matches a given + XML pattern, or mask. For example, message stanzas with body elements + could be matched using the mask: + + .. code-block:: xml + + <message xmlns="jabber:client"><body /></message> + + Use of XMLMask is discouraged, and + :class:`~sleekxmpp.xmlstream.matcher.xpath.MatchXPath` or + :class:`~sleekxmpp.xmlstream.matcher.stanzapath.StanzaPath` + should be used instead. + + The use of namespaces in the mask comparison is controlled by + ``IGNORE_NS``. Setting ``IGNORE_NS`` to ``True`` will disable namespace + based matching for ALL XMLMask matchers. + + :param criteria: Either an :class:`~xml.etree.ElementTree.Element` XML + object or XML string to use as a mask. + """ + + def __init__(self, criteria): + MatcherBase.__init__(self, criteria) + if isinstance(criteria, str): + self._criteria = ET.fromstring(self._criteria) + self.default_ns = 'jabber:client' + + def setDefaultNS(self, ns): + """Set the default namespace to use during comparisons. + + :param ns: The new namespace to use as the default. + """ + self.default_ns = ns + + def match(self, xml): + """Compare a stanza object or XML object against the stored XML mask. + + Overrides MatcherBase.match. + + :param xml: The stanza object or XML object to compare against. + """ + if hasattr(xml, 'xml'): + xml = xml.xml + return self._mask_cmp(xml, self._criteria, True) + + def _mask_cmp(self, source, mask, use_ns=False, default_ns='__no_ns__'): + """Compare an XML object against an XML mask. + + :param source: The :class:`~xml.etree.ElementTree.Element` XML object + to compare against the mask. + :param mask: The :class:`~xml.etree.ElementTree.Element` XML object + serving as the mask. + :param use_ns: Indicates if namespaces should be respected during + the comparison. + :default_ns: The default namespace to apply to elements that + do not have a specified namespace. + Defaults to ``"__no_ns__"``. + """ + use_ns = not IGNORE_NS + + if source is None: + # If the element was not found. May happend during recursive calls. + return False + + # Convert the mask to an XML object if it is a string. + if not hasattr(mask, 'attrib'): + try: + mask = ET.fromstring(mask) + except ExpatError: + log.warning("Expat error: %s\nIn parsing: %s", '', mask) + if not use_ns: + # Compare the element without using namespaces. + source_tag = source.tag.split('}', 1)[-1] + mask_tag = mask.tag.split('}', 1)[-1] + if source_tag != mask_tag: + return False + else: + # Compare the element using namespaces + mask_ns_tag = "{%s}%s" % (self.default_ns, mask.tag) + if source.tag not in [mask.tag, mask_ns_tag]: + return False + + # If the mask includes text, compare it. + if mask.text and source.text and \ + source.text.strip() != mask.text.strip(): + return False + + # Compare attributes. The stanza must include the attributes + # defined by the mask, but may include others. + for name, value in mask.attrib.items(): + if source.attrib.get(name, "__None__") != value: + return False + + # Recursively check subelements. + matched_elements = {} + for subelement in mask: + if use_ns: + matched = False + for other in source.findall(subelement.tag): + matched_elements[other] = False + if self._mask_cmp(other, subelement, use_ns): + if not matched_elements.get(other, False): + matched_elements[other] = True + matched = True + if not matched: + return False + else: + if not self._mask_cmp(self._get_child(source, subelement.tag), + subelement, use_ns): + return False + + # Everything matches. + return True + + def _get_child(self, xml, tag): + """Return a child element given its tag, ignoring namespace values. + + Returns ``None`` if the child was not found. + + :param xml: The :class:`~xml.etree.ElementTree.Element` XML object + to search for the given child tag. + :param tag: The name of the subelement to find. + """ + tag = tag.split('}')[-1] + try: + children = [c.tag.split('}')[-1] for c in xml.getchildren()] + index = children.index(tag) + except ValueError: + return None + return xml.getchildren()[index] diff --git a/sleekxmpp/xmlstream/matcher/xpath.py b/sleekxmpp/xmlstream/matcher/xpath.py new file mode 100644 index 00000000..b6af0609 --- /dev/null +++ b/sleekxmpp/xmlstream/matcher/xpath.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +""" + sleekxmpp.xmlstream.matcher.xpath + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +from sleekxmpp.xmlstream.stanzabase import ET +from sleekxmpp.xmlstream.matcher.base import MatcherBase + + +# Flag indicating if the builtin XPath matcher should be used, which +# uses namespaces, or a custom matcher that ignores namespaces. +# Changing this will affect ALL XPath matchers. +IGNORE_NS = False + + +class MatchXPath(MatcherBase): + + """ + The XPath matcher selects stanzas whose XML contents matches a given + XPath expression. + + .. warning:: + + Using this matcher may not produce expected behavior when using + attribute selectors. For Python 2.6 and 3.1, the ElementTree + :meth:`~xml.etree.ElementTree.Element.find()` method does + not support the use of attribute selectors. If you need to + support Python 2.6 or 3.1, it might be more useful to use a + :class:`~sleekxmpp.xmlstream.matcher.stanzapath.StanzaPath` matcher. + + If the value of :data:`IGNORE_NS` is set to ``True``, then XPath + expressions will be matched without using namespaces. + """ + + def match(self, xml): + """ + Compare a stanza's XML contents to an XPath expression. + + If the value of :data:`IGNORE_NS` is set to ``True``, then XPath + expressions will be matched without using namespaces. + + .. warning:: + + In Python 2.6 and 3.1 the ElementTree + :meth:`~xml.etree.ElementTree.Element.find()` method does not + support attribute selectors in the XPath expression. + + :param xml: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` + stanza to compare against. + """ + if hasattr(xml, 'xml'): + xml = xml.xml + x = ET.Element('x') + x.append(xml) + + if not IGNORE_NS: + # Use builtin, namespace respecting, XPath matcher. + if x.find(self._criteria) is not None: + return True + return False + else: + # Remove namespaces from the XPath expression. + criteria = [] + for ns_block in self._criteria.split('{'): + criteria.extend(ns_block.split('}')[-1].split('/')) + + # Walk the XPath expression. + xml = x + for tag in criteria: + if not tag: + # Skip empty tag name artifacts from the cleanup phase. + continue + + children = [c.tag.split('}')[-1] for c in xml.getchildren()] + try: + index = children.index(tag) + except ValueError: + return False + xml = xml.getchildren()[index] + return True diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py new file mode 100644 index 00000000..4a6f073f --- /dev/null +++ b/sleekxmpp/xmlstream/scheduler.py @@ -0,0 +1,228 @@ +# -*- coding: utf-8 -*- +""" + sleekxmpp.xmlstream.scheduler + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + This module provides a task scheduler that works better + with SleekXMPP's threading usage than the stock version. + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +import time +import threading +import logging +try: + import queue +except ImportError: + import Queue as queue + + +log = logging.getLogger(__name__) + + +class Task(object): + + """ + A scheduled task that will be executed by the scheduler + after a given time interval has passed. + + :param string name: The name of the task. + :param int seconds: The number of seconds to wait before executing. + :param callback: The function to execute. + :param tuple args: The arguments to pass to the callback. + :param dict kwargs: The keyword arguments to pass to the callback. + :param bool repeat: Indicates if the task should repeat. + Defaults to ``False``. + :param pointer: A pointer to an event queue for queuing callback + execution instead of executing immediately. + """ + + def __init__(self, name, seconds, callback, args=None, + kwargs=None, repeat=False, qpointer=None): + #: The name of the task. + self.name = name + + #: The number of seconds to wait before executing. + self.seconds = seconds + + #: The function to execute once enough time has passed. + self.callback = callback + + #: The arguments to pass to :attr:`callback`. + self.args = args or tuple() + + #: The keyword arguments to pass to :attr:`callback`. + self.kwargs = kwargs or {} + + #: Indicates if the task should repeat after executing, + #: using the same :attr:`seconds` delay. + self.repeat = repeat + + #: The time when the task should execute next. + self.next = time.time() + self.seconds + + #: The main event queue, which allows for callbacks to + #: be queued for execution instead of executing immediately. + self.qpointer = qpointer + + def run(self): + """Execute the task's callback. + + If an event queue was supplied, place the callback in the queue; + otherwise, execute the callback immediately. + """ + if self.qpointer is not None: + self.qpointer.put(('schedule', self.callback, + self.args, self.name)) + else: + self.callback(*self.args, **self.kwargs) + self.reset() + return self.repeat + + def reset(self): + """Reset the task's timer so that it will repeat.""" + self.next = time.time() + self.seconds + + +class Scheduler(object): + + """ + A threaded scheduler that allows for updates mid-execution unlike the + scheduler in the standard library. + + Based on: http://docs.python.org/library/sched.html#module-sched + + :param parentstop: An :class:`~threading.Event` to signal stopping + the scheduler. + """ + + def __init__(self, parentstop=None): + #: A queue for storing tasks + self.addq = queue.Queue() + + #: A list of tasks in order of execution time. + self.schedule = [] + + #: If running in threaded mode, this will be the thread processing + #: the schedule. + self.thread = None + + #: A flag indicating that the scheduler is running. + self.run = False + + #: An :class:`~threading.Event` instance for signalling to stop + #: the scheduler. + self.stop = parentstop + + #: Lock for accessing the task queue. + self.schedule_lock = threading.RLock() + + def process(self, threaded=True): + """Begin accepting and processing scheduled tasks. + + :param bool threaded: Indicates if the scheduler should execute + in its own thread. Defaults to ``True``. + """ + if threaded: + self.thread = threading.Thread(name='scheduler_process', + target=self._process) + self.thread.start() + else: + self._process() + + def _process(self): + """Process scheduled tasks.""" + self.run = True + try: + while self.run and not self.stop.isSet(): + wait = 1 + updated = False + if self.schedule: + wait = self.schedule[0].next - time.time() + try: + if wait <= 0.0: + newtask = self.addq.get(False) + else: + if wait >= 3.0: + wait = 3.0 + newtask = self.addq.get(True, wait) + except queue.Empty: + cleanup = [] + self.schedule_lock.acquire() + for task in self.schedule: + if time.time() >= task.next: + updated = True + if not task.run(): + cleanup.append(task) + else: + break + for task in cleanup: + x = self.schedule.pop(self.schedule.index(task)) + else: + updated = True + self.schedule_lock.acquire() + self.schedule.append(newtask) + finally: + if updated: + self.schedule = sorted(self.schedule, + key=lambda task: task.next) + self.schedule_lock.release() + except KeyboardInterrupt: + self.run = False + except SystemExit: + self.run = False + log.debug("Quitting Scheduler thread") + + def add(self, name, seconds, callback, args=None, + kwargs=None, repeat=False, qpointer=None): + """Schedule a new task. + + :param string name: The name of the task. + :param int seconds: The number of seconds to wait before executing. + :param callback: The function to execute. + :param tuple args: The arguments to pass to the callback. + :param dict kwargs: The keyword arguments to pass to the callback. + :param bool repeat: Indicates if the task should repeat. + Defaults to ``False``. + :param pointer: A pointer to an event queue for queuing callback + execution instead of executing immediately. + """ + try: + self.schedule_lock.acquire() + for task in self.schedule: + if task.name == name: + raise ValueError("Key %s already exists" % name) + + self.addq.put(Task(name, seconds, callback, args, + kwargs, repeat, qpointer)) + except: + raise + finally: + self.schedule_lock.release() + + def remove(self, name): + """Remove a scheduled task ahead of schedule, and without + executing it. + + :param string name: The name of the task to remove. + """ + try: + self.schedule_lock.acquire() + the_task = None + for task in self.schedule: + if task.name == name: + the_task = task + if the_task is not None: + self.schedule.remove(the_task) + except: + raise + finally: + self.schedule_lock.release() + + def quit(self): + """Shutdown the scheduler.""" + self.run = False diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py new file mode 100644 index 00000000..dff8c997 --- /dev/null +++ b/sleekxmpp/xmlstream/stanzabase.py @@ -0,0 +1,1323 @@ +# -*- coding: utf-8 -*- +""" + sleekxmpp.xmlstream.stanzabase + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + This module implements a wrapper layer for XML objects + that allows them to be treated like dictionaries. + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +import copy +import logging +import sys +import weakref +from xml.etree import cElementTree as ET + +from sleekxmpp.xmlstream import JID +from sleekxmpp.xmlstream.tostring import tostring +from sleekxmpp.thirdparty import OrderedDict + + +log = logging.getLogger(__name__) + + +# Used to check if an argument is an XML object. +XML_TYPE = type(ET.Element('xml')) + + +def register_stanza_plugin(stanza, plugin, iterable=False, overrides=False): + """ + Associate a stanza object as a plugin for another stanza. + + >>> from sleekxmpp.xmlstream import register_stanza_plugin + >>> register_stanza_plugin(Iq, CustomStanza) + + :param class stanza: The class of the parent stanza. + :param class plugin: The class of the plugin stanza. + :param bool iterable: Indicates if the plugin stanza should be + included in the parent stanza's iterable + ``'substanzas'`` interface results. + :param bool overrides: Indicates if the plugin should be allowed + to override the interface handlers for + the parent stanza, based on the plugin's + ``overrides`` field. + + .. versionadded:: 1.0-Beta1 + Made ``register_stanza_plugin`` the default name. The prior + ``registerStanzaPlugin`` function name remains as an alias. + """ + tag = "{%s}%s" % (plugin.namespace, plugin.name) + + # Prevent weird memory reference gotchas by ensuring + # that the parent stanza class has its own set of + # plugin info maps and is not using the mappings from + # an ancestor class (like ElementBase). + plugin_info = ('plugin_attrib_map', 'plugin_tag_map', + 'plugin_iterables', 'plugin_overrides') + for attr in plugin_info: + info = getattr(stanza, attr) + setattr(stanza, attr, info.copy()) + + stanza.plugin_attrib_map[plugin.plugin_attrib] = plugin + stanza.plugin_tag_map[tag] = plugin + + if iterable: + stanza.plugin_iterables.add(plugin) + if overrides: + for interface in plugin.overrides: + stanza.plugin_overrides[interface] = plugin.plugin_attrib + + +# To maintain backwards compatibility for now, preserve the camel case name. +registerStanzaPlugin = register_stanza_plugin + + +def fix_ns(xpath, split=False, propagate_ns=True, default_ns=''): + """Apply the stanza's namespace to elements in an XPath expression. + + :param string xpath: The XPath expression to fix with namespaces. + :param bool split: Indicates if the fixed XPath should be left as a + list of element names with namespaces. Defaults to + False, which returns a flat string path. + :param bool propagate_ns: Overrides propagating parent element + namespaces to child elements. Useful if + you wish to simply split an XPath that has + non-specified namespaces, and child and + parent namespaces are known not to always + match. Defaults to True. + """ + fixed = [] + # Split the XPath into a series of blocks, where a block + # is started by an element with a namespace. + ns_blocks = xpath.split('{') + for ns_block in ns_blocks: + if '}' in ns_block: + # Apply the found namespace to following elements + # that do not have namespaces. + namespace = ns_block.split('}')[0] + elements = ns_block.split('}')[1].split('/') + else: + # Apply the stanza's namespace to the following + # elements since no namespace was provided. + namespace = default_ns + elements = ns_block.split('/') + + for element in elements: + if element: + # Skip empty entry artifacts from splitting. + if propagate_ns: + tag = '{%s}%s' % (namespace, element) + else: + tag = element + fixed.append(tag) + if split: + return fixed + return '/'.join(fixed) + + +class ElementBase(object): + + """ + The core of SleekXMPP's stanza XML manipulation and handling is provided + by ElementBase. ElementBase wraps XML cElementTree objects and enables + access to the XML contents through dictionary syntax, similar in style + to the Ruby XMPP library Blather's stanza implementation. + + Stanzas are defined by their name, namespace, and interfaces. For + example, a simplistic Message stanza could be defined as:: + + >>> class Message(ElementBase): + ... name = "message" + ... namespace = "jabber:client" + ... interfaces = set(('to', 'from', 'type', 'body')) + ... sub_interfaces = set(('body',)) + + The resulting Message stanza's contents may be accessed as so:: + + >>> message['to'] = "user@example.com" + >>> message['body'] = "Hi!" + >>> message['body'] + "Hi!" + >>> del message['body'] + >>> message['body'] + "" + + The interface values map to either custom access methods, stanza + XML attributes, or (if the interface is also in sub_interfaces) the + text contents of a stanza's subelement. + + Custom access methods may be created by adding methods of the + form "getInterface", "setInterface", or "delInterface", where + "Interface" is the titlecase version of the interface name. + + Stanzas may be extended through the use of plugins. A plugin + is simply a stanza that has a plugin_attrib value. For example:: + + >>> class MessagePlugin(ElementBase): + ... name = "custom_plugin" + ... namespace = "custom" + ... interfaces = set(('useful_thing', 'custom')) + ... plugin_attrib = "custom" + + The plugin stanza class must be associated with its intended + container stanza by using register_stanza_plugin as so:: + + >>> register_stanza_plugin(Message, MessagePlugin) + + The plugin may then be accessed as if it were built-in to the parent + stanza:: + + >>> message['custom']['useful_thing'] = 'foo' + + If a plugin provides an interface that is the same as the plugin's + plugin_attrib value, then the plugin's interface may be assigned + directly from the parent stanza, as shown below, but retrieving + information will require all interfaces to be used, as so:: + + >>> message['custom'] = 'bar' # Same as using message['custom']['custom'] + >>> message['custom']['custom'] # Must use all interfaces + 'bar' + + If the plugin sets :attr:`is_extension` to ``True``, then both setting + and getting an interface value that is the same as the plugin's + plugin_attrib value will work, as so:: + + >>> message['custom'] = 'bar' # Using is_extension=True + >>> message['custom'] + 'bar' + + + :param xml: Initialize the stanza object with an existing XML object. + :param parent: Optionally specify a parent stanza object will will + contain this substanza. + """ + + #: The XML tag name of the element, not including any namespace + #: prefixes. For example, an :class:`ElementBase` object for ``<message />`` + #: would use ``name = 'message'``. + name = 'stanza' + + #: The XML namespace for the element. Given ``<foo xmlns="bar" />``, + #: then ``namespace = "bar"`` should be used. The default namespace + #: is ``jabber:client`` since this is being used in an XMPP library. + namespace = 'jabber:client' + + #: For :class:`ElementBase` subclasses which are intended to be used + #: as plugins, the ``plugin_attrib`` value defines the plugin name. + #: Plugins may be accessed by using the ``plugin_attrib`` value as + #: the interface. An example using ``plugin_attrib = 'foo'``:: + #: + #: register_stanza_plugin(Message, FooPlugin) + #: msg = Message() + #: msg['foo']['an_interface_from_the_foo_plugin'] + plugin_attrib = 'plugin' + + #: The set of keys that the stanza provides for accessing and + #: manipulating the underlying XML object. This set may be augmented + #: with the :attr:`plugin_attrib` value of any registered + #: stanza plugins. + interfaces = set(('type', 'to', 'from', 'id', 'payload')) + + #: A subset of :attr:`interfaces` which maps interfaces to direct + #: subelements of the underlying XML object. Using this set, the text + #: of these subelements may be set, retrieved, or removed without + #: needing to define custom methods. + sub_interfaces = tuple() + + #: In some cases you may wish to override the behaviour of one of the + #: parent stanza's interfaces. The ``overrides`` list specifies the + #: interface name and access method to be overridden. For example, + #: to override setting the parent's ``'condition'`` interface you + #: would use:: + #: + #: overrides = ['set_condition'] + #: + #: Getting and deleting the ``'condition'`` interface would not + #: be affected. + #: + #: .. versionadded:: 1.0-Beta5 + overrides = [] + + #: If you need to add a new interface to an existing stanza, you + #: can create a plugin and set ``is_extension = True``. Be sure + #: to set the :attr:`plugin_attrib` value to the desired interface + #: name, and that it is the only interface listed in + #: :attr:`interfaces`. Requests for the new interface from the + #: parent stanza will be passed to the plugin directly. + #: + #: .. versionadded:: 1.0-Beta5 + is_extension = False + + #: A map of interface operations to the overriding functions. + #: For example, after overriding the ``set`` operation for + #: the interface ``body``, :attr:`plugin_overrides` would be:: + #: + #: {'set_body': <some function>} + #: + #: .. versionadded: 1.0-Beta5 + plugin_overrides = {} + + #: A mapping of the :attr:`plugin_attrib` values of registered + #: plugins to their respective classes. + plugin_attrib_map = {} + + #: A mapping of root element tag names (in ``'{namespace}elementname'`` + #: format) to the plugin classes responsible for them. + plugin_tag_map = {} + + #: The set of stanza classes that can be iterated over using + #: the 'substanzas' interface. Classes are added to this set + #: when registering a plugin with ``iterable=True``:: + #: + #: register_stanza_plugin(DiscoInfo, DiscoItem, iterable=True) + #: + #: .. versionadded:: 1.0-Beta5 + plugin_iterables = set() + + #: A deprecated version of :attr:`plugin_iterables` that remains + #: for backward compatibility. It required a parent stanza to + #: know beforehand what stanza classes would be iterable:: + #: + #: class DiscoItem(ElementBase): + #: ... + #: + #: class DiscoInfo(ElementBase): + #: subitem = (DiscoItem, ) + #: ... + #: + #: .. deprecated:: 1.0-Beta5 + subitem = set() + + #: The default XML namespace: ``http://www.w3.org/XML/1998/namespace``. + xml_ns = 'http://www.w3.org/XML/1998/namespace' + + def __init__(self, xml=None, parent=None): + self._index = 0 + + #: The underlying XML object for the stanza. It is a standard + #: :class:`xml.etree.cElementTree` object. + self.xml = xml + + #: An ordered dictionary of plugin stanzas, mapped by their + #: :attr:`plugin_attrib` value. + self.plugins = OrderedDict() + + #: A list of child stanzas whose class is included in + #: :attr:`plugin_iterables`. + self.iterables = [] + + #: The name of the tag for the stanza's root element. It is the + #: same as calling :meth:`tag_name()` and is formatted as + #: ``'{namespace}elementname'``. + self.tag = self.tag_name() + + #: A :class:`weakref.weakref` to the parent stanza, if there is one. + #: If not, then :attr:`parent` is ``None``. + self.parent = None + if parent is not None: + self.parent = weakref.ref(parent) + + if self.subitem is not None: + for sub in self.subitem: + self.plugin_iterables.add(sub) + + if self.setup(xml): + # If we generated our own XML, then everything is ready. + return + + # Initialize values using provided XML + for child in self.xml.getchildren(): + if child.tag in self.plugin_tag_map: + plugin_class = self.plugin_tag_map[child.tag] + plugin = plugin_class(child, self) + self.plugins[plugin.plugin_attrib] = plugin + if plugin_class in self.plugin_iterables: + self.iterables.append(plugin) + + def setup(self, xml=None): + """Initialize the stanza's XML contents. + + Will return ``True`` if XML was generated according to the stanza's + definition instead of building a stanza object from an existing + XML object. + + :param xml: An existing XML object to use for the stanza's content + instead of generating new XML. + """ + if self.xml is None: + self.xml = xml + + if self.xml is None: + # Generate XML from the stanza definition + for ename in self.name.split('/'): + new = ET.Element("{%s}%s" % (self.namespace, ename)) + if self.xml is None: + self.xml = new + else: + last_xml.append(new) + last_xml = new + if self.parent is not None: + self.parent().xml.append(self.xml) + + # We had to generate XML + return True + else: + # We did not generate XML + return False + + def enable(self, attrib): + """Enable and initialize a stanza plugin. + + Alias for :meth:`init_plugin`. + + :param string attrib: The :attr:`plugin_attrib` value of the + plugin to enable. + """ + return self.init_plugin(attrib) + + def init_plugin(self, attrib): + """Enable and initialize a stanza plugin. + + :param string attrib: The :attr:`plugin_attrib` value of the + plugin to enable. + """ + if attrib not in self.plugins: + plugin_class = self.plugin_attrib_map[attrib] + existing_xml = self.xml.find(plugin_class.tag_name()) + plugin = plugin_class(parent=self, xml=existing_xml) + self.plugins[attrib] = plugin + if plugin_class in self.plugin_iterables: + self.iterables.append(plugin) + return self + + def _get_stanza_values(self): + """Return A JSON/dictionary version of the XML content + exposed through the stanza's interfaces:: + + >>> msg = Message() + >>> msg.values + {'body': '', 'from': , 'mucnick': '', 'mucroom': '', + 'to': , 'type': 'normal', 'id': '', 'subject': ''} + + Likewise, assigning to :attr:`values` will change the XML + content:: + + >>> msg = Message() + >>> msg.values = {'body': 'Hi!', 'to': 'user@example.com'} + >>> msg + '<message to="user@example.com"><body>Hi!</body></message>' + + .. versionadded:: 1.0-Beta1 + """ + values = {} + for interface in self.interfaces: + values[interface] = self[interface] + for plugin, stanza in self.plugins.items(): + values[plugin] = stanza.values + if self.iterables: + iterables = [] + for stanza in self.iterables: + iterables.append(stanza.values) + iterables[-1]['__childtag__'] = stanza.tag + values['substanzas'] = iterables + return values + + def _set_stanza_values(self, values): + """Set multiple stanza interface values using a dictionary. + + Stanza plugin values may be set using nested dictionaries. + + :param values: A dictionary mapping stanza interface with values. + Plugin interfaces may accept a nested dictionary that + will be used recursively. + + .. versionadded:: 1.0-Beta1 + """ + iterable_interfaces = [p.plugin_attrib for \ + p in self.plugin_iterables] + + for interface, value in values.items(): + if interface == 'substanzas': + # Remove existing substanzas + for stanza in self.iterables: + self.xml.remove(stanza.xml) + self.iterables = [] + + # Add new substanzas + for subdict in value: + if '__childtag__' in subdict: + for subclass in self.plugin_iterables: + child_tag = "{%s}%s" % (subclass.namespace, + subclass.name) + if subdict['__childtag__'] == child_tag: + sub = subclass(parent=self) + sub.values = subdict + self.iterables.append(sub) + break + elif interface in self.interfaces: + self[interface] = value + elif interface in self.plugin_attrib_map: + if interface not in iterable_interfaces: + if interface not in self.plugins: + self.init_plugin(interface) + self.plugins[interface].values = value + return self + + def __getitem__(self, attrib): + """Return the value of a stanza interface using dict-like syntax. + + Example:: + + >>> msg['body'] + 'Message contents' + + Stanza interfaces are typically mapped directly to the underlying XML + object, but can be overridden by the presence of a ``get_attrib`` + method (or ``get_foo`` where the interface is named ``'foo'``, etc). + + The search order for interface value retrieval for an interface + named ``'foo'`` is: + + 1. The list of substanzas (``'substanzas'``) + 2. The result of calling the ``get_foo`` override handler. + 3. The result of calling ``get_foo``. + 4. The result of calling ``getFoo``. + 5. The contents of the ``foo`` subelement, if ``foo`` is listed + in :attr:`sub_interfaces`. + 6. The value of the ``foo`` attribute of the XML object. + 7. The plugin named ``'foo'`` + 8. An empty string. + + :param string attrib: The name of the requested stanza interface. + """ + if attrib == 'substanzas': + return self.iterables + elif attrib in self.interfaces: + get_method = "get_%s" % attrib.lower() + get_method2 = "get%s" % attrib.title() + + if self.plugin_overrides: + plugin = self.plugin_overrides.get(get_method, None) + if plugin: + if plugin not in self.plugins: + self.init_plugin(plugin) + handler = getattr(self.plugins[plugin], get_method, None) + if handler: + return handler() + + if hasattr(self, get_method): + return getattr(self, get_method)() + elif hasattr(self, get_method2): + return getattr(self, get_method2)() + else: + if attrib in self.sub_interfaces: + return self._get_sub_text(attrib) + else: + return self._get_attr(attrib) + elif attrib in self.plugin_attrib_map: + if attrib not in self.plugins: + self.init_plugin(attrib) + if self.plugins[attrib].is_extension: + return self.plugins[attrib][attrib] + return self.plugins[attrib] + else: + return '' + + def __setitem__(self, attrib, value): + """Set the value of a stanza interface using dictionary-like syntax. + + Example:: + + >>> msg['body'] = "Hi!" + >>> msg['body'] + 'Hi!' + + Stanza interfaces are typically mapped directly to the underlying XML + object, but can be overridden by the presence of a ``set_attrib`` + method (or ``set_foo`` where the interface is named ``'foo'``, etc). + + The effect of interface value assignment for an interface + named ``'foo'`` will be one of: + + 1. Delete the interface's contents if the value is None. + 2. Call the ``set_foo`` override handler, if it exists. + 3. Call ``set_foo``, if it exists. + 4. Call ``setFoo``, if it exists. + 5. Set the text of a ``foo`` element, if ``'foo'`` is + in :attr:`sub_interfaces`. + 6. Set the value of a top level XML attribute named ``foo``. + 7. Attempt to pass the value to a plugin named ``'foo'`` using + the plugin's ``'foo'`` interface. + 8. Do nothing. + + :param string attrib: The name of the stanza interface to modify. + :param value: The new value of the stanza interface. + """ + if attrib in self.interfaces: + if value is not None: + set_method = "set_%s" % attrib.lower() + set_method2 = "set%s" % attrib.title() + + if self.plugin_overrides: + plugin = self.plugin_overrides.get(set_method, None) + if plugin: + if plugin not in self.plugins: + self.init_plugin(plugin) + handler = getattr(self.plugins[plugin], + set_method, None) + if handler: + return handler(value) + + if hasattr(self, set_method): + getattr(self, set_method)(value,) + elif hasattr(self, set_method2): + getattr(self, set_method2)(value,) + else: + if attrib in self.sub_interfaces: + return self._set_sub_text(attrib, text=value) + else: + self._set_attr(attrib, value) + else: + self.__delitem__(attrib) + elif attrib in self.plugin_attrib_map: + if attrib not in self.plugins: + self.init_plugin(attrib) + self.plugins[attrib][attrib] = value + return self + + def __delitem__(self, attrib): + """Delete the value of a stanza interface using dict-like syntax. + + Example:: + + >>> msg['body'] = "Hi!" + >>> msg['body'] + 'Hi!' + >>> del msg['body'] + >>> msg['body'] + '' + + Stanza interfaces are typically mapped directly to the underlyig XML + object, but can be overridden by the presence of a ``del_attrib`` + method (or ``del_foo`` where the interface is named ``'foo'``, etc). + + The effect of deleting a stanza interface value named ``foo`` will be + one of: + + 1. Call ``del_foo`` override handler, if it exists. + 2. Call ``del_foo``, if it exists. + 3. Call ``delFoo``, if it exists. + 4. Delete ``foo`` element, if ``'foo'`` is in + :attr:`sub_interfaces`. + 5. Delete top level XML attribute named ``foo``. + 6. Remove the ``foo`` plugin, if it was loaded. + 7. Do nothing. + + :param attrib: The name of the affected stanza interface. + """ + if attrib in self.interfaces: + del_method = "del_%s" % attrib.lower() + del_method2 = "del%s" % attrib.title() + + if self.plugin_overrides: + plugin = self.plugin_overrides.get(del_method, None) + if plugin: + if plugin not in self.plugins: + self.init_plugin(plugin) + handler = getattr(self.plugins[plugin], del_method, None) + if handler: + return handler() + + if hasattr(self, del_method): + getattr(self, del_method)() + elif hasattr(self, del_method2): + getattr(self, del_method2)() + else: + if attrib in self.sub_interfaces: + return self._del_sub(attrib) + else: + self._del_attr(attrib) + elif attrib in self.plugin_attrib_map: + if attrib in self.plugins: + xml = self.plugins[attrib].xml + if self.plugins[attrib].is_extension: + del self.plugins[attrib][attrib] + del self.plugins[attrib] + try: + self.xml.remove(xml) + except: + pass + return self + + def _set_attr(self, name, value): + """Set the value of a top level attribute of the XML object. + + If the new value is None or an empty string, then the attribute will + be removed. + + :param name: The name of the attribute. + :param value: The new value of the attribute, or None or '' to + remove it. + """ + if value is None or value == '': + self.__delitem__(name) + else: + self.xml.attrib[name] = value + + def _del_attr(self, name): + """Remove a top level attribute of the XML object. + + :param name: The name of the attribute. + """ + if name in self.xml.attrib: + del self.xml.attrib[name] + + def _get_attr(self, name, default=''): + """Return the value of a top level attribute of the XML object. + + In case the attribute has not been set, a default value can be + returned instead. An empty string is returned if no other default + is supplied. + + :param name: The name of the attribute. + :param default: Optional value to return if the attribute has not + been set. An empty string is returned otherwise. + """ + return self.xml.attrib.get(name, default) + + def _get_sub_text(self, name, default=''): + """Return the text contents of a sub element. + + In case the element does not exist, or it has no textual content, + a default value can be returned instead. An empty string is returned + if no other default is supplied. + + :param name: The name or XPath expression of the element. + :param default: Optional default to return if the element does + not exists. An empty string is returned otherwise. + """ + name = self._fix_ns(name) + stanza = self.xml.find(name) + if stanza is None or stanza.text is None: + return default + else: + return stanza.text + + def _set_sub_text(self, name, text=None, keep=False): + """Set the text contents of a sub element. + + In case the element does not exist, a element will be created, + and its text contents will be set. + + If the text is set to an empty string, or None, then the + element will be removed, unless keep is set to True. + + :param name: The name or XPath expression of the element. + :param text: The new textual content of the element. If the text + is an empty string or None, the element will be removed + unless the parameter keep is True. + :param keep: Indicates if the element should be kept if its text is + removed. Defaults to False. + """ + path = self._fix_ns(name, split=True) + element = self.xml.find(name) + + if not text and not keep: + return self._del_sub(name) + + if element is None: + # We need to add the element. If the provided name was + # an XPath expression, some of the intermediate elements + # may already exist. If so, we want to use those instead + # of generating new elements. + last_xml = self.xml + walked = [] + for ename in path: + walked.append(ename) + element = self.xml.find("/".join(walked)) + if element is None: + element = ET.Element(ename) + last_xml.append(element) + last_xml = element + element = last_xml + + element.text = text + return element + + def _del_sub(self, name, all=False): + """Remove sub elements that match the given name or XPath. + + If the element is in a path, then any parent elements that become + empty after deleting the element may also be deleted if requested + by setting all=True. + + :param name: The name or XPath expression for the element(s) to remove. + :param bool all: If True, remove all empty elements in the path to the + deleted element. Defaults to False. + """ + path = self._fix_ns(name, split=True) + original_target = path[-1] + + for level, _ in enumerate(path): + # Generate the paths to the target elements and their parent. + element_path = "/".join(path[:len(path) - level]) + parent_path = "/".join(path[:len(path) - level - 1]) + + elements = self.xml.findall(element_path) + parent = self.xml.find(parent_path) + + if elements: + if parent is None: + parent = self.xml + for element in elements: + if element.tag == original_target or \ + not element.getchildren(): + # Only delete the originally requested elements, and + # any parent elements that have become empty. + parent.remove(element) + if not all: + # If we don't want to delete elements up the tree, stop + # after deleting the first level of elements. + return + + def match(self, xpath): + """Compare a stanza object with an XPath-like expression. + + If the XPath matches the contents of the stanza object, the match + is successful. + + The XPath expression may include checks for stanza attributes. + For example:: + + 'presence@show=xa@priority=2/status' + + Would match a presence stanza whose show value is set to ``'xa'``, + has a priority value of ``'2'``, and has a status element. + + :param string xpath: The XPath expression to check against. It + may be either a string or a list of element + names with attribute checks. + """ + if not isinstance(xpath, list): + xpath = self._fix_ns(xpath, split=True, propagate_ns=False) + + # Extract the tag name and attribute checks for the first XPath node. + components = xpath[0].split('@') + tag = components[0] + attributes = components[1:] + + if tag not in (self.name, "{%s}%s" % (self.namespace, self.name)) and \ + tag not in self.plugins and tag not in self.plugin_attrib: + # The requested tag is not in this stanza, so no match. + return False + + # Check the rest of the XPath against any substanzas. + matched_substanzas = False + for substanza in self.iterables: + if xpath[1:] == []: + break + matched_substanzas = substanza.match(xpath[1:]) + if matched_substanzas: + break + + # Check attribute values. + for attribute in attributes: + name, value = attribute.split('=') + if self[name] != value: + return False + + # Check sub interfaces. + if len(xpath) > 1: + next_tag = xpath[1] + if next_tag in self.sub_interfaces and self[next_tag]: + return True + + # Attempt to continue matching the XPath using the stanza's plugins. + if not matched_substanzas and len(xpath) > 1: + # Convert {namespace}tag@attribs to just tag + next_tag = xpath[1].split('@')[0].split('}')[-1] + if next_tag in self.plugins: + return self.plugins[next_tag].match(xpath[1:]) + else: + return False + + # Everything matched. + return True + + def find(self, xpath): + """Find an XML object in this stanza given an XPath expression. + + Exposes ElementTree interface for backwards compatibility. + + .. note:: + + Matching on attribute values is not supported in Python 2.6 + or Python 3.1 + + :param string xpath: An XPath expression matching a single + desired element. + """ + return self.xml.find(xpath) + + def findall(self, xpath): + """Find multiple XML objects in this stanza given an XPath expression. + + Exposes ElementTree interface for backwards compatibility. + + .. note:: + + Matching on attribute values is not supported in Python 2.6 + or Python 3.1. + + :param string xpath: An XPath expression matching multiple + desired elements. + """ + return self.xml.findall(xpath) + + def get(self, key, default=None): + """Return the value of a stanza interface. + + If the found value is None or an empty string, return the supplied + default value. + + Allows stanza objects to be used like dictionaries. + + :param string key: The name of the stanza interface to check. + :param default: Value to return if the stanza interface has a value + of ``None`` or ``""``. Will default to returning None. + """ + value = self[key] + if value is None or value == '': + return default + return value + + def keys(self): + """Return the names of all stanza interfaces provided by the + stanza object. + + Allows stanza objects to be used like dictionaries. + """ + out = [] + out += [x for x in self.interfaces] + out += [x for x in self.plugins] + if self.iterables: + out.append('substanzas') + return out + + def append(self, item): + """Append either an XML object or a substanza to this stanza object. + + If a substanza object is appended, it will be added to the list + of iterable stanzas. + + Allows stanza objects to be used like lists. + + :param item: Either an XML object or a stanza object to add to + this stanza's contents. + """ + if not isinstance(item, ElementBase): + if type(item) == XML_TYPE: + return self.appendxml(item) + else: + raise TypeError + self.xml.append(item.xml) + self.iterables.append(item) + return self + + def appendxml(self, xml): + """Append an XML object to the stanza's XML. + + The added XML will not be included in the list of + iterable substanzas. + + :param XML xml: The XML object to add to the stanza. + """ + self.xml.append(xml) + return self + + def pop(self, index=0): + """Remove and return the last substanza in the list of + iterable substanzas. + + Allows stanza objects to be used like lists. + + :param int index: The index of the substanza to remove. + """ + substanza = self.iterables.pop(index) + self.xml.remove(substanza.xml) + return substanza + + def next(self): + """Return the next iterable substanza.""" + return self.__next__() + + def clear(self): + """Remove all XML element contents and plugins. + + Any attribute values will be preserved. + """ + for child in self.xml.getchildren(): + self.xml.remove(child) + for plugin in list(self.plugins.keys()): + del self.plugins[plugin] + return self + + @classmethod + def tag_name(cls): + """Return the namespaced name of the stanza's root element. + + The format for the tag name is:: + + '{namespace}elementname' + + For example, for the stanza ``<foo xmlns="bar" />``, + ``stanza.tag_name()`` would return ``"{bar}foo"``. + """ + return "{%s}%s" % (cls.namespace, cls.name) + + @property + def attrib(self): + """Return the stanza object itself. + + Older implementations of stanza objects used XML objects directly, + requiring the use of ``.attrib`` to access attribute values. + + Use of the dictionary syntax with the stanza object itself for + accessing stanza interfaces is preferred. + + .. deprecated:: 1.0 + """ + return self + + def _fix_ns(self, xpath, split=False, propagate_ns=True): + return fix_ns(xpath, split=split, + propagate_ns=propagate_ns, + default_ns=self.namespace) + + def __eq__(self, other): + """Compare the stanza object with another to test for equality. + + Stanzas are equal if their interfaces return the same values, + and if they are both instances of ElementBase. + + :param ElementBase other: The stanza object to compare against. + """ + if not isinstance(other, ElementBase): + return False + + # Check that this stanza is a superset of the other stanza. + values = self.values + for key in other.keys(): + if key not in values or values[key] != other[key]: + return False + + # Check that the other stanza is a superset of this stanza. + values = other.values + for key in self.keys(): + if key not in values or values[key] != self[key]: + return False + + # Both stanzas are supersets of each other, therefore they + # must be equal. + return True + + def __ne__(self, other): + """Compare the stanza object with another to test for inequality. + + Stanzas are not equal if their interfaces return different values, + or if they are not both instances of ElementBase. + + :param ElementBase other: The stanza object to compare against. + """ + return not self.__eq__(other) + + def __bool__(self): + """Stanza objects should be treated as True in boolean contexts. + + Python 3.x version. + """ + return True + + def __nonzero__(self): + """Stanza objects should be treated as True in boolean contexts. + + Python 2.x version. + """ + return True + + def __len__(self): + """Return the number of iterable substanzas in this stanza.""" + return len(self.iterables) + + def __iter__(self): + """Return an iterator object for the stanza's substanzas. + + The iterator is the stanza object itself. Attempting to use two + iterators on the same stanza at the same time is discouraged. + """ + self._index = 0 + return self + + def __next__(self): + """Return the next iterable substanza.""" + self._index += 1 + if self._index > len(self.iterables): + self._index = 0 + raise StopIteration + return self.iterables[self._index - 1] + + def __copy__(self): + """Return a copy of the stanza object that does not share the same + underlying XML object. + """ + return self.__class__(xml=copy.deepcopy(self.xml), parent=self.parent) + + def __str__(self, top_level_ns=True): + """Return a string serialization of the underlying XML object. + + .. seealso:: :ref:`tostring` + + :param bool top_level_ns: Display the top-most namespace. + Defaults to True. + """ + stanza_ns = '' if top_level_ns else self.namespace + return tostring(self.xml, xmlns='', + stanza_ns=stanza_ns, + top_level=not top_level_ns) + + def __repr__(self): + """Use the stanza's serialized XML as its representation.""" + return self.__str__() + + +class StanzaBase(ElementBase): + + """ + StanzaBase provides the foundation for all other stanza objects used + by SleekXMPP, and defines a basic set of interfaces common to nearly + all stanzas. These interfaces are the ``'id'``, ``'type'``, ``'to'``, + and ``'from'`` attributes. An additional interface, ``'payload'``, is + available to access the XML contents of the stanza. Most stanza objects + will provided more specific interfaces, however. + + **Stanza Interfaces:** + + :id: An optional id value that can be used to associate stanzas + :to: A JID object representing the recipient's JID. + :from: A JID object representing the sender's JID. + with their replies. + :type: The type of stanza, typically will be ``'normal'``, + ``'error'``, ``'get'``, or ``'set'``, etc. + :payload: The XML contents of the stanza. + + :param XMLStream stream: Optional :class:`sleekxmpp.xmlstream.XMLStream` + object responsible for sending this stanza. + :param XML xml: Optional XML contents to initialize stanza values. + :param string stype: Optional stanza type value. + :param sto: Optional string or :class:`sleekxmpp.xmlstream.JID` + object of the recipient's JID. + :param sfrom: Optional string or :class:`sleekxmpp.xmlstream.JID` + object of the sender's JID. + :param string sid: Optional ID value for the stanza. + """ + + #: The default XMPP client namespace + namespace = 'jabber:client' + + #: There is a small set of attributes which apply to all XMPP stanzas: + #: the stanza type, the to and from JIDs, the stanza ID, and, especially + #: in the case of an Iq stanza, a payload. + interfaces = set(('type', 'to', 'from', 'id', 'payload')) + + #: A basic set of allowed values for the ``'type'`` interface. + types = set(('get', 'set', 'error', None, 'unavailable', 'normal', 'chat')) + + def __init__(self, stream=None, xml=None, stype=None, + sto=None, sfrom=None, sid=None): + self.stream = stream + if stream is not None: + self.namespace = stream.default_ns + ElementBase.__init__(self, xml) + if stype is not None: + self['type'] = stype + if sto is not None: + self['to'] = sto + if sfrom is not None: + self['from'] = sfrom + self.tag = "{%s}%s" % (self.namespace, self.name) + + def set_type(self, value): + """Set the stanza's ``'type'`` attribute. + + Only type values contained in :attr:`types` are accepted. + + :param string value: One of the values contained in :attr:`types` + """ + if value in self.types: + self.xml.attrib['type'] = value + return self + + def get_to(self): + """Return the value of the stanza's ``'to'`` attribute.""" + return JID(self._get_attr('to')) + + def set_to(self, value): + """Set the ``'to'`` attribute of the stanza. + + :param value: A string or :class:`sleekxmpp.xmlstream.JID` object + representing the recipient's JID. + """ + return self._set_attr('to', str(value)) + + def get_from(self): + """Return the value of the stanza's ``'from'`` attribute.""" + return JID(self._get_attr('from')) + + def set_from(self, value): + """Set the 'from' attribute of the stanza. + + Arguments: + from -- A string or JID object representing the sender's JID. + """ + return self._set_attr('from', str(value)) + + def get_payload(self): + """Return a list of XML objects contained in the stanza.""" + return self.xml.getchildren() + + def set_payload(self, value): + """Add XML content to the stanza. + + :param value: Either an XML or a stanza object, or a list + of XML or stanza objects. + """ + if not isinstance(value, list): + value = [value] + for val in value: + self.append(val) + return self + + def del_payload(self): + """Remove the XML contents of the stanza.""" + self.clear() + return self + + def reply(self, clear=True): + """Prepare the stanza for sending a reply. + + Swaps the ``'from'`` and ``'to'`` attributes. + + If ``clear=True``, then also remove the stanza's + contents to make room for the reply content. + + For client streams, the ``'from'`` attribute is removed. + + :param bool clear: Indicates if the stanza's contents should be + removed. Defaults to ``True``. + """ + # if it's a component, use from + if self.stream and hasattr(self.stream, "is_component") and \ + self.stream.is_component: + self['from'], self['to'] = self['to'], self['from'] + else: + self['to'] = self['from'] + del self['from'] + if clear: + self.clear() + return self + + def error(self): + """Set the stanza's type to ``'error'``.""" + self['type'] = 'error' + return self + + def unhandled(self): + """Called if no handlers have been registered to process this stanza. + + Meant to be overridden. + """ + pass + + def exception(self, e): + """Handle exceptions raised during stanza processing. + + Meant to be overridden. + """ + log.exception('Error handling {%s}%s stanza', self.namespace, + self.name) + + def send(self, now=False): + """Queue the stanza to be sent on the XML stream. + + :param bool now: Indicates if the queue should be skipped and the + stanza sent immediately. Useful for stream + initialization. Defaults to ``False``. + """ + self.stream.send(self, now=now) + + def __copy__(self): + """Return a copy of the stanza object that does not share the + same underlying XML object, but does share the same XML stream. + """ + return self.__class__(xml=copy.deepcopy(self.xml), + stream=self.stream) + + def __str__(self, top_level_ns=False): + """Serialize the stanza's XML to a string. + + :param bool top_level_ns: Display the top-most namespace. + Defaults to ``False``. + """ + stanza_ns = '' if top_level_ns else self.namespace + return tostring(self.xml, xmlns='', + stanza_ns=stanza_ns, + stream=self.stream, + top_level=not top_level_ns) + + +#: A JSON/dictionary version of the XML content exposed through +#: the stanza interfaces:: +#: +#: >>> msg = Message() +#: >>> msg.values +#: {'body': '', 'from': , 'mucnick': '', 'mucroom': '', +#: 'to': , 'type': 'normal', 'id': '', 'subject': ''} +#: +#: Likewise, assigning to the :attr:`values` will change the XML +#: content:: +#: +#: >>> msg = Message() +#: >>> msg.values = {'body': 'Hi!', 'to': 'user@example.com'} +#: >>> msg +#: '<message to="user@example.com"><body>Hi!</body></message>' +#: +#: Child stanzas are exposed as nested dictionaries. +ElementBase.values = property(ElementBase._get_stanza_values, + ElementBase._set_stanza_values) + + +# To comply with PEP8, method names now use underscores. +# Deprecated method names are re-mapped for backwards compatibility. +ElementBase.initPlugin = ElementBase.init_plugin +ElementBase._getAttr = ElementBase._get_attr +ElementBase._setAttr = ElementBase._set_attr +ElementBase._delAttr = ElementBase._del_attr +ElementBase._getSubText = ElementBase._get_sub_text +ElementBase._setSubText = ElementBase._set_sub_text +ElementBase._delSub = ElementBase._del_sub +ElementBase.getStanzaValues = ElementBase._get_stanza_values +ElementBase.setStanzaValues = ElementBase._set_stanza_values + +StanzaBase.setType = StanzaBase.set_type +StanzaBase.getTo = StanzaBase.get_to +StanzaBase.setTo = StanzaBase.set_to +StanzaBase.getFrom = StanzaBase.get_from +StanzaBase.setFrom = StanzaBase.set_from +StanzaBase.getPayload = StanzaBase.get_payload +StanzaBase.setPayload = StanzaBase.set_payload +StanzaBase.delPayload = StanzaBase.del_payload diff --git a/sleekxmpp/xmlstream/test.py b/sleekxmpp/xmlstream/test.py new file mode 100644 index 00000000..a45fb8b4 --- /dev/null +++ b/sleekxmpp/xmlstream/test.py @@ -0,0 +1,23 @@ +import xmlstream +import time +import socket +from handler.callback import Callback +from matcher.xpath import MatchXPath + +def server(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(('localhost', 5228)) + s.listen(1) + servers = [] + while True: + conn, addr = s.accept() + server = xmlstream.XMLStream(conn, 'localhost', 5228) + server.registerHandler(Callback('test', MatchXPath('test'), testHandler)) + server.process() + servers.append(server) + +def testHandler(xml): + print("weeeeeeeee!") + +server() diff --git a/sleekxmpp/xmlstream/test.xml b/sleekxmpp/xmlstream/test.xml new file mode 100644 index 00000000..d20dd82c --- /dev/null +++ b/sleekxmpp/xmlstream/test.xml @@ -0,0 +1,2 @@ +<stream> +</stream> diff --git a/sleekxmpp/xmlstream/testclient.py b/sleekxmpp/xmlstream/testclient.py new file mode 100644 index 00000000..50eb6c50 --- /dev/null +++ b/sleekxmpp/xmlstream/testclient.py @@ -0,0 +1,13 @@ +import socket +import time + +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +s.connect(('localhost', 5228)) +s.send("<stream>") +#s.flush() +s.send("<test/>") +s.send("<test/>") +s.send("<test/>") +s.send("</stream>") +#s.flush() +s.close() diff --git a/sleekxmpp/xmlstream/tostring.py b/sleekxmpp/xmlstream/tostring.py new file mode 100644 index 00000000..8e729f79 --- /dev/null +++ b/sleekxmpp/xmlstream/tostring.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +""" + sleekxmpp.xmlstream.tostring + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + This module converts XML objects into Unicode strings and + intelligently includes namespaces only when necessary to + keep the output readable. + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +import sys + +if sys.version_info < (3, 0): + import types + + +def tostring(xml=None, xmlns='', stanza_ns='', stream=None, + outbuffer='', top_level=False): + """Serialize an XML object to a Unicode string. + + If namespaces are provided using ``xmlns`` or ``stanza_ns``, then + elements that use those namespaces will not include the xmlns attribute + in the output. + + :param XML xml: The XML object to serialize. + :param string xmlns: Optional namespace of an element wrapping the XML + object. + :param string stanza_ns: The namespace of the stanza object that contains + the XML object. + :param stream: The XML stream that generated the XML object. + :param string outbuffer: Optional buffer for storing serializations + during recursive calls. + :param bool top_level: Indicates that the element is the outermost + element. + + + :type xml: :py:class:`~xml.etree.ElementTree.Element` + :type stream: :class:`~sleekxmpp.xmlstream.xmlstream.XMLStream` + + :rtype: Unicode string + """ + # Add previous results to the start of the output. + output = [outbuffer] + + # Extract the element's tag name. + tag_name = xml.tag.split('}', 1)[-1] + + # Extract the element's namespace if it is defined. + if '}' in xml.tag: + tag_xmlns = xml.tag.split('}', 1)[0][1:] + else: + tag_xmlns = '' + + default_ns = '' + stream_ns = '' + if stream: + default_ns = stream.default_ns + stream_ns = stream.stream_ns + + # Output the tag name and derived namespace of the element. + namespace = '' + if top_level and tag_xmlns not in ['', default_ns, stream_ns] or \ + tag_xmlns not in ['', xmlns, stanza_ns, stream_ns]: + namespace = ' xmlns="%s"' % tag_xmlns + if stream and tag_xmlns in stream.namespace_map: + mapped_namespace = stream.namespace_map[tag_xmlns] + if mapped_namespace: + tag_name = "%s:%s" % (mapped_namespace, tag_name) + output.append("<%s" % tag_name) + output.append(namespace) + + # Output escaped attribute values. + for attrib, value in xml.attrib.items(): + value = xml_escape(value) + if '}' not in attrib: + output.append(' %s="%s"' % (attrib, value)) + else: + attrib_ns = attrib.split('}')[0][1:] + attrib = attrib.split('}')[1] + if stream and attrib_ns in stream.namespace_map: + mapped_ns = stream.namespace_map[attrib_ns] + if mapped_ns: + output.append(' %s:%s="%s"' % (mapped_ns, + attrib, + value)) + + if len(xml) or xml.text: + # If there are additional child elements to serialize. + output.append(">") + if xml.text: + output.append(xml_escape(xml.text)) + if len(xml): + for child in xml.getchildren(): + output.append(tostring(child, tag_xmlns, stanza_ns, stream)) + output.append("</%s>" % tag_name) + elif xml.text: + # If we only have text content. + output.append(">%s</%s>" % (xml_escape(xml.text), tag_name)) + else: + # Empty element. + output.append(" />") + if xml.tail: + # If there is additional text after the element. + output.append(xml_escape(xml.tail)) + return ''.join(output) + + +def xml_escape(text): + """Convert special characters in XML to escape sequences. + + :param string text: The XML text to convert. + :rtype: Unicode string + """ + if sys.version_info < (3, 0): + if type(text) != types.UnicodeType: + text = unicode(text, 'utf-8', 'ignore') + + text = list(text) + escapes = {'&': '&', + '<': '<', + '>': '>', + "'": ''', + '"': '"'} + for i, c in enumerate(text): + text[i] = escapes.get(c, c) + return ''.join(text) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py new file mode 100644 index 00000000..4c8696b3 --- /dev/null +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -0,0 +1,1479 @@ +# -*- coding: utf-8 -*- +""" + sleekxmpp.xmlstream.xmlstream + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + This module provides the module for creating and + interacting with generic XML streams, along with + the necessary eventing infrastructure. + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +from __future__ import with_statement, unicode_literals + +import base64 +import copy +import logging +import signal +import socket as Socket +import ssl +import sys +import threading +import time +import types +import random +import weakref +try: + import queue +except ImportError: + import Queue as queue + +import sleekxmpp +from sleekxmpp.thirdparty.statemachine import StateMachine +from sleekxmpp.xmlstream import Scheduler, tostring +from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET, ElementBase +from sleekxmpp.xmlstream.handler import Waiter, XMLCallback +from sleekxmpp.xmlstream.matcher import MatchXMLMask + +# In Python 2.x, file socket objects are broken. A patched socket +# wrapper is provided for this case in filesocket.py. +if sys.version_info < (3, 0): + from sleekxmpp.xmlstream.filesocket import FileSocket, Socket26 + +try: + import dns.resolver +except ImportError: + DNSPYTHON = False +else: + DNSPYTHON = True + + +#: The time in seconds to wait before timing out waiting for response stanzas. +RESPONSE_TIMEOUT = 30 + +#: The time in seconds to wait for events from the event queue, and also the +#: time between checks for the process stop signal. +WAIT_TIMEOUT = 1 + +#: The number of threads to use to handle XML stream events. This is not the +#: same as the number of custom event handling threads. +#: :data:`HANDLER_THREADS` must be at least 1. For Python implementations +#: with a GIL, this should be left at 1, but for implemetnations without +#: a GIL increasing this value can provide better performance. +HANDLER_THREADS = 1 + +#: Flag indicating if the SSL library is available for use. +SSL_SUPPORT = True + +#: The time in seconds to delay between attempts to resend data +#: after an SSL error. +SSL_RETRY_DELAY = 0.5 + +#: The maximum number of times to attempt resending data due to +#: an SSL error. +SSL_RETRY_MAX = 10 + +#: Maximum time to delay between connection attempts is one hour. +RECONNECT_MAX_DELAY = 600 + + +log = logging.getLogger(__name__) + + +class RestartStream(Exception): + """ + Exception to restart stream processing, including + resending the stream header. + """ + + +class XMLStream(object): + """ + An XML stream connection manager and event dispatcher. + + The XMLStream class abstracts away the issues of establishing a + connection with a server and sending and receiving XML "stanzas". + A stanza is a complete XML element that is a direct child of a root + document element. Two streams are used, one for each communication + direction, over the same socket. Once the connection is closed, both + streams should be complete and valid XML documents. + + Three types of events are provided to manage the stream: + :Stream: Triggered based on received stanzas, similar in concept + to events in a SAX XML parser. + :Custom: Triggered manually. + :Scheduled: Triggered based on time delays. + + Typically, stanzas are first processed by a stream event handler which + will then trigger custom events to continue further processing, + especially since custom event handlers may run in individual threads. + + :param socket: Use an existing socket for the stream. Defaults to + ``None`` to generate a new socket. + :param string host: The name of the target server. + :param int port: The port to use for the connection. Defaults to 0. + """ + + def __init__(self, socket=None, host='', port=0): + #: Flag indicating if the SSL library is available for use. + self.ssl_support = SSL_SUPPORT + + #: Most XMPP servers support TLSv1, but OpenFire in particular + #: does not work well with it. For OpenFire, set + #: :attr:`ssl_version` to use ``SSLv23``:: + #: + #: import ssl + #: xmpp.ssl_version = ssl.PROTOCOL_SSLv23 + self.ssl_version = ssl.PROTOCOL_TLSv1 + + #: Path to a file containing certificates for verifying the + #: server SSL certificate. A non-``None`` value will trigger + #: certificate checking. + #: + #: .. note:: + #: + #: On Mac OS X, certificates in the system keyring will + #: be consulted, even if they are not in the provided file. + self.ca_certs = None + + #: The time in seconds to wait for events from the event queue, + #: and also the time between checks for the process stop signal. + self.wait_timeout = WAIT_TIMEOUT + + #: The time in seconds to wait before timing out waiting + #: for response stanzas. + self.response_timeout = RESPONSE_TIMEOUT + + #: The current amount to time to delay attempting to reconnect. + #: This value doubles (with some jitter) with each failed + #: connection attempt up to :attr:`reconnect_max_delay` seconds. + self.reconnect_delay = None + + #: Maximum time to delay between connection attempts is one hour. + self.reconnect_max_delay = RECONNECT_MAX_DELAY + + #: The time in seconds to delay between attempts to resend data + #: after an SSL error. + self.ssl_retry_max = SSL_RETRY_MAX + + #: The maximum number of times to attempt resending data due to + #: an SSL error. + self.ssl_retry_delay = SSL_RETRY_DELAY + + #: The connection state machine tracks if the stream is + #: ``'connected'`` or ``'disconnected'``. + self.state = StateMachine(('disconnected', 'connected')) + self.state._set_state('disconnected') + + #: The default port to return when querying DNS records. + self.default_port = int(port) + + #: The domain to try when querying DNS records. + self.default_domain = '' + + #: The desired, or actual, address of the connected server. + self.address = (host, int(port)) + + #: A file-like wrapper for the socket for use with the + #: :mod:`~xml.etree.ElementTree` module. + self.filesocket = None + self.set_socket(socket) + + if sys.version_info < (3, 0): + self.socket_class = Socket26 + else: + self.socket_class = Socket.socket + + #: Enable connecting to the server directly over SSL, in + #: particular when the service provides two ports: one for + #: non-SSL traffic and another for SSL traffic. + self.use_ssl = False + + #: Enable connecting to the service without using SSL + #: immediately, but allow upgrading the connection later + #: to use SSL. + self.use_tls = False + + #: If set to ``True``, attempt to connect through an HTTP + #: proxy based on the settings in :attr:`proxy_config`. + self.use_proxy = False + + #: An optional dictionary of proxy settings. It may provide: + #: :host: The host offering proxy services. + #: :port: The port for the proxy service. + #: :username: Optional username for accessing the proxy. + #: :password: Optional password for accessing the proxy. + self.proxy_config = {} + + #: The default namespace of the stream content, not of the + #: stream wrapper itself. + self.default_ns = '' + + #: The namespace of the enveloping stream element. + self.stream_ns = '' + + #: The default opening tag for the stream element. + self.stream_header = "<stream>" + + #: The default closing tag for the stream element. + self.stream_footer = "</stream>" + + #: If ``True``, periodically send a whitespace character over the + #: wire to keep the connection alive. Mainly useful for connections + #: traversing NAT. + self.whitespace_keepalive = True + + #: The default interval between keepalive signals when + #: :attr:`whitespace_keepalive` is enabled. + self.whitespace_keepalive_interval = 300 + + #: An :class:`~threading.Event` to signal that the application + #: is stopping, and that all threads should shutdown. + self.stop = threading.Event() + + #: An :class:`~threading.Event` to signal receiving a closing + #: stream tag from the server. + self.stream_end_event = threading.Event() + self.stream_end_event.set() + + #: An :class:`~threading.Event` to signal the start of a stream + #: session. Until this event fires, the send queue is not used + #: and data is sent immediately over the wire. + self.session_started_event = threading.Event() + + #: The default time in seconds to wait for a session to start + #: after connecting before reconnecting and trying again. + self.session_timeout = 45 + + #: A queue of stream, custom, and scheduled events to be processed. + self.event_queue = queue.Queue() + + #: A queue of string data to be sent over the stream. + self.send_queue = queue.Queue() + + #: A :class:`~sleekxmpp.xmlstream.scheduler.Scheduler` instance for + #: executing callbacks in the future based on time delays. + self.scheduler = Scheduler(self.stop) + self.__failed_send_stanza = None + + #: A mapping of XML namespaces to well-known prefixes. + self.namespace_map = {StanzaBase.xml_ns: 'xml'} + + self.__thread = {} + self.__root_stanza = [] + self.__handlers = [] + self.__event_handlers = {} + self.__event_handlers_lock = threading.Lock() + self.__filters = {'in': [], 'out': []} + + self._id = 0 + self._id_lock = threading.Lock() + + #: The :attr:`auto_reconnnect` setting controls whether or not + #: the stream will be restarted in the event of an error. + self.auto_reconnect = True + + #: The :attr:`disconnect_wait` setting is the default value + #: for controlling if the system waits for the send queue to + #: empty before ending the stream. This may be overridden by + #: passing ``wait=True`` or ``wait=False`` to :meth:`disconnect`. + #: The default :attr:`disconnect_wait` value is ``False``. + self.disconnect_wait = False + + #: A list of DNS results that have not yet been tried. + self.dns_answers = [] + + self.add_event_handler('connected', self._handle_connected) + self.add_event_handler('session_start', self._start_keepalive) + self.add_event_handler('session_end', self._end_keepalive) + + def use_signals(self, signals=None): + """Register signal handlers for ``SIGHUP`` and ``SIGTERM``. + + By using signals, a ``'killed'`` event will be raised when the + application is terminated. + + If a signal handler already existed, it will be executed first, + before the ``'killed'`` event is raised. + + :param list signals: A list of signal names to be monitored. + Defaults to ``['SIGHUP', 'SIGTERM']``. + """ + if signals is None: + signals = ['SIGHUP', 'SIGTERM'] + + existing_handlers = {} + for sig_name in signals: + if hasattr(signal, sig_name): + sig = getattr(signal, sig_name) + handler = signal.getsignal(sig) + if handler: + existing_handlers[sig] = handler + + def handle_kill(signum, frame): + """ + Capture kill event and disconnect cleanly after first + spawning the ``'killed'`` event. + """ + + if signum in existing_handlers and \ + existing_handlers[signum] != handle_kill: + existing_handlers[signum](signum, frame) + + self.event("killed", direct=True) + self.disconnect() + + try: + for sig_name in signals: + if hasattr(signal, sig_name): + sig = getattr(signal, sig_name) + signal.signal(sig, handle_kill) + self.__signals_installed = True + except: + log.debug("Can not set interrupt signal handlers. " + \ + "SleekXMPP is not running from a main thread.") + + def new_id(self): + """Generate and return a new stream ID in hexadecimal form. + + Many stanzas, handlers, or matchers may require unique + ID values. Using this method ensures that all new ID values + are unique in this stream. + """ + with self._id_lock: + self._id += 1 + return self.get_id() + + def get_id(self): + """Return the current unique stream ID in hexadecimal form.""" + return "%X" % self._id + + def connect(self, host='', port=0, use_ssl=False, + use_tls=True, reattempt=True): + """Create a new socket and connect to the server. + + Setting ``reattempt`` to ``True`` will cause connection attempts to + be made every second until a successful connection is established. + + :param host: The name of the desired server for the connection. + :param port: Port to connect to on the server. + :param use_ssl: Flag indicating if SSL should be used by connecting + directly to a port using SSL. + :param use_tls: Flag indicating if TLS should be used, allowing for + connecting to a port without using SSL immediately and + later upgrading the connection. + :param reattempt: Flag indicating if the socket should reconnect + after disconnections. + """ + if host and port: + self.address = (host, int(port)) + try: + Socket.inet_aton(self.address[0]) + except Socket.error: + self.default_domain = self.address[0] + + # Respect previous SSL and TLS usage directives. + if use_ssl is not None: + self.use_ssl = use_ssl + if use_tls is not None: + self.use_tls = use_tls + + # Repeatedly attempt to connect until a successful connection + # is established. + connected = self.state.transition('disconnected', 'connected', + func=self._connect) + while reattempt and not connected and not self.stop.is_set(): + connected = self.state.transition('disconnected', 'connected', + func=self._connect) + return connected + + def _connect(self): + self.scheduler.remove('Session timeout check') + self.stop.clear() + if self.default_domain: + self.address = self.pick_dns_answer(self.default_domain, + self.address[1]) + self.socket = self.socket_class(Socket.AF_INET, Socket.SOCK_STREAM) + self.configure_socket() + + if self.reconnect_delay is None: + delay = 1.0 + else: + delay = min(self.reconnect_delay * 2, self.reconnect_max_delay) + delay = random.normalvariate(delay, delay * 0.1) + log.debug('Waiting %s seconds before connecting.', delay) + elapsed = 0 + try: + while elapsed < delay and not self.stop.is_set(): + time.sleep(0.1) + elapsed += 0.1 + except KeyboardInterrupt: + self.stop.set() + return False + except SystemExit: + self.stop.set() + return False + + if self.use_proxy: + connected = self._connect_proxy() + if not connected: + self.reconnect_delay = delay + return False + + if self.use_ssl and self.ssl_support: + log.debug("Socket Wrapped for SSL") + if self.ca_certs is None: + cert_policy = ssl.CERT_NONE + else: + cert_policy = ssl.CERT_REQUIRED + + ssl_socket = ssl.wrap_socket(self.socket, + ca_certs=self.ca_certs, + cert_reqs=cert_policy) + + if hasattr(self.socket, 'socket'): + # We are using a testing socket, so preserve the top + # layer of wrapping. + self.socket.socket = ssl_socket + else: + self.socket = ssl_socket + + try: + if not self.use_proxy: + log.debug("Connecting to %s:%s", *self.address) + self.socket.connect(self.address) + + self.set_socket(self.socket, ignore=True) + #this event is where you should set your application state + self.event("connected", direct=True) + self.reconnect_delay = 1.0 + return True + except Socket.error as serr: + error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" + self.event('socket_error', serr) + log.error(error_msg, self.address[0], self.address[1], + serr.errno, serr.strerror) + self.reconnect_delay = delay + return False + + def _connect_proxy(self): + """Attempt to connect using an HTTP Proxy.""" + + # Extract the proxy address, and optional credentials + address = (self.proxy_config['host'], int(self.proxy_config['port'])) + cred = None + if self.proxy_config['username']: + username = self.proxy_config['username'] + password = self.proxy_config['password'] + + cred = '%s:%s' % (username, password) + if sys.version_info < (3, 0): + cred = bytes(cred) + else: + cred = bytes(cred, 'utf-8') + cred = base64.b64encode(cred).decode('utf-8') + + # Build the HTTP headers for connecting to the XMPP server + headers = ['CONNECT %s:%s HTTP/1.0' % self.address, + 'Host: %s:%s' % self.address, + 'Proxy-Connection: Keep-Alive', + 'Pragma: no-cache', + 'User-Agent: SleekXMPP/%s' % sleekxmpp.__version__] + if cred: + headers.append('Proxy-Authorization: Basic %s' % cred) + headers = '\r\n'.join(headers) + '\r\n\r\n' + + try: + log.debug("Connecting to proxy: %s:%s", address) + self.socket.connect(address) + self.send_raw(headers, now=True) + resp = '' + while '\r\n\r\n' not in resp and not self.stop.is_set(): + resp += self.socket.recv(1024).decode('utf-8') + log.debug('RECV: %s', resp) + + lines = resp.split('\r\n') + if '200' not in lines[0]: + self.event('proxy_error', resp) + log.error('Proxy Error: %s', lines[0]) + return False + + # Proxy connection established, continue connecting + # with the XMPP server. + return True + except Socket.error as serr: + error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" + self.event('socket_error', serr) + log.error(error_msg, self.address[0], self.address[1], + serr.errno, serr.strerror) + return False + + def _handle_connected(self, event=None): + """ + Add check to ensure that a session is established within + a reasonable amount of time. + """ + + def _handle_session_timeout(): + if not self.session_started_event.is_set(): + log.debug("Session start has taken more " + \ + "than %d seconds", self.session_timeout) + self.disconnect(reconnect=self.auto_reconnect) + + self.schedule("Session timeout check", + self.session_timeout, + _handle_session_timeout) + + def disconnect(self, reconnect=False, wait=None): + """Terminate processing and close the XML streams. + + Optionally, the connection may be reconnected and + resume processing afterwards. + + If the disconnect should take place after all items + in the send queue have been sent, use ``wait=True``. + + .. warning:: + + If you are constantly adding items to the queue + such that it is never empty, then the disconnect will + not occur and the call will continue to block. + + :param reconnect: Flag indicating if the connection + and processing should be restarted. + Defaults to ``False``. + :param wait: Flag indicating if the send queue should + be emptied before disconnecting, overriding + :attr:`disconnect_wait`. + """ + self.state.transition('connected', 'disconnected', + func=self._disconnect, args=(reconnect, wait)) + + def _disconnect(self, reconnect=False, wait=None): + self.event('session_end', direct=True) + + # Wait for the send queue to empty. + if wait is not None: + if wait: + self.send_queue.join() + elif self.disconnect_wait: + self.send_queue.join() + + # Send the end of stream marker. + self.send_raw(self.stream_footer, now=True) + self.session_started_event.clear() + # Wait for confirmation that the stream was + # closed in the other direction. + self.auto_reconnect = reconnect + log.debug('Waiting for %s from server', self.stream_footer) + self.stream_end_event.wait(4) + if not self.auto_reconnect: + self.stop.set() + try: + self.socket.shutdown(Socket.SHUT_RDWR) + self.socket.close() + self.filesocket.close() + except Socket.error as serr: + self.event('socket_error', serr) + finally: + #clear your application state + self.event("disconnected", direct=True) + return True + + def reconnect(self, reattempt=True): + """Reset the stream's state and reconnect to the server.""" + log.debug("reconnecting...") + if self.state.ensure('connected'): + self.state.transition('connected', 'disconnected', wait=2.0, + func=self._disconnect, args=(True,)) + + log.debug("connecting...") + connected = self.state.transition('disconnected', 'connected', + wait=2.0, func=self._connect) + while reattempt and not connected and not self.stop.is_set(): + connected = self.state.transition('disconnected', 'connected', + wait=2.0, func=self._connect) + connected = connected or self.state.ensure('connected') + return connected + + def set_socket(self, socket, ignore=False): + """Set the socket to use for the stream. + + The filesocket will be recreated as well. + + :param socket: The new socket object to use. + :param bool ignore: If ``True``, don't set the connection + state to ``'connected'``. + """ + self.socket = socket + if socket is not None: + # ElementTree.iterparse requires a file. + # 0 buffer files have to be binary. + + # Use the correct fileobject type based on the Python + # version to work around a broken implementation in + # Python 2.x. + if sys.version_info < (3, 0): + self.filesocket = FileSocket(self.socket) + else: + self.filesocket = self.socket.makefile('rb', 0) + if not ignore: + self.state._set_state('connected') + + def configure_socket(self): + """Set timeout and other options for self.socket. + + Meant to be overridden. + """ + self.socket.settimeout(None) + + def configure_dns(self, resolver, domain=None, port=None): + """ + Configure and set options for a :class:`~dns.resolver.Resolver` + instance, and other DNS related tasks. For example, you + can also check :meth:`~socket.socket.getaddrinfo` to see + if you need to call out to ``libresolv.so.2`` to + run ``res_init()``. + + Meant to be overridden. + + :param resolver: A :class:`~dns.resolver.Resolver` instance + or ``None`` if ``dnspython`` is not installed. + :param domain: The initial domain under consideration. + :param port: The initial port under consideration. + """ + pass + + def start_tls(self): + """Perform handshakes for TLS. + + If the handshake is successful, the XML stream will need + to be restarted. + """ + if self.ssl_support: + log.info("Negotiating TLS") + log.info("Using SSL version: %s", str(self.ssl_version)) + if self.ca_certs is None: + cert_policy = ssl.CERT_NONE + else: + cert_policy = ssl.CERT_REQUIRED + + ssl_socket = ssl.wrap_socket(self.socket, + ssl_version=self.ssl_version, + do_handshake_on_connect=False, + ca_certs=self.ca_certs, + cert_reqs=cert_policy) + + if hasattr(self.socket, 'socket'): + # We are using a testing socket, so preserve the top + # layer of wrapping. + self.socket.socket = ssl_socket + else: + self.socket = ssl_socket + self.socket.do_handshake() + self.set_socket(self.socket) + return True + else: + log.warning("Tried to enable TLS, but ssl module not found.") + return False + + def _start_keepalive(self, event): + """Begin sending whitespace periodically to keep the connection alive. + + May be disabled by setting:: + + self.whitespace_keepalive = False + + The keepalive interval can be set using:: + + self.whitespace_keepalive_interval = 300 + """ + + def send_keepalive(): + if self.send_queue.empty(): + self.send_raw(' ') + + self.schedule('Whitespace Keepalive', + self.whitespace_keepalive_interval, + send_keepalive, + repeat=True) + + def _end_keepalive(self, event): + """Stop sending whitespace keepalives""" + self.scheduler.remove('Whitespace Keepalive') + + def start_stream_handler(self, xml): + """Perform any initialization actions, such as handshakes, + once the stream header has been sent. + + Meant to be overridden. + """ + pass + + def register_stanza(self, stanza_class): + """Add a stanza object class as a known root stanza. + + A root stanza is one that appears as a direct child of the stream's + root element. + + Stanzas that appear as substanzas of a root stanza do not need to + be registered here. That is done using register_stanza_plugin() from + sleekxmpp.xmlstream.stanzabase. + + Stanzas that are not registered will not be converted into + stanza objects, but may still be processed using handlers and + matchers. + + :param stanza_class: The top-level stanza object's class. + """ + self.__root_stanza.append(stanza_class) + + def remove_stanza(self, stanza_class): + """Remove a stanza from being a known root stanza. + + A root stanza is one that appears as a direct child of the stream's + root element. + + Stanzas that are not registered will not be converted into + stanza objects, but may still be processed using handlers and + matchers. + """ + del self.__root_stanza[stanza_class] + + def add_filter(self, mode, handler, order=None): + """Add a filter for incoming or outgoing stanzas. + + These filters are applied before incoming stanzas are + passed to any handlers, and before outgoing stanzas + are put in the send queue. + + Each filter must accept a single stanza, and return + either a stanza or ``None``. If the filter returns + ``None``, then the stanza will be dropped from being + processed for events or from being sent. + + :param mode: One of ``'in'`` or ``'out'``. + :param handler: The filter function. + :param int order: The position to insert the filter in + the list of active filters. + """ + if order: + self.__filters[mode].insert(order, handler) + else: + self.__filters[mode].append(handler) + + def add_handler(self, mask, pointer, name=None, disposable=False, + threaded=False, filter=False, instream=False): + """A shortcut method for registering a handler using XML masks. + + The use of :meth:`register_handler()` is preferred. + + :param mask: An XML snippet matching the structure of the + stanzas that will be passed to this handler. + :param pointer: The handler function itself. + :parm name: A unique name for the handler. A name will + be generated if one is not provided. + :param disposable: Indicates if the handler should be discarded + after one use. + :param threaded: **DEPRECATED**. + Remains for backwards compatibility. + :param filter: **DEPRECATED**. + Remains for backwards compatibility. + :param instream: Indicates if the handler should execute during + stream processing and not during normal event + processing. + """ + # To prevent circular dependencies, we must load the matcher + # and handler classes here. + + if name is None: + name = 'add_handler_%s' % self.getNewId() + self.registerHandler(XMLCallback(name, MatchXMLMask(mask), pointer, + once=disposable, instream=instream)) + + def register_handler(self, handler, before=None, after=None): + """Add a stream event handler that will be executed when a matching + stanza is received. + + :param handler: The :class:`~sleekxmpp.xmlstream.handler.base.BaseHandler` + derived object to execute. + """ + if handler.stream is None: + self.__handlers.append(handler) + handler.stream = weakref.ref(self) + + def remove_handler(self, name): + """Remove any stream event handlers with the given name. + + :param name: The name of the handler. + """ + idx = 0 + for handler in self.__handlers: + if handler.name == name: + self.__handlers.pop(idx) + return True + idx += 1 + return False + + def get_dns_records(self, domain, port=None): + """Get the DNS records for a domain. + + :param domain: The domain in question. + :param port: If the results don't include a port, use this one. + """ + if port is None: + port = self.default_port + if DNSPYTHON: + resolver = dns.resolver.get_default_resolver() + self.configure_dns(resolver, domain=domain, port=port) + + try: + answers = resolver.query(domain, dns.rdatatype.A) + except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): + log.warning("No A records for %s", domain) + return [((domain, port), 0, 0)] + except dns.exception.Timeout: + log.warning("DNS resolution timed out " + \ + "for A record of %s", domain) + return [((domain, port), 0, 0)] + else: + return [((ans.address, port), 0, 0) for ans in answers] + else: + log.warning("dnspython is not installed -- " + \ + "relying on OS A record resolution") + self.configure_dns(None, domain=domain, port=port) + return [((domain, port), 0, 0)] + + def pick_dns_answer(self, domain, port=None): + """Pick a server and port from DNS answers. + + Gets DNS answers if none available. + Removes used answer from available answers. + + :param domain: The domain in question. + :param port: If the results don't include a port, use this one. + """ + if not self.dns_answers: + self.dns_answers = self.get_dns_records(domain, port) + addresses = {} + intmax = 0 + topprio = 65535 + for answer in self.dns_answers: + topprio = min(topprio, answer[1]) + for answer in self.dns_answers: + if answer[1] == topprio: + intmax += answer[2] + addresses[intmax] = answer[0] + + #python3 returns a generator for dictionary keys + items = [x for x in addresses.keys()] + items.sort() + + picked = random.randint(0, intmax) + for item in items: + if picked <= item: + address = addresses[item] + break + for idx, answer in enumerate(self.dns_answers): + if self.dns_answers[0] == address: + break + self.dns_answers.pop(idx) + log.debug("Trying to connect to %s:%s", *address) + return address + + def add_event_handler(self, name, pointer, + threaded=False, disposable=False): + """Add a custom event handler that will be executed whenever + its event is manually triggered. + + :param name: The name of the event that will trigger + this handler. + :param pointer: The function to execute. + :param threaded: If set to ``True``, the handler will execute + in its own thread. Defaults to ``False``. + :param disposable: If set to ``True``, the handler will be + discarded after one use. Defaults to ``False``. + """ + if not name in self.__event_handlers: + self.__event_handlers[name] = [] + self.__event_handlers[name].append((pointer, threaded, disposable)) + + def del_event_handler(self, name, pointer): + """Remove a function as a handler for an event. + + :param name: The name of the event. + :param pointer: The function to remove as a handler. + """ + if not name in self.__event_handlers: + return + + # Need to keep handlers that do not use + # the given function pointer + def filter_pointers(handler): + return handler[0] != pointer + + self.__event_handlers[name] = list(filter( + filter_pointers, + self.__event_handlers[name])) + + def event_handled(self, name): + """Returns the number of registered handlers for an event. + + :param name: The name of the event to check. + """ + return len(self.__event_handlers.get(name, [])) + + def event(self, name, data={}, direct=False): + """Manually trigger a custom event. + + :param name: The name of the event to trigger. + :param data: Data that will be passed to each event handler. + Defaults to an empty dictionary, but is usually + a stanza object. + :param direct: Runs the event directly if True, skipping the + event queue. All event handlers will run in the + same thread. + """ + handlers = self.__event_handlers.get(name, []) + for handler in handlers: + #TODO: Data should not be copied, but should be read only, + # but this might break current code so it's left for future. + + out_data = copy.copy(data) if len(handlers) > 1 else data + old_exception = getattr(data, 'exception', None) + if direct: + try: + handler[0](out_data) + except Exception as e: + error_msg = 'Error processing event handler: %s' + log.exception(error_msg, str(handler[0])) + if old_exception: + old_exception(e) + else: + self.exception(e) + else: + self.event_queue.put(('event', handler, out_data)) + if handler[2]: + # If the handler is disposable, we will go ahead and + # remove it now instead of waiting for it to be + # processed in the queue. + with self.__event_handlers_lock: + try: + h_index = self.__event_handlers[name].index(handler) + self.__event_handlers[name].pop(h_index) + except: + pass + + def schedule(self, name, seconds, callback, args=None, + kwargs=None, repeat=False): + """Schedule a callback function to execute after a given delay. + + :param name: A unique name for the scheduled callback. + :param seconds: The time in seconds to wait before executing. + :param callback: A pointer to the function to execute. + :param args: A tuple of arguments to pass to the function. + :param kwargs: A dictionary of keyword arguments to pass to + the function. + :param repeat: Flag indicating if the scheduled event should + be reset and repeat after executing. + """ + self.scheduler.add(name, seconds, callback, args, kwargs, + repeat, qpointer=self.event_queue) + + def incoming_filter(self, xml): + """Filter incoming XML objects before they are processed. + + Possible uses include remapping namespaces, or correcting elements + from sources with incorrect behavior. + + Meant to be overridden. + """ + return xml + + def send(self, data, mask=None, timeout=None, now=False): + """A wrapper for :meth:`send_raw()` for sending stanza objects. + + May optionally block until an expected response is received. + + :param data: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` + stanza to send on the stream. + :param mask: **DEPRECATED** + An XML string snippet matching the structure + of the expected response. Execution will block + in this thread until the response is received + or a timeout occurs. + :param int timeout: Time in seconds to wait for a response before + continuing. Defaults to :attr:`response_timeout`. + :param bool now: Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to ``False``. + """ + if timeout is None: + timeout = self.response_timeout + if hasattr(mask, 'xml'): + mask = mask.xml + + if isinstance(data, ElementBase): + for filter in self.__filters['out']: + if data is not None: + data = filter(data) + if data is None: + return + + data = str(data) + if mask is not None: + log.warning("Use of send mask waiters is deprecated.") + wait_for = Waiter("SendWait_%s" % self.new_id(), + MatchXMLMask(mask)) + self.register_handler(wait_for) + self.send_raw(data, now) + if mask is not None: + return wait_for.wait(timeout) + + def send_xml(self, data, mask=None, timeout=None, now=False): + """Send an XML object on the stream, and optionally wait + for a response. + + :param data: The :class:`~xml.etree.ElementTree.Element` XML object + to send on the stream. + :param mask: **DEPRECATED** + An XML string snippet matching the structure + of the expected response. Execution will block + in this thread until the response is received + or a timeout occurs. + :param int timeout: Time in seconds to wait for a response before + continuing. Defaults to :attr:`response_timeout`. + :param bool now: Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to ``False``. + """ + if timeout is None: + timeout = self.response_timeout + return self.send(tostring(data), mask, timeout, now) + + def send_raw(self, data, now=False, reconnect=None): + """Send raw data across the stream. + + :param string data: Any string value. + :param bool reconnect: Indicates if the stream should be + restarted if there is an error sending + the stanza. Used mainly for testing. + Defaults to :attr:`auto_reconnect`. + """ + if now: + log.debug("SEND (IMMED): %s", data) + try: + data = data.encode('utf-8') + total = len(data) + sent = 0 + count = 0 + tries = 0 + while sent < total and not self.stop.is_set(): + try: + sent += self.socket.send(data[sent:]) + count += 1 + except ssl.SSLError as serr: + if tries >= self.ssl_retry_max: + log.debug('SSL error - max retries reached') + self.exception(serr) + log.warning("Failed to send %s", data) + if reconnect is None: + reconnect = self.auto_reconnect + self.disconnect(reconnect) + log.warning('SSL write error - reattempting') + time.sleep(self.ssl_retry_delay) + tries += 1 + if count > 1: + log.debug('SENT: %d chunks', count) + except Socket.error as serr: + self.event('socket_error', serr) + log.warning("Failed to send %s", data) + if reconnect is None: + reconnect = self.auto_reconnect + self.disconnect(reconnect) + else: + self.send_queue.put(data) + return True + + def process(self, **kwargs): + """Initialize the XML streams and begin processing events. + + The number of threads used for processing stream events is determined + by :data:`HANDLER_THREADS`. + + :param bool block: If ``False``, then event dispatcher will run + in a separate thread, allowing for the stream to be + used in the background for another application. + Otherwise, ``process(block=True)`` blocks the current + thread. Defaults to ``False``. + :param bool threaded: **DEPRECATED** + If ``True``, then event dispatcher will run + in a separate thread, allowing for the stream to be + used in the background for another application. + Defaults to ``True``. This does **not** mean that no + threads are used at all if ``threaded=False``. + + Regardless of these threading options, these threads will + always exist: + + - The event queue processor + - The send queue processor + - The scheduler + """ + if 'threaded' in kwargs and 'block' in kwargs: + raise ValueError("process() called with both " + \ + "block and threaded arguments") + elif 'block' in kwargs: + threaded = not(kwargs.get('block', False)) + else: + threaded = kwargs.get('threaded', True) + + self.scheduler.process(threaded=True) + + def start_thread(name, target): + self.__thread[name] = threading.Thread(name=name, target=target) + self.__thread[name].start() + + for t in range(0, HANDLER_THREADS): + log.debug("Starting HANDLER THREAD") + start_thread('stream_event_handler_%s' % t, self._event_runner) + + start_thread('send_thread', self._send_thread) + + if threaded: + # Run the XML stream in the background for another application. + start_thread('process', self._process) + else: + self._process() + + def _process(self): + """Start processing the XML streams. + + Processing will continue after any recoverable errors + if reconnections are allowed. + """ + + # The body of this loop will only execute once per connection. + # Additional passes will be made only if an error occurs and + # reconnecting is permitted. + while True: + shutdown = False + try: + # The call to self.__read_xml will block and prevent + # the body of the loop from running until a disconnect + # occurs. After any reconnection, the stream header will + # be resent and processing will resume. + while not self.stop.is_set(): + # Only process the stream while connected to the server + if not self.state.ensure('connected', wait=0.1, + block_on_transition=True): + continue + # Ensure the stream header is sent for any + # new connections. + if not self.session_started_event.is_set(): + self.send_raw(self.stream_header, now=True) + if not self.__read_xml(): + # If the server terminated the stream, end processing + break + except KeyboardInterrupt: + log.debug("Keyboard Escape Detected in _process") + self.event('killed', direct=True) + shutdown = True + except SystemExit: + log.debug("SystemExit in _process") + shutdown = True + except SyntaxError as e: + log.error("Error reading from XML stream.") + shutdown = True + self.exception(e) + except Socket.error as serr: + self.event('socket_error', serr) + log.exception('Socket Error') + except Exception as e: + if not self.stop.is_set(): + log.exception('Connection error.') + self.exception(e) + + if not shutdown and not self.stop.is_set() \ + and self.auto_reconnect: + self.reconnect() + else: + self.disconnect() + break + + def __read_xml(self): + """Parse the incoming XML stream + + Stream events are raised for each received stanza. + """ + depth = 0 + root = None + for event, xml in ET.iterparse(self.filesocket, (b'end', b'start')): + if event == b'start': + if depth == 0: + # We have received the start of the root element. + root = xml + # Perform any stream initialization actions, such + # as handshakes. + self.stream_end_event.clear() + self.start_stream_handler(root) + depth += 1 + if event == b'end': + depth -= 1 + if depth == 0: + # The stream's root element has closed, + # terminating the stream. + log.debug("End of stream recieved") + self.stream_end_event.set() + return False + elif depth == 1: + # We only raise events for stanzas that are direct + # children of the root element. + try: + self.__spawn_event(xml) + except RestartStream: + return True + if root is not None: + # Keep the root element empty of children to + # save on memory use. + root.clear() + log.debug("Ending read XML loop") + + def _build_stanza(self, xml, default_ns=None): + """Create a stanza object from a given XML object. + + If a specialized stanza type is not found for the XML, then + a generic :class:`~sleekxmpp.xmlstream.stanzabase.StanzaBase` + stanza will be returned. + + :param xml: The :class:`~xml.etree.ElementTree.Element` XML object + to convert into a stanza object. + :param default_ns: Optional default namespace to use instead of the + stream's current default namespace. + """ + if default_ns is None: + default_ns = self.default_ns + stanza_type = StanzaBase + for stanza_class in self.__root_stanza: + if xml.tag == "{%s}%s" % (default_ns, stanza_class.name) or \ + xml.tag == stanza_class.tag_name(): + stanza_type = stanza_class + break + stanza = stanza_type(self, xml) + return stanza + + def __spawn_event(self, xml): + """ + Analyze incoming XML stanzas and convert them into stanza + objects if applicable and queue stream events to be processed + by matching handlers. + + :param xml: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` + stanza to analyze. + """ + # Apply any preprocessing filters. + xml = self.incoming_filter(xml) + + # Convert the raw XML object into a stanza object. If no registered + # stanza type applies, a generic StanzaBase stanza will be used. + stanza = self._build_stanza(xml) + + for filter in self.__filters['in']: + if stanza is not None: + stanza = filter(stanza) + if stanza is None: + return + + log.debug("RECV: %s", stanza) + + # Match the stanza against registered handlers. Handlers marked + # to run "in stream" will be executed immediately; the rest will + # be queued. + unhandled = True + matched_handlers = [h for h in self.__handlers if h.match(stanza)] + for handler in matched_handlers: + if len(matched_handlers) > 1: + stanza_copy = copy.copy(stanza) + else: + stanza_copy = stanza + handler.prerun(stanza_copy) + self.event_queue.put(('stanza', handler, stanza_copy)) + try: + if handler.check_delete(): + self.__handlers.remove(handler) + except: + pass # not thread safe + unhandled = False + + # Some stanzas require responses, such as Iq queries. A default + # handler will be executed immediately for this case. + if unhandled: + stanza.unhandled() + + def _threaded_event_wrapper(self, func, args): + """Capture exceptions for event handlers that run + in individual threads. + + :param func: The event handler to execute. + :param args: Arguments to the event handler. + """ + # this is always already copied before this is invoked + orig = args[0] + try: + func(*args) + except Exception as e: + error_msg = 'Error processing event handler: %s' + log.exception(error_msg, str(func)) + if hasattr(orig, 'exception'): + orig.exception(e) + else: + self.exception(e) + + def _event_runner(self): + """Process the event queue and execute handlers. + + The number of event runner threads is controlled by HANDLER_THREADS. + + Stream event handlers will all execute in this thread. Custom event + handlers may be spawned in individual threads. + """ + log.debug("Loading event runner") + try: + while not self.stop.is_set(): + try: + wait = self.wait_timeout + event = self.event_queue.get(True, timeout=wait) + except queue.Empty: + event = None + if event is None: + continue + + etype, handler = event[0:2] + args = event[2:] + orig = copy.copy(args[0]) + + if etype == 'stanza': + try: + handler.run(args[0]) + except Exception as e: + error_msg = 'Error processing stream handler: %s' + log.exception(error_msg, handler.name) + orig.exception(e) + elif etype == 'schedule': + name = args[1] + try: + log.debug('Scheduled event: %s: %s', name, args[0]) + handler(*args[0]) + except Exception as e: + log.exception('Error processing scheduled task') + self.exception(e) + elif etype == 'event': + func, threaded, disposable = handler + try: + if threaded: + x = threading.Thread( + name="Event_%s" % str(func), + target=self._threaded_event_wrapper, + args=(func, args)) + x.start() + else: + func(*args) + except Exception as e: + error_msg = 'Error processing event handler: %s' + log.exception(error_msg, str(func)) + if hasattr(orig, 'exception'): + orig.exception(e) + else: + self.exception(e) + elif etype == 'quit': + log.debug("Quitting event runner thread") + return False + except KeyboardInterrupt: + log.debug("Keyboard Escape Detected in _event_runner") + self.event('killed', direct=True) + self.disconnect() + return + except SystemExit: + self.disconnect() + self.event_queue.put(('quit', None, None)) + return + + def _send_thread(self): + """Extract stanzas from the send queue and send them on the stream.""" + try: + while not self.stop.is_set(): + while not self.stop.is_set and \ + not self.session_started_event.is_set(): + self.session_started_event.wait(timeout=1) + if self.__failed_send_stanza is not None: + data = self.__failed_send_stanza + self.__failed_send_stanza = None + else: + try: + data = self.send_queue.get(True, 1) + except queue.Empty: + continue + log.debug("SEND: %s", data) + enc_data = data.encode('utf-8') + total = len(enc_data) + sent = 0 + count = 0 + tries = 0 + try: + while sent < total and not self.stop.is_set(): + try: + sent += self.socket.send(enc_data[sent:]) + count += 1 + except ssl.SSLError as serr: + if tries >= self.ssl_retry_max: + log.debug('SSL error - max retries reached') + self.exception(serr) + log.warning("Failed to send %s", data) + if reconnect is None: + reconnect = self.auto_reconnect + self.disconnect(reconnect) + log.warning('SSL write error - reattempting') + time.sleep(self.ssl_retry_delay) + tries += 1 + if count > 1: + log.debug('SENT: %d chunks', count) + self.send_queue.task_done() + except Socket.error as serr: + self.event('socket_error', serr) + log.warning("Failed to send %s", data) + self.__failed_send_stanza = data + self.disconnect(self.auto_reconnect) + except Exception as ex: + log.exception('Unexpected error in send thread: %s', ex) + self.exception(ex) + if not self.stop.is_set(): + self.disconnect(self.auto_reconnect) + + def exception(self, exception): + """Process an unknown exception. + + Meant to be overridden. + + :param exception: An unhandled exception object. + """ + pass + + +# To comply with PEP8, method names now use underscores. +# Deprecated method names are re-mapped for backwards compatibility. +XMLStream.startTLS = XMLStream.start_tls +XMLStream.registerStanza = XMLStream.register_stanza +XMLStream.removeStanza = XMLStream.remove_stanza +XMLStream.registerHandler = XMLStream.register_handler +XMLStream.removeHandler = XMLStream.remove_handler +XMLStream.setSocket = XMLStream.set_socket +XMLStream.sendRaw = XMLStream.send_raw +XMLStream.getId = XMLStream.get_id +XMLStream.getNewId = XMLStream.new_id +XMLStream.sendXML = XMLStream.send_xml |