diff options
Diffstat (limited to 'sleekxmpp/xmlstream')
24 files changed, 0 insertions, 5267 deletions
diff --git a/sleekxmpp/xmlstream/__init__.py b/sleekxmpp/xmlstream/__init__.py deleted file mode 100644 index 5a1ea1be..00000000 --- a/sleekxmpp/xmlstream/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -""" - SleekXMPP: The Sleek XMPP Library - Copyright (C) 2010 Nathanael C. Fritz - This file is part of SleekXMPP. - - See the file LICENSE for copying permission. -""" - -from sleekxmpp.jid import JID -from sleekxmpp.xmlstream.scheduler import Scheduler -from sleekxmpp.xmlstream.stanzabase import StanzaBase, ElementBase, ET -from sleekxmpp.xmlstream.stanzabase import register_stanza_plugin -from sleekxmpp.xmlstream.tostring import tostring -from sleekxmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT -from sleekxmpp.xmlstream.xmlstream import RestartStream - -__all__ = ['JID', 'Scheduler', 'StanzaBase', 'ElementBase', - 'ET', 'StateMachine', 'tostring', 'XMLStream', - 'RESPONSE_TIMEOUT', 'RestartStream'] diff --git a/sleekxmpp/xmlstream/cert.py b/sleekxmpp/xmlstream/cert.py deleted file mode 100644 index d357b326..00000000 --- a/sleekxmpp/xmlstream/cert.py +++ /dev/null @@ -1,184 +0,0 @@ -import logging -from datetime import datetime, timedelta - -# Make a call to strptime before starting threads to -# prevent thread safety issues. -datetime.strptime('1970-01-01 12:00:00', "%Y-%m-%d %H:%M:%S") - - -try: - from pyasn1.codec.der import decoder, encoder - from pyasn1.type.univ import Any, ObjectIdentifier, OctetString - from pyasn1.type.char import BMPString, IA5String, UTF8String - from pyasn1.type.useful import GeneralizedTime - from pyasn1_modules.rfc2459 import (Certificate, DirectoryString, - SubjectAltName, GeneralNames, - GeneralName) - from pyasn1_modules.rfc2459 import id_ce_subjectAltName as SUBJECT_ALT_NAME - from pyasn1_modules.rfc2459 import id_at_commonName as COMMON_NAME - - XMPP_ADDR = ObjectIdentifier('1.3.6.1.5.5.7.8.5') - SRV_NAME = ObjectIdentifier('1.3.6.1.5.5.7.8.7') - - HAVE_PYASN1 = True -except ImportError: - HAVE_PYASN1 = False - - -log = logging.getLogger(__name__) - - -class CertificateError(Exception): - pass - - -def decode_str(data): - encoding = 'utf-16-be' if isinstance(data, BMPString) else 'utf-8' - return bytes(data).decode(encoding) - - -def extract_names(raw_cert): - results = {'CN': set(), - 'DNS': set(), - 'SRV': set(), - 'URI': set(), - 'XMPPAddr': set()} - - cert = decoder.decode(raw_cert, asn1Spec=Certificate())[0] - tbs = cert.getComponentByName('tbsCertificate') - subject = tbs.getComponentByName('subject') - extensions = tbs.getComponentByName('extensions') or [] - - # Extract the CommonName(s) from the cert. - for rdnss in subject: - for rdns in rdnss: - for name in rdns: - oid = name.getComponentByName('type') - value = name.getComponentByName('value') - - if oid != COMMON_NAME: - continue - - value = decoder.decode(value, asn1Spec=DirectoryString())[0] - value = decode_str(value.getComponent()) - results['CN'].add(value) - - # Extract the Subject Alternate Names (DNS, SRV, URI, XMPPAddr) - for extension in extensions: - oid = extension.getComponentByName('extnID') - if oid != SUBJECT_ALT_NAME: - continue - - value = decoder.decode(extension.getComponentByName('extnValue'), - asn1Spec=OctetString())[0] - sa_names = decoder.decode(value, asn1Spec=SubjectAltName())[0] - for name in sa_names: - name_type = name.getName() - if name_type == 'dNSName': - results['DNS'].add(decode_str(name.getComponent())) - if name_type == 'uniformResourceIdentifier': - value = decode_str(name.getComponent()) - if value.startswith('xmpp:'): - results['URI'].add(value[5:]) - elif name_type == 'otherName': - name = name.getComponent() - - oid = name.getComponentByName('type-id') - value = name.getComponentByName('value') - - if oid == XMPP_ADDR: - value = decoder.decode(value, asn1Spec=UTF8String())[0] - results['XMPPAddr'].add(decode_str(value)) - elif oid == SRV_NAME: - value = decoder.decode(value, asn1Spec=IA5String())[0] - results['SRV'].add(decode_str(value)) - - return results - - -def extract_dates(raw_cert): - if not HAVE_PYASN1: - log.warning("Could not find pyasn1 and pyasn1_modules. " + \ - "SSL certificate expiration COULD NOT BE VERIFIED.") - return None, None - - cert = decoder.decode(raw_cert, asn1Spec=Certificate())[0] - tbs = cert.getComponentByName('tbsCertificate') - validity = tbs.getComponentByName('validity') - - not_before = validity.getComponentByName('notBefore') - not_before = str(not_before.getComponent()) - - not_after = validity.getComponentByName('notAfter') - not_after = str(not_after.getComponent()) - - if isinstance(not_before, GeneralizedTime): - not_before = datetime.strptime(not_before, '%Y%m%d%H%M%SZ') - else: - not_before = datetime.strptime(not_before, '%y%m%d%H%M%SZ') - - if isinstance(not_after, GeneralizedTime): - not_after = datetime.strptime(not_after, '%Y%m%d%H%M%SZ') - else: - not_after = datetime.strptime(not_after, '%y%m%d%H%M%SZ') - - return not_before, not_after - - -def get_ttl(raw_cert): - not_before, not_after = extract_dates(raw_cert) - if not_after is None: - return None - return not_after - datetime.utcnow() - - -def verify(expected, raw_cert): - if not HAVE_PYASN1: - log.warning("Could not find pyasn1 and pyasn1_modules. " + \ - "SSL certificate COULD NOT BE VERIFIED.") - return - - not_before, not_after = extract_dates(raw_cert) - cert_names = extract_names(raw_cert) - - now = datetime.utcnow() - - if not_before > now: - raise CertificateError( - 'Certificate has not entered its valid date range.') - - if not_after <= now: - raise CertificateError( - 'Certificate has expired.') - - if '.' in expected: - expected_wild = expected[expected.index('.'):] - else: - expected_wild = expected - expected_srv = '_xmpp-client.%s' % expected - - for name in cert_names['XMPPAddr']: - if name == expected: - return True - for name in cert_names['SRV']: - if name == expected_srv or name == expected: - return True - for name in cert_names['DNS']: - if name == expected: - return True - if name.startswith('*'): - if '.' in name: - name_wild = name[name.index('.'):] - else: - name_wild = name - if expected_wild == name_wild: - return True - for name in cert_names['URI']: - if name == expected: - return True - for name in cert_names['CN']: - if name == expected: - return True - - raise CertificateError( - 'Could not match certificate against hostname: %s' % expected) diff --git a/sleekxmpp/xmlstream/filesocket.py b/sleekxmpp/xmlstream/filesocket.py deleted file mode 100644 index 53b83bc7..00000000 --- a/sleekxmpp/xmlstream/filesocket.py +++ /dev/null @@ -1,54 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.xmlstream.filesocket - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - This module is a shim for correcting deficiencies in the file - socket implementation of Python2.6. - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -from socket import _fileobject -import errno -import socket - - -class FileSocket(_fileobject): - - """Create a file object wrapper for a socket to work around - issues present in Python 2.6 when using sockets as file objects. - - The parser for :class:`~xml.etree.cElementTree` requires a file, but - we will be reading from the XMPP connection socket instead. - """ - - def read(self, size=4096): - """Read data from the socket as if it were a file.""" - if self._sock is None: - return None - while True: - try: - data = self._sock.recv(size) - break - except socket.error as serr: - if serr.errno != errno.EINTR: - raise - if data is not None: - return data - - -class Socket26(socket.socket): - - """A custom socket implementation that uses our own FileSocket class - to work around issues in Python 2.6 when using sockets as files. - """ - - def makefile(self, mode='r', bufsize=-1): - """makefile([mode[, bufsize]]) -> file object - Return a regular file object corresponding to the socket. The mode - and bufsize arguments are as for the built-in open() function.""" - return FileSocket(self._sock, mode, bufsize) diff --git a/sleekxmpp/xmlstream/handler/__init__.py b/sleekxmpp/xmlstream/handler/__init__.py deleted file mode 100644 index 83c87f01..00000000 --- a/sleekxmpp/xmlstream/handler/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -""" - SleekXMPP: The Sleek XMPP Library - Copyright (C) 2010 Nathanael C. Fritz - This file is part of SleekXMPP. - - See the file LICENSE for copying permission. -""" - -from sleekxmpp.xmlstream.handler.callback import Callback -from sleekxmpp.xmlstream.handler.collector import Collector -from sleekxmpp.xmlstream.handler.waiter import Waiter -from sleekxmpp.xmlstream.handler.xmlcallback import XMLCallback -from sleekxmpp.xmlstream.handler.xmlwaiter import XMLWaiter - -__all__ = ['Callback', 'Waiter', 'XMLCallback', 'XMLWaiter'] diff --git a/sleekxmpp/xmlstream/handler/base.py b/sleekxmpp/xmlstream/handler/base.py deleted file mode 100644 index 01c1991a..00000000 --- a/sleekxmpp/xmlstream/handler/base.py +++ /dev/null @@ -1,84 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.xmlstream.handler.base - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -import weakref - - -class BaseHandler(object): - - """ - Base class for stream handlers. Stream handlers are matched with - incoming stanzas so that the stanza may be processed in some way. - Stanzas may be matched with multiple handlers. - - Handler execution may take place in two phases: during the incoming - stream processing, and in the main event loop. The :meth:`prerun()` - method is executed in the first case, and :meth:`run()` is called - during the second. - - :param string name: The name of the handler. - :param matcher: A :class:`~sleekxmpp.xmlstream.matcher.base.MatcherBase` - derived object that will be used to determine if a - stanza should be accepted by this handler. - :param stream: The :class:`~sleekxmpp.xmlstream.xmlstream.XMLStream` - instance that the handle will respond to. - """ - - def __init__(self, name, matcher, stream=None): - #: The name of the handler - self.name = name - - #: The XML stream this handler is assigned to - self.stream = None - if stream is not None: - self.stream = weakref.ref(stream) - stream.register_handler(self) - - self._destroy = False - self._payload = None - self._matcher = matcher - - def match(self, xml): - """Compare a stanza or XML object with the handler's matcher. - - :param xml: An XML or - :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` object - """ - return self._matcher.match(xml) - - def prerun(self, payload): - """Prepare the handler for execution while the XML - stream is being processed. - - :param payload: A :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` - object. - """ - self._payload = payload - - def run(self, payload): - """Execute the handler after XML stream processing and during the - main event loop. - - :param payload: A :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` - object. - """ - self._payload = payload - - def check_delete(self): - """Check if the handler should be removed from the list - of stream handlers. - """ - return self._destroy - - -# To comply with PEP8, method names now use underscores. -# Deprecated method names are re-mapped for backwards compatibility. -BaseHandler.checkDelete = BaseHandler.check_delete diff --git a/sleekxmpp/xmlstream/handler/callback.py b/sleekxmpp/xmlstream/handler/callback.py deleted file mode 100644 index 7e3388f1..00000000 --- a/sleekxmpp/xmlstream/handler/callback.py +++ /dev/null @@ -1,79 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.xmlstream.handler.callback - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -from sleekxmpp.xmlstream.handler.base import BaseHandler - - -class Callback(BaseHandler): - - """ - The Callback handler will execute a callback function with - matched stanzas. - - The handler may execute the callback either during stream - processing or during the main event loop. - - Callback functions are all executed in the same thread, so be aware if - you are executing functions that will block for extended periods of - time. Typically, you should signal your own events using the SleekXMPP - object's :meth:`~sleekxmpp.xmlstream.xmlstream.XMLStream.event()` - method to pass the stanza off to a threaded event handler for further - processing. - - - :param string name: The name of the handler. - :param matcher: A :class:`~sleekxmpp.xmlstream.matcher.base.MatcherBase` - derived object for matching stanza objects. - :param pointer: The function to execute during callback. - :param bool thread: **DEPRECATED.** Remains only for - backwards compatibility. - :param bool once: Indicates if the handler should be used only - once. Defaults to False. - :param bool instream: Indicates if the callback should be executed - during stream processing instead of in the - main event loop. - :param stream: The :class:`~sleekxmpp.xmlstream.xmlstream.XMLStream` - instance this handler should monitor. - """ - - def __init__(self, name, matcher, pointer, thread=False, - once=False, instream=False, stream=None): - BaseHandler.__init__(self, name, matcher, stream) - self._pointer = pointer - self._once = once - self._instream = instream - - def prerun(self, payload): - """Execute the callback during stream processing, if - the callback was created with ``instream=True``. - - :param payload: The matched - :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` object. - """ - if self._once: - self._destroy = True - if self._instream: - self.run(payload, True) - - def run(self, payload, instream=False): - """Execute the callback function with the matched stanza payload. - - :param payload: The matched - :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` object. - :param bool instream: Force the handler to execute during stream - processing. This should only be used by - :meth:`prerun()`. Defaults to ``False``. - """ - if not self._instream or instream: - self._pointer(payload) - if self._once: - self._destroy = True - del self._pointer diff --git a/sleekxmpp/xmlstream/handler/collector.py b/sleekxmpp/xmlstream/handler/collector.py deleted file mode 100644 index 8f02f8c3..00000000 --- a/sleekxmpp/xmlstream/handler/collector.py +++ /dev/null @@ -1,66 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.xmlstream.handler.collector - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2012 Nathanael C. Fritz, Lance J.T. Stout - :license: MIT, see LICENSE for more details -""" - -import logging - -from sleekxmpp.util import Queue, QueueEmpty -from sleekxmpp.xmlstream.handler.base import BaseHandler - - -log = logging.getLogger(__name__) - - -class Collector(BaseHandler): - - """ - The Collector handler allows for collecting a set of stanzas - that match a given pattern. Unlike the Waiter handler, a - Collector does not block execution, and will continue to - accumulate matching stanzas until told to stop. - - :param string name: The name of the handler. - :param matcher: A :class:`~sleekxmpp.xmlstream.matcher.base.MatcherBase` - derived object for matching stanza objects. - :param stream: The :class:`~sleekxmpp.xmlstream.xmlstream.XMLStream` - instance this handler should monitor. - """ - - def __init__(self, name, matcher, stream=None): - BaseHandler.__init__(self, name, matcher, stream=stream) - self._payload = Queue() - - def prerun(self, payload): - """Store the matched stanza when received during processing. - - :param payload: The matched - :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` object. - """ - self._payload.put(payload) - - def run(self, payload): - """Do not process this handler during the main event loop.""" - pass - - def stop(self): - """ - Stop collection of matching stanzas, and return the ones that - have been stored so far. - """ - self._destroy = True - results = [] - try: - while True: - results.append(self._payload.get(False)) - except QueueEmpty: - pass - - self.stream().remove_handler(self.name) - return results diff --git a/sleekxmpp/xmlstream/handler/waiter.py b/sleekxmpp/xmlstream/handler/waiter.py deleted file mode 100644 index 66e14496..00000000 --- a/sleekxmpp/xmlstream/handler/waiter.py +++ /dev/null @@ -1,83 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.xmlstream.handler.waiter - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -import logging - -from sleekxmpp.util import Queue, QueueEmpty -from sleekxmpp.xmlstream.handler.base import BaseHandler - - -log = logging.getLogger(__name__) - - -class Waiter(BaseHandler): - - """ - The Waiter handler allows an event handler to block until a - particular stanza has been received. The handler will either be - given the matched stanza, or ``False`` if the waiter has timed out. - - :param string name: The name of the handler. - :param matcher: A :class:`~sleekxmpp.xmlstream.matcher.base.MatcherBase` - derived object for matching stanza objects. - :param stream: The :class:`~sleekxmpp.xmlstream.xmlstream.XMLStream` - instance this handler should monitor. - """ - - def __init__(self, name, matcher, stream=None): - BaseHandler.__init__(self, name, matcher, stream=stream) - self._payload = Queue() - - def prerun(self, payload): - """Store the matched stanza when received during processing. - - :param payload: The matched - :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` object. - """ - self._payload.put(payload) - - def run(self, payload): - """Do not process this handler during the main event loop.""" - pass - - def wait(self, timeout=None): - """Block an event handler while waiting for a stanza to arrive. - - Be aware that this will impact performance if called from a - non-threaded event handler. - - Will return either the received stanza, or ``False`` if the - waiter timed out. - - :param int timeout: The number of seconds to wait for the stanza - to arrive. Defaults to the the stream's - :class:`~sleekxmpp.xmlstream.xmlstream.XMLStream.response_timeout` - value. - """ - if timeout is None: - timeout = self.stream().response_timeout - - elapsed_time = 0 - stanza = False - while elapsed_time < timeout and not self.stream().stop.is_set(): - try: - stanza = self._payload.get(True, 1) - break - except QueueEmpty: - elapsed_time += 1 - if elapsed_time >= timeout: - log.warning("Timed out waiting for %s", self.name) - self.stream().remove_handler(self.name) - return stanza - - def check_delete(self): - """Always remove waiters after use.""" - return True diff --git a/sleekxmpp/xmlstream/handler/xmlcallback.py b/sleekxmpp/xmlstream/handler/xmlcallback.py deleted file mode 100644 index 11607ffb..00000000 --- a/sleekxmpp/xmlstream/handler/xmlcallback.py +++ /dev/null @@ -1,36 +0,0 @@ -""" - SleekXMPP: The Sleek XMPP Library - Copyright (C) 2010 Nathanael C. Fritz - This file is part of SleekXMPP. - - See the file LICENSE for copying permission. -""" - -from sleekxmpp.xmlstream.handler import Callback - - -class XMLCallback(Callback): - - """ - The XMLCallback class is identical to the normal Callback class, - except that XML contents of matched stanzas will be processed instead - of the stanza objects themselves. - - Methods: - run -- Overrides Callback.run - """ - - def run(self, payload, instream=False): - """ - Execute the callback function with the matched stanza's - XML contents, instead of the stanza itself. - - Overrides BaseHandler.run - - Arguments: - payload -- The matched stanza object. - instream -- Force the handler to execute during - stream processing. Used only by prerun. - Defaults to False. - """ - Callback.run(self, payload.xml, instream) diff --git a/sleekxmpp/xmlstream/handler/xmlwaiter.py b/sleekxmpp/xmlstream/handler/xmlwaiter.py deleted file mode 100644 index 5201caf3..00000000 --- a/sleekxmpp/xmlstream/handler/xmlwaiter.py +++ /dev/null @@ -1,33 +0,0 @@ -""" - SleekXMPP: The Sleek XMPP Library - Copyright (C) 2010 Nathanael C. Fritz - This file is part of SleekXMPP. - - See the file LICENSE for copying permission. -""" - -from sleekxmpp.xmlstream.handler import Waiter - - -class XMLWaiter(Waiter): - - """ - The XMLWaiter class is identical to the normal Waiter class - except that it returns the XML contents of the stanza instead - of the full stanza object itself. - - Methods: - prerun -- Overrides Waiter.prerun - """ - - def prerun(self, payload): - """ - Store the XML contents of the stanza to return to the - waiting event handler. - - Overrides Waiter.prerun - - Arguments: - payload -- The matched stanza object. - """ - Waiter.prerun(self, payload.xml) diff --git a/sleekxmpp/xmlstream/jid.py b/sleekxmpp/xmlstream/jid.py deleted file mode 100644 index 2b59db47..00000000 --- a/sleekxmpp/xmlstream/jid.py +++ /dev/null @@ -1,5 +0,0 @@ -import logging - -logging.warning('Deprecated: sleekxmpp.xmlstream.jid is moving to sleekxmpp.jid') - -from sleekxmpp.jid import JID diff --git a/sleekxmpp/xmlstream/matcher/__init__.py b/sleekxmpp/xmlstream/matcher/__init__.py deleted file mode 100644 index aa74c434..00000000 --- a/sleekxmpp/xmlstream/matcher/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -""" - SleekXMPP: The Sleek XMPP Library - Copyright (C) 2010 Nathanael C. Fritz - This file is part of SleekXMPP. - - See the file LICENSE for copying permission. -""" - -from sleekxmpp.xmlstream.matcher.id import MatcherId -from sleekxmpp.xmlstream.matcher.idsender import MatchIDSender -from sleekxmpp.xmlstream.matcher.many import MatchMany -from sleekxmpp.xmlstream.matcher.stanzapath import StanzaPath -from sleekxmpp.xmlstream.matcher.xmlmask import MatchXMLMask -from sleekxmpp.xmlstream.matcher.xpath import MatchXPath - -__all__ = ['MatcherId', 'MatchMany', 'StanzaPath', - 'MatchXMLMask', 'MatchXPath'] diff --git a/sleekxmpp/xmlstream/matcher/base.py b/sleekxmpp/xmlstream/matcher/base.py deleted file mode 100644 index 83c26688..00000000 --- a/sleekxmpp/xmlstream/matcher/base.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.xmlstream.matcher.base - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - - -class MatcherBase(object): - - """ - Base class for stanza matchers. Stanza matchers are used to pick - stanzas out of the XML stream and pass them to the appropriate - stream handlers. - - :param criteria: Object to compare some aspect of a stanza against. - """ - - def __init__(self, criteria): - self._criteria = criteria - - def match(self, xml): - """Check if a stanza matches the stored criteria. - - Meant to be overridden. - """ - return False diff --git a/sleekxmpp/xmlstream/matcher/id.py b/sleekxmpp/xmlstream/matcher/id.py deleted file mode 100644 index 11ab70bb..00000000 --- a/sleekxmpp/xmlstream/matcher/id.py +++ /dev/null @@ -1,29 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.xmlstream.matcher.id - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -from sleekxmpp.xmlstream.matcher.base import MatcherBase - - -class MatcherId(MatcherBase): - - """ - The ID matcher selects stanzas that have the same stanza 'id' - interface value as the desired ID. - """ - - def match(self, xml): - """Compare the given stanza's ``'id'`` attribute to the stored - ``id`` value. - - :param xml: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` - stanza to compare against. - """ - return xml['id'] == self._criteria diff --git a/sleekxmpp/xmlstream/matcher/idsender.py b/sleekxmpp/xmlstream/matcher/idsender.py deleted file mode 100644 index 5c2c1f51..00000000 --- a/sleekxmpp/xmlstream/matcher/idsender.py +++ /dev/null @@ -1,47 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.xmlstream.matcher.id - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -from sleekxmpp.xmlstream.matcher.base import MatcherBase - - -class MatchIDSender(MatcherBase): - - """ - The IDSender matcher selects stanzas that have the same stanza 'id' - interface value as the desired ID, and that the 'from' value is one - of a set of approved entities that can respond to a request. - """ - - def match(self, xml): - """Compare the given stanza's ``'id'`` attribute to the stored - ``id`` value, and verify the sender's JID. - - :param xml: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` - stanza to compare against. - """ - - selfjid = self._criteria['self'] - peerjid = self._criteria['peer'] - - allowed = {} - allowed[''] = True - allowed[selfjid.bare] = True - allowed[selfjid.host] = True - allowed[peerjid.full] = True - allowed[peerjid.bare] = True - allowed[peerjid.host] = True - - _from = xml['from'] - - try: - return xml['id'] == self._criteria['id'] and allowed[_from] - except KeyError: - return False diff --git a/sleekxmpp/xmlstream/matcher/many.py b/sleekxmpp/xmlstream/matcher/many.py deleted file mode 100644 index f470ec9c..00000000 --- a/sleekxmpp/xmlstream/matcher/many.py +++ /dev/null @@ -1,40 +0,0 @@ -""" - SleekXMPP: The Sleek XMPP Library - Copyright (C) 2010 Nathanael C. Fritz - This file is part of SleekXMPP. - - See the file LICENSE for copying permission. -""" - -from sleekxmpp.xmlstream.matcher.base import MatcherBase - - -class MatchMany(MatcherBase): - - """ - The MatchMany matcher may compare a stanza against multiple - criteria. It is essentially an OR relation combining multiple - matchers. - - Each of the criteria must implement a match() method. - - Methods: - match -- Overrides MatcherBase.match. - """ - - def match(self, xml): - """ - Match a stanza against multiple criteria. The match is successful - if one of the criteria matches. - - Each of the criteria must implement a match() method. - - Overrides MatcherBase.match. - - Arguments: - xml -- The stanza object to compare against. - """ - for m in self._criteria: - if m.match(xml): - return True - return False diff --git a/sleekxmpp/xmlstream/matcher/stanzapath.py b/sleekxmpp/xmlstream/matcher/stanzapath.py deleted file mode 100644 index a4c0fda0..00000000 --- a/sleekxmpp/xmlstream/matcher/stanzapath.py +++ /dev/null @@ -1,43 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.xmlstream.matcher.stanzapath - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -from sleekxmpp.xmlstream.matcher.base import MatcherBase -from sleekxmpp.xmlstream.stanzabase import fix_ns - - -class StanzaPath(MatcherBase): - - """ - The StanzaPath matcher selects stanzas that match a given "stanza path", - which is similar to a normal XPath except that it uses the interfaces and - plugins of the stanza instead of the actual, underlying XML. - - :param criteria: Object to compare some aspect of a stanza against. - """ - - def __init__(self, criteria): - self._criteria = fix_ns(criteria, split=True, - propagate_ns=False, - default_ns='jabber:client') - self._raw_criteria = criteria - - def match(self, stanza): - """ - Compare a stanza against a "stanza path". A stanza path is similar to - an XPath expression, but uses the stanza's interfaces and plugins - instead of the underlying XML. See the documentation for the stanza - :meth:`~sleekxmpp.xmlstream.stanzabase.ElementBase.match()` method - for more information. - - :param stanza: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` - stanza to compare against. - """ - return stanza.match(self._criteria) or stanza.match(self._raw_criteria) diff --git a/sleekxmpp/xmlstream/matcher/xmlmask.py b/sleekxmpp/xmlstream/matcher/xmlmask.py deleted file mode 100644 index 56f728e1..00000000 --- a/sleekxmpp/xmlstream/matcher/xmlmask.py +++ /dev/null @@ -1,117 +0,0 @@ -""" - SleekXMPP: The Sleek XMPP Library - Copyright (C) 2010 Nathanael C. Fritz - This file is part of SleekXMPP. - - See the file LICENSE for copying permission. -""" - -import logging - -from xml.parsers.expat import ExpatError - -from sleekxmpp.xmlstream.stanzabase import ET -from sleekxmpp.xmlstream.matcher.base import MatcherBase - - -log = logging.getLogger(__name__) - - -class MatchXMLMask(MatcherBase): - - """ - The XMLMask matcher selects stanzas whose XML matches a given - XML pattern, or mask. For example, message stanzas with body elements - could be matched using the mask: - - .. code-block:: xml - - <message xmlns="jabber:client"><body /></message> - - Use of XMLMask is discouraged, and - :class:`~sleekxmpp.xmlstream.matcher.xpath.MatchXPath` or - :class:`~sleekxmpp.xmlstream.matcher.stanzapath.StanzaPath` - should be used instead. - - :param criteria: Either an :class:`~xml.etree.ElementTree.Element` XML - object or XML string to use as a mask. - """ - - def __init__(self, criteria, default_ns='jabber:client'): - MatcherBase.__init__(self, criteria) - if isinstance(criteria, str): - self._criteria = ET.fromstring(self._criteria) - self.default_ns = default_ns - - def setDefaultNS(self, ns): - """Set the default namespace to use during comparisons. - - :param ns: The new namespace to use as the default. - """ - self.default_ns = ns - - def match(self, xml): - """Compare a stanza object or XML object against the stored XML mask. - - Overrides MatcherBase.match. - - :param xml: The stanza object or XML object to compare against. - """ - if hasattr(xml, 'xml'): - xml = xml.xml - return self._mask_cmp(xml, self._criteria, True) - - def _mask_cmp(self, source, mask, use_ns=False, default_ns='__no_ns__'): - """Compare an XML object against an XML mask. - - :param source: The :class:`~xml.etree.ElementTree.Element` XML object - to compare against the mask. - :param mask: The :class:`~xml.etree.ElementTree.Element` XML object - serving as the mask. - :param use_ns: Indicates if namespaces should be respected during - the comparison. - :default_ns: The default namespace to apply to elements that - do not have a specified namespace. - Defaults to ``"__no_ns__"``. - """ - if source is None: - # If the element was not found. May happend during recursive calls. - return False - - # Convert the mask to an XML object if it is a string. - if not hasattr(mask, 'attrib'): - try: - mask = ET.fromstring(mask) - except ExpatError: - log.warning("Expat error: %s\nIn parsing: %s", '', mask) - - mask_ns_tag = "{%s}%s" % (self.default_ns, mask.tag) - if source.tag not in [mask.tag, mask_ns_tag]: - return False - - # If the mask includes text, compare it. - if mask.text and source.text and \ - source.text.strip() != mask.text.strip(): - return False - - # Compare attributes. The stanza must include the attributes - # defined by the mask, but may include others. - for name, value in mask.attrib.items(): - if source.attrib.get(name, "__None__") != value: - return False - - # Recursively check subelements. - matched_elements = {} - for subelement in mask: - matched = False - for other in source.findall(subelement.tag): - matched_elements[other] = False - if self._mask_cmp(other, subelement, use_ns): - if not matched_elements.get(other, False): - matched_elements[other] = True - matched = True - if not matched: - return False - - # Everything matches. - return True diff --git a/sleekxmpp/xmlstream/matcher/xpath.py b/sleekxmpp/xmlstream/matcher/xpath.py deleted file mode 100644 index f3d28429..00000000 --- a/sleekxmpp/xmlstream/matcher/xpath.py +++ /dev/null @@ -1,59 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.xmlstream.matcher.xpath - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -from sleekxmpp.xmlstream.stanzabase import ET, fix_ns -from sleekxmpp.xmlstream.matcher.base import MatcherBase - - -class MatchXPath(MatcherBase): - - """ - The XPath matcher selects stanzas whose XML contents matches a given - XPath expression. - - .. warning:: - - Using this matcher may not produce expected behavior when using - attribute selectors. For Python 2.6 and 3.1, the ElementTree - :meth:`~xml.etree.ElementTree.Element.find()` method does - not support the use of attribute selectors. If you need to - support Python 2.6 or 3.1, it might be more useful to use a - :class:`~sleekxmpp.xmlstream.matcher.stanzapath.StanzaPath` matcher. - - If the value of :data:`IGNORE_NS` is set to ``True``, then XPath - expressions will be matched without using namespaces. - """ - - def __init__(self, criteria): - self._criteria = fix_ns(criteria) - - def match(self, xml): - """ - Compare a stanza's XML contents to an XPath expression. - - If the value of :data:`IGNORE_NS` is set to ``True``, then XPath - expressions will be matched without using namespaces. - - .. warning:: - - In Python 2.6 and 3.1 the ElementTree - :meth:`~xml.etree.ElementTree.Element.find()` method does not - support attribute selectors in the XPath expression. - - :param xml: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` - stanza to compare against. - """ - if hasattr(xml, 'xml'): - xml = xml.xml - x = ET.Element('x') - x.append(xml) - - return x.find(self._criteria) is not None diff --git a/sleekxmpp/xmlstream/resolver.py b/sleekxmpp/xmlstream/resolver.py deleted file mode 100644 index 188e5ac7..00000000 --- a/sleekxmpp/xmlstream/resolver.py +++ /dev/null @@ -1,333 +0,0 @@ -# -*- encoding: utf-8 -*- - -""" - sleekxmpp.xmlstream.dns - ~~~~~~~~~~~~~~~~~~~~~~~ - - :copyright: (c) 2012 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -import socket -import logging -import random - - -log = logging.getLogger(__name__) - - -#: Global flag indicating the availability of the ``dnspython`` package. -#: Installing ``dnspython`` can be done via: -#: -#: .. code-block:: sh -#: -#: pip install dnspython -#: -#: For Python3, installation may require installing from source using -#: the ``python3`` branch: -#: -#: .. code-block:: sh -#: -#: git clone http://github.com/rthalley/dnspython -#: cd dnspython -#: git checkout python3 -#: python3 setup.py install -DNSPYTHON_AVAILABLE = False -try: - import dns.resolver - DNSPYTHON_AVAILABLE = True -except ImportError as e: - log.debug("Could not find dnspython package. " + \ - "Not all features will be available") - - -def default_resolver(): - """Return a basic DNS resolver object. - - :returns: A :class:`dns.resolver.Resolver` object if dnspython - is available. Otherwise, ``None``. - """ - if DNSPYTHON_AVAILABLE: - return dns.resolver.get_default_resolver() - return None - - -def resolve(host, port=None, service=None, proto='tcp', - resolver=None, use_ipv6=True, use_dnspython=True): - """Peform DNS resolution for a given hostname. - - Resolution may perform SRV record lookups if a service and protocol - are specified. The returned addresses will be sorted according to - the SRV priorities and weights. - - If no resolver is provided, the dnspython resolver will be used if - available. Otherwise the built-in socket facilities will be used, - but those do not provide SRV support. - - If SRV records were used, queries to resolve alternative hosts will - be made as needed instead of all at once. - - :param host: The hostname to resolve. - :param port: A default port to connect with. SRV records may - dictate use of a different port. - :param service: Optional SRV service name without leading underscore. - :param proto: Optional SRV protocol name without leading underscore. - :param resolver: Optionally provide a DNS resolver object that has - been custom configured. - :param use_ipv6: Optionally control the use of IPv6 in situations - where it is either not available, or performance - is degraded. Defaults to ``True``. - :param use_dnspython: Optionally control if dnspython is used to make - the DNS queries instead of the built-in DNS - library. - - :type host: string - :type port: int - :type service: string - :type proto: string - :type resolver: :class:`dns.resolver.Resolver` - :type use_ipv6: bool - :type use_dnspython: bool - - :return: An iterable of IP address, port pairs in the order - dictated by SRV priorities and weights, if applicable. - """ - - if not use_dnspython: - if DNSPYTHON_AVAILABLE: - log.debug("DNS: Not using dnspython, but dnspython is installed.") - else: - log.debug("DNS: Not using dnspython.") - - if not use_ipv6: - log.debug("DNS: Use of IPv6 has been disabled.") - - if resolver is None and DNSPYTHON_AVAILABLE and use_dnspython: - resolver = dns.resolver.get_default_resolver() - - # An IPv6 literal is allowed to be enclosed in square brackets, but - # the brackets must be stripped in order to process the literal; - # otherwise, things break. - host = host.strip('[]') - - try: - # If `host` is an IPv4 literal, we can return it immediately. - ipv4 = socket.inet_aton(host) - yield (host, host, port) - except socket.error: - pass - - if use_ipv6: - try: - # Likewise, If `host` is an IPv6 literal, we can return - # it immediately. - if hasattr(socket, 'inet_pton'): - ipv6 = socket.inet_pton(socket.AF_INET6, host) - yield (host, host, port) - except (socket.error, ValueError): - pass - - # If no service was provided, then we can just do A/AAAA lookups on the - # provided host. Otherwise we need to get an ordered list of hosts to - # resolve based on SRV records. - if not service: - hosts = [(host, port)] - else: - hosts = get_SRV(host, port, service, proto, - resolver=resolver, - use_dnspython=use_dnspython) - - for host, port in hosts: - results = [] - if host == 'localhost': - if use_ipv6: - results.append((host, '::1', port)) - results.append((host, '127.0.0.1', port)) - if use_ipv6: - for address in get_AAAA(host, resolver=resolver, - use_dnspython=use_dnspython): - results.append((host, address, port)) - for address in get_A(host, resolver=resolver, - use_dnspython=use_dnspython): - results.append((host, address, port)) - - for host, address, port in results: - yield host, address, port - - -def get_A(host, resolver=None, use_dnspython=True): - """Lookup DNS A records for a given host. - - If ``resolver`` is not provided, or is ``None``, then resolution will - be performed using the built-in :mod:`socket` module. - - :param host: The hostname to resolve for A record IPv4 addresses. - :param resolver: Optional DNS resolver object to use for the query. - :param use_dnspython: Optionally control if dnspython is used to make - the DNS queries instead of the built-in DNS - library. - - :type host: string - :type resolver: :class:`dns.resolver.Resolver` or ``None`` - :type use_dnspython: bool - - :return: A list of IPv4 literals. - """ - log.debug("DNS: Querying %s for A records." % host) - - # If not using dnspython, attempt lookup using the OS level - # getaddrinfo() method. - if resolver is None or not use_dnspython: - try: - recs = socket.getaddrinfo(host, None, socket.AF_INET, - socket.SOCK_STREAM) - return [rec[4][0] for rec in recs] - except socket.gaierror: - log.debug("DNS: Error retreiving A address info for %s." % host) - return [] - - # Using dnspython: - try: - recs = resolver.query(host, dns.rdatatype.A) - return [rec.to_text() for rec in recs] - except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): - log.debug("DNS: No A records for %s" % host) - return [] - except dns.exception.Timeout: - log.debug("DNS: A record resolution timed out for %s" % host) - return [] - except dns.exception.DNSException as e: - log.debug("DNS: Error querying A records for %s" % host) - log.exception(e) - return [] - - -def get_AAAA(host, resolver=None, use_dnspython=True): - """Lookup DNS AAAA records for a given host. - - If ``resolver`` is not provided, or is ``None``, then resolution will - be performed using the built-in :mod:`socket` module. - - :param host: The hostname to resolve for AAAA record IPv6 addresses. - :param resolver: Optional DNS resolver object to use for the query. - :param use_dnspython: Optionally control if dnspython is used to make - the DNS queries instead of the built-in DNS - library. - - :type host: string - :type resolver: :class:`dns.resolver.Resolver` or ``None`` - :type use_dnspython: bool - - :return: A list of IPv6 literals. - """ - log.debug("DNS: Querying %s for AAAA records." % host) - - # If not using dnspython, attempt lookup using the OS level - # getaddrinfo() method. - if resolver is None or not use_dnspython: - if not socket.has_ipv6: - log.debug("Unable to query %s for AAAA records: IPv6 is not supported", host) - return [] - try: - recs = socket.getaddrinfo(host, None, socket.AF_INET6, - socket.SOCK_STREAM) - return [rec[4][0] for rec in recs] - except (OSError, socket.gaierror): - log.debug("DNS: Error retreiving AAAA address " + \ - "info for %s." % host) - return [] - - # Using dnspython: - try: - recs = resolver.query(host, dns.rdatatype.AAAA) - return [rec.to_text() for rec in recs] - except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): - log.debug("DNS: No AAAA records for %s" % host) - return [] - except dns.exception.Timeout: - log.debug("DNS: AAAA record resolution timed out for %s" % host) - return [] - except dns.exception.DNSException as e: - log.debug("DNS: Error querying AAAA records for %s" % host) - log.exception(e) - return [] - - -def get_SRV(host, port, service, proto='tcp', resolver=None, use_dnspython=True): - """Perform SRV record resolution for a given host. - - .. note:: - - This function requires the use of the ``dnspython`` package. Calling - :func:`get_SRV` without ``dnspython`` will return the provided host - and port without performing any DNS queries. - - :param host: The hostname to resolve. - :param port: A default port to connect with. SRV records may - dictate use of a different port. - :param service: Optional SRV service name without leading underscore. - :param proto: Optional SRV protocol name without leading underscore. - :param resolver: Optionally provide a DNS resolver object that has - been custom configured. - - :type host: string - :type port: int - :type service: string - :type proto: string - :type resolver: :class:`dns.resolver.Resolver` - - :return: A list of hostname, port pairs in the order dictacted - by SRV priorities and weights. - """ - if resolver is None or not use_dnspython: - log.warning("DNS: dnspython not found. Can not use SRV lookup.") - return [(host, port)] - - log.debug("DNS: Querying SRV records for %s" % host) - try: - recs = resolver.query('_%s._%s.%s' % (service, proto, host), - dns.rdatatype.SRV) - except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): - log.debug("DNS: No SRV records for %s." % host) - return [(host, port)] - except dns.exception.Timeout: - log.debug("DNS: SRV record resolution timed out for %s." % host) - return [(host, port)] - except dns.exception.DNSException as e: - log.debug("DNS: Error querying SRV records for %s." % host) - log.exception(e) - return [(host, port)] - - if len(recs) == 1 and recs[0].target == '.': - return [(host, port)] - - answers = {} - for rec in recs: - if rec.priority not in answers: - answers[rec.priority] = [] - if rec.weight == 0: - answers[rec.priority].insert(0, rec) - else: - answers[rec.priority].append(rec) - - sorted_recs = [] - for priority in sorted(answers.keys()): - while answers[priority]: - running_sum = 0 - sums = {} - for rec in answers[priority]: - running_sum += rec.weight - sums[running_sum] = rec - - selected = random.randint(0, running_sum + 1) - for running_sum in sums: - if running_sum >= selected: - rec = sums[running_sum] - host = rec.target.to_text() - if host.endswith('.'): - host = host[:-1] - sorted_recs.append((host, rec.port)) - answers[priority].remove(rec) - break - - return sorted_recs diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py deleted file mode 100644 index e6fae37a..00000000 --- a/sleekxmpp/xmlstream/scheduler.py +++ /dev/null @@ -1,250 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.xmlstream.scheduler - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - This module provides a task scheduler that works better - with SleekXMPP's threading usage than the stock version. - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -import time -import threading -import logging -import itertools - -from sleekxmpp.util import Queue, QueueEmpty - - -#: The time in seconds to wait for events from the event queue, and also the -#: time between checks for the process stop signal. -WAIT_TIMEOUT = 1.0 - - -log = logging.getLogger(__name__) - - -class Task(object): - - """ - A scheduled task that will be executed by the scheduler - after a given time interval has passed. - - :param string name: The name of the task. - :param int seconds: The number of seconds to wait before executing. - :param callback: The function to execute. - :param tuple args: The arguments to pass to the callback. - :param dict kwargs: The keyword arguments to pass to the callback. - :param bool repeat: Indicates if the task should repeat. - Defaults to ``False``. - :param pointer: A pointer to an event queue for queuing callback - execution instead of executing immediately. - """ - - def __init__(self, name, seconds, callback, args=None, - kwargs=None, repeat=False, qpointer=None): - #: The name of the task. - self.name = name - - #: The number of seconds to wait before executing. - self.seconds = seconds - - #: The function to execute once enough time has passed. - self.callback = callback - - #: The arguments to pass to :attr:`callback`. - self.args = args or tuple() - - #: The keyword arguments to pass to :attr:`callback`. - self.kwargs = kwargs or {} - - #: Indicates if the task should repeat after executing, - #: using the same :attr:`seconds` delay. - self.repeat = repeat - - #: The time when the task should execute next. - self.next = time.time() + self.seconds - - #: The main event queue, which allows for callbacks to - #: be queued for execution instead of executing immediately. - self.qpointer = qpointer - - def run(self): - """Execute the task's callback. - - If an event queue was supplied, place the callback in the queue; - otherwise, execute the callback immediately. - """ - if self.qpointer is not None: - self.qpointer.put(('schedule', self.callback, - self.args, self.kwargs, self.name)) - else: - self.callback(*self.args, **self.kwargs) - self.reset() - return self.repeat - - def reset(self): - """Reset the task's timer so that it will repeat.""" - self.next = time.time() + self.seconds - - -class Scheduler(object): - - """ - A threaded scheduler that allows for updates mid-execution unlike the - scheduler in the standard library. - - Based on: http://docs.python.org/library/sched.html#module-sched - - :param parentstop: An :class:`~threading.Event` to signal stopping - the scheduler. - """ - - def __init__(self, parentstop=None): - #: A queue for storing tasks - self.addq = Queue() - - #: A list of tasks in order of execution time. - self.schedule = [] - - #: If running in threaded mode, this will be the thread processing - #: the schedule. - self.thread = None - - #: A flag indicating that the scheduler is running. - self.run = False - - #: An :class:`~threading.Event` instance for signalling to stop - #: the scheduler. - self.stop = parentstop - - #: Lock for accessing the task queue. - self.schedule_lock = threading.RLock() - - #: The time in seconds to wait for events from the event queue, - #: and also the time between checks for the process stop signal. - self.wait_timeout = WAIT_TIMEOUT - - def process(self, threaded=True, daemon=False): - """Begin accepting and processing scheduled tasks. - - :param bool threaded: Indicates if the scheduler should execute - in its own thread. Defaults to ``True``. - """ - if threaded: - self.thread = threading.Thread(name='scheduler_process', - target=self._process) - self.thread.daemon = daemon - self.thread.start() - else: - self._process() - - def _process(self): - """Process scheduled tasks.""" - self.run = True - try: - while self.run and not self.stop.is_set(): - updated = False - if self.schedule: - wait = self.schedule[0].next - time.time() - else: - wait = self.wait_timeout - try: - if wait <= 0.0: - newtask = self.addq.get(False) - else: - newtask = None - while self.run and \ - not self.stop.is_set() and \ - newtask is None and \ - wait > 0: - try: - newtask = self.addq.get(True, min(wait, self.wait_timeout)) - except QueueEmpty: # Nothing to add, nothing to do. Check run flags and continue waiting. - wait -= self.wait_timeout - except QueueEmpty: # Time to run some tasks, and no new tasks to add. - self.schedule_lock.acquire() - # select only those tasks which are to be executed now - relevant = itertools.takewhile( - lambda task: time.time() >= task.next, self.schedule) - # run the tasks and keep the return value in a tuple - status = map(lambda task: (task, task.run()), relevant) - # remove non-repeating tasks - for task, doRepeat in status: - if not doRepeat: - try: - self.schedule.remove(task) - except ValueError: - pass - else: - # only need to resort tasks if a repeated task has - # been kept in the list. - updated = True - else: # Add new task - self.schedule_lock.acquire() - if newtask is not None: - self.schedule.append(newtask) - updated = True - finally: - if updated: - self.schedule.sort(key=lambda task: task.next) - self.schedule_lock.release() - except KeyboardInterrupt: - self.run = False - except SystemExit: - self.run = False - log.debug("Quitting Scheduler thread") - - def add(self, name, seconds, callback, args=None, - kwargs=None, repeat=False, qpointer=None): - """Schedule a new task. - - :param string name: The name of the task. - :param int seconds: The number of seconds to wait before executing. - :param callback: The function to execute. - :param tuple args: The arguments to pass to the callback. - :param dict kwargs: The keyword arguments to pass to the callback. - :param bool repeat: Indicates if the task should repeat. - Defaults to ``False``. - :param pointer: A pointer to an event queue for queuing callback - execution instead of executing immediately. - """ - try: - self.schedule_lock.acquire() - for task in self.schedule: - if task.name == name: - raise ValueError("Key %s already exists" % name) - - self.addq.put(Task(name, seconds, callback, args, - kwargs, repeat, qpointer)) - except: - raise - finally: - self.schedule_lock.release() - - def remove(self, name): - """Remove a scheduled task ahead of schedule, and without - executing it. - - :param string name: The name of the task to remove. - """ - try: - self.schedule_lock.acquire() - the_task = None - for task in self.schedule: - if task.name == name: - the_task = task - if the_task is not None: - self.schedule.remove(the_task) - except: - raise - finally: - self.schedule_lock.release() - - def quit(self): - """Shutdown the scheduler.""" - self.run = False diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py deleted file mode 100644 index c2e0f718..00000000 --- a/sleekxmpp/xmlstream/stanzabase.py +++ /dev/null @@ -1,1654 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.xmlstream.stanzabase - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - module implements a wrapper layer for XML objects - that allows them to be treated like dictionaries. - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -from __future__ import with_statement, unicode_literals - -import copy -import logging -import weakref -from xml.etree import cElementTree as ET - -from sleekxmpp.util import safedict -from sleekxmpp.xmlstream import JID -from sleekxmpp.xmlstream.tostring import tostring -from sleekxmpp.thirdparty import OrderedDict - - -log = logging.getLogger(__name__) - - -# Used to check if an argument is an XML object. -XML_TYPE = type(ET.Element('xml')) - - -XML_NS = 'http://www.w3.org/XML/1998/namespace' - - -def register_stanza_plugin(stanza, plugin, iterable=False, overrides=False): - """ - Associate a stanza object as a plugin for another stanza. - - >>> from sleekxmpp.xmlstream import register_stanza_plugin - >>> register_stanza_plugin(Iq, CustomStanza) - - Plugin stanzas marked as iterable will be included in the list of - substanzas for the parent, using ``parent['substanzas']``. If the - attribute ``plugin_multi_attrib`` was defined for the plugin, then - the substanza set can be filtered to only instances of the plugin - class. For example, given a plugin class ``Foo`` with - ``plugin_multi_attrib = 'foos'`` then:: - - parent['foos'] - - would return a collection of all ``Foo`` substanzas. - - :param class stanza: The class of the parent stanza. - :param class plugin: The class of the plugin stanza. - :param bool iterable: Indicates if the plugin stanza should be - included in the parent stanza's iterable - ``'substanzas'`` interface results. - :param bool overrides: Indicates if the plugin should be allowed - to override the interface handlers for - the parent stanza, based on the plugin's - ``overrides`` field. - - .. versionadded:: 1.0-Beta1 - Made ``register_stanza_plugin`` the default name. The prior - ``registerStanzaPlugin`` function name remains as an alias. - """ - tag = "{%s}%s" % (plugin.namespace, plugin.name) - - # Prevent weird memory reference gotchas by ensuring - # that the parent stanza class has its own set of - # plugin info maps and is not using the mappings from - # an ancestor class (like ElementBase). - plugin_info = ('plugin_attrib_map', 'plugin_tag_map', - 'plugin_iterables', 'plugin_overrides') - for attr in plugin_info: - info = getattr(stanza, attr) - setattr(stanza, attr, info.copy()) - - stanza.plugin_attrib_map[plugin.plugin_attrib] = plugin - stanza.plugin_tag_map[tag] = plugin - - if iterable: - stanza.plugin_iterables.add(plugin) - if plugin.plugin_multi_attrib: - multiplugin = multifactory(plugin, plugin.plugin_multi_attrib) - register_stanza_plugin(stanza, multiplugin) - if overrides: - for interface in plugin.overrides: - stanza.plugin_overrides[interface] = plugin.plugin_attrib - - -# To maintain backwards compatibility for now, preserve the camel case name. -registerStanzaPlugin = register_stanza_plugin - - -def multifactory(stanza, plugin_attrib): - """ - Returns a ElementBase class for handling reoccuring child stanzas - """ - - def plugin_filter(self): - return lambda x: isinstance(x, self._multistanza) - - def plugin_lang_filter(self, lang): - return lambda x: isinstance(x, self._multistanza) and \ - x['lang'] == lang - - class Multi(ElementBase): - """ - Template class for multifactory - """ - def setup(self, xml=None): - self.xml = ET.Element('') - - def get_multi(self, lang=None): - parent = self.parent() - if not lang or lang == '*': - res = filter(plugin_filter(self), parent) - else: - res = filter(plugin_filter(self, lang), parent) - return list(res) - - def set_multi(self, val, lang=None): - parent = self.parent() - del_multi = getattr(self, 'del_%s' % plugin_attrib) - del_multi(lang) - for sub in val: - parent.append(sub) - - def del_multi(self, lang=None): - parent = self.parent() - if not lang or lang == '*': - res = filter(plugin_filter(self), parent) - else: - res = filter(plugin_filter(self, lang), parent) - res = list(res) - if not res: - del parent.plugins[(plugin_attrib, None)] - parent.loaded_plugins.remove(plugin_attrib) - try: - parent.xml.remove(self.xml) - except ValueError: - pass - else: - for stanza in list(res): - parent.iterables.remove(stanza) - parent.xml.remove(stanza.xml) - - Multi.is_extension = True - Multi.plugin_attrib = plugin_attrib - Multi._multistanza = stanza - Multi.interfaces = set([plugin_attrib]) - Multi.lang_interfaces = set([plugin_attrib]) - setattr(Multi, "get_%s" % plugin_attrib, get_multi) - setattr(Multi, "set_%s" % plugin_attrib, set_multi) - setattr(Multi, "del_%s" % plugin_attrib, del_multi) - return Multi - - -def fix_ns(xpath, split=False, propagate_ns=True, default_ns=''): - """Apply the stanza's namespace to elements in an XPath expression. - - :param string xpath: The XPath expression to fix with namespaces. - :param bool split: Indicates if the fixed XPath should be left as a - list of element names with namespaces. Defaults to - False, which returns a flat string path. - :param bool propagate_ns: Overrides propagating parent element - namespaces to child elements. Useful if - you wish to simply split an XPath that has - non-specified namespaces, and child and - parent namespaces are known not to always - match. Defaults to True. - """ - fixed = [] - # Split the XPath into a series of blocks, where a block - # is started by an element with a namespace. - ns_blocks = xpath.split('{') - for ns_block in ns_blocks: - if '}' in ns_block: - # Apply the found namespace to following elements - # that do not have namespaces. - namespace = ns_block.split('}')[0] - elements = ns_block.split('}')[1].split('/') - else: - # Apply the stanza's namespace to the following - # elements since no namespace was provided. - namespace = default_ns - elements = ns_block.split('/') - - for element in elements: - if element: - # Skip empty entry artifacts from splitting. - if propagate_ns and element[0] != '*': - tag = '{%s}%s' % (namespace, element) - else: - tag = element - fixed.append(tag) - if split: - return fixed - return '/'.join(fixed) - - -class ElementBase(object): - - """ - The core of SleekXMPP's stanza XML manipulation and handling is provided - by ElementBase. ElementBase wraps XML cElementTree objects and enables - access to the XML contents through dictionary syntax, similar in style - to the Ruby XMPP library Blather's stanza implementation. - - Stanzas are defined by their name, namespace, and interfaces. For - example, a simplistic Message stanza could be defined as:: - - >>> class Message(ElementBase): - ... name = "message" - ... namespace = "jabber:client" - ... interfaces = set(('to', 'from', 'type', 'body')) - ... sub_interfaces = set(('body',)) - - The resulting Message stanza's contents may be accessed as so:: - - >>> message['to'] = "user@example.com" - >>> message['body'] = "Hi!" - >>> message['body'] - "Hi!" - >>> del message['body'] - >>> message['body'] - "" - - The interface values map to either custom access methods, stanza - XML attributes, or (if the interface is also in sub_interfaces) the - text contents of a stanza's subelement. - - Custom access methods may be created by adding methods of the - form "getInterface", "setInterface", or "delInterface", where - "Interface" is the titlecase version of the interface name. - - Stanzas may be extended through the use of plugins. A plugin - is simply a stanza that has a plugin_attrib value. For example:: - - >>> class MessagePlugin(ElementBase): - ... name = "custom_plugin" - ... namespace = "custom" - ... interfaces = set(('useful_thing', 'custom')) - ... plugin_attrib = "custom" - - The plugin stanza class must be associated with its intended - container stanza by using register_stanza_plugin as so:: - - >>> register_stanza_plugin(Message, MessagePlugin) - - The plugin may then be accessed as if it were built-in to the parent - stanza:: - - >>> message['custom']['useful_thing'] = 'foo' - - If a plugin provides an interface that is the same as the plugin's - plugin_attrib value, then the plugin's interface may be assigned - directly from the parent stanza, as shown below, but retrieving - information will require all interfaces to be used, as so:: - - >>> # Same as using message['custom']['custom'] - >>> message['custom'] = 'bar' - >>> # Must use all interfaces - >>> message['custom']['custom'] - 'bar' - - If the plugin sets :attr:`is_extension` to ``True``, then both setting - and getting an interface value that is the same as the plugin's - plugin_attrib value will work, as so:: - - >>> message['custom'] = 'bar' # Using is_extension=True - >>> message['custom'] - 'bar' - - - :param xml: Initialize the stanza object with an existing XML object. - :param parent: Optionally specify a parent stanza object will - contain this substanza. - """ - - #: The XML tag name of the element, not including any namespace - #: prefixes. For example, an :class:`ElementBase` object for - #: ``<message />`` would use ``name = 'message'``. - name = 'stanza' - - #: The XML namespace for the element. Given ``<foo xmlns="bar" />``, - #: then ``namespace = "bar"`` should be used. The default namespace - #: is ``jabber:client`` since this is being used in an XMPP library. - namespace = 'jabber:client' - - #: For :class:`ElementBase` subclasses which are intended to be used - #: as plugins, the ``plugin_attrib`` value defines the plugin name. - #: Plugins may be accessed by using the ``plugin_attrib`` value as - #: the interface. An example using ``plugin_attrib = 'foo'``:: - #: - #: register_stanza_plugin(Message, FooPlugin) - #: msg = Message() - #: msg['foo']['an_interface_from_the_foo_plugin'] - plugin_attrib = 'plugin' - - #: For :class:`ElementBase` subclasses that are intended to be an - #: iterable group of items, the ``plugin_multi_attrib`` value defines - #: an interface for the parent stanza which returns the entire group - #: of matching substanzas. So the following are equivalent:: - #: - #: # Given stanza class Foo, with plugin_multi_attrib = 'foos' - #: parent['foos'] - #: filter(isinstance(item, Foo), parent['substanzas']) - plugin_multi_attrib = '' - - #: The set of keys that the stanza provides for accessing and - #: manipulating the underlying XML object. This set may be augmented - #: with the :attr:`plugin_attrib` value of any registered - #: stanza plugins. - interfaces = set(('type', 'to', 'from', 'id', 'payload')) - - #: A subset of :attr:`interfaces` which maps interfaces to direct - #: subelements of the underlying XML object. Using this set, the text - #: of these subelements may be set, retrieved, or removed without - #: needing to define custom methods. - sub_interfaces = set() - - #: A subset of :attr:`interfaces` which maps the presence of - #: subelements to boolean values. Using this set allows for quickly - #: checking for the existence of empty subelements like ``<required />``. - #: - #: .. versionadded:: 1.1 - bool_interfaces = set() - - #: .. versionadded:: 1.1.2 - lang_interfaces = set() - - #: In some cases you may wish to override the behaviour of one of the - #: parent stanza's interfaces. The ``overrides`` list specifies the - #: interface name and access method to be overridden. For example, - #: to override setting the parent's ``'condition'`` interface you - #: would use:: - #: - #: overrides = ['set_condition'] - #: - #: Getting and deleting the ``'condition'`` interface would not - #: be affected. - #: - #: .. versionadded:: 1.0-Beta5 - overrides = [] - - #: If you need to add a new interface to an existing stanza, you - #: can create a plugin and set ``is_extension = True``. Be sure - #: to set the :attr:`plugin_attrib` value to the desired interface - #: name, and that it is the only interface listed in - #: :attr:`interfaces`. Requests for the new interface from the - #: parent stanza will be passed to the plugin directly. - #: - #: .. versionadded:: 1.0-Beta5 - is_extension = False - - #: A map of interface operations to the overriding functions. - #: For example, after overriding the ``set`` operation for - #: the interface ``body``, :attr:`plugin_overrides` would be:: - #: - #: {'set_body': <some function>} - #: - #: .. versionadded: 1.0-Beta5 - plugin_overrides = {} - - #: A mapping of the :attr:`plugin_attrib` values of registered - #: plugins to their respective classes. - plugin_attrib_map = {} - - #: A mapping of root element tag names (in ``'{namespace}elementname'`` - #: format) to the plugin classes responsible for them. - plugin_tag_map = {} - - #: The set of stanza classes that can be iterated over using - #: the 'substanzas' interface. Classes are added to this set - #: when registering a plugin with ``iterable=True``:: - #: - #: register_stanza_plugin(DiscoInfo, DiscoItem, iterable=True) - #: - #: .. versionadded:: 1.0-Beta5 - plugin_iterables = set() - - #: A deprecated version of :attr:`plugin_iterables` that remains - #: for backward compatibility. It required a parent stanza to - #: know beforehand what stanza classes would be iterable:: - #: - #: class DiscoItem(ElementBase): - #: ... - #: - #: class DiscoInfo(ElementBase): - #: subitem = (DiscoItem, ) - #: ... - #: - #: .. deprecated:: 1.0-Beta5 - subitem = set() - - #: The default XML namespace: ``http://www.w3.org/XML/1998/namespace``. - xml_ns = XML_NS - - def __init__(self, xml=None, parent=None): - self._index = 0 - - #: The underlying XML object for the stanza. It is a standard - #: :class:`xml.etree.cElementTree` object. - self.xml = xml - - #: An ordered dictionary of plugin stanzas, mapped by their - #: :attr:`plugin_attrib` value. - self.plugins = OrderedDict() - self.loaded_plugins = set() - - #: A list of child stanzas whose class is included in - #: :attr:`plugin_iterables`. - self.iterables = [] - - #: The name of the tag for the stanza's root element. It is the - #: same as calling :meth:`tag_name()` and is formatted as - #: ``'{namespace}elementname'``. - self.tag = self.tag_name() - - #: A :class:`weakref.weakref` to the parent stanza, if there is one. - #: If not, then :attr:`parent` is ``None``. - self.parent = None - if parent is not None: - if not isinstance(parent, weakref.ReferenceType): - self.parent = weakref.ref(parent) - else: - self.parent = parent - - if self.subitem is not None: - for sub in self.subitem: - self.plugin_iterables.add(sub) - - if self.setup(xml): - # If we generated our own XML, then everything is ready. - return - - # Initialize values using provided XML - for child in self.xml: - if child.tag in self.plugin_tag_map: - plugin_class = self.plugin_tag_map[child.tag] - self.init_plugin(plugin_class.plugin_attrib, - existing_xml=child, - reuse=False) - - def setup(self, xml=None): - """Initialize the stanza's XML contents. - - Will return ``True`` if XML was generated according to the stanza's - definition instead of building a stanza object from an existing - XML object. - - :param xml: An existing XML object to use for the stanza's content - instead of generating new XML. - """ - if self.xml is None: - self.xml = xml - - last_xml = self.xml - if self.xml is None: - # Generate XML from the stanza definition - for ename in self.name.split('/'): - new = ET.Element("{%s}%s" % (self.namespace, ename)) - if self.xml is None: - self.xml = new - else: - last_xml.append(new) - last_xml = new - if self.parent is not None: - self.parent().xml.append(self.xml) - - # We had to generate XML - return True - else: - # We did not generate XML - return False - - def enable(self, attrib, lang=None): - """Enable and initialize a stanza plugin. - - Alias for :meth:`init_plugin`. - - :param string attrib: The :attr:`plugin_attrib` value of the - plugin to enable. - """ - return self.init_plugin(attrib, lang) - - def _get_plugin(self, name, lang=None, check=False): - if lang is None: - lang = self.get_lang() - - if name not in self.plugin_attrib_map: - return None - - plugin_class = self.plugin_attrib_map[name] - - if plugin_class.is_extension: - if (name, None) in self.plugins: - return self.plugins[(name, None)] - else: - return None if check else self.init_plugin(name, lang) - else: - if (name, lang) in self.plugins: - return self.plugins[(name, lang)] - else: - return None if check else self.init_plugin(name, lang) - - def init_plugin(self, attrib, lang=None, existing_xml=None, reuse=True): - """Enable and initialize a stanza plugin. - - :param string attrib: The :attr:`plugin_attrib` value of the - plugin to enable. - """ - default_lang = self.get_lang() - if not lang: - lang = default_lang - - plugin_class = self.plugin_attrib_map[attrib] - - if plugin_class.is_extension and (attrib, None) in self.plugins: - return self.plugins[(attrib, None)] - if reuse and (attrib, lang) in self.plugins: - return self.plugins[(attrib, lang)] - - plugin = plugin_class(parent=self, xml=existing_xml) - - if plugin.is_extension: - self.plugins[(attrib, None)] = plugin - else: - if lang != default_lang: - plugin['lang'] = lang - self.plugins[(attrib, lang)] = plugin - - if plugin_class in self.plugin_iterables: - self.iterables.append(plugin) - if plugin_class.plugin_multi_attrib: - self.init_plugin(plugin_class.plugin_multi_attrib) - - self.loaded_plugins.add(attrib) - - return plugin - - def _get_stanza_values(self): - """Return A JSON/dictionary version of the XML content - exposed through the stanza's interfaces:: - - >>> msg = Message() - >>> msg.values - {'body': '', 'from': , 'mucnick': '', 'mucroom': '', - 'to': , 'type': 'normal', 'id': '', 'subject': ''} - - Likewise, assigning to :attr:`values` will change the XML - content:: - - >>> msg = Message() - >>> msg.values = {'body': 'Hi!', 'to': 'user@example.com'} - >>> msg - '<message to="user@example.com"><body>Hi!</body></message>' - - .. versionadded:: 1.0-Beta1 - """ - values = OrderedDict() - values['lang'] = self['lang'] - for interface in self.interfaces: - if isinstance(self[interface], JID): - values[interface] = self[interface].jid - else: - values[interface] = self[interface] - if interface in self.lang_interfaces: - values['%s|*' % interface] = self['%s|*' % interface] - for plugin, stanza in self.plugins.items(): - lang = stanza['lang'] - if lang: - values['%s|%s' % (plugin[0], lang)] = stanza.values - else: - values[plugin[0]] = stanza.values - if self.iterables: - iterables = [] - for stanza in self.iterables: - iterables.append(stanza.values) - iterables[-1]['__childtag__'] = stanza.tag - values['substanzas'] = iterables - return values - - def _set_stanza_values(self, values): - """Set multiple stanza interface values using a dictionary. - - Stanza plugin values may be set using nested dictionaries. - - :param values: A dictionary mapping stanza interface with values. - Plugin interfaces may accept a nested dictionary that - will be used recursively. - - .. versionadded:: 1.0-Beta1 - """ - iterable_interfaces = [p.plugin_attrib for \ - p in self.plugin_iterables] - - if 'lang' in values: - self['lang'] = values['lang'] - - if 'substanzas' in values: - # Remove existing substanzas - for stanza in self.iterables: - try: - self.xml.remove(stanza.xml) - except ValueError: - pass - self.iterables = [] - - # Add new substanzas - for subdict in values['substanzas']: - if '__childtag__' in subdict: - for subclass in self.plugin_iterables: - child_tag = "{%s}%s" % (subclass.namespace, - subclass.name) - if subdict['__childtag__'] == child_tag: - sub = subclass(parent=self) - sub.values = subdict - self.iterables.append(sub) - - for interface, value in values.items(): - full_interface = interface - interface_lang = ('%s|' % interface).split('|') - interface = interface_lang[0] - lang = interface_lang[1] or self.get_lang() - - if interface == 'lang': - continue - elif interface == 'substanzas': - continue - elif interface in self.interfaces: - self[full_interface] = value - elif interface in self.plugin_attrib_map: - if interface not in iterable_interfaces: - plugin = self._get_plugin(interface, lang) - if plugin: - plugin.values = value - return self - - def __getitem__(self, attrib): - """Return the value of a stanza interface using dict-like syntax. - - Example:: - - >>> msg['body'] - 'Message contents' - - Stanza interfaces are typically mapped directly to the underlying XML - object, but can be overridden by the presence of a ``get_attrib`` - method (or ``get_foo`` where the interface is named ``'foo'``, etc). - - The search order for interface value retrieval for an interface - named ``'foo'`` is: - - 1. The list of substanzas (``'substanzas'``) - 2. The result of calling the ``get_foo`` override handler. - 3. The result of calling ``get_foo``. - 4. The result of calling ``getFoo``. - 5. The contents of the ``foo`` subelement, if ``foo`` is listed - in :attr:`sub_interfaces`. - 6. True or False depending on the existence of a ``foo`` - subelement and ``foo`` is in :attr:`bool_interfaces`. - 7. The value of the ``foo`` attribute of the XML object. - 8. The plugin named ``'foo'`` - 9. An empty string. - - :param string attrib: The name of the requested stanza interface. - """ - full_attrib = attrib - attrib_lang = ('%s|' % attrib).split('|') - attrib = attrib_lang[0] - lang = attrib_lang[1] or None - - kwargs = {} - if lang and attrib in self.lang_interfaces: - kwargs['lang'] = lang - - kwargs = safedict(kwargs) - - if attrib == 'substanzas': - return self.iterables - elif attrib in self.interfaces or attrib == 'lang': - get_method = "get_%s" % attrib.lower() - get_method2 = "get%s" % attrib.title() - - if self.plugin_overrides: - name = self.plugin_overrides.get(get_method, None) - if name: - plugin = self._get_plugin(name, lang) - if plugin: - handler = getattr(plugin, get_method, None) - if handler: - return handler(**kwargs) - - if hasattr(self, get_method): - return getattr(self, get_method)(**kwargs) - elif hasattr(self, get_method2): - return getattr(self, get_method2)(**kwargs) - else: - if attrib in self.sub_interfaces: - return self._get_sub_text(attrib, lang=lang) - elif attrib in self.bool_interfaces: - elem = self.xml.find('{%s}%s' % (self.namespace, attrib)) - return elem is not None - else: - return self._get_attr(attrib) - elif attrib in self.plugin_attrib_map: - plugin = self._get_plugin(attrib, lang) - if plugin and plugin.is_extension: - return plugin[full_attrib] - return plugin - else: - return '' - - def __setitem__(self, attrib, value): - """Set the value of a stanza interface using dictionary-like syntax. - - Example:: - - >>> msg['body'] = "Hi!" - >>> msg['body'] - 'Hi!' - - Stanza interfaces are typically mapped directly to the underlying XML - object, but can be overridden by the presence of a ``set_attrib`` - method (or ``set_foo`` where the interface is named ``'foo'``, etc). - - The effect of interface value assignment for an interface - named ``'foo'`` will be one of: - - 1. Delete the interface's contents if the value is None. - 2. Call the ``set_foo`` override handler, if it exists. - 3. Call ``set_foo``, if it exists. - 4. Call ``setFoo``, if it exists. - 5. Set the text of a ``foo`` element, if ``'foo'`` is - in :attr:`sub_interfaces`. - 6. Add or remove an empty subelement ``foo`` - if ``foo`` is in :attr:`bool_interfaces`. - 7. Set the value of a top level XML attribute named ``foo``. - 8. Attempt to pass the value to a plugin named ``'foo'`` using - the plugin's ``'foo'`` interface. - 9. Do nothing. - - :param string attrib: The name of the stanza interface to modify. - :param value: The new value of the stanza interface. - """ - full_attrib = attrib - attrib_lang = ('%s|' % attrib).split('|') - attrib = attrib_lang[0] - lang = attrib_lang[1] or None - - kwargs = {} - if lang and attrib in self.lang_interfaces: - kwargs['lang'] = lang - - kwargs = safedict(kwargs) - - if attrib in self.interfaces or attrib == 'lang': - if value is not None: - set_method = "set_%s" % attrib.lower() - set_method2 = "set%s" % attrib.title() - - if self.plugin_overrides: - name = self.plugin_overrides.get(set_method, None) - if name: - plugin = self._get_plugin(name, lang) - if plugin: - handler = getattr(plugin, set_method, None) - if handler: - return handler(value, **kwargs) - - if hasattr(self, set_method): - getattr(self, set_method)(value, **kwargs) - elif hasattr(self, set_method2): - getattr(self, set_method2)(value, **kwargs) - else: - if attrib in self.sub_interfaces: - if lang == '*': - return self._set_all_sub_text(attrib, - value, - lang='*') - return self._set_sub_text(attrib, text=value, - lang=lang) - elif attrib in self.bool_interfaces: - if value: - return self._set_sub_text(attrib, '', - keep=True, - lang=lang) - else: - return self._set_sub_text(attrib, '', - keep=False, - lang=lang) - else: - self._set_attr(attrib, value) - else: - self.__delitem__(attrib) - elif attrib in self.plugin_attrib_map: - plugin = self._get_plugin(attrib, lang) - if plugin: - plugin[full_attrib] = value - return self - - def __delitem__(self, attrib): - """Delete the value of a stanza interface using dict-like syntax. - - Example:: - - >>> msg['body'] = "Hi!" - >>> msg['body'] - 'Hi!' - >>> del msg['body'] - >>> msg['body'] - '' - - Stanza interfaces are typically mapped directly to the underlyig XML - object, but can be overridden by the presence of a ``del_attrib`` - method (or ``del_foo`` where the interface is named ``'foo'``, etc). - - The effect of deleting a stanza interface value named ``foo`` will be - one of: - - 1. Call ``del_foo`` override handler, if it exists. - 2. Call ``del_foo``, if it exists. - 3. Call ``delFoo``, if it exists. - 4. Delete ``foo`` element, if ``'foo'`` is in - :attr:`sub_interfaces`. - 5. Remove ``foo`` element if ``'foo'`` is in - :attr:`bool_interfaces`. - 6. Delete top level XML attribute named ``foo``. - 7. Remove the ``foo`` plugin, if it was loaded. - 8. Do nothing. - - :param attrib: The name of the affected stanza interface. - """ - full_attrib = attrib - attrib_lang = ('%s|' % attrib).split('|') - attrib = attrib_lang[0] - lang = attrib_lang[1] or None - - kwargs = {} - if lang and attrib in self.lang_interfaces: - kwargs['lang'] = lang - - kwargs = safedict(kwargs) - - if attrib in self.interfaces or attrib == 'lang': - del_method = "del_%s" % attrib.lower() - del_method2 = "del%s" % attrib.title() - - if self.plugin_overrides: - name = self.plugin_overrides.get(del_method, None) - if name: - plugin = self._get_plugin(attrib, lang) - if plugin: - handler = getattr(plugin, del_method, None) - if handler: - return handler(**kwargs) - - if hasattr(self, del_method): - getattr(self, del_method)(**kwargs) - elif hasattr(self, del_method2): - getattr(self, del_method2)(**kwargs) - else: - if attrib in self.sub_interfaces: - return self._del_sub(attrib, lang=lang) - elif attrib in self.bool_interfaces: - return self._del_sub(attrib, lang=lang) - else: - self._del_attr(attrib) - elif attrib in self.plugin_attrib_map: - plugin = self._get_plugin(attrib, lang, check=True) - if not plugin: - return self - if plugin.is_extension: - del plugin[full_attrib] - del self.plugins[(attrib, None)] - else: - del self.plugins[(attrib, plugin['lang'])] - self.loaded_plugins.remove(attrib) - try: - self.xml.remove(plugin.xml) - except ValueError: - pass - return self - - def _set_attr(self, name, value): - """Set the value of a top level attribute of the XML object. - - If the new value is None or an empty string, then the attribute will - be removed. - - :param name: The name of the attribute. - :param value: The new value of the attribute, or None or '' to - remove it. - """ - if value is None or value == '': - self.__delitem__(name) - else: - self.xml.attrib[name] = value - - def _del_attr(self, name): - """Remove a top level attribute of the XML object. - - :param name: The name of the attribute. - """ - if name in self.xml.attrib: - del self.xml.attrib[name] - - def _get_attr(self, name, default=''): - """Return the value of a top level attribute of the XML object. - - In case the attribute has not been set, a default value can be - returned instead. An empty string is returned if no other default - is supplied. - - :param name: The name of the attribute. - :param default: Optional value to return if the attribute has not - been set. An empty string is returned otherwise. - """ - return self.xml.attrib.get(name, default) - - def _get_sub_text(self, name, default='', lang=None): - """Return the text contents of a sub element. - - In case the element does not exist, or it has no textual content, - a default value can be returned instead. An empty string is returned - if no other default is supplied. - - :param name: The name or XPath expression of the element. - :param default: Optional default to return if the element does - not exists. An empty string is returned otherwise. - """ - name = self._fix_ns(name) - if lang == '*': - return self._get_all_sub_text(name, default, None) - - default_lang = self.get_lang() - if not lang: - lang = default_lang - - stanzas = self.xml.findall(name) - if not stanzas: - return default - for stanza in stanzas: - if stanza.attrib.get('{%s}lang' % XML_NS, default_lang) == lang: - if stanza.text is None: - return default - return stanza.text - return default - - def _get_all_sub_text(self, name, default='', lang=None): - name = self._fix_ns(name) - - default_lang = self.get_lang() - results = OrderedDict() - stanzas = self.xml.findall(name) - if stanzas: - for stanza in stanzas: - stanza_lang = stanza.attrib.get('{%s}lang' % XML_NS, - default_lang) - if not lang or lang == '*' or stanza_lang == lang: - results[stanza_lang] = stanza.text - return results - - def _set_sub_text(self, name, text=None, keep=False, lang=None): - """Set the text contents of a sub element. - - In case the element does not exist, a element will be created, - and its text contents will be set. - - If the text is set to an empty string, or None, then the - element will be removed, unless keep is set to True. - - :param name: The name or XPath expression of the element. - :param text: The new textual content of the element. If the text - is an empty string or None, the element will be removed - unless the parameter keep is True. - :param keep: Indicates if the element should be kept if its text is - removed. Defaults to False. - """ - default_lang = self.get_lang() - if lang is None: - lang = default_lang - - if not text and not keep: - return self._del_sub(name, lang=lang) - - path = self._fix_ns(name, split=True) - name = path[-1] - parent = self.xml - - # The first goal is to find the parent of the subelement, or, if - # we can't find that, the closest grandparent element. - missing_path = [] - search_order = path[:-1] - while search_order: - parent = self.xml.find('/'.join(search_order)) - ename = search_order.pop() - if parent is not None: - break - else: - missing_path.append(ename) - missing_path.reverse() - - # Find all existing elements that match the desired - # element path (there may be multiples due to different - # languages values). - if parent is not None: - elements = self.xml.findall('/'.join(path)) - else: - parent = self.xml - elements = [] - - # Insert the remaining grandparent elements that don't exist yet. - for ename in missing_path: - element = ET.Element(ename) - parent.append(element) - parent = element - - # Re-use an existing element with the proper language, if one exists. - for element in elements: - elang = element.attrib.get('{%s}lang' % XML_NS, default_lang) - if not lang and elang == default_lang or lang and lang == elang: - element.text = text - return element - - # No useable element exists, so create a new one. - element = ET.Element(name) - element.text = text - if lang and lang != default_lang: - element.attrib['{%s}lang' % XML_NS] = lang - parent.append(element) - return element - - def _set_all_sub_text(self, name, values, keep=False, lang=None): - self._del_sub(name, lang) - for value_lang, value in values.items(): - if not lang or lang == '*' or value_lang == lang: - self._set_sub_text(name, text=value, - keep=keep, - lang=value_lang) - - def _del_sub(self, name, all=False, lang=None): - """Remove sub elements that match the given name or XPath. - - If the element is in a path, then any parent elements that become - empty after deleting the element may also be deleted if requested - by setting all=True. - - :param name: The name or XPath expression for the element(s) to remove. - :param bool all: If True, remove all empty elements in the path to the - deleted element. Defaults to False. - """ - path = self._fix_ns(name, split=True) - original_target = path[-1] - - default_lang = self.get_lang() - if not lang: - lang = default_lang - - for level, _ in enumerate(path): - # Generate the paths to the target elements and their parent. - element_path = "/".join(path[:len(path) - level]) - parent_path = "/".join(path[:len(path) - level - 1]) - - elements = self.xml.findall(element_path) - parent = self.xml.find(parent_path) - - if elements: - if parent is None: - parent = self.xml - for element in elements: - if element.tag == original_target or not list(element): - # Only delete the originally requested elements, and - # any parent elements that have become empty. - elem_lang = element.attrib.get('{%s}lang' % XML_NS, - default_lang) - if lang == '*' or elem_lang == lang: - parent.remove(element) - if not all: - # If we don't want to delete elements up the tree, stop - # after deleting the first level of elements. - return - - def match(self, xpath): - """Compare a stanza object with an XPath-like expression. - - If the XPath matches the contents of the stanza object, the match - is successful. - - The XPath expression may include checks for stanza attributes. - For example:: - - 'presence@show=xa@priority=2/status' - - Would match a presence stanza whose show value is set to ``'xa'``, - has a priority value of ``'2'``, and has a status element. - - :param string xpath: The XPath expression to check against. It - may be either a string or a list of element - names with attribute checks. - """ - if not isinstance(xpath, list): - xpath = self._fix_ns(xpath, split=True, propagate_ns=False) - - # Extract the tag name and attribute checks for the first XPath node. - components = xpath[0].split('@') - tag = components[0] - attributes = components[1:] - - if tag not in (self.name, "{%s}%s" % (self.namespace, self.name)) and \ - tag not in self.loaded_plugins and tag not in self.plugin_attrib: - # The requested tag is not in this stanza, so no match. - return False - - # Check the rest of the XPath against any substanzas. - matched_substanzas = False - for substanza in self.iterables: - if xpath[1:] == []: - break - matched_substanzas = substanza.match(xpath[1:]) - if matched_substanzas: - break - - # Check attribute values. - for attribute in attributes: - name, value = attribute.split('=') - if self[name] != value: - return False - - # Check sub interfaces. - if len(xpath) > 1: - next_tag = xpath[1] - if next_tag in self.sub_interfaces and self[next_tag]: - return True - - # Attempt to continue matching the XPath using the stanza's plugins. - if not matched_substanzas and len(xpath) > 1: - # Convert {namespace}tag@attribs to just tag - next_tag = xpath[1].split('@')[0].split('}')[-1] - langs = [name[1] for name in self.plugins if name[0] == next_tag] - for lang in langs: - plugin = self._get_plugin(next_tag, lang) - if plugin and plugin.match(xpath[1:]): - return True - return False - - # Everything matched. - return True - - def find(self, xpath): - """Find an XML object in this stanza given an XPath expression. - - Exposes ElementTree interface for backwards compatibility. - - .. note:: - - Matching on attribute values is not supported in Python 2.6 - or Python 3.1 - - :param string xpath: An XPath expression matching a single - desired element. - """ - return self.xml.find(xpath) - - def findall(self, xpath): - """Find multiple XML objects in this stanza given an XPath expression. - - Exposes ElementTree interface for backwards compatibility. - - .. note:: - - Matching on attribute values is not supported in Python 2.6 - or Python 3.1. - - :param string xpath: An XPath expression matching multiple - desired elements. - """ - return self.xml.findall(xpath) - - def get(self, key, default=None): - """Return the value of a stanza interface. - - If the found value is None or an empty string, return the supplied - default value. - - Allows stanza objects to be used like dictionaries. - - :param string key: The name of the stanza interface to check. - :param default: Value to return if the stanza interface has a value - of ``None`` or ``""``. Will default to returning None. - """ - value = self[key] - if value is None or value == '': - return default - return value - - def keys(self): - """Return the names of all stanza interfaces provided by the - stanza object. - - Allows stanza objects to be used like dictionaries. - """ - out = [] - out += [x for x in self.interfaces] - out += [x for x in self.loaded_plugins] - out.append('lang') - if self.iterables: - out.append('substanzas') - return out - - def append(self, item): - """Append either an XML object or a substanza to this stanza object. - - If a substanza object is appended, it will be added to the list - of iterable stanzas. - - Allows stanza objects to be used like lists. - - :param item: Either an XML object or a stanza object to add to - this stanza's contents. - """ - if not isinstance(item, ElementBase): - if type(item) == XML_TYPE: - return self.appendxml(item) - else: - raise TypeError - self.xml.append(item.xml) - self.iterables.append(item) - if item.__class__ in self.plugin_iterables: - if item.__class__.plugin_multi_attrib: - self.init_plugin(item.__class__.plugin_multi_attrib) - elif item.__class__ == self.plugin_tag_map.get(item.tag_name(), None): - self.init_plugin(item.plugin_attrib, - existing_xml=item.xml, - reuse=False) - return self - - def appendxml(self, xml): - """Append an XML object to the stanza's XML. - - The added XML will not be included in the list of - iterable substanzas. - - :param XML xml: The XML object to add to the stanza. - """ - self.xml.append(xml) - return self - - def pop(self, index=0): - """Remove and return the last substanza in the list of - iterable substanzas. - - Allows stanza objects to be used like lists. - - :param int index: The index of the substanza to remove. - """ - substanza = self.iterables.pop(index) - self.xml.remove(substanza.xml) - return substanza - - def next(self): - """Return the next iterable substanza.""" - return self.__next__() - - def clear(self): - """Remove all XML element contents and plugins. - - Any attribute values will be preserved. - """ - for child in list(self.xml): - self.xml.remove(child) - - for plugin in list(self.plugins.keys()): - del self.plugins[plugin] - return self - - @classmethod - def tag_name(cls): - """Return the namespaced name of the stanza's root element. - - The format for the tag name is:: - - '{namespace}elementname' - - For example, for the stanza ``<foo xmlns="bar" />``, - ``stanza.tag_name()`` would return ``"{bar}foo"``. - """ - return "{%s}%s" % (cls.namespace, cls.name) - - def get_lang(self, lang=None): - result = self.xml.attrib.get('{%s}lang' % XML_NS, '') - if not result and self.parent and self.parent(): - return self.parent()['lang'] - return result - - def set_lang(self, lang): - self.del_lang() - attr = '{%s}lang' % XML_NS - if lang: - self.xml.attrib[attr] = lang - - def del_lang(self): - attr = '{%s}lang' % XML_NS - if attr in self.xml.attrib: - del self.xml.attrib[attr] - - @property - def attrib(self): - """Return the stanza object itself. - - Older implementations of stanza objects used XML objects directly, - requiring the use of ``.attrib`` to access attribute values. - - Use of the dictionary syntax with the stanza object itself for - accessing stanza interfaces is preferred. - - .. deprecated:: 1.0 - """ - return self - - def _fix_ns(self, xpath, split=False, propagate_ns=True): - return fix_ns(xpath, split=split, - propagate_ns=propagate_ns, - default_ns=self.namespace) - - def __eq__(self, other): - """Compare the stanza object with another to test for equality. - - Stanzas are equal if their interfaces return the same values, - and if they are both instances of ElementBase. - - :param ElementBase other: The stanza object to compare against. - """ - if not isinstance(other, ElementBase): - return False - - # Check that this stanza is a superset of the other stanza. - values = self.values - for key in other.keys(): - if key not in values or values[key] != other[key]: - return False - - # Check that the other stanza is a superset of this stanza. - values = other.values - for key in self.keys(): - if key not in values or values[key] != self[key]: - return False - - # Both stanzas are supersets of each other, therefore they - # must be equal. - return True - - def __ne__(self, other): - """Compare the stanza object with another to test for inequality. - - Stanzas are not equal if their interfaces return different values, - or if they are not both instances of ElementBase. - - :param ElementBase other: The stanza object to compare against. - """ - return not self.__eq__(other) - - def __bool__(self): - """Stanza objects should be treated as True in boolean contexts. - - Python 3.x version. - """ - return True - - def __nonzero__(self): - """Stanza objects should be treated as True in boolean contexts. - - Python 2.x version. - """ - return True - - def __len__(self): - """Return the number of iterable substanzas in this stanza.""" - return len(self.iterables) - - def __iter__(self): - """Return an iterator object for the stanza's substanzas. - - The iterator is the stanza object itself. Attempting to use two - iterators on the same stanza at the same time is discouraged. - """ - self._index = 0 - return self - - def __next__(self): - """Return the next iterable substanza.""" - self._index += 1 - if self._index > len(self.iterables): - self._index = 0 - raise StopIteration - return self.iterables[self._index - 1] - - def __copy__(self): - """Return a copy of the stanza object that does not share the same - underlying XML object. - """ - return self.__class__(xml=copy.deepcopy(self.xml), parent=self.parent) - - def __str__(self, top_level_ns=True): - """Return a string serialization of the underlying XML object. - - .. seealso:: :ref:`tostring` - - :param bool top_level_ns: Display the top-most namespace. - Defaults to True. - """ - return tostring(self.xml, xmlns='', - top_level=True) - - def __repr__(self): - """Use the stanza's serialized XML as its representation.""" - return self.__str__() - - -class StanzaBase(ElementBase): - - """ - StanzaBase provides the foundation for all other stanza objects used - by SleekXMPP, and defines a basic set of interfaces common to nearly - all stanzas. These interfaces are the ``'id'``, ``'type'``, ``'to'``, - and ``'from'`` attributes. An additional interface, ``'payload'``, is - available to access the XML contents of the stanza. Most stanza objects - will provided more specific interfaces, however. - - **Stanza Interfaces:** - - :id: An optional id value that can be used to associate stanzas - :to: A JID object representing the recipient's JID. - :from: A JID object representing the sender's JID. - with their replies. - :type: The type of stanza, typically will be ``'normal'``, - ``'error'``, ``'get'``, or ``'set'``, etc. - :payload: The XML contents of the stanza. - - :param XMLStream stream: Optional :class:`sleekxmpp.xmlstream.XMLStream` - object responsible for sending this stanza. - :param XML xml: Optional XML contents to initialize stanza values. - :param string stype: Optional stanza type value. - :param sto: Optional string or :class:`sleekxmpp.xmlstream.JID` - object of the recipient's JID. - :param sfrom: Optional string or :class:`sleekxmpp.xmlstream.JID` - object of the sender's JID. - :param string sid: Optional ID value for the stanza. - :param parent: Optionally specify a parent stanza object will - contain this substanza. - """ - - #: The default XMPP client namespace - namespace = 'jabber:client' - - #: There is a small set of attributes which apply to all XMPP stanzas: - #: the stanza type, the to and from JIDs, the stanza ID, and, especially - #: in the case of an Iq stanza, a payload. - interfaces = set(('type', 'to', 'from', 'id', 'payload')) - - #: A basic set of allowed values for the ``'type'`` interface. - types = set(('get', 'set', 'error', None, 'unavailable', 'normal', 'chat')) - - def __init__(self, stream=None, xml=None, stype=None, - sto=None, sfrom=None, sid=None, parent=None): - self.stream = stream - if stream is not None: - self.namespace = stream.default_ns - ElementBase.__init__(self, xml, parent) - if stype is not None: - self['type'] = stype - if sto is not None: - self['to'] = sto - if sfrom is not None: - self['from'] = sfrom - if sid is not None: - self['id'] = sid - self.tag = "{%s}%s" % (self.namespace, self.name) - - def set_type(self, value): - """Set the stanza's ``'type'`` attribute. - - Only type values contained in :attr:`types` are accepted. - - :param string value: One of the values contained in :attr:`types` - """ - if value in self.types: - self.xml.attrib['type'] = value - return self - - def get_to(self): - """Return the value of the stanza's ``'to'`` attribute.""" - return JID(self._get_attr('to')) - - def set_to(self, value): - """Set the ``'to'`` attribute of the stanza. - - :param value: A string or :class:`sleekxmpp.xmlstream.JID` object - representing the recipient's JID. - """ - return self._set_attr('to', str(value)) - - def get_from(self): - """Return the value of the stanza's ``'from'`` attribute.""" - return JID(self._get_attr('from')) - - def set_from(self, value): - """Set the 'from' attribute of the stanza. - - Arguments: - from -- A string or JID object representing the sender's JID. - """ - return self._set_attr('from', str(value)) - - def get_payload(self): - """Return a list of XML objects contained in the stanza.""" - return list(self.xml) - - def set_payload(self, value): - """Add XML content to the stanza. - - :param value: Either an XML or a stanza object, or a list - of XML or stanza objects. - """ - if not isinstance(value, list): - value = [value] - for val in value: - self.append(val) - return self - - def del_payload(self): - """Remove the XML contents of the stanza.""" - self.clear() - return self - - def reply(self, clear=True): - """Prepare the stanza for sending a reply. - - Swaps the ``'from'`` and ``'to'`` attributes. - - If ``clear=True``, then also remove the stanza's - contents to make room for the reply content. - - For client streams, the ``'from'`` attribute is removed. - - :param bool clear: Indicates if the stanza's contents should be - removed. Defaults to ``True``. - """ - # if it's a component, use from - if self.stream and hasattr(self.stream, "is_component") and \ - self.stream.is_component: - self['from'], self['to'] = self['to'], self['from'] - else: - self['to'] = self['from'] - del self['from'] - if clear: - self.clear() - return self - - def error(self): - """Set the stanza's type to ``'error'``.""" - self['type'] = 'error' - return self - - def unhandled(self): - """Called if no handlers have been registered to process this stanza. - - Meant to be overridden. - """ - pass - - def exception(self, e): - """Handle exceptions raised during stanza processing. - - Meant to be overridden. - """ - log.exception('Error handling {%s}%s stanza', self.namespace, - self.name) - - def send(self, now=False): - """Queue the stanza to be sent on the XML stream. - - :param bool now: Indicates if the queue should be skipped and the - stanza sent immediately. Useful for stream - initialization. Defaults to ``False``. - """ - self.stream.send(self, now=now) - - def __copy__(self): - """Return a copy of the stanza object that does not share the - same underlying XML object, but does share the same XML stream. - """ - return self.__class__(xml=copy.deepcopy(self.xml), - stream=self.stream) - - def __str__(self, top_level_ns=False): - """Serialize the stanza's XML to a string. - - :param bool top_level_ns: Display the top-most namespace. - Defaults to ``False``. - """ - xmlns = self.stream.default_ns if self.stream else '' - return tostring(self.xml, xmlns=xmlns, - stream=self.stream, - top_level=(self.stream is None)) - - -#: A JSON/dictionary version of the XML content exposed through -#: the stanza interfaces:: -#: -#: >>> msg = Message() -#: >>> msg.values -#: {'body': '', 'from': , 'mucnick': '', 'mucroom': '', -#: 'to': , 'type': 'normal', 'id': '', 'subject': ''} -#: -#: Likewise, assigning to the :attr:`values` will change the XML -#: content:: -#: -#: >>> msg = Message() -#: >>> msg.values = {'body': 'Hi!', 'to': 'user@example.com'} -#: >>> msg -#: '<message to="user@example.com"><body>Hi!</body></message>' -#: -#: Child stanzas are exposed as nested dictionaries. -ElementBase.values = property(ElementBase._get_stanza_values, - ElementBase._set_stanza_values) - - -# To comply with PEP8, method names now use underscores. -# Deprecated method names are re-mapped for backwards compatibility. -ElementBase.initPlugin = ElementBase.init_plugin -ElementBase._getAttr = ElementBase._get_attr -ElementBase._setAttr = ElementBase._set_attr -ElementBase._delAttr = ElementBase._del_attr -ElementBase._getSubText = ElementBase._get_sub_text -ElementBase._setSubText = ElementBase._set_sub_text -ElementBase._delSub = ElementBase._del_sub -ElementBase.getStanzaValues = ElementBase._get_stanza_values -ElementBase.setStanzaValues = ElementBase._set_stanza_values - -StanzaBase.setType = StanzaBase.set_type -StanzaBase.getTo = StanzaBase.get_to -StanzaBase.setTo = StanzaBase.set_to -StanzaBase.getFrom = StanzaBase.get_from -StanzaBase.setFrom = StanzaBase.set_from -StanzaBase.getPayload = StanzaBase.get_payload -StanzaBase.setPayload = StanzaBase.set_payload -StanzaBase.delPayload = StanzaBase.del_payload diff --git a/sleekxmpp/xmlstream/tostring.py b/sleekxmpp/xmlstream/tostring.py deleted file mode 100644 index c49abd3e..00000000 --- a/sleekxmpp/xmlstream/tostring.py +++ /dev/null @@ -1,172 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.xmlstream.tostring - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - This module converts XML objects into Unicode strings and - intelligently includes namespaces only when necessary to - keep the output readable. - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -from __future__ import unicode_literals - -import sys - -if sys.version_info < (3, 0): - import types - - -XML_NS = 'http://www.w3.org/XML/1998/namespace' - - -def tostring(xml=None, xmlns='', stream=None, outbuffer='', - top_level=False, open_only=False, namespaces=None): - """Serialize an XML object to a Unicode string. - - If an outer xmlns is provided using ``xmlns``, then the current element's - namespace will not be included if it matches the outer namespace. An - exception is made for elements that have an attached stream, and appear - at the stream root. - - :param XML xml: The XML object to serialize. - :param string xmlns: Optional namespace of an element wrapping the XML - object. - :param stream: The XML stream that generated the XML object. - :param string outbuffer: Optional buffer for storing serializations - during recursive calls. - :param bool top_level: Indicates that the element is the outermost - element. - :param set namespaces: Track which namespaces are in active use so - that new ones can be declared when needed. - - :type xml: :py:class:`~xml.etree.ElementTree.Element` - :type stream: :class:`~sleekxmpp.xmlstream.xmlstream.XMLStream` - - :rtype: Unicode string - """ - # Add previous results to the start of the output. - output = [outbuffer] - - # Extract the element's tag name. - tag_name = xml.tag.split('}', 1)[-1] - - # Extract the element's namespace if it is defined. - if '}' in xml.tag: - tag_xmlns = xml.tag.split('}', 1)[0][1:] - else: - tag_xmlns = '' - - default_ns = '' - stream_ns = '' - use_cdata = False - - if stream: - default_ns = stream.default_ns - stream_ns = stream.stream_ns - use_cdata = stream.use_cdata - - # Output the tag name and derived namespace of the element. - namespace = '' - if tag_xmlns: - if top_level and tag_xmlns not in [default_ns, xmlns, stream_ns] \ - or not top_level and tag_xmlns != xmlns: - namespace = ' xmlns="%s"' % tag_xmlns - if stream and tag_xmlns in stream.namespace_map: - mapped_namespace = stream.namespace_map[tag_xmlns] - if mapped_namespace: - tag_name = "%s:%s" % (mapped_namespace, tag_name) - output.append("<%s" % tag_name) - output.append(namespace) - - # Output escaped attribute values. - new_namespaces = set() - for attrib, value in xml.attrib.items(): - value = escape(value, use_cdata) - if '}' not in attrib: - output.append(' %s="%s"' % (attrib, value)) - else: - attrib_ns = attrib.split('}')[0][1:] - attrib = attrib.split('}')[1] - if attrib_ns == XML_NS: - output.append(' xml:%s="%s"' % (attrib, value)) - elif stream and attrib_ns in stream.namespace_map: - mapped_ns = stream.namespace_map[attrib_ns] - if mapped_ns: - if namespaces is None: - namespaces = set() - if attrib_ns not in namespaces: - namespaces.add(attrib_ns) - new_namespaces.add(attrib_ns) - output.append(' xmlns:%s="%s"' % ( - mapped_ns, attrib_ns)) - output.append(' %s:%s="%s"' % ( - mapped_ns, attrib, value)) - - if open_only: - # Only output the opening tag, regardless of content. - output.append(">") - return ''.join(output) - - if len(xml) or xml.text: - # If there are additional child elements to serialize. - output.append(">") - if xml.text: - output.append(escape(xml.text, use_cdata)) - if len(xml): - for child in xml: - output.append(tostring(child, tag_xmlns, stream, - namespaces=namespaces)) - output.append("</%s>" % tag_name) - elif xml.text: - # If we only have text content. - output.append(">%s</%s>" % (escape(xml.text, use_cdata), tag_name)) - else: - # Empty element. - output.append(" />") - if xml.tail: - # If there is additional text after the element. - output.append(escape(xml.tail, use_cdata)) - for ns in new_namespaces: - # Remove namespaces introduced in this context. This is necessary - # because the namespaces object continues to be shared with other - # contexts. - namespaces.remove(ns) - return ''.join(output) - - -def escape(text, use_cdata=False): - """Convert special characters in XML to escape sequences. - - :param string text: The XML text to convert. - :rtype: Unicode string - """ - if sys.version_info < (3, 0): - if type(text) != types.UnicodeType: - text = unicode(text, 'utf-8', 'ignore') - - escapes = {'&': '&', - '<': '<', - '>': '>', - "'": ''', - '"': '"'} - - if not use_cdata: - text = list(text) - for i, c in enumerate(text): - text[i] = escapes.get(c, c) - return ''.join(text) - else: - escape_needed = False - for c in text: - if c in escapes: - escape_needed = True - break - if escape_needed: - escaped = map(lambda x : "<![CDATA[%s]]>" % x, text.split("]]>")) - return "<![CDATA[]]]><![CDATA[]>]]>".join(escaped) - return text diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py deleted file mode 100644 index 62d46100..00000000 --- a/sleekxmpp/xmlstream/xmlstream.py +++ /dev/null @@ -1,1817 +0,0 @@ -""" - sleekxmpp.xmlstream.xmlstream - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - This module provides the module for creating and - interacting with generic XML streams, along with - the necessary eventing infrastructure. - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -from __future__ import with_statement, unicode_literals - -import base64 -import copy -import logging -import signal -import socket as Socket -import ssl -import sys -import threading -import time -import random -import weakref -import uuid -import errno - -from xml.parsers.expat import ExpatError - -import sleekxmpp -from sleekxmpp.util import Queue, QueueEmpty, safedict -from sleekxmpp.thirdparty.statemachine import StateMachine -from sleekxmpp.xmlstream import Scheduler, tostring, cert -from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET, ElementBase -from sleekxmpp.xmlstream.handler import Waiter, XMLCallback -from sleekxmpp.xmlstream.matcher import MatchXMLMask -from sleekxmpp.xmlstream.resolver import resolve, default_resolver - -# In Python 2.x, file socket objects are broken. A patched socket -# wrapper is provided for this case in filesocket.py. -if sys.version_info < (3, 0): - from sleekxmpp.xmlstream.filesocket import FileSocket, Socket26 - - -#: The time in seconds to wait before timing out waiting for response stanzas. -RESPONSE_TIMEOUT = 30 - -#: The time in seconds to wait for events from the event queue, and also the -#: time between checks for the process stop signal. -WAIT_TIMEOUT = 1.0 - -#: The number of threads to use to handle XML stream events. This is not the -#: same as the number of custom event handling threads. -#: :data:`HANDLER_THREADS` must be at least 1. For Python implementations -#: with a GIL, this should be left at 1, but for implemetnations without -#: a GIL increasing this value can provide better performance. -HANDLER_THREADS = 1 - -#: The time in seconds to delay between attempts to resend data -#: after an SSL error. -SSL_RETRY_DELAY = 0.5 - -#: The maximum number of times to attempt resending data due to -#: an SSL error. -SSL_RETRY_MAX = 10 - -#: Maximum time to delay between connection attempts is one hour. -RECONNECT_MAX_DELAY = 600 - -#: Maximum number of attempts to connect to the server before quitting -#: and raising a 'connect_failed' event. Setting this to ``None`` will -#: allow infinite reconnection attempts, and using ``0`` will disable -#: reconnections. Defaults to ``None``. -RECONNECT_MAX_ATTEMPTS = None - - -log = logging.getLogger(__name__) - - -class RestartStream(Exception): - """ - Exception to restart stream processing, including - resending the stream header. - """ - - -class XMLStream(object): - """ - An XML stream connection manager and event dispatcher. - - The XMLStream class abstracts away the issues of establishing a - connection with a server and sending and receiving XML "stanzas". - A stanza is a complete XML element that is a direct child of a root - document element. Two streams are used, one for each communication - direction, over the same socket. Once the connection is closed, both - streams should be complete and valid XML documents. - - Three types of events are provided to manage the stream: - :Stream: Triggered based on received stanzas, similar in concept - to events in a SAX XML parser. - :Custom: Triggered manually. - :Scheduled: Triggered based on time delays. - - Typically, stanzas are first processed by a stream event handler which - will then trigger custom events to continue further processing, - especially since custom event handlers may run in individual threads. - - :param socket: Use an existing socket for the stream. Defaults to - ``None`` to generate a new socket. - :param string host: The name of the target server. - :param int port: The port to use for the connection. Defaults to 0. - """ - - def __init__(self, socket=None, host='', port=0, certfile=None, - keyfile=None, ca_certs=None, **kwargs): - #: Most XMPP servers support TLSv1, but OpenFire in particular - #: does not work well with it. For OpenFire, set - #: :attr:`ssl_version` to use ``SSLv23``:: - #: - #: import ssl - #: xmpp.ssl_version = ssl.PROTOCOL_SSLv23 - self.ssl_version = ssl.PROTOCOL_TLSv1 - - #: The list of accepted ciphers, in OpenSSL Format. - #: It might be useful to override it for improved security - #: over the python defaults. - self.ciphers = None - - #: Path to a file containing certificates for verifying the - #: server SSL certificate. A non-``None`` value will trigger - #: certificate checking. - #: - #: .. note:: - #: - #: On Mac OS X, certificates in the system keyring will - #: be consulted, even if they are not in the provided file. - self.ca_certs = ca_certs - - #: Path to a file containing a client certificate to use for - #: authenticating via SASL EXTERNAL. If set, there must also - #: be a corresponding `:attr:keyfile` value. - self.certfile = certfile - - #: Path to a file containing the private key for the selected - #: client certificate to use for authenticating via SASL EXTERNAL. - self.keyfile = keyfile - - self._der_cert = None - - #: The time in seconds to wait for events from the event queue, - #: and also the time between checks for the process stop signal. - self.wait_timeout = WAIT_TIMEOUT - - #: The time in seconds to wait before timing out waiting - #: for response stanzas. - self.response_timeout = RESPONSE_TIMEOUT - - #: The current amount to time to delay attempting to reconnect. - #: This value doubles (with some jitter) with each failed - #: connection attempt up to :attr:`reconnect_max_delay` seconds. - self.reconnect_delay = None - - #: Maximum time to delay between connection attempts is one hour. - self.reconnect_max_delay = RECONNECT_MAX_DELAY - - #: Maximum number of attempts to connect to the server before - #: quitting and raising a 'connect_failed' event. Setting to - #: ``None`` allows infinite reattempts, while setting it to ``0`` - #: will disable reconnection attempts. Defaults to ``None``. - self.reconnect_max_attempts = RECONNECT_MAX_ATTEMPTS - - #: The time in seconds to delay between attempts to resend data - #: after an SSL error. - self.ssl_retry_max = SSL_RETRY_MAX - - #: The maximum number of times to attempt resending data due to - #: an SSL error. - self.ssl_retry_delay = SSL_RETRY_DELAY - - #: The connection state machine tracks if the stream is - #: ``'connected'`` or ``'disconnected'``. - self.state = StateMachine(('disconnected', 'connected')) - self.state._set_state('disconnected') - - #: The default port to return when querying DNS records. - self.default_port = int(port) - - #: The domain to try when querying DNS records. - self.default_domain = '' - - #: The expected name of the server, for validation. - self._expected_server_name = '' - self._service_name = '' - - #: The desired, or actual, address of the connected server. - self.address = (host, int(port)) - - #: A file-like wrapper for the socket for use with the - #: :mod:`~xml.etree.ElementTree` module. - self.filesocket = None - self.set_socket(socket) - - if sys.version_info < (3, 0): - self.socket_class = Socket26 - else: - self.socket_class = Socket.socket - - #: Enable connecting to the server directly over SSL, in - #: particular when the service provides two ports: one for - #: non-SSL traffic and another for SSL traffic. - self.use_ssl = False - - #: Enable connecting to the service without using SSL - #: immediately, but allow upgrading the connection later - #: to use SSL. - self.use_tls = False - - #: If set to ``True``, attempt to connect through an HTTP - #: proxy based on the settings in :attr:`proxy_config`. - self.use_proxy = False - - #: If set to ``True``, attempt to use IPv6. - self.use_ipv6 = True - - #: If set to ``True``, allow using the ``dnspython`` DNS library - #: if available. If set to ``False``, the builtin DNS resolver - #: will be used, even if ``dnspython`` is installed. - self.use_dnspython = True - - #: Use CDATA for escaping instead of XML entities. Defaults - #: to ``False``. - self.use_cdata = False - - #: An optional dictionary of proxy settings. It may provide: - #: :host: The host offering proxy services. - #: :port: The port for the proxy service. - #: :username: Optional username for accessing the proxy. - #: :password: Optional password for accessing the proxy. - self.proxy_config = {} - - #: The default namespace of the stream content, not of the - #: stream wrapper itself. - self.default_ns = '' - - self.default_lang = None - self.peer_default_lang = None - - #: The namespace of the enveloping stream element. - self.stream_ns = '' - - #: The default opening tag for the stream element. - self.stream_header = "<stream>" - - #: The default closing tag for the stream element. - self.stream_footer = "</stream>" - - #: If ``True``, periodically send a whitespace character over the - #: wire to keep the connection alive. Mainly useful for connections - #: traversing NAT. - self.whitespace_keepalive = True - - #: The default interval between keepalive signals when - #: :attr:`whitespace_keepalive` is enabled. - self.whitespace_keepalive_interval = 300 - - #: An :class:`~threading.Event` to signal that the application - #: is stopping, and that all threads should shutdown. - self.stop = threading.Event() - - #: An :class:`~threading.Event` to signal receiving a closing - #: stream tag from the server. - self.stream_end_event = threading.Event() - self.stream_end_event.set() - - #: An :class:`~threading.Event` to signal the start of a stream - #: session. Until this event fires, the send queue is not used - #: and data is sent immediately over the wire. - self.session_started_event = threading.Event() - - #: The default time in seconds to wait for a session to start - #: after connecting before reconnecting and trying again. - self.session_timeout = 45 - - #: Flag for controlling if the session can be considered ended - #: if the connection is terminated. - self.end_session_on_disconnect = True - - #: A queue of stream, custom, and scheduled events to be processed. - self.event_queue = Queue() - - #: A queue of string data to be sent over the stream. - self.send_queue = Queue(maxsize=256) - self.send_queue_lock = threading.Lock() - self.send_lock = threading.RLock() - - #: A :class:`~sleekxmpp.xmlstream.scheduler.Scheduler` instance for - #: executing callbacks in the future based on time delays. - self.scheduler = Scheduler(self.stop) - self.__failed_send_stanza = None - - #: A mapping of XML namespaces to well-known prefixes. - self.namespace_map = {StanzaBase.xml_ns: 'xml'} - - self.__thread = {} - self.__root_stanza = [] - self.__handlers = [] - self.__event_handlers = {} - self.__event_handlers_lock = threading.Lock() - self.__filters = {'in': [], 'out': [], 'out_sync': []} - self.__thread_count = 0 - self.__thread_cond = threading.Condition() - self.__active_threads = set() - self._use_daemons = False - self._disconnect_wait_for_threads = True - - self._id = 0 - self._id_lock = threading.Lock() - - #: We use an ID prefix to ensure that all ID values are unique. - self._id_prefix = '%s-' % uuid.uuid4() - - #: The :attr:`auto_reconnnect` setting controls whether or not - #: the stream will be restarted in the event of an error. - self.auto_reconnect = True - - #: The :attr:`disconnect_wait` setting is the default value - #: for controlling if the system waits for the send queue to - #: empty before ending the stream. This may be overridden by - #: passing ``wait=True`` or ``wait=False`` to :meth:`disconnect`. - #: The default :attr:`disconnect_wait` value is ``False``. - self.disconnect_wait = False - - #: A list of DNS results that have not yet been tried. - self.dns_answers = [] - - #: The service name to check with DNS SRV records. For - #: example, setting this to ``'xmpp-client'`` would query the - #: ``_xmpp-client._tcp`` service. - self.dns_service = None - - self.add_event_handler('connected', self._session_timeout_check) - self.add_event_handler('disconnected', self._remove_schedules) - self.add_event_handler('session_start', self._start_keepalive) - self.add_event_handler('session_start', self._cert_expiration) - - def use_signals(self, signals=None): - """Register signal handlers for ``SIGHUP`` and ``SIGTERM``. - - By using signals, a ``'killed'`` event will be raised when the - application is terminated. - - If a signal handler already existed, it will be executed first, - before the ``'killed'`` event is raised. - - :param list signals: A list of signal names to be monitored. - Defaults to ``['SIGHUP', 'SIGTERM']``. - """ - if signals is None: - signals = ['SIGHUP', 'SIGTERM'] - - existing_handlers = {} - for sig_name in signals: - if hasattr(signal, sig_name): - sig = getattr(signal, sig_name) - handler = signal.getsignal(sig) - if handler: - existing_handlers[sig] = handler - - def handle_kill(signum, frame): - """ - Capture kill event and disconnect cleanly after first - spawning the ``'killed'`` event. - """ - - if signum in existing_handlers and \ - existing_handlers[signum] != handle_kill: - existing_handlers[signum](signum, frame) - - self.event("killed", direct=True) - self.disconnect() - - try: - for sig_name in signals: - if hasattr(signal, sig_name): - sig = getattr(signal, sig_name) - signal.signal(sig, handle_kill) - self.__signals_installed = True - except: - log.debug("Can not set interrupt signal handlers. " + \ - "SleekXMPP is not running from a main thread.") - - def new_id(self): - """Generate and return a new stream ID in hexadecimal form. - - Many stanzas, handlers, or matchers may require unique - ID values. Using this method ensures that all new ID values - are unique in this stream. - """ - with self._id_lock: - self._id += 1 - return self.get_id() - - def get_id(self): - """Return the current unique stream ID in hexadecimal form.""" - return "%s%X" % (self._id_prefix, self._id) - - def connect(self, host='', port=0, use_ssl=False, - use_tls=True, reattempt=True): - """Create a new socket and connect to the server. - - Setting ``reattempt`` to ``True`` will cause connection - attempts to be made with an exponential backoff delay (max of - :attr:`reconnect_max_delay` which defaults to 10 minute) until a - successful connection is established. - - :param host: The name of the desired server for the connection. - :param port: Port to connect to on the server. - :param use_ssl: Flag indicating if SSL should be used by connecting - directly to a port using SSL. - :param use_tls: Flag indicating if TLS should be used, allowing for - connecting to a port without using SSL immediately and - later upgrading the connection. - :param reattempt: Flag indicating if the socket should reconnect - after disconnections. - """ - self.stop.clear() - - if host and port: - self.address = (host, int(port)) - try: - Socket.inet_aton(self.address[0]) - except (Socket.error, ssl.SSLError): - self.default_domain = self.address[0] - - # Respect previous SSL and TLS usage directives. - if use_ssl is not None: - self.use_ssl = use_ssl - if use_tls is not None: - self.use_tls = use_tls - - # Repeatedly attempt to connect until a successful connection - # is established. - attempts = self.reconnect_max_attempts - connected = self.state.transition('disconnected', 'connected', - func=self._connect, - args=(reattempt,)) - while reattempt and not connected and not self.stop.is_set(): - connected = self.state.transition('disconnected', 'connected', - func=self._connect) - if not connected: - if attempts is not None: - attempts -= 1 - if attempts <= 0: - self.event('connection_failed', direct=True) - return False - return connected - - def _connect(self, reattempt=True): - self.scheduler.remove('Session timeout check') - - if self.reconnect_delay is None: - delay = 1.0 - self.reconnect_delay = delay - - if reattempt: - delay = min(self.reconnect_delay * 2, self.reconnect_max_delay) - delay = random.normalvariate(delay, delay * 0.1) - log.debug('Waiting %s seconds before connecting.', delay) - elapsed = 0 - try: - while elapsed < delay and not self.stop.is_set(): - time.sleep(0.1) - elapsed += 0.1 - except KeyboardInterrupt: - self.set_stop() - return False - except SystemExit: - self.set_stop() - return False - - if self.default_domain: - try: - host, address, port = self.pick_dns_answer(self.default_domain, - self.address[1]) - self.address = (address, port) - self._service_name = host - except StopIteration: - log.debug("No remaining DNS records to try.") - self.dns_answers = None - if reattempt: - self.reconnect_delay = delay - return False - - af = Socket.AF_INET - proto = 'IPv4' - if ':' in self.address[0]: - af = Socket.AF_INET6 - proto = 'IPv6' - try: - self.socket = self.socket_class(af, Socket.SOCK_STREAM) - except Socket.error: - log.debug("Could not connect using %s", proto) - return False - - self.configure_socket() - - if self.use_proxy: - connected = self._connect_proxy() - if not connected: - if reattempt: - self.reconnect_delay = delay - return False - - if self.use_ssl: - log.debug("Socket Wrapped for SSL") - if self.ca_certs is None: - cert_policy = ssl.CERT_NONE - else: - cert_policy = ssl.CERT_REQUIRED - - ssl_args = safedict({ - 'certfile': self.certfile, - 'keyfile': self.keyfile, - 'ca_certs': self.ca_certs, - 'cert_reqs': cert_policy, - 'do_handshake_on_connect': False, - "ssl_version": self.ssl_version - }) - - if sys.version_info >= (2, 7): - ssl_args['ciphers'] = self.ciphers - - ssl_socket = ssl.wrap_socket(self.socket, **ssl_args) - - if hasattr(self.socket, 'socket'): - # We are using a testing socket, so preserve the top - # layer of wrapping. - self.socket.socket = ssl_socket - else: - self.socket = ssl_socket - - try: - if not self.use_proxy: - domain = self.address[0] - if ':' in domain: - domain = '[%s]' % domain - log.debug("Connecting to %s:%s", domain, self.address[1]) - self.socket.connect(self.address) - - if self.use_ssl: - try: - self.socket.do_handshake() - except (Socket.error, ssl.SSLError): - log.error('CERT: Invalid certificate trust chain.') - if not self.event_handled('ssl_invalid_chain'): - self.disconnect(self.auto_reconnect, - send_close=False) - else: - self.event('ssl_invalid_chain', direct=True) - return False - - self._der_cert = self.socket.getpeercert(binary_form=True) - pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert) - log.debug('CERT: %s', pem_cert) - - self.event('ssl_cert', pem_cert, direct=True) - try: - cert.verify(self._expected_server_name, self._der_cert) - except cert.CertificateError as err: - if not self.event_handled('ssl_invalid_cert'): - log.error(err) - self.disconnect(send_close=False) - else: - self.event('ssl_invalid_cert', - pem_cert, - direct=True) - - self.set_socket(self.socket, ignore=True) - #this event is where you should set your application state - self.event('connected', direct=True) - return True - except (Socket.error, ssl.SSLError) as serr: - error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" - self.event('socket_error', serr, direct=True) - domain = self.address[0] - if ':' in domain: - domain = '[%s]' % domain - log.error(error_msg, domain, self.address[1], - serr.errno, serr.strerror) - return False - - def _connect_proxy(self): - """Attempt to connect using an HTTP Proxy.""" - - # Extract the proxy address, and optional credentials - address = (self.proxy_config['host'], int(self.proxy_config['port'])) - cred = None - if self.proxy_config['username']: - username = self.proxy_config['username'] - password = self.proxy_config['password'] - - cred = '%s:%s' % (username, password) - if sys.version_info < (3, 0): - cred = bytes(cred) - else: - cred = bytes(cred, 'utf-8') - cred = base64.b64encode(cred).decode('utf-8') - - # Build the HTTP headers for connecting to the XMPP server - headers = ['CONNECT %s:%s HTTP/1.0' % self.address, - 'Host: %s:%s' % self.address, - 'Proxy-Connection: Keep-Alive', - 'Pragma: no-cache', - 'User-Agent: SleekXMPP/%s' % sleekxmpp.__version__] - if cred: - headers.append('Proxy-Authorization: Basic %s' % cred) - headers = '\r\n'.join(headers) + '\r\n\r\n' - - try: - log.debug("Connecting to proxy: %s:%s", *address) - self.socket.connect(address) - self.send_raw(headers, now=True) - resp = '' - while '\r\n\r\n' not in resp and not self.stop.is_set(): - resp += self.socket.recv(1024).decode('utf-8') - log.debug('RECV: %s', resp) - - lines = resp.split('\r\n') - if '200' not in lines[0]: - self.event('proxy_error', resp) - self.event('connection_failed', direct=True) - log.error('Proxy Error: %s', lines[0]) - return False - - # Proxy connection established, continue connecting - # with the XMPP server. - return True - except (Socket.error, ssl.SSLError) as serr: - error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" - self.event('socket_error', serr, direct=True) - log.error(error_msg, self.address[0], self.address[1], - serr.errno, serr.strerror) - return False - - def _session_timeout_check(self, event=None): - """ - Add check to ensure that a session is established within - a reasonable amount of time. - """ - - def _handle_session_timeout(): - if not self.session_started_event.is_set(): - log.debug("Session start has taken more " + \ - "than %d seconds", self.session_timeout) - self.disconnect(reconnect=self.auto_reconnect) - - self.schedule("Session timeout check", - self.session_timeout, - _handle_session_timeout) - - def disconnect(self, reconnect=False, wait=None, send_close=True): - """Terminate processing and close the XML streams. - - Optionally, the connection may be reconnected and - resume processing afterwards. - - If the disconnect should take place after all items - in the send queue have been sent, use ``wait=True``. - - .. warning:: - - If you are constantly adding items to the queue - such that it is never empty, then the disconnect will - not occur and the call will continue to block. - - :param reconnect: Flag indicating if the connection - and processing should be restarted. - Defaults to ``False``. - :param wait: Flag indicating if the send queue should - be emptied before disconnecting, overriding - :attr:`disconnect_wait`. - :param send_close: Flag indicating if the stream footer - should be sent before terminating the - connection. Setting this to ``False`` - prevents error loops when trying to - disconnect after a socket error. - """ - self.state.transition('connected', 'disconnected', - wait=2.0, - func=self._disconnect, - args=(reconnect, wait, send_close)) - - def _disconnect(self, reconnect=False, wait=None, send_close=True): - if not reconnect: - self.auto_reconnect = False - - if self.end_session_on_disconnect or send_close: - self.event('session_end', direct=True) - - # Wait for the send queue to empty. - if wait is not None: - if wait: - self.send_queue.join() - elif self.disconnect_wait: - self.send_queue.join() - - # Clearing this event will pause the send loop. - self.session_started_event.clear() - - self.__failed_send_stanza = None - - # Send the end of stream marker. - if send_close: - self.send_raw(self.stream_footer, now=True) - - # Wait for confirmation that the stream was - # closed in the other direction. If we didn't - # send a stream footer we don't need to wait - # since the server won't know to respond. - if send_close: - log.info('Waiting for %s from server', self.stream_footer) - self.stream_end_event.wait(4) - else: - self.stream_end_event.set() - - if not self.auto_reconnect: - self.set_stop() - if self._disconnect_wait_for_threads: - self._wait_for_threads() - - try: - self.socket.shutdown(Socket.SHUT_RDWR) - self.socket.close() - self.filesocket.close() - except (Socket.error, ssl.SSLError) as serr: - self.event('socket_error', serr, direct=True) - finally: - #clear your application state - self.event('disconnected', direct=True) - return True - - def abort(self): - self.session_started_event.clear() - self.set_stop() - if self._disconnect_wait_for_threads: - self._wait_for_threads() - try: - self.socket.shutdown(Socket.SHUT_RDWR) - self.socket.close() - self.filesocket.close() - except Socket.error: - pass - self.state.transition_any(['connected', 'disconnected'], 'disconnected', func=lambda: True) - self.event("killed", direct=True) - - def reconnect(self, reattempt=True, wait=False, send_close=True): - """Reset the stream's state and reconnect to the server.""" - log.debug("reconnecting...") - if self.state.ensure('connected'): - self.state.transition('connected', 'disconnected', - wait=2.0, - func=self._disconnect, - args=(True, wait, send_close)) - - attempts = self.reconnect_max_attempts - - log.debug("connecting...") - connected = self.state.transition('disconnected', 'connected', - wait=2.0, - func=self._connect, - args=(reattempt,)) - while reattempt and not connected and not self.stop.is_set(): - connected = self.state.transition('disconnected', 'connected', - wait=2.0, func=self._connect) - connected = connected or self.state.ensure('connected') - if not connected: - if attempts is not None: - attempts -= 1 - if attempts <= 0: - self.event('connection_failed', direct=True) - return False - return connected - - def set_socket(self, socket, ignore=False): - """Set the socket to use for the stream. - - The filesocket will be recreated as well. - - :param socket: The new socket object to use. - :param bool ignore: If ``True``, don't set the connection - state to ``'connected'``. - """ - self.socket = socket - if socket is not None: - # ElementTree.iterparse requires a file. - # 0 buffer files have to be binary. - - # Use the correct fileobject type based on the Python - # version to work around a broken implementation in - # Python 2.x. - if sys.version_info < (3, 0): - self.filesocket = FileSocket(self.socket) - else: - self.filesocket = self.socket.makefile('rb', 0) - if not ignore: - self.state._set_state('connected') - - def configure_socket(self): - """Set timeout and other options for self.socket. - - Meant to be overridden. - """ - self.socket.settimeout(None) - - def configure_dns(self, resolver, domain=None, port=None): - """ - Configure and set options for a :class:`~dns.resolver.Resolver` - instance, and other DNS related tasks. For example, you - can also check :meth:`~socket.socket.getaddrinfo` to see - if you need to call out to ``libresolv.so.2`` to - run ``res_init()``. - - Meant to be overridden. - - :param resolver: A :class:`~dns.resolver.Resolver` instance - or ``None`` if ``dnspython`` is not installed. - :param domain: The initial domain under consideration. - :param port: The initial port under consideration. - """ - pass - - def start_tls(self): - """Perform handshakes for TLS. - - If the handshake is successful, the XML stream will need - to be restarted. - """ - log.info("Negotiating TLS") - ssl_versions = {3: 'TLS 1.0', 1: 'SSL 3', 2: 'SSL 2/3'} - log.info("Using SSL version: %s", ssl_versions[self.ssl_version]) - if self.ca_certs is None: - cert_policy = ssl.CERT_NONE - else: - cert_policy = ssl.CERT_REQUIRED - - ssl_args = safedict({ - 'certfile': self.certfile, - 'keyfile': self.keyfile, - 'ca_certs': self.ca_certs, - 'cert_reqs': cert_policy, - 'do_handshake_on_connect': False, - "ssl_version": self.ssl_version - }) - - if sys.version_info >= (2, 7): - ssl_args['ciphers'] = self.ciphers - - ssl_socket = ssl.wrap_socket(self.socket, **ssl_args) - - if hasattr(self.socket, 'socket'): - # We are using a testing socket, so preserve the top - # layer of wrapping. - self.socket.socket = ssl_socket - else: - self.socket = ssl_socket - - try: - self.socket.do_handshake() - except (Socket.error, ssl.SSLError): - log.error('CERT: Invalid certificate trust chain.') - if not self.event_handled('ssl_invalid_chain'): - self.disconnect(self.auto_reconnect, send_close=False) - else: - self._der_cert = self.socket.getpeercert(binary_form=True) - self.event('ssl_invalid_chain', direct=True) - return False - - self._der_cert = self.socket.getpeercert(binary_form=True) - pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert) - log.debug('CERT: %s', pem_cert) - self.event('ssl_cert', pem_cert, direct=True) - - try: - cert.verify(self._expected_server_name, self._der_cert) - except cert.CertificateError as err: - if not self.event_handled('ssl_invalid_cert'): - log.error(err) - self.disconnect(self.auto_reconnect, send_close=False) - else: - self.event('ssl_invalid_cert', pem_cert, direct=True) - - self.set_socket(self.socket) - return True - - def _cert_expiration(self, event): - """Schedule an event for when the TLS certificate expires.""" - - if not self.use_tls and not self.use_ssl: - return - - if not self._der_cert: - log.warn("TLS or SSL was enabled, but no certificate was found.") - return - - def restart(): - if not self.event_handled('ssl_expired_cert'): - log.warn("The server certificate has expired. Restarting.") - self.reconnect() - else: - pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert) - self.event('ssl_expired_cert', pem_cert) - - cert_ttl = cert.get_ttl(self._der_cert) - if cert_ttl is None: - return - - if cert_ttl.days < 0: - log.warn('CERT: Certificate has expired.') - restart() - - try: - total_seconds = cert_ttl.total_seconds() - except AttributeError: - # for Python < 2.7 - total_seconds = (cert_ttl.microseconds + (cert_ttl.seconds + cert_ttl.days * 24 * 3600) * 10**6) / 10**6 - - log.info('CERT: Time until certificate expiration: %s' % cert_ttl) - self.schedule('Certificate Expiration', - total_seconds, - restart) - - def _start_keepalive(self, event): - """Begin sending whitespace periodically to keep the connection alive. - - May be disabled by setting:: - - self.whitespace_keepalive = False - - The keepalive interval can be set using:: - - self.whitespace_keepalive_interval = 300 - """ - if self.whitespace_keepalive: - self.schedule('Whitespace Keepalive', - self.whitespace_keepalive_interval, - self.send_raw, - args=(' ',), - kwargs={'now': True}, - repeat=True) - - def _remove_schedules(self, event): - """Remove whitespace keepalive and certificate expiration schedules.""" - self.scheduler.remove('Whitespace Keepalive') - self.scheduler.remove('Certificate Expiration') - - def start_stream_handler(self, xml): - """Perform any initialization actions, such as handshakes, - once the stream header has been sent. - - Meant to be overridden. - """ - pass - - def register_stanza(self, stanza_class): - """Add a stanza object class as a known root stanza. - - A root stanza is one that appears as a direct child of the stream's - root element. - - Stanzas that appear as substanzas of a root stanza do not need to - be registered here. That is done using register_stanza_plugin() from - sleekxmpp.xmlstream.stanzabase. - - Stanzas that are not registered will not be converted into - stanza objects, but may still be processed using handlers and - matchers. - - :param stanza_class: The top-level stanza object's class. - """ - self.__root_stanza.append(stanza_class) - - def remove_stanza(self, stanza_class): - """Remove a stanza from being a known root stanza. - - A root stanza is one that appears as a direct child of the stream's - root element. - - Stanzas that are not registered will not be converted into - stanza objects, but may still be processed using handlers and - matchers. - """ - self.__root_stanza.remove(stanza_class) - - def add_filter(self, mode, handler, order=None): - """Add a filter for incoming or outgoing stanzas. - - These filters are applied before incoming stanzas are - passed to any handlers, and before outgoing stanzas - are put in the send queue. - - Each filter must accept a single stanza, and return - either a stanza or ``None``. If the filter returns - ``None``, then the stanza will be dropped from being - processed for events or from being sent. - - :param mode: One of ``'in'`` or ``'out'``. - :param handler: The filter function. - :param int order: The position to insert the filter in - the list of active filters. - """ - if order: - self.__filters[mode].insert(order, handler) - else: - self.__filters[mode].append(handler) - - def del_filter(self, mode, handler): - """Remove an incoming or outgoing filter.""" - self.__filters[mode].remove(handler) - - def add_handler(self, mask, pointer, name=None, disposable=False, - threaded=False, filter=False, instream=False): - """A shortcut method for registering a handler using XML masks. - - The use of :meth:`register_handler()` is preferred. - - :param mask: An XML snippet matching the structure of the - stanzas that will be passed to this handler. - :param pointer: The handler function itself. - :parm name: A unique name for the handler. A name will - be generated if one is not provided. - :param disposable: Indicates if the handler should be discarded - after one use. - :param threaded: **DEPRECATED**. - Remains for backwards compatibility. - :param filter: **DEPRECATED**. - Remains for backwards compatibility. - :param instream: Indicates if the handler should execute during - stream processing and not during normal event - processing. - """ - # To prevent circular dependencies, we must load the matcher - # and handler classes here. - - if name is None: - name = 'add_handler_%s' % self.new_id() - self.register_handler( - XMLCallback(name, - MatchXMLMask(mask, self.default_ns), - pointer, - once=disposable, - instream=instream)) - - def register_handler(self, handler, before=None, after=None): - """Add a stream event handler that will be executed when a matching - stanza is received. - - :param handler: - The :class:`~sleekxmpp.xmlstream.handler.base.BaseHandler` - derived object to execute. - """ - if handler.stream is None: - self.__handlers.append(handler) - handler.stream = weakref.ref(self) - - def remove_handler(self, name): - """Remove any stream event handlers with the given name. - - :param name: The name of the handler. - """ - idx = 0 - for handler in self.__handlers: - if handler.name == name: - self.__handlers.pop(idx) - return True - idx += 1 - return False - - def get_dns_records(self, domain, port=None): - """Get the DNS records for a domain. - - :param domain: The domain in question. - :param port: If the results don't include a port, use this one. - """ - if port is None: - port = self.default_port - - resolver = default_resolver() - self.configure_dns(resolver, domain=domain, port=port) - - return resolve(domain, port, service=self.dns_service, - resolver=resolver, - use_ipv6=self.use_ipv6, - use_dnspython=self.use_dnspython) - - def pick_dns_answer(self, domain, port=None): - """Pick a server and port from DNS answers. - - Gets DNS answers if none available. - Removes used answer from available answers. - - :param domain: The domain in question. - :param port: If the results don't include a port, use this one. - """ - if not self.dns_answers: - self.dns_answers = self.get_dns_records(domain, port) - - if sys.version_info < (3, 0): - return self.dns_answers.next() - else: - return next(self.dns_answers) - - def add_event_handler(self, name, pointer, - threaded=False, disposable=False): - """Add a custom event handler that will be executed whenever - its event is manually triggered. - - :param name: The name of the event that will trigger - this handler. - :param pointer: The function to execute. - :param threaded: If set to ``True``, the handler will execute - in its own thread. Defaults to ``False``. - :param disposable: If set to ``True``, the handler will be - discarded after one use. Defaults to ``False``. - """ - if not name in self.__event_handlers: - self.__event_handlers[name] = [] - self.__event_handlers[name].append((pointer, threaded, disposable)) - - def del_event_handler(self, name, pointer): - """Remove a function as a handler for an event. - - :param name: The name of the event. - :param pointer: The function to remove as a handler. - """ - if not name in self.__event_handlers: - return - - # Need to keep handlers that do not use - # the given function pointer - def filter_pointers(handler): - return handler[0] != pointer - - self.__event_handlers[name] = list(filter( - filter_pointers, - self.__event_handlers[name])) - - def event_handled(self, name): - """Returns the number of registered handlers for an event. - - :param name: The name of the event to check. - """ - return len(self.__event_handlers.get(name, [])) - - def event(self, name, data=None, direct=False): - """Manually trigger a custom event. - - :param name: The name of the event to trigger. - :param data: Data that will be passed to each event handler. - Defaults to an empty dictionary, but is usually - a stanza object. - :param direct: Runs the event directly if True, skipping the - event queue. All event handlers will run in the - same thread. - """ - if not data: - data = {} - - log.debug("Event triggered: " + name) - - handlers = self.__event_handlers.get(name, []) - for handler in handlers: - #TODO: Data should not be copied, but should be read only, - # but this might break current code so it's left for future. - - out_data = copy.copy(data) if len(handlers) > 1 else data - old_exception = getattr(data, 'exception', None) - if direct: - try: - handler[0](out_data) - except Exception as e: - error_msg = 'Error processing event handler: %s' - log.exception(error_msg, str(handler[0])) - if old_exception: - old_exception(e) - else: - self.exception(e) - else: - self.event_queue.put(('event', handler, out_data)) - if handler[2]: - # If the handler is disposable, we will go ahead and - # remove it now instead of waiting for it to be - # processed in the queue. - with self.__event_handlers_lock: - try: - h_index = self.__event_handlers[name].index(handler) - self.__event_handlers[name].pop(h_index) - except: - pass - - def schedule(self, name, seconds, callback, args=None, - kwargs=None, repeat=False): - """Schedule a callback function to execute after a given delay. - - :param name: A unique name for the scheduled callback. - :param seconds: The time in seconds to wait before executing. - :param callback: A pointer to the function to execute. - :param args: A tuple of arguments to pass to the function. - :param kwargs: A dictionary of keyword arguments to pass to - the function. - :param repeat: Flag indicating if the scheduled event should - be reset and repeat after executing. - """ - self.scheduler.add(name, seconds, callback, args, kwargs, - repeat, qpointer=self.event_queue) - - def incoming_filter(self, xml): - """Filter incoming XML objects before they are processed. - - Possible uses include remapping namespaces, or correcting elements - from sources with incorrect behavior. - - Meant to be overridden. - """ - return xml - - def send(self, data, mask=None, timeout=None, now=False, use_filters=True): - """A wrapper for :meth:`send_raw()` for sending stanza objects. - - May optionally block until an expected response is received. - - :param data: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` - stanza to send on the stream. - :param mask: **DEPRECATED** - An XML string snippet matching the structure - of the expected response. Execution will block - in this thread until the response is received - or a timeout occurs. - :param int timeout: Time in seconds to wait for a response before - continuing. Defaults to :attr:`response_timeout`. - :param bool now: Indicates if the send queue should be skipped, - sending the stanza immediately. Useful mainly - for stream initialization stanzas. - Defaults to ``False``. - :param bool use_filters: Indicates if outgoing filters should be - applied to the given stanza data. Disabling - filters is useful when resending stanzas. - Defaults to ``True``. - """ - if timeout is None: - timeout = self.response_timeout - if hasattr(mask, 'xml'): - mask = mask.xml - - if isinstance(data, ElementBase): - if use_filters: - for filter in self.__filters['out']: - data = filter(data) - if data is None: - return - - if mask is not None: - log.warning("Use of send mask waiters is deprecated.") - wait_for = Waiter("SendWait_%s" % self.new_id(), - MatchXMLMask(mask)) - self.register_handler(wait_for) - - if isinstance(data, ElementBase): - with self.send_queue_lock: - if use_filters: - for filter in self.__filters['out_sync']: - data = filter(data) - if data is None: - return - str_data = tostring(data.xml, xmlns=self.default_ns, - stream=self, - top_level=True) - self.send_raw(str_data, now) - else: - self.send_raw(data, now) - if mask is not None: - return wait_for.wait(timeout) - - def send_xml(self, data, mask=None, timeout=None, now=False): - """Send an XML object on the stream, and optionally wait - for a response. - - :param data: The :class:`~xml.etree.ElementTree.Element` XML object - to send on the stream. - :param mask: **DEPRECATED** - An XML string snippet matching the structure - of the expected response. Execution will block - in this thread until the response is received - or a timeout occurs. - :param int timeout: Time in seconds to wait for a response before - continuing. Defaults to :attr:`response_timeout`. - :param bool now: Indicates if the send queue should be skipped, - sending the stanza immediately. Useful mainly - for stream initialization stanzas. - Defaults to ``False``. - """ - if timeout is None: - timeout = self.response_timeout - return self.send(tostring(data), mask, timeout, now) - - def send_raw(self, data, now=False, reconnect=None): - """Send raw data across the stream. - - :param string data: Any string value. - :param bool reconnect: Indicates if the stream should be - restarted if there is an error sending - the stanza. Used mainly for testing. - Defaults to :attr:`auto_reconnect`. - """ - if now: - log.debug("SEND (IMMED): %s", data) - try: - data = data.encode('utf-8') - total = len(data) - sent = 0 - count = 0 - tries = 0 - with self.send_lock: - while sent < total and not self.stop.is_set(): - try: - sent += self.socket.send(data[sent:]) - count += 1 - except ssl.SSLError as serr: - if tries >= self.ssl_retry_max: - log.debug('SSL error: max retries reached') - self.exception(serr) - log.warning("Failed to send %s", data) - if reconnect is None: - reconnect = self.auto_reconnect - if not self.stop.is_set(): - self.disconnect(reconnect, - send_close=False) - log.warning('SSL write error: retrying') - if not self.stop.is_set(): - time.sleep(self.ssl_retry_delay) - tries += 1 - except Socket.error as serr: - if serr.errno != errno.EINTR: - raise - if count > 1: - log.debug('SENT: %d chunks', count) - except (Socket.error, ssl.SSLError) as serr: - self.event('socket_error', serr, direct=True) - log.warning("Failed to send %s", data) - if reconnect is None: - reconnect = self.auto_reconnect - if not self.stop.is_set(): - self.disconnect(reconnect, send_close=False) - else: - self.send_queue.put(data) - return True - - def _start_thread(self, name, target, track=True): - self.__thread[name] = threading.Thread(name=name, target=target) - self.__thread[name].daemon = self._use_daemons - self.__thread[name].start() - - if track: - self.__active_threads.add(name) - with self.__thread_cond: - self.__thread_count += 1 - - def _end_thread(self, name, early=False): - with self.__thread_cond: - curr_thread = threading.current_thread().name - if curr_thread in self.__active_threads: - self.__thread_count -= 1 - self.__active_threads.remove(curr_thread) - - if early: - log.debug('Threading deadlock prevention!') - log.debug(("Marked %s thread as ended due to " + \ - "disconnect() call. %s threads remain.") % ( - name, self.__thread_count)) - else: - log.debug("Stopped %s thread. %s threads remain." % ( - name, self.__thread_count)) - - else: - log.debug(("Finished exiting %s thread after early " + \ - "termination from disconnect() call. " + \ - "%s threads remain.") % ( - name, self.__thread_count)) - - if self.__thread_count == 0: - self.__thread_cond.notify() - - def set_stop(self): - self.stop.set() - - # Unlock queues - self.event_queue.put(None) - self.send_queue.put(None) - - def _wait_for_threads(self): - with self.__thread_cond: - if self.__thread_count != 0: - log.debug("Waiting for %s threads to exit." % - self.__thread_count) - name = threading.current_thread().name - if name in self.__thread: - self._end_thread(name, early=True) - self.__thread_cond.wait(4) - if self.__thread_count != 0: - log.error("Hanged threads: %s" % threading.enumerate()) - log.error("This may be due to calling disconnect() " + \ - "from a non-threaded event handler. Be " + \ - "sure that event handlers that call " + \ - "disconnect() are registered using: " + \ - "add_event_handler(..., threaded=True)") - - def process(self, **kwargs): - """Initialize the XML streams and begin processing events. - - The number of threads used for processing stream events is determined - by :data:`HANDLER_THREADS`. - - :param bool block: If ``False``, then event dispatcher will run - in a separate thread, allowing for the stream to be - used in the background for another application. - Otherwise, ``process(block=True)`` blocks the current - thread. Defaults to ``False``. - :param bool threaded: **DEPRECATED** - If ``True``, then event dispatcher will run - in a separate thread, allowing for the stream to be - used in the background for another application. - Defaults to ``True``. This does **not** mean that no - threads are used at all if ``threaded=False``. - - Regardless of these threading options, these threads will - always exist: - - - The event queue processor - - The send queue processor - - The scheduler - """ - if 'threaded' in kwargs and 'block' in kwargs: - raise ValueError("process() called with both " + \ - "block and threaded arguments") - elif 'block' in kwargs: - threaded = not(kwargs.get('block', False)) - else: - threaded = kwargs.get('threaded', True) - - for t in range(0, HANDLER_THREADS): - log.debug("Starting HANDLER THREAD") - self._start_thread('event_thread_%s' % t, self._event_runner) - - self._start_thread('send_thread', self._send_thread) - self._start_thread('scheduler_thread', self._scheduler_thread) - - if threaded: - # Run the XML stream in the background for another application. - self._start_thread('read_thread', self._process, track=False) - else: - self._process() - - def _process(self): - """Start processing the XML streams. - - Processing will continue after any recoverable errors - if reconnections are allowed. - """ - - # The body of this loop will only execute once per connection. - # Additional passes will be made only if an error occurs and - # reconnecting is permitted. - while True: - shutdown = False - try: - # The call to self.__read_xml will block and prevent - # the body of the loop from running until a disconnect - # occurs. After any reconnection, the stream header will - # be resent and processing will resume. - while not self.stop.is_set(): - # Only process the stream while connected to the server - if not self.state.ensure('connected', wait=0.1): - break - # Ensure the stream header is sent for any - # new connections. - if not self.session_started_event.is_set(): - self.send_raw(self.stream_header, now=True) - if not self.__read_xml(): - # If the server terminated the stream, end processing - break - except KeyboardInterrupt: - log.debug("Keyboard Escape Detected in _process") - self.event('killed', direct=True) - shutdown = True - except SystemExit: - log.debug("SystemExit in _process") - shutdown = True - except (SyntaxError, ExpatError) as e: - log.error("Error reading from XML stream.") - self.exception(e) - except (Socket.error, ssl.SSLError) as serr: - self.event('socket_error', serr, direct=True) - log.error('Socket Error #%s: %s', serr.errno, serr.strerror) - except ValueError as e: - msg = e.message if hasattr(e, 'message') else e.args[0] - - if 'I/O operation on closed file' in msg: - log.error('Can not read from closed socket.') - else: - self.exception(e) - except Exception as e: - if not self.stop.is_set(): - log.error('Connection error.') - self.exception(e) - - if not shutdown and not self.stop.is_set() \ - and self.auto_reconnect: - self.reconnect() - else: - self.disconnect() - break - - def __read_xml(self): - """Parse the incoming XML stream - - Stream events are raised for each received stanza. - """ - depth = 0 - root = None - for event, xml in ET.iterparse(self.filesocket, (b'end', b'start')): - if event == b'start': - if depth == 0: - # We have received the start of the root element. - root = xml - log.debug('RECV: %s', tostring(root, xmlns=self.default_ns, - stream=self, - top_level=True, - open_only=True)) - # Perform any stream initialization actions, such - # as handshakes. - self.stream_end_event.clear() - self.start_stream_handler(root) - - # We have a successful stream connection, so reset - # exponential backoff for new reconnect attempts. - self.reconnect_delay = 1.0 - depth += 1 - if event == b'end': - depth -= 1 - if depth == 0: - # The stream's root element has closed, - # terminating the stream. - log.debug("End of stream recieved") - self.stream_end_event.set() - return False - elif depth == 1: - # We only raise events for stanzas that are direct - # children of the root element. - try: - self.__spawn_event(xml) - except RestartStream: - return True - if root is not None: - # Keep the root element empty of children to - # save on memory use. - root.clear() - log.debug("Ending read XML loop") - - def _build_stanza(self, xml, default_ns=None): - """Create a stanza object from a given XML object. - - If a specialized stanza type is not found for the XML, then - a generic :class:`~sleekxmpp.xmlstream.stanzabase.StanzaBase` - stanza will be returned. - - :param xml: The :class:`~xml.etree.ElementTree.Element` XML object - to convert into a stanza object. - :param default_ns: Optional default namespace to use instead of the - stream's current default namespace. - """ - if default_ns is None: - default_ns = self.default_ns - stanza_type = StanzaBase - for stanza_class in self.__root_stanza: - if xml.tag == "{%s}%s" % (default_ns, stanza_class.name) or \ - xml.tag == stanza_class.tag_name(): - stanza_type = stanza_class - break - stanza = stanza_type(self, xml) - if stanza['lang'] is None and self.peer_default_lang: - stanza['lang'] = self.peer_default_lang - return stanza - - def __spawn_event(self, xml): - """ - Analyze incoming XML stanzas and convert them into stanza - objects if applicable and queue stream events to be processed - by matching handlers. - - :param xml: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` - stanza to analyze. - """ - # Apply any preprocessing filters. - xml = self.incoming_filter(xml) - - # Convert the raw XML object into a stanza object. If no registered - # stanza type applies, a generic StanzaBase stanza will be used. - stanza = self._build_stanza(xml) - - for filter in self.__filters['in']: - if stanza is not None: - stanza = filter(stanza) - if stanza is None: - return - - log.debug("RECV: %s", stanza) - - # Match the stanza against registered handlers. Handlers marked - # to run "in stream" will be executed immediately; the rest will - # be queued. - unhandled = True - matched_handlers = [h for h in self.__handlers if h.match(stanza)] - for handler in matched_handlers: - if len(matched_handlers) > 1: - stanza_copy = copy.copy(stanza) - else: - stanza_copy = stanza - handler.prerun(stanza_copy) - self.event_queue.put(('stanza', handler, stanza_copy)) - try: - if handler.check_delete(): - self.__handlers.remove(handler) - except: - pass # not thread safe - unhandled = False - - # Some stanzas require responses, such as Iq queries. A default - # handler will be executed immediately for this case. - if unhandled: - stanza.unhandled() - - def _threaded_event_wrapper(self, func, args): - """Capture exceptions for event handlers that run - in individual threads. - - :param func: The event handler to execute. - :param args: Arguments to the event handler. - """ - # this is always already copied before this is invoked - orig = args[0] - try: - func(*args) - except Exception as e: - error_msg = 'Error processing event handler: %s' - log.exception(error_msg, str(func)) - if hasattr(orig, 'exception'): - orig.exception(e) - else: - self.exception(e) - - def _event_runner(self): - """Process the event queue and execute handlers. - - The number of event runner threads is controlled by HANDLER_THREADS. - - Stream event handlers will all execute in this thread. Custom event - handlers may be spawned in individual threads. - """ - log.debug("Loading event runner") - try: - while not self.stop.is_set(): - event = self.event_queue.get() - if event is None: - continue - - etype, handler = event[0:2] - args = event[2:] - orig = copy.copy(args[0]) - - if etype == 'stanza': - try: - handler.run(args[0]) - except Exception as e: - error_msg = 'Error processing stream handler: %s' - log.exception(error_msg, handler.name) - orig.exception(e) - elif etype == 'schedule': - name = args[2] - try: - log.debug('Scheduled event: %s: %s', name, args[0]) - handler(*args[0], **args[1]) - except Exception as e: - log.exception('Error processing scheduled task') - self.exception(e) - elif etype == 'event': - func, threaded, disposable = handler - try: - if threaded: - x = threading.Thread( - name="Event_%s" % str(func), - target=self._threaded_event_wrapper, - args=(func, args)) - x.daemon = self._use_daemons - x.start() - else: - func(*args) - except Exception as e: - error_msg = 'Error processing event handler: %s' - log.exception(error_msg, str(func)) - if hasattr(orig, 'exception'): - orig.exception(e) - else: - self.exception(e) - elif etype == 'quit': - log.debug("Quitting event runner thread") - break - except KeyboardInterrupt: - log.debug("Keyboard Escape Detected in _event_runner") - self.event('killed', direct=True) - self.disconnect() - except SystemExit: - self.disconnect() - self.event_queue.put(('quit', None, None)) - - self._end_thread('event runner') - - def _send_thread(self): - """Extract stanzas from the send queue and send them on the stream.""" - try: - while not self.stop.is_set(): - while not self.stop.is_set() and \ - not self.session_started_event.is_set(): - self.session_started_event.wait(timeout=0.1) # Wait for session start - if self.__failed_send_stanza is not None: - data = self.__failed_send_stanza - self.__failed_send_stanza = None - else: - data = self.send_queue.get() # Wait for data to send - if data is None: - continue - log.debug("SEND: %s", data) - enc_data = data.encode('utf-8') - total = len(enc_data) - sent = 0 - count = 0 - tries = 0 - try: - with self.send_lock: - while sent < total and not self.stop.is_set() and \ - self.session_started_event.is_set(): - try: - sent += self.socket.send(enc_data[sent:]) - count += 1 - except ssl.SSLError as serr: - if tries >= self.ssl_retry_max: - log.debug('SSL error: max retries reached') - self.exception(serr) - log.warning("Failed to send %s", data) - if not self.stop.is_set(): - self.disconnect(self.auto_reconnect, - send_close=False) - log.warning('SSL write error: retrying') - if not self.stop.is_set(): - time.sleep(self.ssl_retry_delay) - tries += 1 - except Socket.error as serr: - if serr.errno != errno.EINTR: - raise - if count > 1: - log.debug('SENT: %d chunks', count) - self.send_queue.task_done() - except (Socket.error, ssl.SSLError) as serr: - self.event('socket_error', serr, direct=True) - log.warning("Failed to send %s", data) - if not self.stop.is_set(): - self.__failed_send_stanza = data - self._end_thread('send') - self.disconnect(self.auto_reconnect, send_close=False) - return - except Exception as ex: - log.exception('Unexpected error in send thread: %s', ex) - self.exception(ex) - if not self.stop.is_set(): - self._end_thread('send') - self.disconnect(self.auto_reconnect) - return - - self._end_thread('send') - - def _scheduler_thread(self): - self.scheduler.process(threaded=False) - self._end_thread('scheduler') - - def exception(self, exception): - """Process an unknown exception. - - Meant to be overridden. - - :param exception: An unhandled exception object. - """ - pass - - -# To comply with PEP8, method names now use underscores. -# Deprecated method names are re-mapped for backwards compatibility. -XMLStream.startTLS = XMLStream.start_tls -XMLStream.registerStanza = XMLStream.register_stanza -XMLStream.removeStanza = XMLStream.remove_stanza -XMLStream.registerHandler = XMLStream.register_handler -XMLStream.removeHandler = XMLStream.remove_handler -XMLStream.setSocket = XMLStream.set_socket -XMLStream.sendRaw = XMLStream.send_raw -XMLStream.getId = XMLStream.get_id -XMLStream.getNewId = XMLStream.new_id -XMLStream.sendXML = XMLStream.send_xml |