From a9a7bdc6c3718106f102aaeaf88a9ca59488f7f9 Mon Sep 17 00:00:00 2001 From: mathieui Date: Sun, 22 Nov 2020 18:07:03 +0100 Subject: XEP-0045: update methods for asyncio & stanza, typing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - This rewrites most of the stuff in the plugin by using the newly added elements instead of raw ElementTree stuff with hardcoded namespaces. - Adds methods for affiliation/roles - Adds some type hints - Fix many cases where the call would simply not work since slixmpp exists (and break the API but it was already broken ¯\_(ツ)_/¯) --- slixmpp/plugins/xep_0045/muc.py | 275 +++++++++++++++++++--------------------- 1 file changed, 132 insertions(+), 143 deletions(-) (limited to 'slixmpp/plugins/xep_0045/muc.py') diff --git a/slixmpp/plugins/xep_0045/muc.py b/slixmpp/plugins/xep_0045/muc.py index 2143b400..f310c03e 100644 --- a/slixmpp/plugins/xep_0045/muc.py +++ b/slixmpp/plugins/xep_0045/muc.py @@ -9,6 +9,11 @@ from __future__ import with_statement import logging +from typing import ( + List, + Tuple, + Optional, +) from slixmpp import ( Presence, @@ -38,6 +43,9 @@ from slixmpp.plugins.xep_0045.stanza import ( log = logging.getLogger(__name__) +AFFILIATIONS = ('outcast', 'member', 'admin', 'owner', 'none') +ROLES = ('moderator', 'participant', 'visitor', 'none') + class XEP_0045(BasePlugin): @@ -168,7 +176,6 @@ class XEP_0045(BasePlugin): self.xmpp.event("muc::%s::message_error" % msg['from'].bare, msg) - def handle_groupchat_subject(self, msg: Message) -> None: """ Handle a message coming from a muc indicating a change of subject (or announcing it when joining the room) @@ -179,143 +186,96 @@ class XEP_0045(BasePlugin): return None self.xmpp.event('groupchat_subject', msg) - def jid_in_room(self, room, jid): + def jid_in_room(self, room: JID, jid: JID) -> bool: for nick in self.rooms[room]: entry = self.rooms[room][nick] if entry is not None and entry['jid'].full == jid: return True return False - def get_nick(self, room, jid): + def get_nick(self, room: JID, jid: JID) -> Optional[str]: for nick in self.rooms[room]: entry = self.rooms[room][nick] if entry is not None and entry['jid'].full == jid: return nick - def configure_room(self, room, form=None, ifrom=None): - if form is None: - form = self.get_room_config(room, ifrom=ifrom) - iq = self.xmpp.make_iq_set() - iq['to'] = room - if ifrom is not None: - iq['from'] = ifrom - query = ET.Element('{http://jabber.org/protocol/muc#owner}query') - form['type'] = 'submit' - query.append(form) - iq.append(query) - # For now, swallow errors to preserve existing API - try: - result = iq.send() - except IqError: - return False - except IqTimeout: - return False - return True - - def join_muc(self, room, nick, maxhistory="0", password='', wait=False, pstatus=None, pshow=None, pfrom=None): + def join_muc(self, room: JID, nick: str, maxhistory="0", password='', + pstatus='', pshow='', pfrom=''): """ Join the specified room, requesting 'maxhistory' lines of history. """ - stanza = self.xmpp.make_presence(pto="%s/%s" % (room, nick), pstatus=pstatus, pshow=pshow, pfrom=pfrom) - x = ET.Element('{http://jabber.org/protocol/muc}x') + stanza = self.xmpp.make_presence( + pto="%s/%s" % (room, nick), pstatus=pstatus, + pshow=pshow, pfrom=pfrom + ) + stanza.enable('muc_join') if password: - passelement = ET.Element('{http://jabber.org/protocol/muc}password') - passelement.text = password - x.append(passelement) + stanza['muc_join']['password'] = password if maxhistory: - history = ET.Element('{http://jabber.org/protocol/muc}history') - if maxhistory == "0": - history.attrib['maxchars'] = maxhistory + if maxhistory == "0": + stanza['muc_join']['history']['maxchars'] = '0' else: - history.attrib['maxstanzas'] = maxhistory - x.append(history) - stanza.append(x) - if not wait: - self.xmpp.send(stanza) - else: - #wait for our own room presence back - expect = ET.Element("{%s}presence" % self.xmpp.default_ns, {'from':"%s/%s" % (room, nick)}) - self.xmpp.send(stanza, expect) + stanza['muc_join']['history']['maxstanzas'] = str(maxhistory) + self.xmpp.send(stanza) self.rooms[room] = {} self.our_nicks[room] = nick - def destroy(self, room, reason='', altroom = '', ifrom=None): - iq = self.xmpp.make_iq_set() - if ifrom is not None: - iq['from'] = ifrom - iq['to'] = room - query = ET.Element('{http://jabber.org/protocol/muc#owner}query') - destroy = ET.Element('{http://jabber.org/protocol/muc#owner}destroy') + async def destroy(self, room: JID, reason='', altroom='', *, + ifrom: Optional[JID] = None, **iqkwargs) -> Iq: + iq = self.xmpp.make_iq_set(ifrom=ifrom, ito=room) + iq.enable('mucowner_query') + iq['mucowner_query'].enable('destroy') if altroom: - destroy.attrib['jid'] = altroom - xreason = ET.Element('{http://jabber.org/protocol/muc#owner}reason') - xreason.text = reason - destroy.append(xreason) - query.append(destroy) - iq.append(query) - # For now, swallow errors to preserve existing API - try: - r = iq.send() - except IqError: - return False - except IqTimeout: - return False - return True - - def set_affiliation(self, room, jid=None, nick=None, affiliation='member', ifrom=None): + iq['mucowner_query']['destroy']['jid'] = altroom + if reason: + iq['mucowner_query']['destroy']['reason'] = reason + await iq.send(**iqkwargs) + + async def set_affiliation(self, room: JID, jid: Optional[JID] = None, nick: Optional[str] = None, *, affiliation: str, + ifrom: Optional[JID] = None, **iqkwargs): """ Change room affiliation.""" - if affiliation not in ('outcast', 'member', 'admin', 'owner', 'none'): - raise TypeError - query = ET.Element('{http://jabber.org/protocol/muc#admin}query') - if nick is not None: - item = ET.Element('{http://jabber.org/protocol/muc#admin}item', {'affiliation':affiliation, 'nick':nick}) - else: - item = ET.Element('{http://jabber.org/protocol/muc#admin}item', {'affiliation':affiliation, 'jid':jid}) - query.append(item) - iq = self.xmpp.make_iq_set(query) - iq['to'] = room - iq['from'] = ifrom - # For now, swallow errors to preserve existing API - try: - result = iq.send() - except IqError: - return False - except IqTimeout: - return False - return True - - def set_role(self, room, nick, role): + if affiliation not in AFFILIATIONS: + raise ValueError('%s is not a valid affiliation' % affiliation) + if not any((jid, nick)): + raise ValueError('One of jid or nick must be set') + iq = self.xmpp.make_iq_set(ito=room, ifrom=ifrom) + iq.enable('mucadmin_query') + item = MUCAdminItem() + item['affiliation'] = affiliation + if nick: + item['nick'] = nick + if jid: + item['jid'] = jid + iq['mucadmin_query'].append(item) + await iq.send(**iqkwargs) + + async def set_role(self, room: JID, nick: str, role: str, *, + ifrom: Optional[JID] = None, **iqkwargs) -> Iq: """ Change role property of a nick in a room. Typically, roles are temporary (they last only as long as you are in the room), whereas affiliations are permanent (they last across groupchat sessions). """ - if role not in ('moderator', 'participant', 'visitor', 'none'): - raise TypeError - query = ET.Element('{http://jabber.org/protocol/muc#admin}query') - item = ET.Element('item', {'role':role, 'nick':nick}) - query.append(item) - iq = self.xmpp.make_iq_set(query) - iq['to'] = room - result = iq.send() - if result is False or result['type'] != 'result': - raise ValueError - return True - - def invite(self, room, jid, reason='', mfrom=''): + if role not in ROLES: + raise ValueError("Role %s does not exist" % role) + iq = self.xmpp.make_iq_set(ito=room, ifrom=ifrom) + iq.enable('mucadmin_query') + item = MUCAdminItem() + item['role'] = role + item['nick'] = nick + iq['mucadmin_query'].append(item) + await iq.send(**iqkwargs) + + def invite(self, room: JID, jid: JID, reason='', *, + mfrom: Optional[JID] = None): """ Invite a jid to a room.""" - msg = self.xmpp.make_message(room) - msg['from'] = mfrom - x = ET.Element('{http://jabber.org/protocol/muc#user}x') - invite = ET.Element('{http://jabber.org/protocol/muc#user}invite', {'to': jid}) + msg = self.xmpp.make_message(room, mfrom=mfrom) + msg.enable('muc') + msg['muc']['invite'] = jid if reason: - rxml = ET.Element('{http://jabber.org/protocol/muc#user}reason') - rxml.text = reason - invite.append(rxml) - x.append(invite) - msg.append(x) + msg['muc']['invite']['reason'] = reason self.xmpp.send(msg) - def leave_muc(self, room, nick, msg='', pfrom=None): + def leave_muc(self, room: JID, nick: str, msg='', pfrom=None): """ Leave the specified room. """ if msg: @@ -324,44 +284,77 @@ class XEP_0045(BasePlugin): self.xmpp.send_presence(pshow='unavailable', pto="%s/%s" % (room, nick), pfrom=pfrom) del self.rooms[room] - def get_room_config(self, room, ifrom=''): - iq = self.xmpp.make_iq_get('http://jabber.org/protocol/muc#owner') - iq['to'] = room - iq['from'] = ifrom + + async def get_room_config(self, room: JID, ifrom=''): + """Get the room config form in 0004 plugin format """ + iq = self.xmpp.make_iq_get(stanza.NS_OWNER, ito=room, ifrom=ifrom) # For now, swallow errors to preserve existing API - try: - result = iq.send() - except IqError: - raise ValueError - except IqTimeout: - raise ValueError + result = await iq.send() form = result.xml.find('{http://jabber.org/protocol/muc#owner}query/{jabber:x:data}x') if form is None: - raise ValueError + raise ValueError("Configuration form not found") return self.xmpp.plugin['xep_0004'].build_form(form) - def cancel_config(self, room, ifrom=None): - query = ET.Element('{http://jabber.org/protocol/muc#owner}query') + async def cancel_config(self, room: JID, *, + ifrom: Optional[JID] = None, **iqkwargs) -> Iq: + """Cancel a requested config form""" + query = MUCOwnerQuery() x = ET.Element('{jabber:x:data}x', type='cancel') query.append(x) - iq = self.xmpp.make_iq_set(query) - iq['to'] = room - iq['from'] = ifrom - iq.send() + iq = self.xmpp.make_iq_set(query, ito=room, ifrom=ifrom) + return await iq.send(**iqkwargs) - def set_room_config(self, room, config, ifrom=''): - query = ET.Element('{http://jabber.org/protocol/muc#owner}query') + async def set_room_config(self, room: JID, config, *, + ifrom: Optional[JID] = None, **iqkwargs) -> Iq: + """Send a room config form""" + query = MUCOwnerQuery() config['type'] = 'submit' query.append(config) - iq = self.xmpp.make_iq_set(query) - iq['to'] = room - iq['from'] = ifrom - iq.send() - - def get_joined_rooms(self): + iq = self.xmpp.make_iq_set(query, ito=room, ifrom=ifrom) + return await iq.send(**iqkwargs) + + async def get_affiliation_list(self, room: JID, affiliation: str, *, + ifrom: Optional[JID] = None, **iqkwargs) -> List[JID]: + """"Get a list of JIDs with the specified affiliation""" + iq = self.xmpp.make_iq_get(stanza.NS_ADMIN, ito=room, ifrom=ifrom) + iq['mucadmin_query']['item']['affiliation'] = affiliation + result = await iq.send(**iqkwargs) + return [item['jid'] for item in result['mucadmin_query']] + + async def get_roles_list(self, room: JID, role: str, *, + ifrom: Optional[JID] = None, **iqkwargs) -> List[str]: + """"Get a list of JIDs with the specified role""" + iq = self.xmpp.make_iq_get(stanza.NS_ADMIN, ito=room, ifrom=ifrom) + iq['mucadmin_query']['item']['role'] = role + result = await iq.send(**iqkwargs) + return [item['nick'] for item in result['mucadmin_query']] + + async def send_affiliation_list(self, room: JID, affiliations: List[Tuple[JID, str]], *, + ifrom: Optional[JID] = None, **iqkwargs) -> Iq: + """Send an affiliation delta list""" + iq = self.xmpp.make_iq_set(ito=room, ifrom=ifrom) + for jid, affiliation in affiliations: + item = MUCAdminItem() + item['jid'] = jid + item['affiliation'] = affiliation + iq['mucadmin_query'].append(item) + return await iq.send(**iqkwargs) + + async def send_role_list(self, room: JID, roles: List[Tuple[str, str]], *, + ifrom: Optional[JID], **iqkwargs) -> Iq: + """Send a role delta list""" + iq = self.xmpp.make_iq_set(ito=room, ifrom=ifrom) + for nick, affiliation in roles: + item = MUCAdminItem() + item['nick'] = nick + item['affiliation'] = affiliation + iq['mucadmin_query'].append(item) + return await iq.send(**iqkwargs) + + def get_joined_rooms(self) -> List[JID]: return self.rooms.keys() - def get_our_jid_in_room(self, room_jid): + def get_our_jid_in_room(self, room_jid: JID) -> str: """ Return the jid we're using in a room. """ return "%s/%s" % (room_jid, self.our_nicks[room_jid]) @@ -375,19 +368,15 @@ class XEP_0045(BasePlugin): else: return None - def get_roster(self, room): + def get_roster(self, room: JID) -> List[str]: """ Get the list of nicks in a room. """ if room not in self.rooms.keys(): return None return self.rooms[room].keys() - def get_users_by_affiliation(cls, room, affiliation='member', ifrom=None): - if affiliation not in ('outcast', 'member', 'admin', 'owner', 'none'): + def get_users_by_affiliation(self, room: JID, affiliation='member', *, ifrom: Optional[JID] = None): + # Preserve old API + if affiliation not in AFFILIATIONS: raise TypeError - query = ET.Element('{http://jabber.org/protocol/muc#admin}query') - item = ET.Element('{http://jabber.org/protocol/muc#admin}item', {'affiliation': affiliation}) - query.append(item) - iq = cls.xmpp.Iq(sto=room, sfrom=ifrom, stype='get') - iq.append(query) - return iq.send() + return self.get_affiliation_list(room, affiliation, ifrom=ifrom) -- cgit v1.2.3