summaryrefslogtreecommitdiff
path: root/src/fixes.py
blob: cb992e881181049c0a2c3eaf1fda4777f6a05896 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""
Module used to provide fixes for sleekxmpp functions not yet fixed
upstream.

TODO: Check that they are fixed and remove those hacks
"""

from sleekxmpp.stanza import Message
from sleekxmpp.xmlstream import ET

import logging

# used to avoid doing numerous useless disco#info requests
# especially with message receipts
IQ_ERRORS = set()

log = logging.getLogger(__name__)

def has_identity(xmpp, jid, identity):
    try:
        iq = xmpp.plugin['xep_0030'].get_info(jid=jid, block=True, timeout=1)
        ident = lambda x: x[0]
        return identity in map(ident, iq['disco_info']['identities'])
    except:
        log.debug('Traceback while retrieving identity', exc_info=True)
    return False

def get_version(xmpp, jid, callback=None, **kwargs):
    def handle_result(res):
        if res and res['type'] != 'error':
            ret = res['software_version'].values
        else:
            ret = False
        if callback:
            callback(ret)
        return ret
    iq = xmpp.make_iq_get(ito=jid)
    iq['query'] = 'jabber:iq:version'
    result = iq.send(callback=handle_result if callback else None)
    if not callback:
        return handle_result(result)


def get_room_form(xmpp, room):
    iq = xmpp.make_iq_get(ito=room)
    query = ET.Element('{http://jabber.org/protocol/muc#owner}query')
    iq.append(query)
    try:
        result = iq.send()
    except:
        return False
    xform = result.xml.find('{http://jabber.org/protocol/muc#owner}query/{jabber:x:data}x')
    if xform is None:
        return False
    form = xmpp.plugin['xep_0004'].buildForm(xform)
    return form


def _filter_add_receipt_request(self, stanza):
    """
    Auto add receipt requests to outgoing messages, if:

        - ``self.auto_request`` is set to ``True``
        - The message is not for groupchat
        - The message does not contain a receipt acknowledgment
        - The recipient is a bare JID or, if a full JID, one
          that has the ``urn:xmpp:receipts`` feature enabled
        - The message has a body

    The disco cache is checked if a full JID is specified in
    the outgoing message, which may mean a round-trip disco#info
    delay for the first message sent to the JID if entity caps
    are not used.
    """

    if not self.auto_request:
        return stanza

    if not isinstance(stanza, Message):
        return stanza

    if stanza['request_receipt']:
        return stanza

    if not stanza['type'] in self.ack_types:
        return stanza

    if stanza['receipt']:
        return stanza

    if not stanza['body']:
        return stanza

    if stanza['to'].resource:
        if not self.xmpp['xep_0030'].supports(stanza['to'],
                feature='urn:xmpp:receipts',
                cached=True):
            return stanza

    stanza['request_receipt'] = True
    return stanza

def xep_30_supports(self, jid, node, ifrom, data):
    """
    Check if a JID supports a given feature.

    The data parameter may provide:
        feature  -- The feature to check for support.
        local    -- If true, then the query is for a JID/node
                    combination handled by this Sleek instance and
                    no stanzas need to be sent.
                    Otherwise, a disco stanza must be sent to the
                    remove JID to retrieve the info.
        cached   -- If true, then look for the disco info data from
                    the local cache system. If no results are found,
                    send the query as usual. The self.use_cache
                    setting must be set to true for this option to
                    be useful. If set to false, then the cache will
                    be skipped, even if a result has already been
                    cached. Defaults to false.
    """
    feature = data.get('feature', None)

    data = {'local': data.get('local', False),
            'cached': data.get('cached', True)}

    if not feature or jid.full in IQ_ERRORS:
        return False

    try:
        info = self.disco.get_info(jid=jid, node=node,
                                   ifrom=ifrom, **data)
        info = self.disco._wrap(ifrom, jid, info, True)
        features = info['disco_info']['features']
        return feature in features
    except:
        IQ_ERRORS.add(jid.full)
        log.debug('%s added to the list of entities that do'
                  'not honor disco#info', jid.full)
        return False

def xep_115_supports(self, jid, node, ifrom, data):
    """
    Check if a JID supports a given feature.

    The data parameter may provide:
        feature  -- The feature to check for support.
        local    -- If true, then the query is for a JID/node
                    combination handled by this Sleek instance and
                    no stanzas need to be sent.
                    Otherwise, a disco stanza must be sent to the
                    remove JID to retrieve the info.
        cached   -- If true, then look for the disco info data from
                    the local cache system. If no results are found,
                    send the query as usual. The self.use_cache
                    setting must be set to true for this option to
                    be useful. If set to false, then the cache will
                    be skipped, even if a result has already been
                    cached. Defaults to false.
    """
    feature = data.get('feature', None)

    data = {'local': data.get('local', False),
            'cached': data.get('cached', True)}

    if not feature or jid.full in IQ_ERRORS:
        return False

    if node in (None, ''):
        info = self.caps.get_caps(jid)
        if info and feature in info['features']:
            return True

    try:
        info = self.disco.get_info(jid=jid, node=node,
                                   ifrom=ifrom, **data)
        info = self.disco._wrap(ifrom, jid, info, True)
        return feature in info['disco_info']['features']
    except:
        IQ_ERRORS.add(jid.full)
        log.debug('%s added to the list of entities that do'
                  'not honor disco#info', jid.full)
        return False

def reset_iq_errors():
    "reset the iq error cache"
    IQ_ERRORS.clear()