From ab25301953138343d3d295aaa8872de9c5bc2cf9 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Thu, 18 Nov 2010 15:50:45 -0500 Subject: Adding stream tests for XEP-0030. Fixed some errors when responding to disco requests. --- tests/test_stream_xep_0030.py | 83 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 tests/test_stream_xep_0030.py (limited to 'tests/test_stream_xep_0030.py') diff --git a/tests/test_stream_xep_0030.py b/tests/test_stream_xep_0030.py new file mode 100644 index 00000000..5efce788 --- /dev/null +++ b/tests/test_stream_xep_0030.py @@ -0,0 +1,83 @@ +import time +from sleekxmpp.test import * + + +class TestStreamDisco(SleekTest): + """ + Test using the XEP-0030 plugin. + """ + + def tearDown(self): + self.stream_close() + + def testInfoEmptyNode(self): + """ + Info queries to a node MUST have at least one identity + and feature, namely http://jabber.org/protocol/disco#info. + + Since the XEP-0030 plugin is loaded, a disco response should + be generated and not an error result. + """ + self.stream_start(plugins=['xep_0030']) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testInfoEmptyNodeComponent(self): + """ + Test requesting an empty node using a Component. + """ + self.stream_start(mode='component', + plugins=['xep_0030']) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testInfoIncludeNode(self): + """ + Results for info queries directed to a particular node MUST + include the node in the query response. + """ + self.stream_start(plugins=['xep_0030']) + + self.xmpp['xep_0030'].add_node('testing') + + self.recv(""" + + + + """) + + self.send(""" + + + + """, + method='mask') + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamDisco) -- cgit v1.2.3 From f4451fe6b72f7cfb9680ead7a608d5ca1bc7e753 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Thu, 9 Dec 2010 18:57:27 -0500 Subject: First pass at a new XEP-0030 plugin. Now with dynamic node handling goodness. Some things are not quite working yet, in particular: set_items set_info set_identities set_features And still need more unit tests to round things out. --- tests/test_stream_xep_0030.py | 467 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 456 insertions(+), 11 deletions(-) (limited to 'tests/test_stream_xep_0030.py') diff --git a/tests/test_stream_xep_0030.py b/tests/test_stream_xep_0030.py index 5efce788..1f989745 100644 --- a/tests/test_stream_xep_0030.py +++ b/tests/test_stream_xep_0030.py @@ -1,8 +1,11 @@ import time +import threading + from sleekxmpp.test import * class TestStreamDisco(SleekTest): + """ Test using the XEP-0030 plugin. """ @@ -10,15 +13,16 @@ class TestStreamDisco(SleekTest): def tearDown(self): self.stream_close() - def testInfoEmptyNode(self): + def testInfoEmptyDefaultNode(self): """ - Info queries to a node MUST have at least one identity + Info query result from an entity MUST have at least one identity and feature, namely http://jabber.org/protocol/disco#info. Since the XEP-0030 plugin is loaded, a disco response should be generated and not an error result. """ - self.stream_start(plugins=['xep_0030']) + self.stream_start(mode='client', + plugins=['xep_0030']) self.recv(""" @@ -32,13 +36,15 @@ class TestStreamDisco(SleekTest): - """) + + """) - def testInfoEmptyNodeComponent(self): + def testInfoEmptyDefaultNodeComponent(self): """ - Test requesting an empty node using a Component. + Test requesting an empty, default node using a Component. """ self.stream_start(mode='component', + jid='tester.localhost', plugins=['xep_0030']) self.recv(""" @@ -53,19 +59,22 @@ class TestStreamDisco(SleekTest): - """) + + """) def testInfoIncludeNode(self): """ Results for info queries directed to a particular node MUST include the node in the query response. """ - self.stream_start(plugins=['xep_0030']) + self.stream_start(mode='client', + plugins=['xep_0030']) - self.xmpp['xep_0030'].add_node('testing') + + self.xmpp['xep_0030'].static.add_node(node='testing') self.recv(""" - + @@ -76,8 +85,444 @@ class TestStreamDisco(SleekTest): - """, + """, method='mask') + def testItemsIncludeNode(self): + """ + Results for items queries directed to a particular node MUST + include the node in the query response. + """ + self.stream_start(mode='client', + plugins=['xep_0030']) + + + self.xmpp['xep_0030'].static.add_node(node='testing') + + self.recv(""" + + + + """) + + self.send(""" + + + + """, + method='mask') + + def testDynamicInfoJID(self): + """ + Test using a dynamic info handler for a particular JID. + """ + self.stream_start(mode='client', + plugins=['xep_0030']) + + def dynamic_jid(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoInfo() + result['node'] = node + result.add_identity('client', 'console', name='Dynamic Info') + return result + + self.xmpp['xep_0030'].set_node_handler('get_info', + jid='tester@localhost', + handler=dynamic_jid) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testDynamicInfoGlobal(self): + """ + Test using a dynamic info handler for all requests. + """ + self.stream_start(mode='component', + jid='tester.localhost', + plugins=['xep_0030']) + + def dynamic_global(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoInfo() + result['node'] = node + result.add_identity('component', 'generic', name='Dynamic Info') + return result + + self.xmpp['xep_0030'].set_node_handler('get_info', + handler=dynamic_global) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testOverrideJIDInfoHandler(self): + """Test overriding a JID info handler.""" + self.stream_start(mode='client', + plugins=['xep_0030']) + + def dynamic_jid(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoInfo() + result['node'] = node + result.add_identity('client', 'console', name='Dynamic Info') + return result + + self.xmpp['xep_0030'].set_node_handler('get_info', + jid='tester@localhost', + handler=dynamic_jid) + + + self.xmpp['xep_0030'].make_static(jid='tester@localhost', + node='testing') + + self.xmpp['xep_0030'].add_identity(jid='tester@localhost', + node='testing', + category='automation', + itype='command-list') + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testOverrideGlobalInfoHandler(self): + """Test overriding the global JID info handler.""" + self.stream_start(mode='component', + jid='tester.localhost', + plugins=['xep_0030']) + + def dynamic_global(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoInfo() + result['node'] = node + result.add_identity('component', 'generic', name='Dynamic Info') + return result + + self.xmpp['xep_0030'].set_node_handler('get_info', + handler=dynamic_global) + + self.xmpp['xep_0030'].make_static(jid='user@tester.localhost', + node='testing') + + self.xmpp['xep_0030'].add_feature(jid='user@tester.localhost', + node='testing', + feature='urn:xmpp:ping') + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testGetInfoRemote(self): + """ + Test sending a disco#info query to another entity + and receiving the result. + """ + self.stream_start(mode='client', + plugins=['xep_0030']) + + events = set() + + def handle_disco_info(iq): + events.add('disco_info') + + + self.xmpp.add_event_handler('disco_info', handle_disco_info) + + t = threading.Thread(name="get_info", + target=self.xmpp['xep_0030'].get_info, + args=('user@localhost', 'foo')) + t.start() + + self.send(""" + + + + """) + + self.recv(""" + + + + + + + """) + + # Wait for disco#info request to be received. + t.join() + + time.sleep(0.1) + + self.assertEqual(events, set(('disco_info',)), + "Disco info event was not triggered: %s" % events) + + def testDynamicItemsJID(self): + """ + Test using a dynamic items handler for a particular JID. + """ + self.stream_start(mode='client', + plugins=['xep_0030']) + + def dynamic_jid(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoItems() + result['node'] = node + result.add_item('tester@localhost', node='foo', name='JID') + return result + + self.xmpp['xep_0030'].set_node_handler('get_items', + jid='tester@localhost', + handler=dynamic_jid) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testDynamicItemsGlobal(self): + """ + Test using a dynamic items handler for all requests. + """ + self.stream_start(mode='component', + jid='tester.localhost', + plugins=['xep_0030']) + + def dynamic_global(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoItems() + result['node'] = node + result.add_item('tester@localhost', node='foo', name='Global') + return result + + self.xmpp['xep_0030'].set_node_handler('get_items', + handler=dynamic_global) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testOverrideJIDItemsHandler(self): + """Test overriding a JID items handler.""" + self.stream_start(mode='client', + plugins=['xep_0030']) + + def dynamic_jid(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoItems() + result['node'] = node + result.add_item('tester@localhost', node='foo', name='Global') + return result + + self.xmpp['xep_0030'].set_node_handler('get_items', + jid='tester@localhost', + handler=dynamic_jid) + + + self.xmpp['xep_0030'].make_static(jid='tester@localhost', + node='testing') + + self.xmpp['xep_0030'].add_item(jid='tester@localhost', + node='testing', + ijid='tester@localhost', + inode='foo', + name='Test') + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testOverrideGlobalItemsHandler(self): + """Test overriding the global JID items handler.""" + self.stream_start(mode='component', + jid='tester.localhost', + plugins=['xep_0030']) + + def dynamic_global(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoItems() + result['node'] = node + result.add_item('tester.localhost', node='foo', name='Global') + return result + + self.xmpp['xep_0030'].set_node_handler('get_items', + handler=dynamic_global) + + self.xmpp['xep_0030'].make_static(jid='user@tester.localhost', + node='testing') + + self.xmpp['xep_0030'].add_item(jid='user@tester.localhost', + node='testing', + ijid='user@tester.localhost', + inode='foo', + name='Test') + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testGetItemsRemote(self): + """ + Test sending a disco#items query to another entity + and receiving the result. + """ + self.stream_start(mode='client', + plugins=['xep_0030']) + + events = set() + results = set() + + def handle_disco_items(iq): + events.add('disco_items') + results.update(iq['disco_items']['items']) + + + self.xmpp.add_event_handler('disco_items', handle_disco_items) + + t = threading.Thread(name="get_items", + target=self.xmpp['xep_0030'].get_items, + args=('user@localhost', 'foo')) + t.start() + + self.send(""" + + + + """) + + self.recv(""" + + + + + + + """) + + # Wait for disco#items request to be received. + t.join() + + time.sleep(0.1) + + items = set([('user@localhost', 'bar', 'Test'), + ('user@localhost', 'baz', 'Test 2')]) + self.assertEqual(events, set(('disco_items',)), + "Disco items event was not triggered: %s" % events) + self.assertEqual(results, items, + "Unexpected items: %s" % results) + suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamDisco) -- cgit v1.2.3