import asyncio import time import unittest from slixmpp.test import SlixTest class TestStreamDisco(SlixTest): """ Test using the XEP-0030 plugin. """ def tearDown(self): self.stream_close() def testInfoEmptyDefaultNode(self): """ Info query result from an entity MUST have at least one identity and feature, namely http://jabber.org/protocol/disco#info. Since the XEP-0030 plugin is loaded, a disco response should be generated and not an error result. """ self.stream_start(mode='client', plugins=['xep_0030']) self.recv(""" """) self.send(""" """) def testInfoEmptyDefaultNodeComponent(self): """ Test requesting an empty, default node using a Component. """ self.stream_start(mode='component', jid='tester.localhost', plugins=['xep_0030']) self.recv(""" """) self.send(""" """) def testInfoIncludeNode(self): """ Results for info queries directed to a particular node MUST include the node in the query response. """ self.stream_start(mode='client', plugins=['xep_0030']) self.xmpp['xep_0030'].static.add_node(node='testing') self.recv(""" """) self.send(""" """, method='mask') def testItemsIncludeNode(self): """ Results for items queries directed to a particular node MUST include the node in the query response. """ self.stream_start(mode='client', plugins=['xep_0030']) self.xmpp['xep_0030'].static.add_node(node='testing') self.recv(""" """) self.send(""" """, method='mask') def testDynamicInfoJID(self): """ Test using a dynamic info handler for a particular JID. """ self.stream_start(mode='client', plugins=['xep_0030']) def dynamic_jid(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoInfo() result['node'] = node result.add_identity('client', 'console', name='Dynamic Info') return result self.xmpp['xep_0030'].set_node_handler('get_info', jid='tester@localhost', handler=dynamic_jid) self.recv(""" """) self.send(""" """) def testDynamicInfoGlobal(self): """ Test using a dynamic info handler for all requests. """ self.stream_start(mode='component', jid='tester.localhost', plugins=['xep_0030']) def dynamic_global(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoInfo() result['node'] = node result.add_identity('component', 'generic', name='Dynamic Info') return result self.xmpp['xep_0030'].set_node_handler('get_info', handler=dynamic_global) self.recv(""" """) self.send(""" """) def testOverrideJIDInfoHandler(self): """Test overriding a JID info handler.""" self.stream_start(mode='client', plugins=['xep_0030']) def dynamic_jid(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoInfo() result['node'] = node result.add_identity('client', 'console', name='Dynamic Info') return result self.xmpp['xep_0030'].set_node_handler('get_info', jid='tester@localhost', handler=dynamic_jid) self.xmpp['xep_0030'].restore_defaults(jid='tester@localhost', node='testing') self.xmpp['xep_0030'].add_identity(jid='tester@localhost', node='testing', category='automation', itype='command-list') self.recv(""" """) self.send(""" """) def testOverrideGlobalInfoHandler(self): """Test overriding the global JID info handler.""" self.stream_start(mode='component', jid='tester.localhost', plugins=['xep_0030']) def dynamic_global(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoInfo() result['node'] = node result.add_identity('component', 'generic', name='Dynamic Info') return result self.xmpp['xep_0030'].set_node_handler('get_info', handler=dynamic_global) self.xmpp['xep_0030'].restore_defaults(jid='user@tester.localhost', node='testing') self.xmpp['xep_0030'].add_feature(jid='user@tester.localhost', node='testing', feature='urn:xmpp:ping') self.recv(""" """) self.send(""" """) def testGetInfoRemote(self): """ Test sending a disco#info query to another entity and receiving the result. """ self.stream_start(mode='client', plugins=['xep_0030']) events = set() def handle_disco_info(iq): events.add('disco_info') self.xmpp.add_event_handler('disco_info', handle_disco_info) self.xmpp.wrap(self.xmpp['xep_0030'].get_info('user@localhost', 'foo')) self.wait_() self.send(""" """) self.recv(""" """) self.assertEqual(events, {'disco_info'}, "Disco info event was not triggered: %s" % events) def testDynamicItemsJID(self): """ Test using a dynamic items handler for a particular JID. """ self.stream_start(mode='client', plugins=['xep_0030']) def dynamic_jid(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoItems() result['node'] = node result.add_item('tester@localhost', node='foo', name='JID') return result self.xmpp['xep_0030'].set_node_handler('get_items', jid='tester@localhost', handler=dynamic_jid) self.recv(""" """) self.send(""" """) def testDynamicItemsGlobal(self): """ Test using a dynamic items handler for all requests. """ self.stream_start(mode='component', jid='tester.localhost', plugins=['xep_0030']) def dynamic_global(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoItems() result['node'] = node result.add_item('tester@localhost', node='foo', name='Global') return result self.xmpp['xep_0030'].set_node_handler('get_items', handler=dynamic_global) self.recv(""" """) self.send(""" """) def testOverrideJIDItemsHandler(self): """Test overriding a JID items handler.""" self.stream_start(mode='client', plugins=['xep_0030']) def dynamic_jid(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoItems() result['node'] = node result.add_item('tester@localhost', node='foo', name='Global') return result self.xmpp['xep_0030'].set_node_handler('get_items', jid='tester@localhost', handler=dynamic_jid) self.xmpp['xep_0030'].restore_defaults(jid='tester@localhost', node='testing') self.xmpp['xep_0030'].add_item(ijid='tester@localhost', node='testing', jid='tester@localhost', subnode='foo', name='Test') self.recv(""" """) self.send(""" """) def testOverrideGlobalItemsHandler(self): """Test overriding the global JID items handler.""" self.stream_start(mode='component', jid='tester.localhost', plugins=['xep_0030']) def dynamic_global(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoItems() result['node'] = node result.add_item('tester.localhost', node='foo', name='Global') return result self.xmpp['xep_0030'].set_node_handler('get_items', handler=dynamic_global) self.xmpp['xep_0030'].restore_defaults(jid='user@tester.localhost', node='testing') self.xmpp['xep_0030'].add_item(ijid='user@tester.localhost', node='testing', jid='user@tester.localhost', subnode='foo', name='Test') self.recv(""" """) self.send(""" """) def testGetItemsRemote(self): """ Test sending a disco#items query to another entity and receiving the result. """ self.stream_start(mode='client', plugins=['xep_0030']) events = set() results = set() def handle_disco_items(iq): events.add('disco_items') results.update(iq['disco_items']['items']) self.xmpp.add_event_handler('disco_items', handle_disco_items) self.xmpp.wrap(self.xmpp['xep_0030'].get_items('user@localhost', 'foo')) self.wait_() self.send(""" """) self.recv(""" """) items = {('user@localhost', 'bar', 'Test'), ('user@localhost', 'baz', 'Test 2')} self.assertEqual(events, {'disco_items'}, "Disco items event was not triggered: %s" % events) self.assertEqual(results, items, "Unexpected items: %s" % results) ''' def testGetItemsIterator(self): """Test interaction between XEP-0030 and XEP-0059 plugins.""" raised_exceptions = [] self.stream_start(mode='client', plugins=['xep_0030', 'xep_0059']) results = self.xmpp['xep_0030'].get_items(jid='foo@localhost', node='bar', iterator=True) results.amount = 10 def run_test(): try: results.next() except StopIteration: raised_exceptions.append(True) t = threading.Thread(name="get_items_iterator", target=run_test) t.start() self.send(""" 10 """) self.recv(""" """) t.join() self.assertEqual(raised_exceptions, [True], "StopIteration was not raised: %s" % raised_exceptions) ''' suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamDisco)