summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0030/static.py
blob: 6863830fff8fb02e20d48509f726c7c7ccde300c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
# Slixmpp: The Slick XMPP Library
# Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
from __future__ import annotations

import logging

from typing import (
    Optional,
    Any,
    Dict,
    Tuple,
    TYPE_CHECKING,
    Union,
    Collection,
)

from slixmpp import BaseXMPP, JID
from slixmpp.stanza import Iq
from slixmpp.types import TypedDict, OptJidStr, OptJid
from slixmpp.exceptions import XMPPError, IqError, IqTimeout
from slixmpp.plugins.xep_0030 import DiscoInfo, DiscoItems


log = logging.getLogger(__name__)

if TYPE_CHECKING:
    from slixmpp.plugins.xep_0030 import XEP_0030


class NodeType(TypedDict):
    info: DiscoInfo
    items: DiscoItems


NodesType = Dict[
    Tuple[str, str, str],
    NodeType
]


class StaticDisco:

    """
    While components will likely require fully dynamic handling
    of service discovery information, most clients and simple bots
    only need to manage a few disco nodes that will remain mostly
    static.

    StaticDisco provides a set of node handlers that will store
    static sets of disco info and items in memory.

    :var nodes: A dictionary mapping (JID, node) tuples to a dict
                containing a disco#info and a disco#items stanza.
    :var xmpp: The main Slixmpp object.
    :var disco: The instance of the XEP-0030 plugin.
    """

    def __init__(self, xmpp: 'BaseXMPP', disco: 'XEP_0030'):
        """
        Create a static disco interface. Sets of disco#info and
        disco#items are maintained for every given JID and node
        combination. These stanzas are used to store disco
        information in memory without any additional processing.

        :param xmpp: The main Slixmpp object.
        :param disco: The XEP-0030 plugin.
        """
        self.nodes: NodesType = {}
        self.xmpp: BaseXMPP = xmpp
        self.disco: 'XEP_0030' = disco

    def add_node(self, jid: OptJidStr = None, node: Optional[str] = None,
                 ifrom: OptJidStr = None) -> NodeType:
        if jid is None:
            node_jid = self.xmpp.boundjid.full
        elif isinstance(jid, JID):
            node_jid = jid.full
        if ifrom is None:
            node_ifrom = ''
        elif isinstance(ifrom, JID):
            node_ifrom = ifrom.full
        else:
            node_ifrom = ifrom
        if node is None:
            node = ''
        if (node_jid, node, node_ifrom) not in self.nodes:
            node_dict: NodeType = {
                'info': DiscoInfo(),
                'items': DiscoItems(),
            }
            node_dict['info']['node'] = node
            node_dict['items']['node'] = node
            self.nodes[(node_jid, node, node_ifrom)] = node_dict
        return self.nodes[(node_jid, node, node_ifrom)]

    def get_node(self, jid: OptJidStr = None, node: Optional[str] = None,
                 ifrom: OptJidStr = None) -> NodeType:
        if jid is None:
            node_jid = self.xmpp.boundjid.full
        elif isinstance(jid, JID):
            node_jid = jid.full
        else:
            node_jid = jid
        if node is None:
            node = ''
        if ifrom is None:
            node_ifrom = ''
        elif isinstance(ifrom, JID):
            node_ifrom = ifrom.full
        else:
            node_ifrom = ifrom
        if (node_jid, node, node_ifrom) not in self.nodes:
            self.add_node(node_jid, node, node_ifrom)
        return self.nodes[(node_jid, node, node_ifrom)]

    def node_exists(self, jid: OptJidStr = None, node: Optional[str] = None,
                    ifrom: OptJidStr = None) -> bool:
        if jid is None:
            node_jid = self.xmpp.boundjid.full
        elif isinstance(jid, JID):
            node_jid = jid.full
        else:
            node_jid = jid
        if node is None:
            node = ''
        if ifrom is None:
            node_ifrom = ''
        elif isinstance(ifrom, JID):
            node_ifrom = ifrom.full
        else:
            node_ifrom = ifrom
        return (node_jid, node, node_ifrom) in self.nodes

    # =================================================================
    # Node Handlers
    #
    # Each handler accepts four arguments: jid, node, ifrom, and data.
    # The jid and node parameters together determine the set of info
    # and items stanzas that will be retrieved or added. Additionally,
    # the ifrom value allows for cached results when results vary based
    # on the requester's JID. The data parameter is a dictionary with
    # additional parameters that will be passed to other calls.
    #
    # This implementation does not allow different responses based on
    # the requester's JID, except for cached results. To do that,
    # register a custom node handler.

    async def supports(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                       data: Any) -> Optional[bool]:
        """
        Check if a JID supports a given feature.

        The data parameter may provide:

        :param feature: The feature to check for support.
        :param local:   If true, then the query is for a JID/node
                        combination handled by this Slixmpp instance and
                        no stanzas need to be sent.
                        Otherwise, a disco stanza must be sent to the
                        remove JID to retrieve the info.
        :param cached:  If true, then look for the disco info data from
                        the local cache system. If no results are found,
                        send the query as usual. The self.use_cache
                        setting must be set to true for this option to
                        be useful. If set to false, then the cache will
                        be skipped, even if a result has already been
                        cached. Defaults to false.
        """
        feature = data.get('feature', None)

        data = {'local': data.get('local', False),
                'cached': data.get('cached', True)}

        if not feature:
            return False

        try:
            info = await self.disco.get_info(jid=jid, node=node,
                                             ifrom=ifrom, **data)
            info = self.disco._wrap(ifrom, jid, info, True)
            features = info['disco_info']['features']
            return feature in features
        except IqError:
            return False
        except IqTimeout:
            return None

    async def has_identity(self, jid: OptJid, node: Optional[str],
                           ifrom: OptJid, data: Dict[str, Any]
                           ) -> Optional[bool]:
        """
        Check if a JID has a given identity.

        The data parameter may provide:

        :param category: The category of the identity to check.
        :param itype:    The type of the identity to check.
        :param lang:     The language of the identity to check.
        :param local:    If true, then the query is for a JID/node
                         combination handled by this Slixmpp instance and
                         no stanzas need to be sent.
                         Otherwise, a disco stanza must be sent to the
                         remove JID to retrieve the info.
        :param cached:   If true, then look for the disco info data from
                         the local cache system. If no results are found,
                         send the query as usual. The self.use_cache
                         setting must be set to true for this option to
                         be useful. If set to false, then the cache will
                         be skipped, even if a result has already been
                         cached. Defaults to false.
        """
        identity = (data.get('category', None),
                    data.get('itype', None),
                    data.get('lang', None))

        data = {'local': data.get('local', False),
                'cached': data.get('cached', True)}

        try:
            info = await self.disco.get_info(jid=jid, node=node,
                                             ifrom=ifrom, **data)
            info = self.disco._wrap(ifrom, jid, info, True)

            def trunc(i):
                return (i[0], i[1], i[2])
            return identity in map(trunc, info['disco_info']['identities'])
        except IqError:
            return False
        except IqTimeout:
            return None

    def get_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                 data: Any) -> Optional[DiscoInfo]:
        """
        Return the stored info data for the requested JID/node combination.

        The data parameter is not used.
        """
        if not self.node_exists(jid, node):
            if not node:
                return DiscoInfo()
            else:
                raise XMPPError(condition='item-not-found')
        else:
            return self.get_node(jid, node)['info']

    def set_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                 data: DiscoInfo):
        """
        Set the entire info stanza for a JID/node at once.

        The data parameter is a disco#info substanza.
        """
        new_node = self.add_node(jid, node)
        new_node['info'] = data

    def del_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                 data: Any):
        """
        Reset the info stanza for a given JID/node combination.

        The data parameter is not used.
        """
        if self.node_exists(jid, node):
            self.get_node(jid, node)['info'] = DiscoInfo()

    def get_items(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                  data: Any) -> Optional[DiscoItems]:
        """
        Return the stored items data for the requested JID/node combination.

        The data parameter is not used.
        """
        if not self.node_exists(jid, node):
            if not node:
                return DiscoItems()
            else:
                raise XMPPError(condition='item-not-found')
        else:
            return self.get_node(jid, node)['items']

    def set_items(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                  data: Dict[str, Collection[Tuple]]):
        """
        Replace the stored items data for a JID/node combination.

        The data parameter may provide:
            items -- A set of items in tuple format.
        """
        items = data.get('items', set())
        new_node = self.add_node(jid, node)
        new_node['items']['items'] = items

    def del_items(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                  data: Any):
        """
        Reset the items stanza for a given JID/node combination.

        The data parameter is not used.
        """
        if self.node_exists(jid, node):
            self.get_node(jid, node)['items'] = DiscoItems()

    def add_identity(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                     data: Dict[str, Optional[str]]):
        """
        Add a new identity to the JID/node combination.

        The data parameter may provide:

        :param category: The general category to which the agent belongs.
        :param itype: A more specific designation with the category.
        :param name: Optional human readable name for this identity.
        :param lang: Optional standard xml:lang value.
        """
        new_node = self.add_node(jid, node)
        new_node['info'].add_identity(
                data.get('category', ''),
                data.get('itype', ''),
                data.get('name', None),
                data.get('lang', None))

    def set_identities(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                       data: Dict[str, Collection[str]]):
        """
        Add or replace all identities for a JID/node combination.

        The data parameter should include:

        :param identities: A list of identities in tuple form:
                           (category, type, name, lang)
        """
        identities = data.get('identities', set())
        new_node = self.add_node(jid, node)
        new_node['info']['identities'] = identities

    def del_identity(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                     data: Dict[str, Optional[str]]):
        """
        Remove an identity from a JID/node combination.

        The data parameter may provide:

        :param category: The general category to which the agent belonged.
        :param itype: A more specific designation with the category.
        :param name: Optional human readable name for this identity.
        :param lang: Optional, standard xml:lang value.
        """
        if self.node_exists(jid, node):
            self.get_node(jid, node)['info'].del_identity(
                    data.get('category', ''),
                    data.get('itype', ''),
                    data.get('name', None),
                    data.get('lang', None))

    def del_identities(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                       data: Any):
        """
        Remove all identities from a JID/node combination.

        The data parameter is not used.
        """
        if self.node_exists(jid, node):
            del self.get_node(jid, node)['info']['identities']

    def add_feature(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                    data: Dict[str, str]):
        """
        Add a feature to a JID/node combination.

        The data parameter should include:

        :param feature: The namespace of the supported feature.
        """
        new_node = self.add_node(jid, node)
        new_node['info'].add_feature(
                data.get('feature', ''))

    def set_features(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                     data: Dict[str, Collection[str]]):
        """
        Add or replace all features for a JID/node combination.

        The data parameter should include:

        :param features: The new set of supported features.
        """
        features = data.get('features', set())
        new_node = self.add_node(jid, node)
        new_node['info']['features'] = features

    def del_feature(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                    data: Dict[str, str]):
        """
        Remove a feature from a JID/node combination.

        The data parameter should include:

        :param feature: The namespace of the removed feature.
        """
        if self.node_exists(jid, node):
            self.get_node(jid, node)['info'].del_feature(
                    data.get('feature', ''))

    def del_features(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                     data: Any):
        """
        Remove all features from a JID/node combination.

        The data parameter is not used.
        """
        if not self.node_exists(jid, node):
            return
        del self.get_node(jid, node)['info']['features']

    def add_item(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                 data: Dict[str, str]):
        """
        Add an item to a JID/node combination.

        The data parameter may include:

        :param ijid: The JID for the item.
        :param inode: Optional additional information to reference
                      non-addressable items.
        :param name: Optional human readable name for the item.
        """
        new_node = self.add_node(jid, node)
        new_node['items'].add_item(
                data.get('ijid', ''),
                node=data.get('inode', ''),
                name=data.get('name', ''))

    def del_item(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                 data: Dict[str, str]):
        """
        Remove an item from a JID/node combination.

        The data parameter may include:

        :param ijid: JID of the item to remove.
        :param inode: Optional extra identifying information.
        """
        if self.node_exists(jid, node):
            self.get_node(jid, node)['items'].del_item(
                    data.get('ijid', ''),
                    node=data.get('inode', None))

    def cache_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                   data: Union[Iq, DiscoInfo]):
        """
        Cache disco information for an external JID.

        The data parameter is the Iq result stanza
        containing the disco info to cache, or
        the disco#info substanza itself.
        """
        if isinstance(data, Iq):
            info = data['disco_info']
        else:
            info = data

        new_node = self.add_node(jid, node, ifrom)
        new_node['info'] = info

    def get_cached_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
                        data: Any) -> Optional[DiscoInfo]:
        """
        Retrieve cached disco info data.

        The data parameter is not used.
        """
        if not self.node_exists(jid, node, ifrom):
            return None
        return self.get_node(jid, node, ifrom)['info']