summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream/matcher/xmlmask.py
blob: 2967a2af9e6fbf6ef7e7cd28f6e0518721dee94a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""
    SleekXMPP: The Sleek XMPP Library
    Copyright (C) 2010  Nathanael C. Fritz
    This file is part of SleekXMPP.

    See the file LICENSE for copying permission.
"""

from xml.parsers.expat import ExpatError

from sleekxmpp.xmlstream.stanzabase import ET
from sleekxmpp.xmlstream.matcher.base import MatcherBase


# Flag indicating if the builtin XPath matcher should be used, which
# uses namespaces, or a custom matcher that ignores namespaces.
# Changing this will affect ALL XMLMask matchers.
IGNORE_NS = False


class MatchXMLMask(MatcherBase):

    """
    The XMLMask matcher selects stanzas whose XML matches a given
    XML pattern, or mask. For example, message stanzas with body elements
    could be matched using the mask:

        <message xmlns="jabber:client"><body /></message>

    Use of XMLMask is discouraged, and XPath or StanzaPath should be used
    instead.

    The use of namespaces in the mask comparison is controlled by
    IGNORE_NS. Setting IGNORE_NS to True will disable namespace based matching
    for ALL XMLMask matchers.

    Methods:
        match        -- Overrides MatcherBase.match.
        setDefaultNS -- Set the default namespace for the mask.
    """

    def __init__(self, criteria):
        """
        Create a new XMLMask matcher.

        Arguments:
            criteria -- Either an XML object or XML string to use as a mask.
        """
        MatcherBase.__init__(self, criteria)
        if isinstance(criteria, str):
            self._criteria = ET.fromstring(self._criteria)
        self.default_ns = 'jabber:client'

    def setDefaultNS(self, ns):
        """
        Set the default namespace to use during comparisons.

        Arguments:
            ns -- The new namespace to use as the default.
        """
        self.default_ns = ns

    def match(self, xml):
        """
        Compare a stanza object or XML object against the stored XML mask.

        Overrides MatcherBase.match.

        Arguments:
            xml -- The stanza object or XML object to compare against.
        """
        if hasattr(xml, 'xml'):
            xml = xml.xml
        return self._mask_cmp(xml, self._criteria, True)

    def _mask_cmp(self, source, mask, use_ns=False, default_ns='__no_ns__'):
        """
        Compare an XML object against an XML mask.

        Arguments:
            source     -- The XML object to compare against the mask.
            mask       -- The XML object serving as the mask.
            use_ns     -- Indicates if namespaces should be respected during
                          the comparison.
            default_ns -- The default namespace to apply to elements that
                          do not have a specified namespace.
                          Defaults to "__no_ns__".
        """
        use_ns = not IGNORE_NS

        if source is None:
            # If the element was not found. May happend during recursive calls.
            return False

        # Convert the mask to an XML object if it is a string.
        if not hasattr(mask, 'attrib'):
            try:
                mask = ET.fromstring(mask)
            except ExpatError:
                logging.log(logging.WARNING,
                            "Expat error: %s\nIn parsing: %s" % ('', mask))

        if not use_ns:
            # Compare the element without using namespaces.
            source_tag = source.tag.split('}', 1)[-1]
            mask_tag = mask.tag.split('}', 1)[-1]
            if source_tag != mask_tag:
                return False
        else:
            # Compare the element using namespaces
            mask_ns_tag = "{%s}%s" % (self.default_ns, mask.tag)
            if source.tag not in [mask.tag, mask_ns_tag]:
                return False

        # If the mask includes text, compare it.
        if mask.text and source.text != mask.text:
            return False

        # Compare attributes. The stanza must include the attributes
        # defined by the mask, but may include others.
        for name, value in mask.attrib.items():
            if source.attrib.get(name, "__None__") != value:
                return False

        # Recursively check subelements.
        for subelement in mask:
            if use_ns:
                if not self._mask_cmp(source.find(subelement.tag),
                                      subelement, use_ns):
                    return False
            else:
                if not self._mask_cmp(self._get_child(source, subelement.tag),
                                      subelement, use_ns):
                    return False

        # Everything matches.
        return True

    def _get_child(self, xml, tag):
        """
        Return a child element given its tag, ignoring namespace values.

        Returns None if the child was not found.

        Arguments:
            xml -- The XML object to search for the given child tag.
            tag -- The name of the subelement to find.
        """
        tag = tag.split('}')[-1]
        try:
            children = [c.tag.split('}')[-1] for c in xml.getchildren()]
            index = children.index(tag)
        except ValueError:
            return None
        return xml.getchildren()[index]