From e5f4794a36a75709c4d21d274f2defdcc50f1018 Mon Sep 17 00:00:00 2001 From: mathieui Date: Mon, 8 Mar 2021 21:50:01 +0100 Subject: XEP-0313: Update stanza for completeness, and more docs --- slixmpp/plugins/xep_0313/mam.py | 11 +- slixmpp/plugins/xep_0313/stanza.py | 204 +++++++++++++++++++++++++++---------- 2 files changed, 162 insertions(+), 53 deletions(-) diff --git a/slixmpp/plugins/xep_0313/mam.py b/slixmpp/plugins/xep_0313/mam.py index eaa598a6..ba4c02bb 100644 --- a/slixmpp/plugins/xep_0313/mam.py +++ b/slixmpp/plugins/xep_0313/mam.py @@ -15,6 +15,7 @@ from slixmpp.xmlstream.matcher import MatchXMLMask from slixmpp.xmlstream import register_stanza_plugin from slixmpp.plugins import BasePlugin from slixmpp.plugins.xep_0313 import stanza +from slixmpp.plugins.xep_0004.stanza import Form log = logging.getLogger(__name__) @@ -28,15 +29,21 @@ class XEP_0313(BasePlugin): name = 'xep_0313' description = 'XEP-0313: Message Archive Management' - dependencies = {'xep_0030', 'xep_0050', 'xep_0059', 'xep_0297'} + dependencies = { + 'xep_0004', 'xep_0030', 'xep_0050', 'xep_0059', 'xep_0297' + } stanza = stanza def plugin_init(self): + register_stanza_plugin(stanza.MAM, Form) register_stanza_plugin(Iq, stanza.MAM) register_stanza_plugin(Iq, stanza.Preferences) register_stanza_plugin(Message, stanza.Result) register_stanza_plugin(Iq, stanza.Fin) - register_stanza_plugin(stanza.Result, self.xmpp['xep_0297'].stanza.Forwarded) + register_stanza_plugin( + stanza.Result, + self.xmpp['xep_0297'].stanza.Forwarded + ) register_stanza_plugin(stanza.MAM, self.xmpp['xep_0059'].stanza.Set) register_stanza_plugin(stanza.Fin, self.xmpp['xep_0059'].stanza.Set) diff --git a/slixmpp/plugins/xep_0313/stanza.py b/slixmpp/plugins/xep_0313/stanza.py index 4e43eeba..3021f1ad 100644 --- a/slixmpp/plugins/xep_0313/stanza.py +++ b/slixmpp/plugins/xep_0313/stanza.py @@ -1,90 +1,168 @@ - # Slixmpp: The Slick XMPP Library # Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout # This file is part of Slixmpp. # See the file LICENSE for copying permissio -import datetime as dt - +from datetime import datetime +from typing import ( + Any, + Iterable, + List, + Optional, + Set, + Union, +) + +from slixmpp.stanza import Message from slixmpp.jid import JID from slixmpp.xmlstream import ElementBase, ET -from slixmpp.plugins import xep_0082, xep_0004 +from slixmpp.plugins import xep_0082 class MAM(ElementBase): + """A MAM Query element. + + .. code-block:: xml + + + + + + urn:xmpp:mam:2 + + + juliet@capulet.lit + + + + + + """ name = 'query' namespace = 'urn:xmpp:mam:2' plugin_attrib = 'mam' - interfaces = {'queryid', 'start', 'end', 'with', 'results'} - sub_interfaces = {'start', 'end', 'with'} + #: Available interfaces: + #: + #: - ``queryid``: The MAM query id + #: - ``start`` and ``end``: Temporal boundaries of the query + #: - ``with``: JID of the other entity the conversation is with + #: - ``after_id``: Fetch stanzas after this specific ID + #: - ``before_id``: Fetch stanzas before this specific ID + #: - ``ids``: Fetch the stanzas matching those IDs + #: - ``results``: pseudo-interface used to accumulate MAM results during + #: fetch, not relevant for the stanza itself. + interfaces = { + 'queryid', 'start', 'end', 'with', 'results', + 'before_id', 'after_id', 'ids', + } + sub_interfaces = {'start', 'end', 'with', 'before_id', 'after_id', 'ids'} def setup(self, xml=None): ElementBase.setup(self, xml) - self._form = xep_0004.stanza.Form() - self._form['type'] = 'submit' - field = self._form.add_field(var='FORM_TYPE', ftype='hidden', - value='urn:xmpp:mam:2') - self.append(self._form) - self._results = [] - - def __get_fields(self): - return self._form.get_fields() - - def get_start(self): - fields = self.__get_fields() + self._results: List[Message] = [] + + def _setup_form(self): + found = self.xml.find( + '{jabber:x:data}x/' + '{jabber:x:data}field[@var="FORM_TYPE"]/' + "{jabber:x:data}value[.='urn:xmpp:mam:2']" + ) + if found is None: + self['form']['type'] = 'submit' + self['form'].add_field( + var='FORM_TYPE', ftype='hidden', value='urn:xmpp:mam:2' + ) + + def get_fields(self): + form = self.get_plugin('form', check=True) + if not form: + return {} + return form.get_fields() + + def get_start(self) -> Optional[datetime]: + fields = self.get_fields() field = fields.get('start') if field: return xep_0082.parse(field['value']) + return None - def set_start(self, value): - if isinstance(value, dt.datetime): + def set_start(self, value: Union[str, datetime]): + self._setup_form() + if isinstance(value, datetime): value = xep_0082.format_datetime(value) - fields = self.__get_fields() - field = fields.get('start') - if field: - field['value'] = value - else: - field = self._form.add_field(var='start') - field['value'] = value + self.set_custom_field('start', value) - def get_end(self): - fields = self.__get_fields() + def get_end(self) -> Optional[datetime]: + fields = self.get_fields() field = fields.get('end') if field: return xep_0082.parse(field['value']) + return None - def set_end(self, value): - if isinstance(value, dt.datetime): + def set_end(self, value: Union[str, datetime]): + if isinstance(value, datetime): value = xep_0082.format_datetime(value) - fields = self.__get_fields() - field = fields.get('end') - if field: - field['value'] = value - else: - field = self._form.add_field(var='end') - field['value'] = value + self.set_custom_field('end', value) - def get_with(self): - fields = self.__get_fields() + def get_with(self) -> Optional[JID]: + fields = self.get_fields() field = fields.get('with') if field: return JID(field['value']) + return None - def set_with(self, value): - fields = self.__get_fields() - field = fields.get('with') + def set_with(self, value: JID): + self.set_custom_field('with', value) + + def set_custom_field(self, fieldname: str, value: Any): + self._setup_form() + fields = self.get_fields() + field = fields.get(fieldname) if field: - field['with'] = str(value) + field['value'] = str(value) else: - field = self._form.add_field(var='with') + field = self['form'].add_field(var=fieldname) field['value'] = str(value) + + def get_custom_field(self, fieldname: str) -> Optional[str]: + fields = self.get_fields() + field = fields.get(fieldname) + if field: + return field['value'] + return None + + def set_before_id(self, value: str): + self.set_custom_field('before-id', value) + + def get_before_id(self): + self.get_custom_field('before-id') + + def set_after_id(self, value: str): + self.set_custom_field('after-id', value) + + def get_after_id(self): + self.get_custom_field('after-id') + + def set_ids(self, value: List[str]): + self._setup_form() + fields = self.get_fields() + field = fields.get('ids') + if field: + field['ids'] = value + else: + field = self['form'].add_field(var='ids') + field['value'] = value + + def get_ids(self): + self.get_custom_field('id') + # The results interface is meant only as an easy # way to access the set of collected message responses # from the query. - def get_results(self): + def get_results(self) -> List[Message]: return self._results - def set_results(self, values): + def set_results(self, values: List[Message]): self._results = values def del_results(self): @@ -92,13 +170,37 @@ class MAM(ElementBase): class Preferences(ElementBase): + """MAM Preferences payload. + + .. code-block:: xml + + + + + romeo@montague.lit + + + montague@montague.lit + + + + + """ name = 'prefs' namespace = 'urn:xmpp:mam:2' plugin_attrib = 'mam_prefs' + #: Available interfaces: + #: + #: - ``default``: Default MAM policy (must be one of 'roster', 'always', + #: 'never' + #: - ``always`` (``List[JID]``): list of JIDs to always store + #: conversations with. + #: - ``never`` (``List[JID]``): list of JIDs to never store + #: conversations with. interfaces = {'default', 'always', 'never'} sub_interfaces = {'always', 'never'} - def get_always(self): + def get_always(self) -> Set[JID]: results = set() jids = self.xml.findall('{%s}always/{%s}jid' % ( @@ -109,7 +211,7 @@ class Preferences(ElementBase): return results - def set_always(self, value): + def set_always(self, value: Iterable[JID]): self._set_sub_text('always', '', keep=True) always = self.xml.find('{%s}always' % self.namespace) always.clear() @@ -122,7 +224,7 @@ class Preferences(ElementBase): jid_xml.text = str(jid) always.append(jid_xml) - def get_never(self): + def get_never(self) -> Set[JID]: results = set() jids = self.xml.findall('{%s}never/{%s}jid' % ( @@ -133,7 +235,7 @@ class Preferences(ElementBase): return results - def set_never(self, value): + def set_never(self, value: Iterable[JID]): self._set_sub_text('never', '', keep=True) never = self.xml.find('{%s}never' % self.namespace) never.clear() -- cgit v1.2.3