diff options
Diffstat (limited to 'slixmpp')
-rw-r--r-- | slixmpp/plugins/xep_0356/__init__.py | 7 | ||||
-rw-r--r-- | slixmpp/plugins/xep_0356/privilege.py | 144 | ||||
-rw-r--r-- | slixmpp/plugins/xep_0356/stanza.py | 47 |
3 files changed, 198 insertions, 0 deletions
diff --git a/slixmpp/plugins/xep_0356/__init__.py b/slixmpp/plugins/xep_0356/__init__.py new file mode 100644 index 00000000..d457a06b --- /dev/null +++ b/slixmpp/plugins/xep_0356/__init__.py @@ -0,0 +1,7 @@ +from slixmpp.plugins.base import register_plugin + +from slixmpp.plugins.xep_0356 import stanza +from slixmpp.plugins.xep_0356.stanza import Perm, Privilege +from slixmpp.plugins.xep_0356.privilege import XEP_0356 + +register_plugin(XEP_0356) diff --git a/slixmpp/plugins/xep_0356/privilege.py b/slixmpp/plugins/xep_0356/privilege.py new file mode 100644 index 00000000..a0bdb6a7 --- /dev/null +++ b/slixmpp/plugins/xep_0356/privilege.py @@ -0,0 +1,144 @@ +import logging +import typing + +from slixmpp import Message, JID, Iq +from slixmpp.plugins.base import BasePlugin +from slixmpp.xmlstream.matcher import StanzaPath +from slixmpp.xmlstream.handler import Callback +from slixmpp.xmlstream import register_stanza_plugin + +from slixmpp.plugins.xep_0356 import stanza, Privilege, Perm + + +log = logging.getLogger(__name__) + + +class XEP_0356(BasePlugin): + """ + XEP-0356: Privileged Entity + + Events: + + :: + + privileges_advertised -- Received message/privilege from the server + """ + + name = "xep_0356" + description = "XEP-0356: Privileged Entity" + dependencies = {"xep_0297"} + stanza = stanza + + granted_privileges = {"roster": "none", "message": "none", "presence": "none"} + + def plugin_init(self): + if not self.xmpp.is_component: + log.error("XEP 0356 is only available for components") + return + + stanza.register() + + self.xmpp.register_handler( + Callback( + "Privileges", + StanzaPath("message/privilege"), + self._handle_privilege, + ) + ) + + def plugin_end(self): + self.xmpp.remove_handler("Privileges") + + def _handle_privilege(self, msg: Message): + """ + Called when the XMPP server advertise the component's privileges. + + Stores the privileges in this instance's granted_privileges attribute (a dict) + and raises the privileges_advertised event + """ + for perm in msg["privilege"]["perms"]: + self.granted_privileges[perm["access"]] = perm["type"] + log.debug(f"Privileges: {self.granted_privileges}") + self.xmpp.event("privileges_advertised") + + def send_privileged_message(self, msg: Message): + if self.granted_privileges["message"] == "outgoing": + self._make_privileged_message(msg).send() + else: + log.error( + "The server hasn't authorized us to send messages on behalf of other users" + ) + + def _make_privileged_message(self, msg: Message): + stanza = self.xmpp.make_message( + mto=self.xmpp.server_host, mfrom=self.xmpp.boundjid.bare + ) + stanza["privilege"]["forwarded"].append(msg) + return stanza + + def _make_get_roster(self, jid: typing.Union[JID, str], **iq_kwargs): + return self.xmpp.make_iq_get( + queryxmlns="jabber:iq:roster", + ifrom=self.xmpp.boundjid.bare, + ito=jid, + **iq_kwargs, + ) + + def _make_set_roster( + self, + jid: typing.Union[JID, str], + roster_items: dict, + **iq_kwargs, + ): + iq = self.xmpp.make_iq_set( + ifrom=self.xmpp.boundjid.bare, + ito=jid, + **iq_kwargs, + ) + iq["roster"]["items"] = roster_items + return iq + + async def get_roster(self, jid: typing.Union[JID, str], **send_kwargs) -> Iq: + """ + Return the roster of user on the server the component has privileged access to. + + Raises ValueError if the server did not advertise the corresponding privileges + + :param jid: user we want to fetch the roster from + """ + if self.granted_privileges["roster"] not in ("get", "both"): + log.error("The server did not grant us privileges to get rosters") + raise ValueError + else: + return await self._make_get_roster(jid).send(**send_kwargs) + + async def set_roster( + self, jid: typing.Union[JID, str], roster_items: dict, **send_kwargs + ) -> Iq: + """ + Return the roster of user on the server the component has privileged access to. + + Raises ValueError if the server did not advertise the corresponding privileges + + :param jid: user we want to add or modify roster items + :param roster_items: a dict containing the roster items' JIDs as keys and + nested dicts containing names, subscriptions and groups. + Example: + { + "friend1@example.com": { + "name": "Friend 1", + "subscription": "both", + "groups": ["group1", "group2"], + }, + "friend2@example.com": { + "name": "Friend 2", + "subscription": "from", + "groups": ["group3"], + }, + } + """ + if self.granted_privileges["roster"] not in ("set", "both"): + log.error("The server did not grant us privileges to set rosters") + raise ValueError + else: + return await self._make_set_roster(jid, roster_items).send(**send_kwargs) diff --git a/slixmpp/plugins/xep_0356/stanza.py b/slixmpp/plugins/xep_0356/stanza.py new file mode 100644 index 00000000..ef01ee3e --- /dev/null +++ b/slixmpp/plugins/xep_0356/stanza.py @@ -0,0 +1,47 @@ +from slixmpp.stanza import Message +from slixmpp.xmlstream import ( + ElementBase, + register_stanza_plugin, +) +from slixmpp.plugins.xep_0297 import Forwarded + + +class Privilege(ElementBase): + namespace = "urn:xmpp:privilege:1" + name = "privilege" + plugin_attrib = "privilege" + + def permission(self, access): + for perm in self["perms"]: + if perm["access"] == access: + return perm["type"] + + def roster(self): + return self.permission("roster") + + def message(self): + return self.permission("message") + + def presence(self): + return self.permission("presence") + + def add_perm(self, access, type): + # This should only be needed for servers, so maybe out of scope for slixmpp + perm = Perm() + perm["type"] = type + perm["access"] = access + self.append(perm) + + +class Perm(ElementBase): + namespace = "urn:xmpp:privilege:1" + name = "perm" + plugin_attrib = "perm" + plugin_multi_attrib = "perms" + interfaces = {"type", "access"} + + +def register(): + register_stanza_plugin(Message, Privilege) + register_stanza_plugin(Privilege, Forwarded) + register_stanza_plugin(Privilege, Perm, iterable=True)
\ No newline at end of file |