diff options
Diffstat (limited to 'slixmpp/plugins/xep_0313/mam.py')
-rw-r--r-- | slixmpp/plugins/xep_0313/mam.py | 215 |
1 files changed, 171 insertions, 44 deletions
diff --git a/slixmpp/plugins/xep_0313/mam.py b/slixmpp/plugins/xep_0313/mam.py index eaa598a6..02efd3ce 100644 --- a/slixmpp/plugins/xep_0313/mam.py +++ b/slixmpp/plugins/xep_0313/mam.py @@ -5,8 +5,17 @@ # See the file LICENSE for copying permission import logging +from asyncio import Future +from collections.abc import AsyncGenerator from datetime import datetime -from typing import Any, Dict, Callable, Optional, Awaitable +from typing import ( + Any, + Awaitable, + Callable, + Dict, + Optional, + Tuple, +) from slixmpp import JID from slixmpp.stanza import Message, Iq @@ -15,6 +24,7 @@ from slixmpp.xmlstream.matcher import MatchXMLMask from slixmpp.xmlstream import register_stanza_plugin from slixmpp.plugins import BasePlugin from slixmpp.plugins.xep_0313 import stanza +from slixmpp.plugins.xep_0004.stanza import Form log = logging.getLogger(__name__) @@ -28,17 +38,25 @@ class XEP_0313(BasePlugin): name = 'xep_0313' description = 'XEP-0313: Message Archive Management' - dependencies = {'xep_0030', 'xep_0050', 'xep_0059', 'xep_0297'} + dependencies = { + 'xep_0004', 'xep_0030', 'xep_0050', 'xep_0059', 'xep_0297' + } stanza = stanza def plugin_init(self): + register_stanza_plugin(stanza.MAM, Form) register_stanza_plugin(Iq, stanza.MAM) - register_stanza_plugin(Iq, stanza.Preferences) register_stanza_plugin(Message, stanza.Result) register_stanza_plugin(Iq, stanza.Fin) - register_stanza_plugin(stanza.Result, self.xmpp['xep_0297'].stanza.Forwarded) + register_stanza_plugin( + stanza.Result, + self.xmpp['xep_0297'].stanza.Forwarded + ) register_stanza_plugin(stanza.MAM, self.xmpp['xep_0059'].stanza.Set) register_stanza_plugin(stanza.Fin, self.xmpp['xep_0059'].stanza.Set) + register_stanza_plugin(Iq, stanza.Metadata) + register_stanza_plugin(stanza.Metadata, stanza.Start) + register_stanza_plugin(stanza.Metadata, stanza.End) def retrieve( self, @@ -66,16 +84,10 @@ class XEP_0313(BasePlugin): :param bool iterator: Use RSM and iterate over a paginated query :param dict rsm: RSM custom options """ - iq = self.xmpp.Iq() + iq, stanza_mask = self._pre_mam_retrieve( + jid, start, end, with_jid, ifrom + ) query_id = iq['id'] - - iq['to'] = jid - iq['from'] = ifrom - iq['type'] = 'set' - iq['mam']['queryid'] = query_id - iq['mam']['start'] = start - iq['mam']['end'] = end - iq['mam']['with'] = with_jid amount = 10 if rsm: for key, value in rsm.items(): @@ -84,12 +96,6 @@ class XEP_0313(BasePlugin): amount = value cb_data = {} - stanza_mask = self.xmpp.Message() - stanza_mask.xml.remove(stanza_mask.xml.find('{urn:xmpp:sid:0}origin-id')) - del stanza_mask['id'] - del stanza_mask['lang'] - stanza_mask['from'] = jid - stanza_mask['mam_result']['queryid'] = query_id xml_mask = str(stanza_mask) def pre_cb(query: Iq) -> None: @@ -106,11 +112,14 @@ class XEP_0313(BasePlugin): results = cb_data['collector'].stop() if result['type'] == 'result': result['mam']['results'] = results + result['mam_fin']['results'] = results if iterator: - return self.xmpp['xep_0059'].iterate(iq, 'mam', 'results', amount=amount, - reverse=reverse, recv_interface='mam_fin', - pre_cb=pre_cb, post_cb=post_cb) + return self.xmpp['xep_0059'].iterate( + iq, 'mam', 'results', amount=amount, + reverse=reverse, recv_interface='mam_fin', + pre_cb=pre_cb, post_cb=post_cb + ) collector = Collector( 'MAM_Results_%s' % query_id, @@ -126,26 +135,144 @@ class XEP_0313(BasePlugin): return iq.send(timeout=timeout, callback=wrapped_cb) - def get_preferences(self, timeout=None, callback=None): - iq = self.xmpp.Iq() - iq['type'] = 'get' + async def iterate( + self, + jid: Optional[JID] = None, + start: Optional[datetime] = None, + end: Optional[datetime] = None, + with_jid: Optional[JID] = None, + ifrom: Optional[JID] = None, + reverse: bool = False, + rsm: Optional[Dict[str, Any]] = None, + total: Optional[int] = None, + ) -> AsyncGenerator: + """ + Iterate over each message of MAM query. + + :param jid: Entity holding the MAM records + :param start: MAM query start time + :param end: MAM query end time + :param with_jid: Filter results on this JID + :param ifrom: To change the from address of the query + :param reverse: Get the results in reverse order + :param rsm: RSM custom options + :param total: A number of messages received after which the query + should stop. + """ + iq, stanza_mask = self._pre_mam_retrieve( + jid, start, end, with_jid, ifrom + ) query_id = iq['id'] - iq['mam_prefs']['query_id'] = query_id - return iq.send(timeout=timeout, callback=callback) - - def set_preferences(self, jid=None, default=None, always=None, never=None, - ifrom=None, timeout=None, callback=None): - iq = self.xmpp.Iq() - iq['type'] = 'set' - iq['to'] = jid - iq['from'] = ifrom - iq['mam_prefs']['default'] = default - iq['mam_prefs']['always'] = always - iq['mam_prefs']['never'] = never - return iq.send(timeout=timeout, callback=callback) - - def get_configuration_commands(self, jid, **kwargs): - return self.xmpp['xep_0030'].get_items( - jid=jid, - node='urn:xmpp:mam#configure', - **kwargs) + amount = 10 + + if rsm: + for key, value in rsm.items(): + iq['mam']['rsm'][key] = str(value) + if key == 'max': + amount = value + cb_data = {} + + def pre_cb(query: Iq) -> None: + stanza_mask['mam_result']['queryid'] = query['id'] + xml_mask = str(stanza_mask) + query['mam']['queryid'] = query['id'] + collector = Collector( + 'MAM_Results_%s' % query_id, + MatchXMLMask(xml_mask)) + self.xmpp.register_handler(collector) + cb_data['collector'] = collector + + def post_cb(result: Iq) -> None: + results = cb_data['collector'].stop() + if result['type'] == 'result': + result['mam']['results'] = results + result['mam_fin']['results'] = results + + iterator = self.xmpp['xep_0059'].iterate( + iq, 'mam', 'results', amount=amount, + reverse=reverse, recv_interface='mam_fin', + pre_cb=pre_cb, post_cb=post_cb + ) + recv_count = 0 + + async for page in iterator: + messages = [message for message in page['mam']['results']] + if reverse: + messages.reverse() + for message in messages: + yield message + recv_count += 1 + if total is not None and recv_count >= total: + break + if total is not None and recv_count >= total: + break + + def _pre_mam_retrieve( + self, + jid: Optional[JID] = None, + start: Optional[datetime] = None, + end: Optional[datetime] = None, + with_jid: Optional[JID] = None, + ifrom: Optional[JID] = None, + ) -> Tuple[Iq, Message]: + """Build the IQ and stanza mask for MAM results + """ + iq = self.xmpp.make_iq_set(ito=jid, ifrom=ifrom) + query_id = iq['id'] + iq['mam']['queryid'] = query_id + iq['mam']['start'] = start + iq['mam']['end'] = end + iq['mam']['with'] = with_jid + + stanza_mask = self.xmpp.Message() + + auto_origin = stanza_mask.xml.find('{urn:xmpp:sid:0}origin-id') + if auto_origin is not None: + stanza_mask.xml.remove(auto_origin) + del stanza_mask['id'] + del stanza_mask['lang'] + stanza_mask['from'] = jid + stanza_mask['mam_result']['queryid'] = query_id + + return (iq, stanza_mask) + + async def get_fields(self, jid: Optional[JID] = None, **iqkwargs) -> Form: + """Get MAM query fields. + + .. versionaddedd:: 1.8.0 + + :param jid: JID to retrieve the policy from. + :return: The Form of allowed options + """ + ifrom = iqkwargs.pop('ifrom', None) + iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom) + iq.enable('mam') + result = await iq.send(**iqkwargs) + return result['mam']['form'] + + async def get_configuration_commands(self, jid: Optional[JID], + **discokwargs) -> Future: + """Get the list of MAM advanced configuration commands. + + .. versionchanged:: 1.8.0 + + :param jid: JID to get the commands from. + """ + if jid is None: + jid = self.xmpp.boundjid.bare + return await self.xmpp['xep_0030'].get_items( + jid=jid, + node='urn:xmpp:mam#configure', + **discokwargs + ) + + def get_archive_metadata(self, jid: Optional[JID] = None, + **iqkwargs) -> Future: + """Get the archive metadata from a JID. + + :param jid: JID to get the metadata from. + """ + ifrom = iqkwargs.pop('ifrom', None) + iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom) + iq.enable('mam_metadata') + return iq.send(**iqkwargs) |