From f4451fe6b72f7cfb9680ead7a608d5ca1bc7e753 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Thu, 9 Dec 2010 18:57:27 -0500 Subject: First pass at a new XEP-0030 plugin. Now with dynamic node handling goodness. Some things are not quite working yet, in particular: set_items set_info set_identities set_features And still need more unit tests to round things out. --- tests/test_stanza_xep_0030.py | 504 +++++++++++++++++++++++++++++++++++------- tests/test_stream_xep_0030.py | 467 +++++++++++++++++++++++++++++++++++++- 2 files changed, 878 insertions(+), 93 deletions(-) (limited to 'tests') diff --git a/tests/test_stanza_xep_0030.py b/tests/test_stanza_xep_0030.py index e367c8d9..2d64988d 100644 --- a/tests/test_stanza_xep_0030.py +++ b/tests/test_stanza_xep_0030.py @@ -4,6 +4,11 @@ import sleekxmpp.plugins.xep_0030 as xep_0030 class TestDisco(SleekTest): + """ + Test creating and manipulating the disco#info and + disco#items stanzas from the XEP-0030 plugin. + """ + def setUp(self): register_stanza_plugin(Iq, xep_0030.DiscoInfo) register_stanza_plugin(Iq, xep_0030.DiscoItems) @@ -11,11 +16,10 @@ class TestDisco(SleekTest): def testCreateInfoQueryNoNode(self): """Testing disco#info query with no node.""" iq = self.Iq() - iq['id'] = "0" iq['disco_info']['node'] = '' self.check(iq, """ - + """) @@ -23,23 +27,22 @@ class TestDisco(SleekTest): def testCreateInfoQueryWithNode(self): """Testing disco#info query with a node.""" iq = self.Iq() - iq['id'] = "0" iq['disco_info']['node'] = 'foo' self.check(iq, """ - - + + """) - def testCreateInfoQueryNoNode(self): + def testCreateItemsQueryNoNode(self): """Testing disco#items query with no node.""" iq = self.Iq() - iq['id'] = "0" iq['disco_items']['node'] = '' self.check(iq, """ - + """) @@ -47,130 +50,467 @@ class TestDisco(SleekTest): def testCreateItemsQueryWithNode(self): """Testing disco#items query with a node.""" iq = self.Iq() - iq['id'] = "0" iq['disco_items']['node'] = 'foo' self.check(iq, """ - - + + """) - def testInfoIdentities(self): + def testIdentities(self): """Testing adding identities to disco#info.""" iq = self.Iq() - iq['id'] = "0" - iq['disco_info']['node'] = 'foo' - iq['disco_info'].addIdentity('conference', 'text', 'Chatroom') + iq['disco_info'].add_identity('conference', 'text', + name='Chatroom', + lang='en') + + self.check(iq, """ + + + + + + """) + + def testDuplicateIdentities(self): + """ + Test adding multiple copies of the same category + and type combination. Only the first identity should + be kept. + """ + iq = self.Iq() + iq['disco_info'].add_identity('conference', 'text', + name='Chatroom') + iq['disco_info'].add_identity('conference', 'text', + name='MUC') + self.check(iq, """ + + + + + + """) + + def testDuplicateIdentitiesWithLangs(self): + """ + Test adding multiple copies of the same category, + type, and language combination. Only the first identity + should be kept. + """ + iq = self.Iq() + iq['disco_info'].add_identity('conference', 'text', + name='Chatroom', + lang='en') + iq['disco_info'].add_identity('conference', 'text', + name='MUC', + lang='en') + self.check(iq, """ + + + + + + """) + + def testRemoveIdentitiesNoLang(self): + """Test removing identities from a disco#info stanza.""" + iq = self.Iq() + iq['disco_info'].add_identity('client', 'pc') + iq['disco_info'].add_identity('client', 'bot') + + iq['disco_info'].del_identity('client', 'pc') + + self.check(iq, """ + + + + + + """) + + def testRemoveIdentitiesWithLang(self): + """Test removing identities from a disco#info stanza.""" + iq = self.Iq() + iq['disco_info'].add_identity('client', 'pc') + iq['disco_info'].add_identity('client', 'bot') + iq['disco_info'].add_identity('client', 'pc', lang='no') + + iq['disco_info'].del_identity('client', 'pc') self.check(iq, """ - - - + + + + """) - def testInfoFeatures(self): + def testRemoveAllIdentitiesNoLang(self): + """Test removing all identities from a disco#info stanza.""" + iq = self.Iq() + iq['disco_info'].add_identity('client', 'bot', name='Bot') + iq['disco_info'].add_identity('client', 'bot', lang='no') + iq['disco_info'].add_identity('client', 'console') + + del iq['disco_info']['identities'] + + self.check(iq, """ + + + + """) + + def testRemoveAllIdentitiesWithLang(self): + """Test removing all identities from a disco#info stanza.""" + iq = self.Iq() + iq['disco_info'].add_identity('client', 'bot', name='Bot') + iq['disco_info'].add_identity('client', 'bot', lang='no') + iq['disco_info'].add_identity('client', 'console') + + iq['disco_info'].del_identities(lang='no') + + self.check(iq, """ + + + + + + + """) + + def testAddBatchIdentitiesNoLang(self): + """Test adding multiple identities at once to a disco#info stanza.""" + iq = self.Iq() + identities = [('client', 'pc', 'no', 'PC Client'), + ('client', 'bot', None, 'Bot'), + ('client', 'console', None, None)] + + iq['disco_info']['identities'] = identities + + self.check(iq, """ + + + + + + + + """) + + + def testAddBatchIdentitiesWithLang(self): + """Test selectively replacing identities based on language.""" + iq = self.Iq() + iq['disco_info'].add_identity('client', 'pc', lang='no') + iq['disco_info'].add_identity('client', 'pc', lang='en') + iq['disco_info'].add_identity('client', 'pc', lang='fr') + + identities = [('client', 'bot', 'fr', 'Bot'), + ('client', 'bot', 'en', 'Bot')] + + iq['disco_info'].set_identities(identities, lang='fr') + + self.check(iq, """ + + + + + + + + + """) + + def testGetIdentitiesNoLang(self): + """Test getting all identities from a disco#info stanza.""" + iq = self.Iq() + iq['disco_info'].add_identity('client', 'pc') + iq['disco_info'].add_identity('client', 'pc', lang='no') + iq['disco_info'].add_identity('client', 'pc', lang='en') + iq['disco_info'].add_identity('client', 'pc', lang='fr') + + expected = set([('client', 'pc', None, None), + ('client', 'pc', 'no', None), + ('client', 'pc', 'en', None), + ('client', 'pc', 'fr', None)]) + self.failUnless(iq['disco_info']['identities'] == expected, + "Identities do not match:\n%s\n%s" % ( + expected, + iq['disco_info']['identities'])) + + def testGetIdentitiesWithLang(self): + """ + Test getting all identities of a given + lang from a disco#info stanza. + """ + iq = self.Iq() + iq['disco_info'].add_identity('client', 'pc') + iq['disco_info'].add_identity('client', 'pc', lang='no') + iq['disco_info'].add_identity('client', 'pc', lang='en') + iq['disco_info'].add_identity('client', 'pc', lang='fr') + + expected = set([('client', 'pc', 'no', None)]) + result = iq['disco_info'].get_identities(lang='no') + self.failUnless(result == expected, + "Identities do not match:\n%s\n%s" % ( + expected, result)) + + def testFeatures(self): """Testing adding features to disco#info.""" iq = self.Iq() - iq['id'] = "0" - iq['disco_info']['node'] = 'foo' - iq['disco_info'].addFeature('foo') - iq['disco_info'].addFeature('bar') + iq['disco_info'].add_feature('foo') + iq['disco_info'].add_feature('bar') self.check(iq, """ - - + + """) - def testItems(self): - """Testing adding features to disco#info.""" + def testFeaturesDuplicate(self): + """Test adding duplicate features to disco#info.""" iq = self.Iq() - iq['id'] = "0" - iq['disco_items']['node'] = 'foo' - iq['disco_items'].addItem('user@localhost') - iq['disco_items'].addItem('user@localhost', 'foo') - iq['disco_items'].addItem('user@localhost', 'bar', 'Testing') + iq['disco_info'].add_feature('foo') + iq['disco_info'].add_feature('bar') + iq['disco_info'].add_feature('foo') self.check(iq, """ - - - - - + + + + """) - def testAddRemoveIdentities(self): - """Test adding and removing identities to disco#info stanza""" - ids = [('automation', 'commands', 'AdHoc'), - ('conference', 'text', 'ChatRoom')] + def testRemoveFeature(self): + """Test removing a feature from disco#info.""" + iq = self.Iq() + iq['disco_info'].add_feature('foo') + iq['disco_info'].add_feature('bar') + iq['disco_info'].add_feature('baz') - info = xep_0030.DiscoInfo() - info.addIdentity(*ids[0]) - self.failUnless(info.getIdentities() == [ids[0]]) + iq['disco_info'].del_feature('foo') - info.delIdentity('automation', 'commands') - self.failUnless(info.getIdentities() == []) + self.check(iq, """ + + + + + + + """) - info.setIdentities(ids) - self.failUnless(info.getIdentities() == ids) + def testGetFeatures(self): + """Test getting all features from a disco#info stanza.""" + iq = self.Iq() + iq['disco_info'].add_feature('foo') + iq['disco_info'].add_feature('bar') + iq['disco_info'].add_feature('baz') + + expected = set(['foo', 'bar', 'baz']) + self.failUnless(iq['disco_info']['features'] == expected, + "Features do not match:\n%s\n%s" % ( + expected, + iq['disco_info']['features'])) + + def testRemoveAllFeatures(self): + """Test removing all features from a disco#info stanza.""" + iq = self.Iq() + iq['disco_info'].add_feature('foo') + iq['disco_info'].add_feature('bar') + iq['disco_info'].add_feature('baz') - info.delIdentity('automation', 'commands') - self.failUnless(info.getIdentities() == [ids[1]]) + del iq['disco_info']['features'] - info.delIdentities() - self.failUnless(info.getIdentities() == []) + self.check(iq, """ + + + + """) - def testAddRemoveFeatures(self): - """Test adding and removing features to disco#info stanza""" + def testAddBatchFeatures(self): + """Test adding multiple features at once to a disco#info stanza.""" + iq = self.Iq() features = ['foo', 'bar', 'baz'] - info = xep_0030.DiscoInfo() - info.addFeature(features[0]) - self.failUnless(info.getFeatures() == [features[0]]) + iq['disco_info']['features'] = features + + self.check(iq, """ + + + + + + + + """) + + def testItems(self): + """Testing adding features to disco#info.""" + iq = self.Iq() + iq['disco_items'].add_item('user@localhost') + iq['disco_items'].add_item('user@localhost', 'foo') + iq['disco_items'].add_item('user@localhost', 'bar', name='Testing') + + self.check(iq, """ + + + + + + + + """) + + def testDuplicateItems(self): + """Test adding items with the same JID without any nodes.""" + iq = self.Iq() + iq['disco_items'].add_item('user@localhost', name='First') + iq['disco_items'].add_item('user@localhost', name='Second') + + self.check(iq, """ + + + + + + """) - info.delFeature('foo') - self.failUnless(info.getFeatures() == []) - info.setFeatures(features) - self.failUnless(info.getFeatures() == features) + def testDuplicateItemsWithNodes(self): + """Test adding items with the same JID/node combination.""" + iq = self.Iq() + iq['disco_items'].add_item('user@localhost', + node='foo', + name='First') + iq['disco_items'].add_item('user@localhost', + node='foo', + name='Second') - info.delFeature('bar') - self.failUnless(info.getFeatures() == ['foo', 'baz']) + self.check(iq, """ + + + + + + """) - info.delFeatures() - self.failUnless(info.getFeatures() == []) + def testRemoveItemsNoNode(self): + """Test removing items without nodes from a disco#items stanza.""" + iq = self.Iq() + iq['disco_items'].add_item('user@localhost') + iq['disco_items'].add_item('user@localhost', node='foo') + iq['disco_items'].add_item('test@localhost') - def testAddRemoveItems(self): - """Test adding and removing items to disco#items stanza""" - items = [('user@localhost', None, None), - ('user@localhost', 'foo', None), - ('user@localhost', 'bar', 'Test')] + iq['disco_items'].del_item('user@localhost') - info = xep_0030.DiscoItems() - self.failUnless(True, ""+str(items[0])) + self.check(iq, """ + + + + + + + """) - info.addItem(*(items[0])) - self.failUnless(info.getItems() == [items[0]], info.getItems()) + def testRemoveItemsWithNode(self): + """Test removing items with nodes from a disco#items stanza.""" + iq = self.Iq() + iq['disco_items'].add_item('user@localhost') + iq['disco_items'].add_item('user@localhost', node='foo') + iq['disco_items'].add_item('test@localhost') - info.delItem('user@localhost') - self.failUnless(info.getItems() == []) + iq['disco_items'].del_item('user@localhost', node='foo') - info.setItems(items) - self.failUnless(info.getItems() == items) + self.check(iq, """ + + + + + + + """) - info.delItem('user@localhost', 'foo') - self.failUnless(info.getItems() == [items[0], items[2]]) + def testGetItems(self): + """Test retrieving items from disco#items stanza.""" + iq = self.Iq() + iq['disco_items'].add_item('user@localhost') + iq['disco_items'].add_item('user@localhost', node='foo') + iq['disco_items'].add_item('test@localhost', + node='bar', + name='Tester') + + expected = set([('user@localhost', None, None), + ('user@localhost', 'foo', None), + ('test@localhost', 'bar', 'Tester')]) + self.failUnless(iq['disco_items']['items'] == expected, + "Items do not match:\n%s\n%s" % ( + expected, + iq['disco_items']['items'])) + + def testRemoveAllItems(self): + """Test removing all items from a disco#items stanza.""" + iq = self.Iq() + iq['disco_items'].add_item('user@localhost') + iq['disco_items'].add_item('user@localhost', node='foo') + iq['disco_items'].add_item('test@localhost', + node='bar', + name='Tester') - info.delItems() - self.failUnless(info.getItems() == []) + del iq['disco_items']['items'] + self.check(iq, """ + + + + """) + + def testAddBatchItems(self): + """Test adding multiple items to a disco#items stanza.""" + iq = self.Iq() + items = [('user@localhost', 'foo', 'Test'), + ('test@localhost', None, None), + ('other@localhost', None, 'Other')] + + iq['disco_items']['items'] = items + + self.check(iq, """ + + + + + + + + """) suite = unittest.TestLoader().loadTestsFromTestCase(TestDisco) diff --git a/tests/test_stream_xep_0030.py b/tests/test_stream_xep_0030.py index 5efce788..1f989745 100644 --- a/tests/test_stream_xep_0030.py +++ b/tests/test_stream_xep_0030.py @@ -1,8 +1,11 @@ import time +import threading + from sleekxmpp.test import * class TestStreamDisco(SleekTest): + """ Test using the XEP-0030 plugin. """ @@ -10,15 +13,16 @@ class TestStreamDisco(SleekTest): def tearDown(self): self.stream_close() - def testInfoEmptyNode(self): + def testInfoEmptyDefaultNode(self): """ - Info queries to a node MUST have at least one identity + Info query result from an entity MUST have at least one identity and feature, namely http://jabber.org/protocol/disco#info. Since the XEP-0030 plugin is loaded, a disco response should be generated and not an error result. """ - self.stream_start(plugins=['xep_0030']) + self.stream_start(mode='client', + plugins=['xep_0030']) self.recv(""" @@ -32,13 +36,15 @@ class TestStreamDisco(SleekTest): - """) + + """) - def testInfoEmptyNodeComponent(self): + def testInfoEmptyDefaultNodeComponent(self): """ - Test requesting an empty node using a Component. + Test requesting an empty, default node using a Component. """ self.stream_start(mode='component', + jid='tester.localhost', plugins=['xep_0030']) self.recv(""" @@ -53,19 +59,22 @@ class TestStreamDisco(SleekTest): - """) + + """) def testInfoIncludeNode(self): """ Results for info queries directed to a particular node MUST include the node in the query response. """ - self.stream_start(plugins=['xep_0030']) + self.stream_start(mode='client', + plugins=['xep_0030']) - self.xmpp['xep_0030'].add_node('testing') + + self.xmpp['xep_0030'].static.add_node(node='testing') self.recv(""" - + @@ -76,8 +85,444 @@ class TestStreamDisco(SleekTest): - """, + """, method='mask') + def testItemsIncludeNode(self): + """ + Results for items queries directed to a particular node MUST + include the node in the query response. + """ + self.stream_start(mode='client', + plugins=['xep_0030']) + + + self.xmpp['xep_0030'].static.add_node(node='testing') + + self.recv(""" + + + + """) + + self.send(""" + + + + """, + method='mask') + + def testDynamicInfoJID(self): + """ + Test using a dynamic info handler for a particular JID. + """ + self.stream_start(mode='client', + plugins=['xep_0030']) + + def dynamic_jid(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoInfo() + result['node'] = node + result.add_identity('client', 'console', name='Dynamic Info') + return result + + self.xmpp['xep_0030'].set_node_handler('get_info', + jid='tester@localhost', + handler=dynamic_jid) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testDynamicInfoGlobal(self): + """ + Test using a dynamic info handler for all requests. + """ + self.stream_start(mode='component', + jid='tester.localhost', + plugins=['xep_0030']) + + def dynamic_global(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoInfo() + result['node'] = node + result.add_identity('component', 'generic', name='Dynamic Info') + return result + + self.xmpp['xep_0030'].set_node_handler('get_info', + handler=dynamic_global) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testOverrideJIDInfoHandler(self): + """Test overriding a JID info handler.""" + self.stream_start(mode='client', + plugins=['xep_0030']) + + def dynamic_jid(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoInfo() + result['node'] = node + result.add_identity('client', 'console', name='Dynamic Info') + return result + + self.xmpp['xep_0030'].set_node_handler('get_info', + jid='tester@localhost', + handler=dynamic_jid) + + + self.xmpp['xep_0030'].make_static(jid='tester@localhost', + node='testing') + + self.xmpp['xep_0030'].add_identity(jid='tester@localhost', + node='testing', + category='automation', + itype='command-list') + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testOverrideGlobalInfoHandler(self): + """Test overriding the global JID info handler.""" + self.stream_start(mode='component', + jid='tester.localhost', + plugins=['xep_0030']) + + def dynamic_global(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoInfo() + result['node'] = node + result.add_identity('component', 'generic', name='Dynamic Info') + return result + + self.xmpp['xep_0030'].set_node_handler('get_info', + handler=dynamic_global) + + self.xmpp['xep_0030'].make_static(jid='user@tester.localhost', + node='testing') + + self.xmpp['xep_0030'].add_feature(jid='user@tester.localhost', + node='testing', + feature='urn:xmpp:ping') + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testGetInfoRemote(self): + """ + Test sending a disco#info query to another entity + and receiving the result. + """ + self.stream_start(mode='client', + plugins=['xep_0030']) + + events = set() + + def handle_disco_info(iq): + events.add('disco_info') + + + self.xmpp.add_event_handler('disco_info', handle_disco_info) + + t = threading.Thread(name="get_info", + target=self.xmpp['xep_0030'].get_info, + args=('user@localhost', 'foo')) + t.start() + + self.send(""" + + + + """) + + self.recv(""" + + + + + + + """) + + # Wait for disco#info request to be received. + t.join() + + time.sleep(0.1) + + self.assertEqual(events, set(('disco_info',)), + "Disco info event was not triggered: %s" % events) + + def testDynamicItemsJID(self): + """ + Test using a dynamic items handler for a particular JID. + """ + self.stream_start(mode='client', + plugins=['xep_0030']) + + def dynamic_jid(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoItems() + result['node'] = node + result.add_item('tester@localhost', node='foo', name='JID') + return result + + self.xmpp['xep_0030'].set_node_handler('get_items', + jid='tester@localhost', + handler=dynamic_jid) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testDynamicItemsGlobal(self): + """ + Test using a dynamic items handler for all requests. + """ + self.stream_start(mode='component', + jid='tester.localhost', + plugins=['xep_0030']) + + def dynamic_global(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoItems() + result['node'] = node + result.add_item('tester@localhost', node='foo', name='Global') + return result + + self.xmpp['xep_0030'].set_node_handler('get_items', + handler=dynamic_global) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testOverrideJIDItemsHandler(self): + """Test overriding a JID items handler.""" + self.stream_start(mode='client', + plugins=['xep_0030']) + + def dynamic_jid(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoItems() + result['node'] = node + result.add_item('tester@localhost', node='foo', name='Global') + return result + + self.xmpp['xep_0030'].set_node_handler('get_items', + jid='tester@localhost', + handler=dynamic_jid) + + + self.xmpp['xep_0030'].make_static(jid='tester@localhost', + node='testing') + + self.xmpp['xep_0030'].add_item(jid='tester@localhost', + node='testing', + ijid='tester@localhost', + inode='foo', + name='Test') + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testOverrideGlobalItemsHandler(self): + """Test overriding the global JID items handler.""" + self.stream_start(mode='component', + jid='tester.localhost', + plugins=['xep_0030']) + + def dynamic_global(jid, node, iq): + result = self.xmpp['xep_0030'].stanza.DiscoItems() + result['node'] = node + result.add_item('tester.localhost', node='foo', name='Global') + return result + + self.xmpp['xep_0030'].set_node_handler('get_items', + handler=dynamic_global) + + self.xmpp['xep_0030'].make_static(jid='user@tester.localhost', + node='testing') + + self.xmpp['xep_0030'].add_item(jid='user@tester.localhost', + node='testing', + ijid='user@tester.localhost', + inode='foo', + name='Test') + + self.recv(""" + + + + """) + + self.send(""" + + + + + + """) + + def testGetItemsRemote(self): + """ + Test sending a disco#items query to another entity + and receiving the result. + """ + self.stream_start(mode='client', + plugins=['xep_0030']) + + events = set() + results = set() + + def handle_disco_items(iq): + events.add('disco_items') + results.update(iq['disco_items']['items']) + + + self.xmpp.add_event_handler('disco_items', handle_disco_items) + + t = threading.Thread(name="get_items", + target=self.xmpp['xep_0030'].get_items, + args=('user@localhost', 'foo')) + t.start() + + self.send(""" + + + + """) + + self.recv(""" + + + + + + + """) + + # Wait for disco#items request to be received. + t.join() + + time.sleep(0.1) + + items = set([('user@localhost', 'bar', 'Test'), + ('user@localhost', 'baz', 'Test 2')]) + self.assertEqual(events, set(('disco_items',)), + "Disco items event was not triggered: %s" % events) + self.assertEqual(results, items, + "Unexpected items: %s" % results) + suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamDisco) -- cgit v1.2.3