diff options
-rw-r--r-- | itests/test_slow_filters.py | 49 | ||||
-rw-r--r-- | slixmpp/xmlstream/xmlstream.py | 12 |
2 files changed, 57 insertions, 4 deletions
diff --git a/itests/test_slow_filters.py b/itests/test_slow_filters.py new file mode 100644 index 00000000..254a6b03 --- /dev/null +++ b/itests/test_slow_filters.py @@ -0,0 +1,49 @@ +import asyncio +import unittest +from slixmpp.test.integration import SlixIntegration +from slixmpp import Message + + +class TestSlowFilter(SlixIntegration): + async def asyncSetUp(self): + await super().asyncSetUp() + self.add_client( + self.envjid('CI_ACCOUNT1'), + self.envstr('CI_ACCOUNT1_PASSWORD'), + ) + self.add_client( + self.envjid('CI_ACCOUNT2'), + self.envstr('CI_ACCOUNT2_PASSWORD'), + ) + await self.connect_clients() + + async def test_filters(self): + """Make sure filters work""" + def add_a(stanza): + if isinstance(stanza, Message): + stanza['body'] = stanza['body'] + ' a' + return stanza + + async def add_b(stanza): + if isinstance(stanza, Message): + stanza['body'] = stanza['body'] + ' b' + return stanza + + async def add_c_wait(stanza): + if isinstance(stanza, Message): + await asyncio.sleep(2) + stanza['body'] = stanza['body'] + ' c' + return stanza + self.clients[0].add_filter('out', add_a) + self.clients[0].add_filter('out', add_b) + self.clients[0].add_filter('out', add_c_wait) + body = 'Msg body' + msg = self.clients[0].make_message( + mto=self.clients[1].boundjid, mbody=body, + ) + msg.send() + message = await self.clients[1].wait_until('message') + self.assertEqual(message['body'], body + ' a b c') + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestSlowFilter) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 7a94bf50..ab9b781d 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -1053,11 +1053,13 @@ class XMLStream(asyncio.BaseProtocol): """ data = await task self.__slow_tasks.remove(task) - for filter in self.__filters['out']: + if data is None: + return + for filter in self.__filters['out'][:]: if filter in already_used: continue if iscoroutinefunction(filter): - data = await task + data = await filter(data) else: data = filter(data) if data is None: @@ -1093,7 +1095,7 @@ class XMLStream(asyncio.BaseProtocol): timeout=1, ) if pending: - self.slow_tasks.append(task) + self.__slow_tasks.append(task) asyncio.ensure_future( self._continue_slow_send( task, @@ -1101,7 +1103,9 @@ class XMLStream(asyncio.BaseProtocol): ), loop=self.loop, ) - raise Exception("Slow coro, rescheduling") + raise ContinueQueue( + "Slow coroutine, rescheduling filters" + ) data = task.result() else: data = filter(data) |