summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/test_stanza_xep_0030.py504
-rw-r--r--tests/test_stream_xep_0030.py467
2 files changed, 878 insertions, 93 deletions
diff --git a/tests/test_stanza_xep_0030.py b/tests/test_stanza_xep_0030.py
index e367c8d9..2d64988d 100644
--- a/tests/test_stanza_xep_0030.py
+++ b/tests/test_stanza_xep_0030.py
@@ -4,6 +4,11 @@ import sleekxmpp.plugins.xep_0030 as xep_0030
class TestDisco(SleekTest):
+ """
+ Test creating and manipulating the disco#info and
+ disco#items stanzas from the XEP-0030 plugin.
+ """
+
def setUp(self):
register_stanza_plugin(Iq, xep_0030.DiscoInfo)
register_stanza_plugin(Iq, xep_0030.DiscoItems)
@@ -11,11 +16,10 @@ class TestDisco(SleekTest):
def testCreateInfoQueryNoNode(self):
"""Testing disco#info query with no node."""
iq = self.Iq()
- iq['id'] = "0"
iq['disco_info']['node'] = ''
self.check(iq, """
- <iq id="0">
+ <iq>
<query xmlns="http://jabber.org/protocol/disco#info" />
</iq>
""")
@@ -23,23 +27,22 @@ class TestDisco(SleekTest):
def testCreateInfoQueryWithNode(self):
"""Testing disco#info query with a node."""
iq = self.Iq()
- iq['id'] = "0"
iq['disco_info']['node'] = 'foo'
self.check(iq, """
- <iq id="0">
- <query xmlns="http://jabber.org/protocol/disco#info" node="foo" />
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="foo" />
</iq>
""")
- def testCreateInfoQueryNoNode(self):
+ def testCreateItemsQueryNoNode(self):
"""Testing disco#items query with no node."""
iq = self.Iq()
- iq['id'] = "0"
iq['disco_items']['node'] = ''
self.check(iq, """
- <iq id="0">
+ <iq>
<query xmlns="http://jabber.org/protocol/disco#items" />
</iq>
""")
@@ -47,130 +50,467 @@ class TestDisco(SleekTest):
def testCreateItemsQueryWithNode(self):
"""Testing disco#items query with a node."""
iq = self.Iq()
- iq['id'] = "0"
iq['disco_items']['node'] = 'foo'
self.check(iq, """
- <iq id="0">
- <query xmlns="http://jabber.org/protocol/disco#items" node="foo" />
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="foo" />
</iq>
""")
- def testInfoIdentities(self):
+ def testIdentities(self):
"""Testing adding identities to disco#info."""
iq = self.Iq()
- iq['id'] = "0"
- iq['disco_info']['node'] = 'foo'
- iq['disco_info'].addIdentity('conference', 'text', 'Chatroom')
+ iq['disco_info'].add_identity('conference', 'text',
+ name='Chatroom',
+ lang='en')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="conference"
+ type="text"
+ name="Chatroom"
+ xml:lang="en" />
+ </query>
+ </iq>
+ """)
+
+ def testDuplicateIdentities(self):
+ """
+ Test adding multiple copies of the same category
+ and type combination. Only the first identity should
+ be kept.
+ """
+ iq = self.Iq()
+ iq['disco_info'].add_identity('conference', 'text',
+ name='Chatroom')
+ iq['disco_info'].add_identity('conference', 'text',
+ name='MUC')
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="conference"
+ type="text"
+ name="Chatroom" />
+ </query>
+ </iq>
+ """)
+
+ def testDuplicateIdentitiesWithLangs(self):
+ """
+ Test adding multiple copies of the same category,
+ type, and language combination. Only the first identity
+ should be kept.
+ """
+ iq = self.Iq()
+ iq['disco_info'].add_identity('conference', 'text',
+ name='Chatroom',
+ lang='en')
+ iq['disco_info'].add_identity('conference', 'text',
+ name='MUC',
+ lang='en')
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="conference"
+ type="text"
+ name="Chatroom"
+ xml:lang="en" />
+ </query>
+ </iq>
+ """)
+
+ def testRemoveIdentitiesNoLang(self):
+ """Test removing identities from a disco#info stanza."""
+ iq = self.Iq()
+ iq['disco_info'].add_identity('client', 'pc')
+ iq['disco_info'].add_identity('client', 'bot')
+
+ iq['disco_info'].del_identity('client', 'pc')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="client" type="bot" />
+ </query>
+ </iq>
+ """)
+
+ def testRemoveIdentitiesWithLang(self):
+ """Test removing identities from a disco#info stanza."""
+ iq = self.Iq()
+ iq['disco_info'].add_identity('client', 'pc')
+ iq['disco_info'].add_identity('client', 'bot')
+ iq['disco_info'].add_identity('client', 'pc', lang='no')
+
+ iq['disco_info'].del_identity('client', 'pc')
self.check(iq, """
- <iq id="0">
- <query xmlns="http://jabber.org/protocol/disco#info" node="foo">
- <identity category="conference" type="text" name="Chatroom" />
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="client" type="bot" />
+ <identity category="client"
+ type="pc"
+ xml:lang="no" />
</query>
</iq>
""")
- def testInfoFeatures(self):
+ def testRemoveAllIdentitiesNoLang(self):
+ """Test removing all identities from a disco#info stanza."""
+ iq = self.Iq()
+ iq['disco_info'].add_identity('client', 'bot', name='Bot')
+ iq['disco_info'].add_identity('client', 'bot', lang='no')
+ iq['disco_info'].add_identity('client', 'console')
+
+ del iq['disco_info']['identities']
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info" />
+ </iq>
+ """)
+
+ def testRemoveAllIdentitiesWithLang(self):
+ """Test removing all identities from a disco#info stanza."""
+ iq = self.Iq()
+ iq['disco_info'].add_identity('client', 'bot', name='Bot')
+ iq['disco_info'].add_identity('client', 'bot', lang='no')
+ iq['disco_info'].add_identity('client', 'console')
+
+ iq['disco_info'].del_identities(lang='no')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="client" type="bot" name="Bot" />
+ <identity category="client" type="console" />
+ </query>
+ </iq>
+ """)
+
+ def testAddBatchIdentitiesNoLang(self):
+ """Test adding multiple identities at once to a disco#info stanza."""
+ iq = self.Iq()
+ identities = [('client', 'pc', 'no', 'PC Client'),
+ ('client', 'bot', None, 'Bot'),
+ ('client', 'console', None, None)]
+
+ iq['disco_info']['identities'] = identities
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="client"
+ type="pc"
+ xml:lang="no"
+ name="PC Client" />
+ <identity category="client" type="bot" name="Bot" />
+ <identity category="client" type="console" />
+ </query>
+ </iq>
+ """)
+
+
+ def testAddBatchIdentitiesWithLang(self):
+ """Test selectively replacing identities based on language."""
+ iq = self.Iq()
+ iq['disco_info'].add_identity('client', 'pc', lang='no')
+ iq['disco_info'].add_identity('client', 'pc', lang='en')
+ iq['disco_info'].add_identity('client', 'pc', lang='fr')
+
+ identities = [('client', 'bot', 'fr', 'Bot'),
+ ('client', 'bot', 'en', 'Bot')]
+
+ iq['disco_info'].set_identities(identities, lang='fr')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="client" type="pc" xml:lang="no" />
+ <identity category="client" type="pc" xml:lang="en" />
+ <identity category="client"
+ type="bot"
+ xml:lang="fr"
+ name="Bot" />
+ <identity category="client"
+ type="bot"
+ xml:lang="en"
+ name="Bot" />
+ </query>
+ </iq>
+ """)
+
+ def testGetIdentitiesNoLang(self):
+ """Test getting all identities from a disco#info stanza."""
+ iq = self.Iq()
+ iq['disco_info'].add_identity('client', 'pc')
+ iq['disco_info'].add_identity('client', 'pc', lang='no')
+ iq['disco_info'].add_identity('client', 'pc', lang='en')
+ iq['disco_info'].add_identity('client', 'pc', lang='fr')
+
+ expected = set([('client', 'pc', None, None),
+ ('client', 'pc', 'no', None),
+ ('client', 'pc', 'en', None),
+ ('client', 'pc', 'fr', None)])
+ self.failUnless(iq['disco_info']['identities'] == expected,
+ "Identities do not match:\n%s\n%s" % (
+ expected,
+ iq['disco_info']['identities']))
+
+ def testGetIdentitiesWithLang(self):
+ """
+ Test getting all identities of a given
+ lang from a disco#info stanza.
+ """
+ iq = self.Iq()
+ iq['disco_info'].add_identity('client', 'pc')
+ iq['disco_info'].add_identity('client', 'pc', lang='no')
+ iq['disco_info'].add_identity('client', 'pc', lang='en')
+ iq['disco_info'].add_identity('client', 'pc', lang='fr')
+
+ expected = set([('client', 'pc', 'no', None)])
+ result = iq['disco_info'].get_identities(lang='no')
+ self.failUnless(result == expected,
+ "Identities do not match:\n%s\n%s" % (
+ expected, result))
+
+ def testFeatures(self):
"""Testing adding features to disco#info."""
iq = self.Iq()
- iq['id'] = "0"
- iq['disco_info']['node'] = 'foo'
- iq['disco_info'].addFeature('foo')
- iq['disco_info'].addFeature('bar')
+ iq['disco_info'].add_feature('foo')
+ iq['disco_info'].add_feature('bar')
self.check(iq, """
- <iq id="0">
- <query xmlns="http://jabber.org/protocol/disco#info" node="foo">
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
<feature var="foo" />
<feature var="bar" />
</query>
</iq>
""")
- def testItems(self):
- """Testing adding features to disco#info."""
+ def testFeaturesDuplicate(self):
+ """Test adding duplicate features to disco#info."""
iq = self.Iq()
- iq['id'] = "0"
- iq['disco_items']['node'] = 'foo'
- iq['disco_items'].addItem('user@localhost')
- iq['disco_items'].addItem('user@localhost', 'foo')
- iq['disco_items'].addItem('user@localhost', 'bar', 'Testing')
+ iq['disco_info'].add_feature('foo')
+ iq['disco_info'].add_feature('bar')
+ iq['disco_info'].add_feature('foo')
self.check(iq, """
- <iq id="0">
- <query xmlns="http://jabber.org/protocol/disco#items" node="foo">
- <item jid="user@localhost" />
- <item node="foo" jid="user@localhost" />
- <item node="bar" jid="user@localhost" name="Testing" />
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <feature var="foo" />
+ <feature var="bar" />
</query>
</iq>
""")
- def testAddRemoveIdentities(self):
- """Test adding and removing identities to disco#info stanza"""
- ids = [('automation', 'commands', 'AdHoc'),
- ('conference', 'text', 'ChatRoom')]
+ def testRemoveFeature(self):
+ """Test removing a feature from disco#info."""
+ iq = self.Iq()
+ iq['disco_info'].add_feature('foo')
+ iq['disco_info'].add_feature('bar')
+ iq['disco_info'].add_feature('baz')
- info = xep_0030.DiscoInfo()
- info.addIdentity(*ids[0])
- self.failUnless(info.getIdentities() == [ids[0]])
+ iq['disco_info'].del_feature('foo')
- info.delIdentity('automation', 'commands')
- self.failUnless(info.getIdentities() == [])
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <feature var="bar" />
+ <feature var="baz" />
+ </query>
+ </iq>
+ """)
- info.setIdentities(ids)
- self.failUnless(info.getIdentities() == ids)
+ def testGetFeatures(self):
+ """Test getting all features from a disco#info stanza."""
+ iq = self.Iq()
+ iq['disco_info'].add_feature('foo')
+ iq['disco_info'].add_feature('bar')
+ iq['disco_info'].add_feature('baz')
+
+ expected = set(['foo', 'bar', 'baz'])
+ self.failUnless(iq['disco_info']['features'] == expected,
+ "Features do not match:\n%s\n%s" % (
+ expected,
+ iq['disco_info']['features']))
+
+ def testRemoveAllFeatures(self):
+ """Test removing all features from a disco#info stanza."""
+ iq = self.Iq()
+ iq['disco_info'].add_feature('foo')
+ iq['disco_info'].add_feature('bar')
+ iq['disco_info'].add_feature('baz')
- info.delIdentity('automation', 'commands')
- self.failUnless(info.getIdentities() == [ids[1]])
+ del iq['disco_info']['features']
- info.delIdentities()
- self.failUnless(info.getIdentities() == [])
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info" />
+ </iq>
+ """)
- def testAddRemoveFeatures(self):
- """Test adding and removing features to disco#info stanza"""
+ def testAddBatchFeatures(self):
+ """Test adding multiple features at once to a disco#info stanza."""
+ iq = self.Iq()
features = ['foo', 'bar', 'baz']
- info = xep_0030.DiscoInfo()
- info.addFeature(features[0])
- self.failUnless(info.getFeatures() == [features[0]])
+ iq['disco_info']['features'] = features
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <feature var="foo" />
+ <feature var="bar" />
+ <feature var="baz" />
+ </query>
+ </iq>
+ """)
+
+ def testItems(self):
+ """Testing adding features to disco#info."""
+ iq = self.Iq()
+ iq['disco_items'].add_item('user@localhost')
+ iq['disco_items'].add_item('user@localhost', 'foo')
+ iq['disco_items'].add_item('user@localhost', 'bar', name='Testing')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="user@localhost" />
+ <item jid="user@localhost"
+ node="foo" />
+ <item jid="user@localhost"
+ node="bar"
+ name="Testing" />
+ </query>
+ </iq>
+ """)
+
+ def testDuplicateItems(self):
+ """Test adding items with the same JID without any nodes."""
+ iq = self.Iq()
+ iq['disco_items'].add_item('user@localhost', name='First')
+ iq['disco_items'].add_item('user@localhost', name='Second')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="user@localhost" name="First" />
+ </query>
+ </iq>
+ """)
- info.delFeature('foo')
- self.failUnless(info.getFeatures() == [])
- info.setFeatures(features)
- self.failUnless(info.getFeatures() == features)
+ def testDuplicateItemsWithNodes(self):
+ """Test adding items with the same JID/node combination."""
+ iq = self.Iq()
+ iq['disco_items'].add_item('user@localhost',
+ node='foo',
+ name='First')
+ iq['disco_items'].add_item('user@localhost',
+ node='foo',
+ name='Second')
- info.delFeature('bar')
- self.failUnless(info.getFeatures() == ['foo', 'baz'])
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="user@localhost" node="foo" name="First" />
+ </query>
+ </iq>
+ """)
- info.delFeatures()
- self.failUnless(info.getFeatures() == [])
+ def testRemoveItemsNoNode(self):
+ """Test removing items without nodes from a disco#items stanza."""
+ iq = self.Iq()
+ iq['disco_items'].add_item('user@localhost')
+ iq['disco_items'].add_item('user@localhost', node='foo')
+ iq['disco_items'].add_item('test@localhost')
- def testAddRemoveItems(self):
- """Test adding and removing items to disco#items stanza"""
- items = [('user@localhost', None, None),
- ('user@localhost', 'foo', None),
- ('user@localhost', 'bar', 'Test')]
+ iq['disco_items'].del_item('user@localhost')
- info = xep_0030.DiscoItems()
- self.failUnless(True, ""+str(items[0]))
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="user@localhost" node="foo" />
+ <item jid="test@localhost" />
+ </query>
+ </iq>
+ """)
- info.addItem(*(items[0]))
- self.failUnless(info.getItems() == [items[0]], info.getItems())
+ def testRemoveItemsWithNode(self):
+ """Test removing items with nodes from a disco#items stanza."""
+ iq = self.Iq()
+ iq['disco_items'].add_item('user@localhost')
+ iq['disco_items'].add_item('user@localhost', node='foo')
+ iq['disco_items'].add_item('test@localhost')
- info.delItem('user@localhost')
- self.failUnless(info.getItems() == [])
+ iq['disco_items'].del_item('user@localhost', node='foo')
- info.setItems(items)
- self.failUnless(info.getItems() == items)
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="user@localhost" />
+ <item jid="test@localhost" />
+ </query>
+ </iq>
+ """)
- info.delItem('user@localhost', 'foo')
- self.failUnless(info.getItems() == [items[0], items[2]])
+ def testGetItems(self):
+ """Test retrieving items from disco#items stanza."""
+ iq = self.Iq()
+ iq['disco_items'].add_item('user@localhost')
+ iq['disco_items'].add_item('user@localhost', node='foo')
+ iq['disco_items'].add_item('test@localhost',
+ node='bar',
+ name='Tester')
+
+ expected = set([('user@localhost', None, None),
+ ('user@localhost', 'foo', None),
+ ('test@localhost', 'bar', 'Tester')])
+ self.failUnless(iq['disco_items']['items'] == expected,
+ "Items do not match:\n%s\n%s" % (
+ expected,
+ iq['disco_items']['items']))
+
+ def testRemoveAllItems(self):
+ """Test removing all items from a disco#items stanza."""
+ iq = self.Iq()
+ iq['disco_items'].add_item('user@localhost')
+ iq['disco_items'].add_item('user@localhost', node='foo')
+ iq['disco_items'].add_item('test@localhost',
+ node='bar',
+ name='Tester')
- info.delItems()
- self.failUnless(info.getItems() == [])
+ del iq['disco_items']['items']
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items" />
+ </iq>
+ """)
+
+ def testAddBatchItems(self):
+ """Test adding multiple items to a disco#items stanza."""
+ iq = self.Iq()
+ items = [('user@localhost', 'foo', 'Test'),
+ ('test@localhost', None, None),
+ ('other@localhost', None, 'Other')]
+
+ iq['disco_items']['items'] = items
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="user@localhost" node="foo" name="Test" />
+ <item jid="test@localhost" />
+ <item jid="other@localhost" name="Other" />
+ </query>
+ </iq>
+ """)
suite = unittest.TestLoader().loadTestsFromTestCase(TestDisco)
diff --git a/tests/test_stream_xep_0030.py b/tests/test_stream_xep_0030.py
index 5efce788..1f989745 100644
--- a/tests/test_stream_xep_0030.py
+++ b/tests/test_stream_xep_0030.py
@@ -1,8 +1,11 @@
import time
+import threading
+
from sleekxmpp.test import *
class TestStreamDisco(SleekTest):
+
"""
Test using the XEP-0030 plugin.
"""
@@ -10,15 +13,16 @@ class TestStreamDisco(SleekTest):
def tearDown(self):
self.stream_close()
- def testInfoEmptyNode(self):
+ def testInfoEmptyDefaultNode(self):
"""
- Info queries to a node MUST have at least one identity
+ Info query result from an entity MUST have at least one identity
and feature, namely http://jabber.org/protocol/disco#info.
Since the XEP-0030 plugin is loaded, a disco response should
be generated and not an error result.
"""
- self.stream_start(plugins=['xep_0030'])
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
self.recv("""
<iq type="get" id="test">
@@ -32,13 +36,15 @@ class TestStreamDisco(SleekTest):
<identity category="client" type="bot" />
<feature var="http://jabber.org/protocol/disco#info" />
</query>
- </iq>""")
+ </iq>
+ """)
- def testInfoEmptyNodeComponent(self):
+ def testInfoEmptyDefaultNodeComponent(self):
"""
- Test requesting an empty node using a Component.
+ Test requesting an empty, default node using a Component.
"""
self.stream_start(mode='component',
+ jid='tester.localhost',
plugins=['xep_0030'])
self.recv("""
@@ -53,19 +59,22 @@ class TestStreamDisco(SleekTest):
<identity category="component" type="generic" />
<feature var="http://jabber.org/protocol/disco#info" />
</query>
- </iq>""")
+ </iq>
+ """)
def testInfoIncludeNode(self):
"""
Results for info queries directed to a particular node MUST
include the node in the query response.
"""
- self.stream_start(plugins=['xep_0030'])
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
- self.xmpp['xep_0030'].add_node('testing')
+
+ self.xmpp['xep_0030'].static.add_node(node='testing')
self.recv("""
- <iq type="get" id="test">
+ <iq to="tester@localhost" type="get" id="test">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing" />
</iq>
@@ -76,8 +85,444 @@ class TestStreamDisco(SleekTest):
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing">
</query>
- </iq>""",
+ </iq>""",
method='mask')
+ def testItemsIncludeNode(self):
+ """
+ Results for items queries directed to a particular node MUST
+ include the node in the query response.
+ """
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+
+ self.xmpp['xep_0030'].static.add_node(node='testing')
+
+ self.recv("""
+ <iq to="tester@localhost" type="get" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing">
+ </query>
+ </iq>""",
+ method='mask')
+
+ def testDynamicInfoJID(self):
+ """
+ Test using a dynamic info handler for a particular JID.
+ """
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+ def dynamic_jid(jid, node, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoInfo()
+ result['node'] = node
+ result.add_identity('client', 'console', name='Dynamic Info')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_info',
+ jid='tester@localhost',
+ handler=dynamic_jid)
+
+ self.recv("""
+ <iq type="get" id="test" to="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing">
+ <identity category="client"
+ type="console"
+ name="Dynamic Info" />
+ </query>
+ </iq>
+ """)
+
+ def testDynamicInfoGlobal(self):
+ """
+ Test using a dynamic info handler for all requests.
+ """
+ self.stream_start(mode='component',
+ jid='tester.localhost',
+ plugins=['xep_0030'])
+
+ def dynamic_global(jid, node, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoInfo()
+ result['node'] = node
+ result.add_identity('component', 'generic', name='Dynamic Info')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_info',
+ handler=dynamic_global)
+
+ self.recv("""
+ <iq type="get" id="test"
+ to="user@tester.localhost"
+ from="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test"
+ to="tester@localhost"
+ from="user@tester.localhost">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing">
+ <identity category="component"
+ type="generic"
+ name="Dynamic Info" />
+ </query>
+ </iq>
+ """)
+
+ def testOverrideJIDInfoHandler(self):
+ """Test overriding a JID info handler."""
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+ def dynamic_jid(jid, node, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoInfo()
+ result['node'] = node
+ result.add_identity('client', 'console', name='Dynamic Info')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_info',
+ jid='tester@localhost',
+ handler=dynamic_jid)
+
+
+ self.xmpp['xep_0030'].make_static(jid='tester@localhost',
+ node='testing')
+
+ self.xmpp['xep_0030'].add_identity(jid='tester@localhost',
+ node='testing',
+ category='automation',
+ itype='command-list')
+
+ self.recv("""
+ <iq type="get" id="test" to="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing">
+ <identity category="automation"
+ type="command-list" />
+ </query>
+ </iq>
+ """)
+
+ def testOverrideGlobalInfoHandler(self):
+ """Test overriding the global JID info handler."""
+ self.stream_start(mode='component',
+ jid='tester.localhost',
+ plugins=['xep_0030'])
+
+ def dynamic_global(jid, node, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoInfo()
+ result['node'] = node
+ result.add_identity('component', 'generic', name='Dynamic Info')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_info',
+ handler=dynamic_global)
+
+ self.xmpp['xep_0030'].make_static(jid='user@tester.localhost',
+ node='testing')
+
+ self.xmpp['xep_0030'].add_feature(jid='user@tester.localhost',
+ node='testing',
+ feature='urn:xmpp:ping')
+
+ self.recv("""
+ <iq type="get" id="test"
+ to="user@tester.localhost"
+ from="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test"
+ to="tester@localhost"
+ from="user@tester.localhost">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing">
+ <feature var="urn:xmpp:ping" />
+ </query>
+ </iq>
+ """)
+
+ def testGetInfoRemote(self):
+ """
+ Test sending a disco#info query to another entity
+ and receiving the result.
+ """
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+ events = set()
+
+ def handle_disco_info(iq):
+ events.add('disco_info')
+
+
+ self.xmpp.add_event_handler('disco_info', handle_disco_info)
+
+ t = threading.Thread(name="get_info",
+ target=self.xmpp['xep_0030'].get_info,
+ args=('user@localhost', 'foo'))
+ t.start()
+
+ self.send("""
+ <iq type="get" to="user@localhost" id="1">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="foo" />
+ </iq>
+ """)
+
+ self.recv("""
+ <iq type="result" to="tester@localhost" id="1">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="foo">
+ <identity category="client" type="bot" />
+ <feature var="urn:xmpp:ping" />
+ </query>
+ </iq>
+ """)
+
+ # Wait for disco#info request to be received.
+ t.join()
+
+ time.sleep(0.1)
+
+ self.assertEqual(events, set(('disco_info',)),
+ "Disco info event was not triggered: %s" % events)
+
+ def testDynamicItemsJID(self):
+ """
+ Test using a dynamic items handler for a particular JID.
+ """
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+ def dynamic_jid(jid, node, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoItems()
+ result['node'] = node
+ result.add_item('tester@localhost', node='foo', name='JID')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_items',
+ jid='tester@localhost',
+ handler=dynamic_jid)
+
+ self.recv("""
+ <iq type="get" id="test" to="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing">
+ <item jid="tester@localhost" node="foo" name="JID" />
+ </query>
+ </iq>
+ """)
+
+ def testDynamicItemsGlobal(self):
+ """
+ Test using a dynamic items handler for all requests.
+ """
+ self.stream_start(mode='component',
+ jid='tester.localhost',
+ plugins=['xep_0030'])
+
+ def dynamic_global(jid, node, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoItems()
+ result['node'] = node
+ result.add_item('tester@localhost', node='foo', name='Global')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_items',
+ handler=dynamic_global)
+
+ self.recv("""
+ <iq type="get" id="test"
+ to="user@tester.localhost"
+ from="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test"
+ to="tester@localhost"
+ from="user@tester.localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing">
+ <item jid="tester@localhost" node="foo" name="Global" />
+ </query>
+ </iq>
+ """)
+
+ def testOverrideJIDItemsHandler(self):
+ """Test overriding a JID items handler."""
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+ def dynamic_jid(jid, node, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoItems()
+ result['node'] = node
+ result.add_item('tester@localhost', node='foo', name='Global')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_items',
+ jid='tester@localhost',
+ handler=dynamic_jid)
+
+
+ self.xmpp['xep_0030'].make_static(jid='tester@localhost',
+ node='testing')
+
+ self.xmpp['xep_0030'].add_item(jid='tester@localhost',
+ node='testing',
+ ijid='tester@localhost',
+ inode='foo',
+ name='Test')
+
+ self.recv("""
+ <iq type="get" id="test" to="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing">
+ <item jid="tester@localhost" node="foo" name="Test" />
+ </query>
+ </iq>
+ """)
+
+ def testOverrideGlobalItemsHandler(self):
+ """Test overriding the global JID items handler."""
+ self.stream_start(mode='component',
+ jid='tester.localhost',
+ plugins=['xep_0030'])
+
+ def dynamic_global(jid, node, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoItems()
+ result['node'] = node
+ result.add_item('tester.localhost', node='foo', name='Global')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_items',
+ handler=dynamic_global)
+
+ self.xmpp['xep_0030'].make_static(jid='user@tester.localhost',
+ node='testing')
+
+ self.xmpp['xep_0030'].add_item(jid='user@tester.localhost',
+ node='testing',
+ ijid='user@tester.localhost',
+ inode='foo',
+ name='Test')
+
+ self.recv("""
+ <iq type="get" id="test"
+ to="user@tester.localhost"
+ from="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test"
+ to="tester@localhost"
+ from="user@tester.localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing">
+ <item jid="user@tester.localhost" node="foo" name="Test" />
+ </query>
+ </iq>
+ """)
+
+ def testGetItemsRemote(self):
+ """
+ Test sending a disco#items query to another entity
+ and receiving the result.
+ """
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+ events = set()
+ results = set()
+
+ def handle_disco_items(iq):
+ events.add('disco_items')
+ results.update(iq['disco_items']['items'])
+
+
+ self.xmpp.add_event_handler('disco_items', handle_disco_items)
+
+ t = threading.Thread(name="get_items",
+ target=self.xmpp['xep_0030'].get_items,
+ args=('user@localhost', 'foo'))
+ t.start()
+
+ self.send("""
+ <iq type="get" to="user@localhost" id="1">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="foo" />
+ </iq>
+ """)
+
+ self.recv("""
+ <iq type="result" to="tester@localhost" id="1">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="foo">
+ <item jid="user@localhost" node="bar" name="Test" />
+ <item jid="user@localhost" node="baz" name="Test 2" />
+ </query>
+ </iq>
+ """)
+
+ # Wait for disco#items request to be received.
+ t.join()
+
+ time.sleep(0.1)
+
+ items = set([('user@localhost', 'bar', 'Test'),
+ ('user@localhost', 'baz', 'Test 2')])
+ self.assertEqual(events, set(('disco_items',)),
+ "Disco items event was not triggered: %s" % events)
+ self.assertEqual(results, items,
+ "Unexpected items: %s" % results)
+
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamDisco)