summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormathieui <mathieui@mathieui.net>2017-07-21 15:01:13 +0200
committermathieui <mathieui@mathieui.net>2017-07-21 15:01:13 +0200
commitb38e229359679a07d0180a0dc3fd00e2a0b4736b (patch)
tree70156da742abca09d443aa24eb8f902d9ee41440
parent9a563f14258f02c03b67042ad953234ad863e912 (diff)
downloadslixmpp-b38e229359679a07d0180a0dc3fd00e2a0b4736b.tar.gz
slixmpp-b38e229359679a07d0180a0dc3fd00e2a0b4736b.tar.bz2
slixmpp-b38e229359679a07d0180a0dc3fd00e2a0b4736b.tar.xz
slixmpp-b38e229359679a07d0180a0dc3fd00e2a0b4736b.zip
Update RSM for asyncio
- Use an async iterator - Add a "recv_interface" parameter in order to differenciate the stanza we send from the stanza we receive (required for MAM) - Add a pre_cb to run before sending the query stanza - Add a post_cb to run after receiving the result stanza
-rw-r--r--slixmpp/plugins/xep_0059/rsm.py81
1 files changed, 55 insertions, 26 deletions
diff --git a/slixmpp/plugins/xep_0059/rsm.py b/slixmpp/plugins/xep_0059/rsm.py
index a02d94e0..8f7f48f0 100644
--- a/slixmpp/plugins/xep_0059/rsm.py
+++ b/slixmpp/plugins/xep_0059/rsm.py
@@ -19,23 +19,27 @@ from slixmpp.exceptions import XMPPError
log = logging.getLogger(__name__)
-class ResultIterator():
+class ResultIterator:
"""
An iterator for Result Set Managment
"""
def __init__(self, query, interface, results='substanzas', amount=10,
- start=None, reverse=False):
+ start=None, reverse=False, recv_interface=None,
+ pre_cb=None, post_cb=None):
"""
Arguments:
query -- The template query
- interface -- The substanza of the query, for example disco_items
+ interface -- The substanza of the query to send, for example disco_items
+ recv_interface -- The substanza of the query to receive, for example disco_items
results -- The query stanza's interface which provides a
countable list of query results.
amount -- The max amounts of items to request per iteration
start -- From which item id to start
reverse -- If True, page backwards through the results
+ pre_cb -- Callback to run before sending the stanza
+ post_cb -- Callback to run after receiving the reply
Example:
q = Iq()
@@ -49,17 +53,23 @@ class ResultIterator():
self.amount = amount
self.start = start
self.interface = interface
+ if recv_interface:
+ self.recv_interface = recv_interface
+ else:
+ self.recv_interface = interface
+ self.pre_cb = pre_cb
+ self.post_cb = post_cb
self.results = results
self.reverse = reverse
self._stop = False
- def __iter__(self):
+ def __aiter__(self):
return self
- def __next__(self):
- return self.next()
+ async def __anext__(self):
+ return await self.next()
- def next(self):
+ async def next(self):
"""
Return the next page of results from a query.
@@ -68,7 +78,7 @@ class ResultIterator():
of items.
"""
if self._stop:
- raise StopIteration
+ raise StopAsyncIteration
self.query[self.interface]['rsm']['before'] = self.reverse
self.query['id'] = self.query.stream.new_id()
self.query[self.interface]['rsm']['max'] = str(self.amount)
@@ -79,28 +89,32 @@ class ResultIterator():
self.query[self.interface]['rsm']['after'] = self.start
try:
- r = self.query.send(block=True)
-
- if not r[self.interface]['rsm']['first'] and \
- not r[self.interface]['rsm']['last']:
- raise StopIteration
-
- if r[self.interface]['rsm']['count'] and \
- r[self.interface]['rsm']['first_index']:
- count = int(r[self.interface]['rsm']['count'])
- first = int(r[self.interface]['rsm']['first_index'])
- num_items = len(r[self.interface][self.results])
+ if self.pre_cb:
+ self.pre_cb(self.query)
+ r = await self.query.send()
+
+ if not r[self.recv_interface]['rsm']['first'] and \
+ not r[self.recv_interface]['rsm']['last']:
+ raise StopAsyncIteration
+
+ if r[self.recv_interface]['rsm']['count'] and \
+ r[self.recv_interface]['rsm']['first_index']:
+ count = int(r[self.recv_interface]['rsm']['count'])
+ first = int(r[self.recv_interface]['rsm']['first_index'])
+ num_items = len(r[self.recv_interface][self.results])
if first + num_items == count:
self._stop = True
if self.reverse:
- self.start = r[self.interface]['rsm']['first']
+ self.start = r[self.recv_interface]['rsm']['first']
else:
- self.start = r[self.interface]['rsm']['last']
+ self.start = r[self.recv_interface]['rsm']['last']
+ if self.post_cb:
+ self.post_cb(r)
return r
except XMPPError:
- raise StopIteration
+ raise StopAsyncIteration
class XEP_0059(BasePlugin):
@@ -127,7 +141,8 @@ class XEP_0059(BasePlugin):
def session_bind(self, jid):
self.xmpp['xep_0030'].add_feature(Set.namespace)
- def iterate(self, stanza, interface, results='substanzas'):
+ def iterate(self, stanza, interface, results='substanzas',
+ recv_interface=None, pre_cb=None, post_cb=None):
"""
Create a new result set iterator for a given stanza query.
@@ -137,9 +152,23 @@ class XEP_0059(BasePlugin):
basic disco#items query.
interface -- The name of the substanza to which the
result set management stanza should be
- appended. For example, for disco#items queries
- the interface 'disco_items' should be used.
+ appended in the query stanza. For example,
+ for disco#items queries the interface
+ 'disco_items' should be used.
+ recv_interface -- The name of the substanza from which the
+ result set management stanza should be
+ read in the result stanza. If unspecified,
+ it will be set to the same value as the
+ ``interface`` parameter.
+ pre_cb -- Callback to run before sending each stanza e.g.
+ setting the MAM queryid and starting a stanza
+ collector.
+ post_cb -- Callback to run after receiving each stanza e.g.
+ stopping a MAM stanza collector in order to
+ gather results.
results -- The name of the interface containing the
query results (typically just 'substanzas').
"""
- return ResultIterator(stanza, interface, results)
+ return ResultIterator(stanza, interface, results,
+ recv_interface=recv_interface, pre_cb=pre_cb,
+ post_cb=post_cb)