import asyncio
import time
import unittest
from slixmpp.test import SlixTest
class TestStreamDisco(SlixTest):
"""
Test using the XEP-0030 plugin.
"""
def tearDown(self):
self.stream_close()
def testInfoEmptyDefaultNode(self):
"""
Info query result from an entity MUST have at least one identity
and feature, namely http://jabber.org/protocol/disco#info.
Since the XEP-0030 plugin is loaded, a disco response should
be generated and not an error result.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
self.recv("""
""")
self.send("""
""")
def testInfoEmptyDefaultNodeComponent(self):
"""
Test requesting an empty, default node using a Component.
"""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
self.recv("""
""")
self.send("""
""")
def testInfoIncludeNode(self):
"""
Results for info queries directed to a particular node MUST
include the node in the query response.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
self.xmpp['xep_0030'].static.add_node(node='testing')
self.recv("""
""")
self.send("""
""",
method='mask')
def testItemsIncludeNode(self):
"""
Results for items queries directed to a particular node MUST
include the node in the query response.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
self.xmpp['xep_0030'].static.add_node(node='testing')
self.recv("""
""")
self.send("""
""",
method='mask')
def testDynamicInfoJID(self):
"""
Test using a dynamic info handler for a particular JID.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
def dynamic_jid(jid, node, ifrom, iq):
result = self.xmpp['xep_0030'].stanza.DiscoInfo()
result['node'] = node
result.add_identity('client', 'console', name='Dynamic Info')
return result
self.xmpp['xep_0030'].set_node_handler('get_info',
jid='tester@localhost',
handler=dynamic_jid)
self.recv("""
""")
self.send("""
""")
def testDynamicInfoGlobal(self):
"""
Test using a dynamic info handler for all requests.
"""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
def dynamic_global(jid, node, ifrom, iq):
result = self.xmpp['xep_0030'].stanza.DiscoInfo()
result['node'] = node
result.add_identity('component', 'generic', name='Dynamic Info')
return result
self.xmpp['xep_0030'].set_node_handler('get_info',
handler=dynamic_global)
self.recv("""
""")
self.send("""
""")
def testOverrideJIDInfoHandler(self):
"""Test overriding a JID info handler."""
self.stream_start(mode='client',
plugins=['xep_0030'])
def dynamic_jid(jid, node, ifrom, iq):
result = self.xmpp['xep_0030'].stanza.DiscoInfo()
result['node'] = node
result.add_identity('client', 'console', name='Dynamic Info')
return result
self.xmpp['xep_0030'].set_node_handler('get_info',
jid='tester@localhost',
handler=dynamic_jid)
self.xmpp['xep_0030'].restore_defaults(jid='tester@localhost',
node='testing')
self.xmpp['xep_0030'].add_identity(jid='tester@localhost',
node='testing',
category='automation',
itype='command-list')
self.recv("""
""")
self.send("""
""")
def testOverrideGlobalInfoHandler(self):
"""Test overriding the global JID info handler."""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
def dynamic_global(jid, node, ifrom, iq):
result = self.xmpp['xep_0030'].stanza.DiscoInfo()
result['node'] = node
result.add_identity('component', 'generic', name='Dynamic Info')
return result
self.xmpp['xep_0030'].set_node_handler('get_info',
handler=dynamic_global)
self.xmpp['xep_0030'].restore_defaults(jid='user@tester.localhost',
node='testing')
self.xmpp['xep_0030'].add_feature(jid='user@tester.localhost',
node='testing',
feature='urn:xmpp:ping')
self.recv("""
""")
self.send("""
""")
def testGetInfoRemote(self):
"""
Test sending a disco#info query to another entity
and receiving the result.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
events = set()
def handle_disco_info(iq):
events.add('disco_info')
self.xmpp.add_event_handler('disco_info', handle_disco_info)
self.xmpp.wrap(self.xmpp['xep_0030'].get_info('user@localhost', 'foo'))
self.wait_()
self.send("""
""")
self.recv("""
""")
self.assertEqual(events, {'disco_info'},
"Disco info event was not triggered: %s" % events)
def testDynamicItemsJID(self):
"""
Test using a dynamic items handler for a particular JID.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
def dynamic_jid(jid, node, ifrom, iq):
result = self.xmpp['xep_0030'].stanza.DiscoItems()
result['node'] = node
result.add_item('tester@localhost', node='foo', name='JID')
return result
self.xmpp['xep_0030'].set_node_handler('get_items',
jid='tester@localhost',
handler=dynamic_jid)
self.recv("""
""")
self.send("""
""")
def testDynamicItemsGlobal(self):
"""
Test using a dynamic items handler for all requests.
"""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
def dynamic_global(jid, node, ifrom, iq):
result = self.xmpp['xep_0030'].stanza.DiscoItems()
result['node'] = node
result.add_item('tester@localhost', node='foo', name='Global')
return result
self.xmpp['xep_0030'].set_node_handler('get_items',
handler=dynamic_global)
self.recv("""
""")
self.send("""
""")
def testOverrideJIDItemsHandler(self):
"""Test overriding a JID items handler."""
self.stream_start(mode='client',
plugins=['xep_0030'])
def dynamic_jid(jid, node, ifrom, iq):
result = self.xmpp['xep_0030'].stanza.DiscoItems()
result['node'] = node
result.add_item('tester@localhost', node='foo', name='Global')
return result
self.xmpp['xep_0030'].set_node_handler('get_items',
jid='tester@localhost',
handler=dynamic_jid)
self.xmpp['xep_0030'].restore_defaults(jid='tester@localhost',
node='testing')
self.xmpp['xep_0030'].add_item(ijid='tester@localhost',
node='testing',
jid='tester@localhost',
subnode='foo',
name='Test')
self.recv("""
""")
self.send("""
""")
def testOverrideGlobalItemsHandler(self):
"""Test overriding the global JID items handler."""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
def dynamic_global(jid, node, ifrom, iq):
result = self.xmpp['xep_0030'].stanza.DiscoItems()
result['node'] = node
result.add_item('tester.localhost', node='foo', name='Global')
return result
self.xmpp['xep_0030'].set_node_handler('get_items',
handler=dynamic_global)
self.xmpp['xep_0030'].restore_defaults(jid='user@tester.localhost',
node='testing')
self.xmpp['xep_0030'].add_item(ijid='user@tester.localhost',
node='testing',
jid='user@tester.localhost',
subnode='foo',
name='Test')
self.recv("""
""")
self.send("""
""")
def testGetItemsRemote(self):
"""
Test sending a disco#items query to another entity
and receiving the result.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
events = set()
results = set()
def handle_disco_items(iq):
events.add('disco_items')
results.update(iq['disco_items']['items'])
self.xmpp.add_event_handler('disco_items', handle_disco_items)
self.xmpp.wrap(self.xmpp['xep_0030'].get_items('user@localhost', 'foo'))
self.wait_()
self.send("""
""")
self.recv("""
""")
items = {('user@localhost', 'bar', 'Test'),
('user@localhost', 'baz', 'Test 2')}
self.assertEqual(events, {'disco_items'},
"Disco items event was not triggered: %s" % events)
self.assertEqual(results, items,
"Unexpected items: %s" % results)
def testGetItemsIterators(self):
"""Test interaction between XEP-0030 and XEP-0059 plugins."""
iteration_finished = []
jids_found = set()
self.stream_start(mode='client',
plugins=['xep_0030', 'xep_0059'])
async def run_test():
iterator = await self.xmpp['xep_0030'].get_items(
jid='foo@localhost',
node='bar',
iterator=True
)
iterator.amount = 10
async for page in iterator:
for item in page['disco_items']['items']:
jids_found.add(item[0])
iteration_finished.append(True)
test_run = self.xmpp.wrap(run_test())
self.wait_()
self.send("""
10
""")
self.recv("""
a@b
e@b
10
""")
self.wait_()
self.send("""
10
e@b
""")
self.recv("""
f@b
j@b
10
""")
expected_jids = {'%s@b' % i for i in 'abcdefghij'}
self.run_coro(test_run)
self.assertEqual(expected_jids, jids_found)
self.assertEqual(iteration_finished, [True])
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamDisco)