import sys import time import threading from sleekxmpp.test import * class TestStreamDisco(SleekTest): """ Test using the XEP-0030 plugin. """ def tearDown(self): self.stream_close() def testInfoEmptyDefaultNode(self): """ Info query result from an entity MUST have at least one identity and feature, namely http://jabber.org/protocol/disco#info. Since the XEP-0030 plugin is loaded, a disco response should be generated and not an error result. """ self.stream_start(mode='client', plugins=['xep_0030']) self.recv(""" """) self.send(""" """) def testInfoEmptyDefaultNodeComponent(self): """ Test requesting an empty, default node using a Component. """ self.stream_start(mode='component', jid='tester.localhost', plugins=['xep_0030']) self.recv(""" """) self.send(""" """) def testInfoIncludeNode(self): """ Results for info queries directed to a particular node MUST include the node in the query response. """ self.stream_start(mode='client', plugins=['xep_0030']) self.xmpp['xep_0030'].static.add_node(node='testing') self.recv(""" """) self.send(""" """, method='mask') def testItemsIncludeNode(self): """ Results for items queries directed to a particular node MUST include the node in the query response. """ self.stream_start(mode='client', plugins=['xep_0030']) self.xmpp['xep_0030'].static.add_node(node='testing') self.recv(""" """) self.send(""" """, method='mask') def testDynamicInfoJID(self): """ Test using a dynamic info handler for a particular JID. """ self.stream_start(mode='client', plugins=['xep_0030']) def dynamic_jid(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoInfo() result['node'] = node result.add_identity('client', 'console', name='Dynamic Info') return result self.xmpp['xep_0030'].set_node_handler('get_info', jid='tester@localhost', handler=dynamic_jid) self.recv(""" """) self.send(""" """) def testDynamicInfoGlobal(self): """ Test using a dynamic info handler for all requests. """ self.stream_start(mode='component', jid='tester.localhost', plugins=['xep_0030']) def dynamic_global(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoInfo() result['node'] = node result.add_identity('component', 'generic', name='Dynamic Info') return result self.xmpp['xep_0030'].set_node_handler('get_info', handler=dynamic_global) self.recv(""" """) self.send(""" """) def testOverrideJIDInfoHandler(self): """Test overriding a JID info handler.""" self.stream_start(mode='client', plugins=['xep_0030']) def dynamic_jid(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoInfo() result['node'] = node result.add_identity('client', 'console', name='Dynamic Info') return result self.xmpp['xep_0030'].set_node_handler('get_info', jid='tester@localhost', handler=dynamic_jid) self.xmpp['xep_0030'].make_static(jid='tester@localhost', node='testing') self.xmpp['xep_0030'].add_identity(jid='tester@localhost', node='testing', category='automation', itype='command-list') self.recv(""" """) self.send(""" """) def testOverrideGlobalInfoHandler(self): """Test overriding the global JID info handler.""" self.stream_start(mode='component', jid='tester.localhost', plugins=['xep_0030']) def dynamic_global(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoInfo() result['node'] = node result.add_identity('component', 'generic', name='Dynamic Info') return result self.xmpp['xep_0030'].set_node_handler('get_info', handler=dynamic_global) self.xmpp['xep_0030'].make_static(jid='user@tester.localhost', node='testing') self.xmpp['xep_0030'].add_feature(jid='user@tester.localhost', node='testing', feature='urn:xmpp:ping') self.recv(""" """) self.send(""" """) def testGetInfoRemote(self): """ Test sending a disco#info query to another entity and receiving the result. """ self.stream_start(mode='client', plugins=['xep_0030']) events = set() def handle_disco_info(iq): events.add('disco_info') self.xmpp.add_event_handler('disco_info', handle_disco_info) t = threading.Thread(name="get_info", target=self.xmpp['xep_0030'].get_info, args=('user@localhost', 'foo')) t.start() self.send(""" """) self.recv(""" """) # Wait for disco#info request to be received. t.join() time.sleep(0.1) self.assertEqual(events, set(('disco_info',)), "Disco info event was not triggered: %s" % events) def testDynamicItemsJID(self): """ Test using a dynamic items handler for a particular JID. """ self.stream_start(mode='client', plugins=['xep_0030']) def dynamic_jid(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoItems() result['node'] = node result.add_item('tester@localhost', node='foo', name='JID') return result self.xmpp['xep_0030'].set_node_handler('get_items', jid='tester@localhost', handler=dynamic_jid) self.recv(""" """) self.send(""" """) def testDynamicItemsGlobal(self): """ Test using a dynamic items handler for all requests. """ self.stream_start(mode='component', jid='tester.localhost', plugins=['xep_0030']) def dynamic_global(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoItems() result['node'] = node result.add_item('tester@localhost', node='foo', name='Global') return result self.xmpp['xep_0030'].set_node_handler('get_items', handler=dynamic_global) self.recv(""" """) self.send(""" """) def testOverrideJIDItemsHandler(self): """Test overriding a JID items handler.""" self.stream_start(mode='client', plugins=['xep_0030']) def dynamic_jid(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoItems() result['node'] = node result.add_item('tester@localhost', node='foo', name='Global') return result self.xmpp['xep_0030'].set_node_handler('get_items', jid='tester@localhost', handler=dynamic_jid) self.xmpp['xep_0030'].make_static(jid='tester@localhost', node='testing') self.xmpp['xep_0030'].add_item(ijid='tester@localhost', node='testing', jid='tester@localhost', subnode='foo', name='Test') self.recv(""" """) self.send(""" """) def testOverrideGlobalItemsHandler(self): """Test overriding the global JID items handler.""" self.stream_start(mode='component', jid='tester.localhost', plugins=['xep_0030']) def dynamic_global(jid, node, ifrom, iq): result = self.xmpp['xep_0030'].stanza.DiscoItems() result['node'] = node result.add_item('tester.localhost', node='foo', name='Global') return result self.xmpp['xep_0030'].set_node_handler('get_items', handler=dynamic_global) self.xmpp['xep_0030'].make_static(jid='user@tester.localhost', node='testing') self.xmpp['xep_0030'].add_item(ijid='user@tester.localhost', node='testing', jid='user@tester.localhost', subnode='foo', name='Test') self.recv(""" """) self.send(""" """) def testGetItemsRemote(self): """ Test sending a disco#items query to another entity and receiving the result. """ self.stream_start(mode='client', plugins=['xep_0030']) events = set() results = set() def handle_disco_items(iq): events.add('disco_items') results.update(iq['disco_items']['items']) self.xmpp.add_event_handler('disco_items', handle_disco_items) t = threading.Thread(name="get_items", target=self.xmpp['xep_0030'].get_items, args=('user@localhost', 'foo')) t.start() self.send(""" """) self.recv(""" """) # Wait for disco#items request to be received. t.join() time.sleep(0.1) items = set([('user@localhost', 'bar', 'Test'), ('user@localhost', 'baz', 'Test 2')]) self.assertEqual(events, set(('disco_items',)), "Disco items event was not triggered: %s" % events) self.assertEqual(results, items, "Unexpected items: %s" % results) def testGetItemsIterator(self): """Test interaction between XEP-0030 and XEP-0059 plugins.""" raised_exceptions = [] self.stream_start(mode='client', plugins=['xep_0030', 'xep_0059']) results = self.xmpp['xep_0030'].get_items(jid='foo@localhost', node='bar', iterator=True) results.amount = 10 def run_test(): try: results.next() except StopIteration: raised_exceptions.append(True) t = threading.Thread(name="get_items_iterator", target=run_test) t.start() self.send(""" 10 """) self.recv(""" """) t.join() self.assertEqual(raised_exceptions, [True], "StopIteration was not raised: %s" % raised_exceptions) suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamDisco)