summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormathieui <mathieui@mathieui.net>2021-03-09 19:25:26 +0100
committermathieui <mathieui@mathieui.net>2021-03-09 19:25:26 +0100
commit1289cf575c7a2c3db4a425b4ca4ee128098d9966 (patch)
treeeca81fdee93beb2e04b175e69fd3c282f57351a9
parenta568363a6c6e102271c400f28628e3d8de335486 (diff)
parent5a3ab2c5c13aea6761680a9ccc6cf53c133c9ac7 (diff)
downloadslixmpp-1289cf575c7a2c3db4a425b4ca4ee128098d9966.tar.gz
slixmpp-1289cf575c7a2c3db4a425b4ca4ee128098d9966.tar.bz2
slixmpp-1289cf575c7a2c3db4a425b4ca4ee128098d9966.tar.xz
slixmpp-1289cf575c7a2c3db4a425b4ca4ee128098d9966.zip
Merge branch 'rsm-fixes' into 'master'
XEP-0059 (RSM) - Some fixes See merge request poezio/slixmpp!145
-rw-r--r--docs/api/plugins/xep_0059.rst3
-rw-r--r--slixmpp/plugins/xep_0059/rsm.py174
-rw-r--r--tests/test_stream_xep_0030.py82
3 files changed, 170 insertions, 89 deletions
diff --git a/docs/api/plugins/xep_0059.rst b/docs/api/plugins/xep_0059.rst
index e9f7db28..0d5ba32e 100644
--- a/docs/api/plugins/xep_0059.rst
+++ b/docs/api/plugins/xep_0059.rst
@@ -8,6 +8,9 @@ XEP-0059: Result Set Management
:members:
:exclude-members: session_bind, plugin_init, plugin_end
+.. autoclass:: ResultIterator
+ :members:
+ :member-order: bysource
Stanza elements
---------------
diff --git a/slixmpp/plugins/xep_0059/rsm.py b/slixmpp/plugins/xep_0059/rsm.py
index 0fd6b2f9..61752af4 100644
--- a/slixmpp/plugins/xep_0059/rsm.py
+++ b/slixmpp/plugins/xep_0059/rsm.py
@@ -5,9 +5,16 @@
# See the file LICENSE for copying permission.
import logging
-import slixmpp
-from slixmpp import Iq
-from slixmpp.plugins import BasePlugin, register_plugin
+from collections.abc import AsyncIterator
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Optional,
+)
+
+from slixmpp.stanza import Iq
+from slixmpp.plugins import BasePlugin
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins.xep_0059 import stanza, Set
from slixmpp.exceptions import XMPPError
@@ -16,41 +23,73 @@ from slixmpp.exceptions import XMPPError
log = logging.getLogger(__name__)
-class ResultIterator:
+class ResultIterator(AsyncIterator):
"""
An iterator for Result Set Management
- """
- def __init__(self, query, interface, results='substanzas', amount=10,
- start=None, reverse=False, recv_interface=None,
- pre_cb=None, post_cb=None):
+ Example:
+
+ .. code-block:: python
+
+ q = Iq()
+ q['to'] = 'pubsub.example.com'
+ q['disco_items']['node'] = 'blog'
+ async for i in ResultIterator(q, 'disco_items', '10'):
+ print(i['disco_items']['items'])
+
+ """
+ #: Template for the RSM query
+ query: Iq
+ #: Substanza of the query to send, e.g. "disco_items"
+ interface: str
+ #: Stanza interface on the query results providing the retrieved
+ #: elements (used to count them)
+ results: str
+ #: From which item id to start
+ start: Optional[str]
+ #: Amount of elements to retrieve for each page
+ amount: int
+ #: If True, page backwards through the results
+ reverse: bool
+ #: Callback to run before sending the stanza
+ pre_cb: Optional[Callable[[Iq], None]]
+ #: Callback to run after receiving the reply
+ post_cb: Optional[Callable[[Iq], None]]
+ #: Optional dict of Iq options (timeout, etc…) for Iq.send()
+ iq_options: Dict[str, Any]
+
+ def __init__(self, query: Iq, interface: str, results: str = 'substanzas',
+ amount: int = 10,
+ start: Optional[str] = None, reverse: bool = False,
+ recv_interface: Optional[str] = None,
+ pre_cb: Optional[Callable[[Iq], None]] = None,
+ post_cb: Optional[Callable[[Iq], None]] = None,
+ iq_options: Optional[Dict[str, Any]] = None):
"""
- Arguments:
- query -- The template query
- interface -- The substanza of the query to send, for example disco_items
- recv_interface -- The substanza of the query to receive, for example disco_items
- results -- The query stanza's interface which provides a
+ :param query: The template query
+ :param interface: The substanza of the query to send, for example
+ disco_items
+ :param recv_interface: The substanza of the query to receive, for
+ example disco_items
+ :param results: The query stanza's interface which provides a
countable list of query results.
- amount -- The max amounts of items to request per iteration
- start -- From which item id to start
- reverse -- If True, page backwards through the results
- pre_cb -- Callback to run before sending the stanza
- post_cb -- Callback to run after receiving the reply
-
- Example:
- q = Iq()
- q['to'] = 'pubsub.example.com'
- q['disco_items']['node'] = 'blog'
- for i in ResultIterator(q, 'disco_items', '10'):
- print i['disco_items']['items']
-
+ :param amount: The max amounts of items to request per iteration
+ :param start: From which item id to start
+ :param reverse: If True, page backwards through the results
+ :param pre_cb: Callback to run before sending the stanza
+ :param post_cb: Callback to run after receiving the reply
+ :param iq_options: Optional dict of parameters for Iq.send
"""
self.query = query
self.amount = amount
self.start = start
+ if iq_options is None:
+ self.iq_options = {}
+ else:
+ self.iq_options = iq_options
self.interface = interface
- if recv_interface:
+ if recv_interface is not None:
self.recv_interface = recv_interface
else:
self.recv_interface = interface
@@ -63,10 +102,10 @@ class ResultIterator:
def __aiter__(self):
return self
- async def __anext__(self):
+ async def __anext__(self) -> Iq:
return await self.next()
- async def next(self):
+ async def next(self) -> Iq:
"""
Return the next page of results from a query.
@@ -76,20 +115,21 @@ class ResultIterator:
"""
if self._stop:
raise StopAsyncIteration
- if self.query[self.interface]['rsm']['before'] is None:
- self.query[self.interface]['rsm']['before'] = self.reverse
self.query['id'] = self.query.stream.new_id()
self.query[self.interface]['rsm']['max'] = str(self.amount)
- if self.start and self.reverse:
- self.query[self.interface]['rsm']['before'] = self.start
- elif self.start:
- self.query[self.interface]['rsm']['after'] = self.start
+ if self.start:
+ if self.reverse:
+ self.query[self.interface]['rsm']['before'] = self.start
+ else:
+ self.query[self.interface]['rsm']['after'] = self.start
+ elif self.reverse:
+ self.query[self.interface]['rsm']['before'] = True
try:
if self.pre_cb:
self.pre_cb(self.query)
- r = await self.query.send()
+ r = await self.query.send(**self.iq_options)
if not r[self.recv_interface]['rsm']['first'] and \
not r[self.recv_interface]['rsm']['last']:
@@ -118,7 +158,7 @@ class ResultIterator:
class XEP_0059(BasePlugin):
"""
- XEP-0050: Result Set Management
+ XEP-0059: Result Set Management
"""
name = 'xep_0059'
@@ -139,34 +179,40 @@ class XEP_0059(BasePlugin):
def session_bind(self, jid):
self.xmpp['xep_0030'].add_feature(Set.namespace)
- def iterate(self, stanza, interface, results='substanzas', amount=10, reverse=False,
- recv_interface=None, pre_cb=None, post_cb=None):
+ def iterate(self, stanza: Iq, interface: str, results: str = 'substanzas',
+ amount: int = 10, reverse: bool = False,
+ recv_interface: Optional[str] = None,
+ pre_cb: Optional[Callable[[Iq], None]] = None,
+ post_cb: Optional[Callable[[Iq], None]] = None,
+ iq_options: Optional[Dict[str, Any]] = None
+ ) -> ResultIterator:
"""
Create a new result set iterator for a given stanza query.
- Arguments:
- stanza -- A stanza object to serve as a template for
- queries made each iteration. For example, a
- basic disco#items query.
- interface -- The name of the substanza to which the
- result set management stanza should be
- appended in the query stanza. For example,
- for disco#items queries the interface
- 'disco_items' should be used.
- recv_interface -- The name of the substanza from which the
- result set management stanza should be
- read in the result stanza. If unspecified,
- it will be set to the same value as the
- ``interface`` parameter.
- pre_cb -- Callback to run before sending each stanza e.g.
- setting the MAM queryid and starting a stanza
- collector.
- post_cb -- Callback to run after receiving each stanza e.g.
- stopping a MAM stanza collector in order to
- gather results.
- results -- The name of the interface containing the
- query results (typically just 'substanzas').
+ :param stanza: A stanza object to serve as a template for
+ queries made each iteration. For example, a
+ basic disco#items query.
+ :param interface: The name of the substanza to which the
+ result set management stanza should be
+ appended in the query stanza. For example,
+ for disco#items queries the interface
+ 'disco_items' should be used.
+ :param recv_interface: The name of the substanza from which the
+ result set management stanza should be
+ read in the result stanza. If unspecified,
+ it will be set to the same value as the
+ ``interface`` parameter.
+ :param pre_cb: Callback to run before sending each stanza e.g.
+ setting the MAM queryid and starting a stanza
+ collector.
+ :param post_cb: Callback to run after receiving each stanza e.g.
+ stopping a MAM stanza collector in order to
+ gather results.
+ :param results: The name of the interface containing the
+ query results (typically just 'substanzas').
+ :param iq_options: Optional dict of parameters for Iq.send
"""
- return ResultIterator(stanza, interface, results, amount, reverse=reverse,
- recv_interface=recv_interface, pre_cb=pre_cb,
- post_cb=post_cb)
+ return ResultIterator(stanza, interface, results, amount,
+ reverse=reverse, recv_interface=recv_interface,
+ pre_cb=pre_cb, post_cb=post_cb,
+ iq_options=iq_options)
diff --git a/tests/test_stream_xep_0030.py b/tests/test_stream_xep_0030.py
index 8cba8280..4cabfe38 100644
--- a/tests/test_stream_xep_0030.py
+++ b/tests/test_stream_xep_0030.py
@@ -512,30 +512,28 @@ class TestStreamDisco(SlixTest):
self.assertEqual(results, items,
"Unexpected items: %s" % results)
- '''
- def testGetItemsIterator(self):
+ def testGetItemsIterators(self):
"""Test interaction between XEP-0030 and XEP-0059 plugins."""
-
- raised_exceptions = []
+ iteration_finished = []
+ jids_found = set()
self.stream_start(mode='client',
plugins=['xep_0030', 'xep_0059'])
- results = self.xmpp['xep_0030'].get_items(jid='foo@localhost',
- node='bar',
- iterator=True)
- results.amount = 10
-
- def run_test():
- try:
- results.next()
- except StopIteration:
- raised_exceptions.append(True)
-
- t = threading.Thread(name="get_items_iterator",
- target=run_test)
- t.start()
-
+ async def run_test():
+ iterator = await self.xmpp['xep_0030'].get_items(
+ jid='foo@localhost',
+ node='bar',
+ iterator=True
+ )
+ iterator.amount = 10
+ async for page in iterator:
+ for item in page['disco_items']['items']:
+ jids_found.add(item[0])
+ iteration_finished.append(True)
+
+ test_run = self.xmpp.wrap(run_test())
+ self.wait_()
self.send("""
<iq id="2" type="get" to="foo@localhost">
<query xmlns="http://jabber.org/protocol/disco#items"
@@ -549,17 +547,51 @@ class TestStreamDisco(SlixTest):
self.recv("""
<iq id="2" type="result" to="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="a@b" node="1"/>
+ <item jid="b@b" node="2"/>
+ <item jid="c@b" node="3"/>
+ <item jid="d@b" node="4"/>
+ <item jid="e@b" node="5"/>
<set xmlns="http://jabber.org/protocol/rsm">
+ <first index='0'>a@b</first>
+ <last>e@b</last>
+ <count>10</count>
</set>
</query>
</iq>
""")
-
- t.join()
-
- self.assertEqual(raised_exceptions, [True],
- "StopIteration was not raised: %s" % raised_exceptions)
- '''
+ self.wait_()
+ self.send("""
+ <iq id="3" type="get" to="foo@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="bar">
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <max>10</max>
+ <after>e@b</after>
+ </set>
+ </query>
+ </iq>
+ """)
+ self.recv("""
+ <iq id="3" type="result" to="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="f@b" node="6"/>
+ <item jid="g@b" node="7"/>
+ <item jid="h@b" node="8"/>
+ <item jid="i@b" node="9"/>
+ <item jid="j@b" node="10"/>
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <first index='5'>f@b</first>
+ <last>j@b</last>
+ <count>10</count>
+ </set>
+ </query>
+ </iq>
+ """)
+ expected_jids = {'%s@b' % i for i in 'abcdefghij'}
+ self.run_coro(test_run)
+ self.assertEqual(expected_jids, jids_found)
+ self.assertEqual(iteration_finished, [True])
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamDisco)