summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0356/privilege.py
blob: a0bdb6a7de4af9dbb23e79fdb47c05c5b92e49e3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import logging
import typing

from slixmpp import Message, JID, Iq
from slixmpp.plugins.base import BasePlugin
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream import register_stanza_plugin

from slixmpp.plugins.xep_0356 import stanza, Privilege, Perm


log = logging.getLogger(__name__)


class XEP_0356(BasePlugin):
    """
    XEP-0356: Privileged Entity

    Events:

    ::

        privileges_advertised  -- Received message/privilege from the server
    """

    name = "xep_0356"
    description = "XEP-0356: Privileged Entity"
    dependencies = {"xep_0297"}
    stanza = stanza

    granted_privileges = {"roster": "none", "message": "none", "presence": "none"}

    def plugin_init(self):
        if not self.xmpp.is_component:
            log.error("XEP 0356 is only available for components")
            return

        stanza.register()

        self.xmpp.register_handler(
            Callback(
                "Privileges",
                StanzaPath("message/privilege"),
                self._handle_privilege,
            )
        )

    def plugin_end(self):
        self.xmpp.remove_handler("Privileges")

    def _handle_privilege(self, msg: Message):
        """
        Called when the XMPP server advertise the component's privileges.

        Stores the privileges in this instance's granted_privileges attribute (a dict)
        and raises the privileges_advertised event
        """
        for perm in msg["privilege"]["perms"]:
            self.granted_privileges[perm["access"]] = perm["type"]
        log.debug(f"Privileges: {self.granted_privileges}")
        self.xmpp.event("privileges_advertised")

    def send_privileged_message(self, msg: Message):
        if self.granted_privileges["message"] == "outgoing":
            self._make_privileged_message(msg).send()
        else:
            log.error(
                "The server hasn't authorized us to send messages on behalf of other users"
            )

    def _make_privileged_message(self, msg: Message):
        stanza = self.xmpp.make_message(
            mto=self.xmpp.server_host, mfrom=self.xmpp.boundjid.bare
        )
        stanza["privilege"]["forwarded"].append(msg)
        return stanza

    def _make_get_roster(self, jid: typing.Union[JID, str], **iq_kwargs):
        return self.xmpp.make_iq_get(
            queryxmlns="jabber:iq:roster",
            ifrom=self.xmpp.boundjid.bare,
            ito=jid,
            **iq_kwargs,
        )

    def _make_set_roster(
        self,
        jid: typing.Union[JID, str],
        roster_items: dict,
        **iq_kwargs,
    ):
        iq = self.xmpp.make_iq_set(
            ifrom=self.xmpp.boundjid.bare,
            ito=jid,
            **iq_kwargs,
        )
        iq["roster"]["items"] = roster_items
        return iq

    async def get_roster(self, jid: typing.Union[JID, str], **send_kwargs) -> Iq:
        """
        Return the roster of user on the server the component has privileged access to.

        Raises ValueError if the server did not advertise the corresponding privileges

        :param jid: user we want to fetch the roster from
        """
        if self.granted_privileges["roster"] not in ("get", "both"):
            log.error("The server did not grant us privileges to get rosters")
            raise ValueError
        else:
            return await self._make_get_roster(jid).send(**send_kwargs)

    async def set_roster(
        self, jid: typing.Union[JID, str], roster_items: dict, **send_kwargs
    ) -> Iq:
        """
        Return the roster of user on the server the component has privileged access to.

        Raises ValueError if the server did not advertise the corresponding privileges

        :param jid: user we want to add or modify roster items
        :param roster_items: a dict containing the roster items' JIDs as keys and
            nested dicts containing names, subscriptions and groups.
            Example:
            {
                "friend1@example.com": {
                    "name": "Friend 1",
                    "subscription": "both",
                    "groups": ["group1", "group2"],
                },
                "friend2@example.com": {
                    "name": "Friend 2",
                    "subscription": "from",
                    "groups": ["group3"],
            },
        }
        """
        if self.granted_privileges["roster"] not in ("set", "both"):
            log.error("The server did not grant us privileges to set rosters")
            raise ValueError
        else:
            return await self._make_set_roster(jid, roster_items).send(**send_kwargs)