1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
This file is part of Slixmpp.
See the file LICENSE for copying permission
"""
import logging
from datetime import datetime
from typing import Any, Dict, Callable, Optional, Awaitable
from slixmpp import JID
from slixmpp.stanza import Message, Iq
from slixmpp.xmlstream.handler import Collector
from slixmpp.xmlstream.matcher import MatchXMLMask
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins import BasePlugin
from slixmpp.plugins.xep_0313 import stanza
log = logging.getLogger(__name__)
class XEP_0313(BasePlugin):
"""
XEP-0313 Message Archive Management
"""
name = 'xep_0313'
description = 'XEP-0313: Message Archive Management'
dependencies = {'xep_0030', 'xep_0050', 'xep_0059', 'xep_0297'}
stanza = stanza
def plugin_init(self):
register_stanza_plugin(Iq, stanza.MAM)
register_stanza_plugin(Iq, stanza.Preferences)
register_stanza_plugin(Message, stanza.Result)
register_stanza_plugin(Iq, stanza.Fin)
register_stanza_plugin(stanza.Result, self.xmpp['xep_0297'].stanza.Forwarded)
register_stanza_plugin(stanza.MAM, self.xmpp['xep_0059'].stanza.Set)
register_stanza_plugin(stanza.Fin, self.xmpp['xep_0059'].stanza.Set)
def retrieve(
self,
jid: Optional[JID] = None,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
with_jid: Optional[JID] = None,
ifrom: Optional[JID] = None,
reverse: bool = False,
timeout: int = None,
callback: Callable[[Iq], None] = None,
iterator: bool = False,
rsm: Optional[Dict[str, Any]] = None
) -> Awaitable:
"""
Send a MAM query and retrieve the results.
:param JID jid: Entity holding the MAM records
:param datetime start,end: MAM query temporal boundaries
:param JID with_jid: Filter results on this JID
:param JID ifrom: To change the from address of the query
:param bool reverse: Get the results in reverse order
:param int timeout: IQ timeout
:param func callback: Custom callback for handling results
:param bool iterator: Use RSM and iterate over a paginated query
:param dict rsm: RSM custom options
"""
iq = self.xmpp.Iq()
query_id = iq['id']
iq['to'] = jid
iq['from'] = ifrom
iq['type'] = 'set'
iq['mam']['queryid'] = query_id
iq['mam']['start'] = start
iq['mam']['end'] = end
iq['mam']['with'] = with_jid
amount = 10
if rsm:
for key, value in rsm.items():
iq['mam']['rsm'][key] = str(value)
if key == 'max':
amount = value
cb_data = {}
stanza_mask = self.xmpp.Message()
stanza_mask.xml.remove(stanza_mask.xml.find('{urn:xmpp:sid:0}origin-id'))
del stanza_mask['id']
del stanza_mask['lang']
stanza_mask['from'] = jid
stanza_mask['mam_result']['queryid'] = query_id
xml_mask = str(stanza_mask)
def pre_cb(query: Iq) -> None:
stanza_mask['mam_result']['queryid'] = query['id']
xml_mask = str(stanza_mask)
query['mam']['queryid'] = query['id']
collector = Collector(
'MAM_Results_%s' % query_id,
MatchXMLMask(xml_mask))
self.xmpp.register_handler(collector)
cb_data['collector'] = collector
def post_cb(result: Iq) -> None:
results = cb_data['collector'].stop()
if result['type'] == 'result':
result['mam']['results'] = results
if iterator:
return self.xmpp['xep_0059'].iterate(iq, 'mam', 'results', amount=amount,
reverse=reverse, recv_interface='mam_fin',
pre_cb=pre_cb, post_cb=post_cb)
collector = Collector(
'MAM_Results_%s' % query_id,
MatchXMLMask(xml_mask))
self.xmpp.register_handler(collector)
def wrapped_cb(iq: Iq) -> None:
results = collector.stop()
if iq['type'] == 'result':
iq['mam']['results'] = results
if callback:
callback(iq)
return iq.send(timeout=timeout, callback=wrapped_cb)
def get_preferences(self, timeout=None, callback=None):
iq = self.xmpp.Iq()
iq['type'] = 'get'
query_id = iq['id']
iq['mam_prefs']['query_id'] = query_id
return iq.send(timeout=timeout, callback=callback)
def set_preferences(self, jid=None, default=None, always=None, never=None,
ifrom=None, timeout=None, callback=None):
iq = self.xmpp.Iq()
iq['type'] = 'set'
iq['to'] = jid
iq['from'] = ifrom
iq['mam_prefs']['default'] = default
iq['mam_prefs']['always'] = always
iq['mam_prefs']['never'] = never
return iq.send(timeout=timeout, callback=callback)
def get_configuration_commands(self, jid, **kwargs):
return self.xmpp['xep_0030'].get_items(
jid=jid,
node='urn:xmpp:mam#configure',
**kwargs)
|