diff options
author | mathieui <mathieui@mathieui.net> | 2021-03-09 19:25:26 +0100 |
---|---|---|
committer | mathieui <mathieui@mathieui.net> | 2021-03-09 19:25:26 +0100 |
commit | 1289cf575c7a2c3db4a425b4ca4ee128098d9966 (patch) | |
tree | eca81fdee93beb2e04b175e69fd3c282f57351a9 | |
parent | a568363a6c6e102271c400f28628e3d8de335486 (diff) | |
parent | 5a3ab2c5c13aea6761680a9ccc6cf53c133c9ac7 (diff) | |
download | slixmpp-1289cf575c7a2c3db4a425b4ca4ee128098d9966.tar.gz slixmpp-1289cf575c7a2c3db4a425b4ca4ee128098d9966.tar.bz2 slixmpp-1289cf575c7a2c3db4a425b4ca4ee128098d9966.tar.xz slixmpp-1289cf575c7a2c3db4a425b4ca4ee128098d9966.zip |
Merge branch 'rsm-fixes' into 'master'
XEP-0059 (RSM) - Some fixes
See merge request poezio/slixmpp!145
-rw-r--r-- | docs/api/plugins/xep_0059.rst | 3 | ||||
-rw-r--r-- | slixmpp/plugins/xep_0059/rsm.py | 174 | ||||
-rw-r--r-- | tests/test_stream_xep_0030.py | 82 |
3 files changed, 170 insertions, 89 deletions
diff --git a/docs/api/plugins/xep_0059.rst b/docs/api/plugins/xep_0059.rst index e9f7db28..0d5ba32e 100644 --- a/docs/api/plugins/xep_0059.rst +++ b/docs/api/plugins/xep_0059.rst @@ -8,6 +8,9 @@ XEP-0059: Result Set Management :members: :exclude-members: session_bind, plugin_init, plugin_end +.. autoclass:: ResultIterator + :members: + :member-order: bysource Stanza elements --------------- diff --git a/slixmpp/plugins/xep_0059/rsm.py b/slixmpp/plugins/xep_0059/rsm.py index 0fd6b2f9..61752af4 100644 --- a/slixmpp/plugins/xep_0059/rsm.py +++ b/slixmpp/plugins/xep_0059/rsm.py @@ -5,9 +5,16 @@ # See the file LICENSE for copying permission. import logging -import slixmpp -from slixmpp import Iq -from slixmpp.plugins import BasePlugin, register_plugin +from collections.abc import AsyncIterator +from typing import ( + Any, + Callable, + Dict, + Optional, +) + +from slixmpp.stanza import Iq +from slixmpp.plugins import BasePlugin from slixmpp.xmlstream import register_stanza_plugin from slixmpp.plugins.xep_0059 import stanza, Set from slixmpp.exceptions import XMPPError @@ -16,41 +23,73 @@ from slixmpp.exceptions import XMPPError log = logging.getLogger(__name__) -class ResultIterator: +class ResultIterator(AsyncIterator): """ An iterator for Result Set Management - """ - def __init__(self, query, interface, results='substanzas', amount=10, - start=None, reverse=False, recv_interface=None, - pre_cb=None, post_cb=None): + Example: + + .. code-block:: python + + q = Iq() + q['to'] = 'pubsub.example.com' + q['disco_items']['node'] = 'blog' + async for i in ResultIterator(q, 'disco_items', '10'): + print(i['disco_items']['items']) + + """ + #: Template for the RSM query + query: Iq + #: Substanza of the query to send, e.g. "disco_items" + interface: str + #: Stanza interface on the query results providing the retrieved + #: elements (used to count them) + results: str + #: From which item id to start + start: Optional[str] + #: Amount of elements to retrieve for each page + amount: int + #: If True, page backwards through the results + reverse: bool + #: Callback to run before sending the stanza + pre_cb: Optional[Callable[[Iq], None]] + #: Callback to run after receiving the reply + post_cb: Optional[Callable[[Iq], None]] + #: Optional dict of Iq options (timeout, etc…) for Iq.send() + iq_options: Dict[str, Any] + + def __init__(self, query: Iq, interface: str, results: str = 'substanzas', + amount: int = 10, + start: Optional[str] = None, reverse: bool = False, + recv_interface: Optional[str] = None, + pre_cb: Optional[Callable[[Iq], None]] = None, + post_cb: Optional[Callable[[Iq], None]] = None, + iq_options: Optional[Dict[str, Any]] = None): """ - Arguments: - query -- The template query - interface -- The substanza of the query to send, for example disco_items - recv_interface -- The substanza of the query to receive, for example disco_items - results -- The query stanza's interface which provides a + :param query: The template query + :param interface: The substanza of the query to send, for example + disco_items + :param recv_interface: The substanza of the query to receive, for + example disco_items + :param results: The query stanza's interface which provides a countable list of query results. - amount -- The max amounts of items to request per iteration - start -- From which item id to start - reverse -- If True, page backwards through the results - pre_cb -- Callback to run before sending the stanza - post_cb -- Callback to run after receiving the reply - - Example: - q = Iq() - q['to'] = 'pubsub.example.com' - q['disco_items']['node'] = 'blog' - for i in ResultIterator(q, 'disco_items', '10'): - print i['disco_items']['items'] - + :param amount: The max amounts of items to request per iteration + :param start: From which item id to start + :param reverse: If True, page backwards through the results + :param pre_cb: Callback to run before sending the stanza + :param post_cb: Callback to run after receiving the reply + :param iq_options: Optional dict of parameters for Iq.send """ self.query = query self.amount = amount self.start = start + if iq_options is None: + self.iq_options = {} + else: + self.iq_options = iq_options self.interface = interface - if recv_interface: + if recv_interface is not None: self.recv_interface = recv_interface else: self.recv_interface = interface @@ -63,10 +102,10 @@ class ResultIterator: def __aiter__(self): return self - async def __anext__(self): + async def __anext__(self) -> Iq: return await self.next() - async def next(self): + async def next(self) -> Iq: """ Return the next page of results from a query. @@ -76,20 +115,21 @@ class ResultIterator: """ if self._stop: raise StopAsyncIteration - if self.query[self.interface]['rsm']['before'] is None: - self.query[self.interface]['rsm']['before'] = self.reverse self.query['id'] = self.query.stream.new_id() self.query[self.interface]['rsm']['max'] = str(self.amount) - if self.start and self.reverse: - self.query[self.interface]['rsm']['before'] = self.start - elif self.start: - self.query[self.interface]['rsm']['after'] = self.start + if self.start: + if self.reverse: + self.query[self.interface]['rsm']['before'] = self.start + else: + self.query[self.interface]['rsm']['after'] = self.start + elif self.reverse: + self.query[self.interface]['rsm']['before'] = True try: if self.pre_cb: self.pre_cb(self.query) - r = await self.query.send() + r = await self.query.send(**self.iq_options) if not r[self.recv_interface]['rsm']['first'] and \ not r[self.recv_interface]['rsm']['last']: @@ -118,7 +158,7 @@ class ResultIterator: class XEP_0059(BasePlugin): """ - XEP-0050: Result Set Management + XEP-0059: Result Set Management """ name = 'xep_0059' @@ -139,34 +179,40 @@ class XEP_0059(BasePlugin): def session_bind(self, jid): self.xmpp['xep_0030'].add_feature(Set.namespace) - def iterate(self, stanza, interface, results='substanzas', amount=10, reverse=False, - recv_interface=None, pre_cb=None, post_cb=None): + def iterate(self, stanza: Iq, interface: str, results: str = 'substanzas', + amount: int = 10, reverse: bool = False, + recv_interface: Optional[str] = None, + pre_cb: Optional[Callable[[Iq], None]] = None, + post_cb: Optional[Callable[[Iq], None]] = None, + iq_options: Optional[Dict[str, Any]] = None + ) -> ResultIterator: """ Create a new result set iterator for a given stanza query. - Arguments: - stanza -- A stanza object to serve as a template for - queries made each iteration. For example, a - basic disco#items query. - interface -- The name of the substanza to which the - result set management stanza should be - appended in the query stanza. For example, - for disco#items queries the interface - 'disco_items' should be used. - recv_interface -- The name of the substanza from which the - result set management stanza should be - read in the result stanza. If unspecified, - it will be set to the same value as the - ``interface`` parameter. - pre_cb -- Callback to run before sending each stanza e.g. - setting the MAM queryid and starting a stanza - collector. - post_cb -- Callback to run after receiving each stanza e.g. - stopping a MAM stanza collector in order to - gather results. - results -- The name of the interface containing the - query results (typically just 'substanzas'). + :param stanza: A stanza object to serve as a template for + queries made each iteration. For example, a + basic disco#items query. + :param interface: The name of the substanza to which the + result set management stanza should be + appended in the query stanza. For example, + for disco#items queries the interface + 'disco_items' should be used. + :param recv_interface: The name of the substanza from which the + result set management stanza should be + read in the result stanza. If unspecified, + it will be set to the same value as the + ``interface`` parameter. + :param pre_cb: Callback to run before sending each stanza e.g. + setting the MAM queryid and starting a stanza + collector. + :param post_cb: Callback to run after receiving each stanza e.g. + stopping a MAM stanza collector in order to + gather results. + :param results: The name of the interface containing the + query results (typically just 'substanzas'). + :param iq_options: Optional dict of parameters for Iq.send """ - return ResultIterator(stanza, interface, results, amount, reverse=reverse, - recv_interface=recv_interface, pre_cb=pre_cb, - post_cb=post_cb) + return ResultIterator(stanza, interface, results, amount, + reverse=reverse, recv_interface=recv_interface, + pre_cb=pre_cb, post_cb=post_cb, + iq_options=iq_options) diff --git a/tests/test_stream_xep_0030.py b/tests/test_stream_xep_0030.py index 8cba8280..4cabfe38 100644 --- a/tests/test_stream_xep_0030.py +++ b/tests/test_stream_xep_0030.py @@ -512,30 +512,28 @@ class TestStreamDisco(SlixTest): self.assertEqual(results, items, "Unexpected items: %s" % results) - ''' - def testGetItemsIterator(self): + def testGetItemsIterators(self): """Test interaction between XEP-0030 and XEP-0059 plugins.""" - - raised_exceptions = [] + iteration_finished = [] + jids_found = set() self.stream_start(mode='client', plugins=['xep_0030', 'xep_0059']) - results = self.xmpp['xep_0030'].get_items(jid='foo@localhost', - node='bar', - iterator=True) - results.amount = 10 - - def run_test(): - try: - results.next() - except StopIteration: - raised_exceptions.append(True) - - t = threading.Thread(name="get_items_iterator", - target=run_test) - t.start() - + async def run_test(): + iterator = await self.xmpp['xep_0030'].get_items( + jid='foo@localhost', + node='bar', + iterator=True + ) + iterator.amount = 10 + async for page in iterator: + for item in page['disco_items']['items']: + jids_found.add(item[0]) + iteration_finished.append(True) + + test_run = self.xmpp.wrap(run_test()) + self.wait_() self.send(""" <iq id="2" type="get" to="foo@localhost"> <query xmlns="http://jabber.org/protocol/disco#items" @@ -549,17 +547,51 @@ class TestStreamDisco(SlixTest): self.recv(""" <iq id="2" type="result" to="tester@localhost"> <query xmlns="http://jabber.org/protocol/disco#items"> + <item jid="a@b" node="1"/> + <item jid="b@b" node="2"/> + <item jid="c@b" node="3"/> + <item jid="d@b" node="4"/> + <item jid="e@b" node="5"/> <set xmlns="http://jabber.org/protocol/rsm"> + <first index='0'>a@b</first> + <last>e@b</last> + <count>10</count> </set> </query> </iq> """) - - t.join() - - self.assertEqual(raised_exceptions, [True], - "StopIteration was not raised: %s" % raised_exceptions) - ''' + self.wait_() + self.send(""" + <iq id="3" type="get" to="foo@localhost"> + <query xmlns="http://jabber.org/protocol/disco#items" + node="bar"> + <set xmlns="http://jabber.org/protocol/rsm"> + <max>10</max> + <after>e@b</after> + </set> + </query> + </iq> + """) + self.recv(""" + <iq id="3" type="result" to="tester@localhost"> + <query xmlns="http://jabber.org/protocol/disco#items"> + <item jid="f@b" node="6"/> + <item jid="g@b" node="7"/> + <item jid="h@b" node="8"/> + <item jid="i@b" node="9"/> + <item jid="j@b" node="10"/> + <set xmlns="http://jabber.org/protocol/rsm"> + <first index='5'>f@b</first> + <last>j@b</last> + <count>10</count> + </set> + </query> + </iq> + """) + expected_jids = {'%s@b' % i for i in 'abcdefghij'} + self.run_coro(test_run) + self.assertEqual(expected_jids, jids_found) + self.assertEqual(iteration_finished, [True]) suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamDisco) |