From b38e229359679a07d0180a0dc3fd00e2a0b4736b Mon Sep 17 00:00:00 2001 From: mathieui Date: Fri, 21 Jul 2017 15:01:13 +0200 Subject: Update RSM for asyncio - Use an async iterator - Add a "recv_interface" parameter in order to differenciate the stanza we send from the stanza we receive (required for MAM) - Add a pre_cb to run before sending the query stanza - Add a post_cb to run after receiving the result stanza --- slixmpp/plugins/xep_0059/rsm.py | 81 ++++++++++++++++++++++++++++------------- 1 file changed, 55 insertions(+), 26 deletions(-) diff --git a/slixmpp/plugins/xep_0059/rsm.py b/slixmpp/plugins/xep_0059/rsm.py index a02d94e0..8f7f48f0 100644 --- a/slixmpp/plugins/xep_0059/rsm.py +++ b/slixmpp/plugins/xep_0059/rsm.py @@ -19,23 +19,27 @@ from slixmpp.exceptions import XMPPError log = logging.getLogger(__name__) -class ResultIterator(): +class ResultIterator: """ An iterator for Result Set Managment """ def __init__(self, query, interface, results='substanzas', amount=10, - start=None, reverse=False): + start=None, reverse=False, recv_interface=None, + pre_cb=None, post_cb=None): """ Arguments: query -- The template query - interface -- The substanza of the query, for example disco_items + interface -- The substanza of the query to send, for example disco_items + recv_interface -- The substanza of the query to receive, for example disco_items results -- The query stanza's interface which provides a countable list of query results. amount -- The max amounts of items to request per iteration start -- From which item id to start reverse -- If True, page backwards through the results + pre_cb -- Callback to run before sending the stanza + post_cb -- Callback to run after receiving the reply Example: q = Iq() @@ -49,17 +53,23 @@ class ResultIterator(): self.amount = amount self.start = start self.interface = interface + if recv_interface: + self.recv_interface = recv_interface + else: + self.recv_interface = interface + self.pre_cb = pre_cb + self.post_cb = post_cb self.results = results self.reverse = reverse self._stop = False - def __iter__(self): + def __aiter__(self): return self - def __next__(self): - return self.next() + async def __anext__(self): + return await self.next() - def next(self): + async def next(self): """ Return the next page of results from a query. @@ -68,7 +78,7 @@ class ResultIterator(): of items. """ if self._stop: - raise StopIteration + raise StopAsyncIteration self.query[self.interface]['rsm']['before'] = self.reverse self.query['id'] = self.query.stream.new_id() self.query[self.interface]['rsm']['max'] = str(self.amount) @@ -79,28 +89,32 @@ class ResultIterator(): self.query[self.interface]['rsm']['after'] = self.start try: - r = self.query.send(block=True) - - if not r[self.interface]['rsm']['first'] and \ - not r[self.interface]['rsm']['last']: - raise StopIteration - - if r[self.interface]['rsm']['count'] and \ - r[self.interface]['rsm']['first_index']: - count = int(r[self.interface]['rsm']['count']) - first = int(r[self.interface]['rsm']['first_index']) - num_items = len(r[self.interface][self.results]) + if self.pre_cb: + self.pre_cb(self.query) + r = await self.query.send() + + if not r[self.recv_interface]['rsm']['first'] and \ + not r[self.recv_interface]['rsm']['last']: + raise StopAsyncIteration + + if r[self.recv_interface]['rsm']['count'] and \ + r[self.recv_interface]['rsm']['first_index']: + count = int(r[self.recv_interface]['rsm']['count']) + first = int(r[self.recv_interface]['rsm']['first_index']) + num_items = len(r[self.recv_interface][self.results]) if first + num_items == count: self._stop = True if self.reverse: - self.start = r[self.interface]['rsm']['first'] + self.start = r[self.recv_interface]['rsm']['first'] else: - self.start = r[self.interface]['rsm']['last'] + self.start = r[self.recv_interface]['rsm']['last'] + if self.post_cb: + self.post_cb(r) return r except XMPPError: - raise StopIteration + raise StopAsyncIteration class XEP_0059(BasePlugin): @@ -127,7 +141,8 @@ class XEP_0059(BasePlugin): def session_bind(self, jid): self.xmpp['xep_0030'].add_feature(Set.namespace) - def iterate(self, stanza, interface, results='substanzas'): + def iterate(self, stanza, interface, results='substanzas', + recv_interface=None, pre_cb=None, post_cb=None): """ Create a new result set iterator for a given stanza query. @@ -137,9 +152,23 @@ class XEP_0059(BasePlugin): basic disco#items query. interface -- The name of the substanza to which the result set management stanza should be - appended. For example, for disco#items queries - the interface 'disco_items' should be used. + appended in the query stanza. For example, + for disco#items queries the interface + 'disco_items' should be used. + recv_interface -- The name of the substanza from which the + result set management stanza should be + read in the result stanza. If unspecified, + it will be set to the same value as the + ``interface`` parameter. + pre_cb -- Callback to run before sending each stanza e.g. + setting the MAM queryid and starting a stanza + collector. + post_cb -- Callback to run after receiving each stanza e.g. + stopping a MAM stanza collector in order to + gather results. results -- The name of the interface containing the query results (typically just 'substanzas'). """ - return ResultIterator(stanza, interface, results) + return ResultIterator(stanza, interface, results, + recv_interface=recv_interface, pre_cb=pre_cb, + post_cb=post_cb) -- cgit v1.2.3