summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/__init__.py0
-rw-r--r--tests/live_multiple_streams.py57
-rw-r--r--tests/live_test.py100
-rw-r--r--tests/test_events.py95
-rw-r--r--tests/test_jid.py141
-rw-r--r--tests/test_overall.py29
-rw-r--r--tests/test_stanza_base.py79
-rw-r--r--tests/test_stanza_element.py746
-rw-r--r--tests/test_stanza_error.py81
-rw-r--r--tests/test_stanza_gmail.py88
-rw-r--r--tests/test_stanza_iq.py90
-rw-r--r--tests/test_stanza_message.py57
-rw-r--r--tests/test_stanza_presence.py67
-rw-r--r--tests/test_stanza_roster.py88
-rw-r--r--tests/test_stanza_xep_0004.py198
-rw-r--r--tests/test_stanza_xep_0009.py288
-rw-r--r--tests/test_stanza_xep_0030.py516
-rw-r--r--tests/test_stanza_xep_0033.py111
-rw-r--r--tests/test_stanza_xep_0050.py114
-rw-r--r--tests/test_stanza_xep_0059.py106
-rw-r--r--tests/test_stanza_xep_0060.py575
-rw-r--r--tests/test_stanza_xep_0085.py41
-rw-r--r--tests/test_stream.py79
-rw-r--r--tests/test_stream_exceptions.py274
-rw-r--r--tests/test_stream_filters.py88
-rw-r--r--tests/test_stream_handlers.py201
-rw-r--r--tests/test_stream_presence.py380
-rw-r--r--tests/test_stream_roster.py231
-rw-r--r--tests/test_stream_xep_0030.py576
-rw-r--r--tests/test_stream_xep_0050.py726
-rw-r--r--tests/test_stream_xep_0059.py162
-rw-r--r--tests/test_stream_xep_0060.py794
-rw-r--r--tests/test_stream_xep_0066.py44
-rw-r--r--tests/test_stream_xep_0085.py59
-rw-r--r--tests/test_stream_xep_0092.py69
-rw-r--r--tests/test_stream_xep_0128.py105
-rw-r--r--tests/test_stream_xep_0249.py63
-rw-r--r--tests/test_tostring.py132
38 files changed, 7650 insertions, 0 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/__init__.py
diff --git a/tests/live_multiple_streams.py b/tests/live_multiple_streams.py
new file mode 100644
index 00000000..69ee74c4
--- /dev/null
+++ b/tests/live_multiple_streams.py
@@ -0,0 +1,57 @@
+import logging
+
+from sleekxmpp.test import *
+
+
+class TestMultipleStreams(SleekTest):
+ """
+ Test that we can test a live stanza stream.
+ """
+
+ def setUp(self):
+ self.client1 = SleekTest()
+ self.client2 = SleekTest()
+
+ def tearDown(self):
+ self.client1.stream_close()
+ self.client2.stream_close()
+
+ def testMultipleStreams(self):
+ """Test that we can interact with multiple live ClientXMPP instance."""
+
+ client1 = self.client1
+ client2 = self.client2
+
+ client1.stream_start(mode='client',
+ socket='live',
+ skip=True,
+ jid='user@localhost/test1',
+ password='user')
+ client2.stream_start(mode='client',
+ socket='live',
+ skip=True,
+ jid='user@localhost/test2',
+ password='user')
+
+ client1.xmpp.send_message(mto='user@localhost/test2',
+ mbody='test')
+
+ client1.send('message@body=test', method='stanzapath')
+ client2.recv('message@body=test', method='stanzapath')
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestMultipleStreams)
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.DEBUG,
+ format='%(levelname)-8s %(message)s')
+
+ tests = unittest.TestSuite([suite])
+ result = unittest.TextTestRunner(verbosity=2).run(tests)
+ test_ns = 'http://andyet.net/protocol/tests'
+ print("<tests xmlns='%s' %s %s %s %s />" % (
+ test_ns,
+ 'ran="%s"' % result.testsRun,
+ 'errors="%s"' % len(result.errors),
+ 'fails="%s"' % len(result.failures),
+ 'success="%s"' % result.wasSuccessful()))
diff --git a/tests/live_test.py b/tests/live_test.py
new file mode 100644
index 00000000..b71930af
--- /dev/null
+++ b/tests/live_test.py
@@ -0,0 +1,100 @@
+import logging
+
+from sleekxmpp.test import *
+
+
+class TestLiveStream(SleekTest):
+ """
+ Test that we can test a live stanza stream.
+ """
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testClientConnection(self):
+ """Test that we can interact with a live ClientXMPP instance."""
+ self.stream_start(mode='client',
+ socket='live',
+ skip=False,
+ jid='user@localhost/test',
+ password='user')
+
+ # Use sid=None to ignore any id sent by the server since
+ # we can't know it in advance.
+ self.recv_header(sfrom='localhost', sid=None)
+ self.send_header(sto='localhost')
+ self.recv_feature("""
+ <stream:features>
+ <starttls xmlns="urn:ietf:params:xml:ns:xmpp-tls" />
+ <mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
+ <mechanism>DIGEST-MD5</mechanism>
+ <mechanism>PLAIN</mechanism>
+ </mechanisms>
+ </stream:features>
+ """)
+ self.send_feature("""
+ <starttls xmlns="urn:ietf:params:xml:ns:xmpp-tls" />
+ """)
+ self.recv_feature("""
+ <proceed xmlns="urn:ietf:params:xml:ns:xmpp-tls" />
+ """)
+ self.send_header(sto='localhost')
+ self.recv_header(sfrom='localhost', sid=None)
+ self.recv_feature("""
+ <stream:features>
+ <mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
+ <mechanism>DIGEST-MD5</mechanism>
+ <mechanism>PLAIN</mechanism>
+ </mechanisms>
+ </stream:features>
+ """)
+ self.send_feature("""
+ <auth xmlns="urn:ietf:params:xml:ns:xmpp-sasl"
+ mechanism="PLAIN">AHVzZXIAdXNlcg==</auth>
+ """)
+ self.recv_feature("""
+ <success xmlns="urn:ietf:params:xml:ns:xmpp-sasl" />
+ """)
+ self.send_header(sto='localhost')
+ self.recv_header(sfrom='localhost', sid=None)
+ self.recv_feature("""
+ <stream:features>
+ <bind xmlns="urn:ietf:params:xml:ns:xmpp-bind" />
+ <session xmlns="urn:ietf:params:xml:ns:xmpp-session" />
+ </stream:features>
+ """)
+
+ # Should really use send, but our Iq stanza objects
+ # can't handle bind element payloads yet.
+ self.send_feature("""
+ <iq type="set" id="1">
+ <bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">
+ <resource>test</resource>
+ </bind>
+ </iq>
+ """)
+ self.recv_feature("""
+ <iq type="result" id="1">
+ <bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">
+ <jid>user@localhost/test</jid>
+ </bind>
+ </iq>
+ """)
+ self.stream_close()
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestLiveStream)
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.DEBUG,
+ format='%(levelname)-8s %(message)s')
+
+ tests = unittest.TestSuite([suite])
+ result = unittest.TextTestRunner(verbosity=2).run(tests)
+ test_ns = 'http://andyet.net/protocol/tests'
+ print("<tests xmlns='%s' %s %s %s %s />" % (
+ test_ns,
+ 'ran="%s"' % result.testsRun,
+ 'errors="%s"' % len(result.errors),
+ 'fails="%s"' % len(result.failures),
+ 'success="%s"' % result.wasSuccessful()))
diff --git a/tests/test_events.py b/tests/test_events.py
new file mode 100644
index 00000000..fb34be30
--- /dev/null
+++ b/tests/test_events.py
@@ -0,0 +1,95 @@
+import time
+from sleekxmpp.test import *
+
+
+class TestEvents(SleekTest):
+
+ def setUp(self):
+ self.stream_start()
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testEventHappening(self):
+ """Test handler working"""
+ happened = []
+
+ def handletestevent(event):
+ happened.append(True)
+
+ self.xmpp.add_event_handler("test_event", handletestevent)
+ self.xmpp.event("test_event")
+ self.xmpp.event("test_event")
+
+ # Give the event queue time to process.
+ time.sleep(0.1)
+
+ msg = "Event was not triggered the correct number of times: %s"
+ self.failUnless(happened == [True, True], msg)
+
+ def testDelEvent(self):
+ """Test handler working, then deleted and not triggered"""
+ happened = []
+
+ def handletestevent(event):
+ happened.append(True)
+
+ self.xmpp.add_event_handler("test_event", handletestevent)
+ self.xmpp.event("test_event", {})
+
+ self.xmpp.del_event_handler("test_event", handletestevent)
+
+ # Should not trigger because it was deleted
+ self.xmpp.event("test_event", {})
+
+ # Give the event queue time to process.
+ time.sleep(0.1)
+
+ msg = "Event was not triggered the correct number of times: %s"
+ self.failUnless(happened == [True], msg % happened)
+
+ def testAddDelAddEvent(self):
+ """Test adding, then removing, then adding an event handler."""
+ happened = []
+
+ def handletestevent(event):
+ happened.append(True)
+
+ self.xmpp.add_event_handler("test_event", handletestevent)
+ self.xmpp.event("test_event", {})
+
+ self.xmpp.del_event_handler("test_event", handletestevent)
+ # Should not trigger because it was deleted
+ self.xmpp.event("test_event", {})
+
+ self.xmpp.add_event_handler("test_event", handletestevent)
+ self.xmpp.event("test_event", {})
+
+ # Give the event queue time to process.
+ time.sleep(0.1)
+
+ msg = "Event was not triggered the correct number of times: %s"
+ self.failUnless(happened == [True, True], msg % happened)
+
+ def testDisposableEvent(self):
+ """Test disposable handler working, then not being triggered again."""
+ happened = []
+
+ def handletestevent(event):
+ happened.append(True)
+
+ self.xmpp.add_event_handler("test_event", handletestevent,
+ disposable=True)
+ self.xmpp.event("test_event", {})
+
+ # Should not trigger because it was deleted
+ self.xmpp.event("test_event", {})
+
+ # Give the event queue time to process.
+ time.sleep(0.1)
+
+ msg = "Event was not triggered the correct number of times: %s"
+ self.failUnless(happened == [True], msg % happened)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestEvents)
diff --git a/tests/test_jid.py b/tests/test_jid.py
new file mode 100644
index 00000000..ef1145d3
--- /dev/null
+++ b/tests/test_jid.py
@@ -0,0 +1,141 @@
+from sleekxmpp.test import *
+from sleekxmpp.xmlstream.jid import JID
+
+
+class TestJIDClass(SleekTest):
+
+ """Verify that the JID class can parse and manipulate JIDs."""
+
+ def testJIDFromFull(self):
+ """Test using JID of the form 'user@server/resource/with/slashes'."""
+ self.check_jid(JID('user@someserver/some/resource'),
+ 'user',
+ 'someserver',
+ 'some/resource',
+ 'user@someserver',
+ 'user@someserver/some/resource',
+ 'user@someserver/some/resource')
+
+ def testJIDchange(self):
+ """Test changing JID of the form 'user@server/resource/with/slashes'"""
+ j = JID('user1@someserver1/some1/resource1')
+ j.user = 'user'
+ j.domain = 'someserver'
+ j.resource = 'some/resource'
+ self.check_jid(j,
+ 'user',
+ 'someserver',
+ 'some/resource',
+ 'user@someserver',
+ 'user@someserver/some/resource',
+ 'user@someserver/some/resource')
+
+ def testJIDaliases(self):
+ """Test changing JID using aliases for domain."""
+ j = JID('user@someserver/resource')
+ j.server = 'anotherserver'
+ self.check_jid(j, domain='anotherserver')
+ j.host = 'yetanother'
+ self.check_jid(j, domain='yetanother')
+
+ def testJIDSetFullWithUser(self):
+ """Test setting the full JID with a user portion."""
+ j = JID('user@domain/resource')
+ j.full = 'otheruser@otherdomain/otherresource'
+ self.check_jid(j,
+ 'otheruser',
+ 'otherdomain',
+ 'otherresource',
+ 'otheruser@otherdomain',
+ 'otheruser@otherdomain/otherresource',
+ 'otheruser@otherdomain/otherresource')
+
+ def testJIDFullNoUserWithResource(self):
+ """
+ Test setting the full JID without a user
+ portion and with a resource.
+ """
+ j = JID('user@domain/resource')
+ j.full = 'otherdomain/otherresource'
+ self.check_jid(j,
+ '',
+ 'otherdomain',
+ 'otherresource',
+ 'otherdomain',
+ 'otherdomain/otherresource',
+ 'otherdomain/otherresource')
+
+ def testJIDFullNoUserNoResource(self):
+ """
+ Test setting the full JID without a user
+ portion and without a resource.
+ """
+ j = JID('user@domain/resource')
+ j.full = 'otherdomain'
+ self.check_jid(j,
+ '',
+ 'otherdomain',
+ '',
+ 'otherdomain',
+ 'otherdomain',
+ 'otherdomain')
+
+ def testJIDBareUser(self):
+ """Test setting the bare JID with a user."""
+ j = JID('user@domain/resource')
+ j.bare = 'otheruser@otherdomain'
+ self.check_jid(j,
+ 'otheruser',
+ 'otherdomain',
+ 'resource',
+ 'otheruser@otherdomain',
+ 'otheruser@otherdomain/resource',
+ 'otheruser@otherdomain/resource')
+
+ def testJIDBareNoUser(self):
+ """Test setting the bare JID without a user."""
+ j = JID('user@domain/resource')
+ j.bare = 'otherdomain'
+ self.check_jid(j,
+ '',
+ 'otherdomain',
+ 'resource',
+ 'otherdomain',
+ 'otherdomain/resource',
+ 'otherdomain/resource')
+
+ def testJIDNoResource(self):
+ """Test using JID of the form 'user@domain'."""
+ self.check_jid(JID('user@someserver'),
+ 'user',
+ 'someserver',
+ '',
+ 'user@someserver',
+ 'user@someserver',
+ 'user@someserver')
+
+ def testJIDNoUser(self):
+ """Test JID of the form 'component.domain.tld'."""
+ self.check_jid(JID('component.someserver'),
+ '',
+ 'component.someserver',
+ '',
+ 'component.someserver',
+ 'component.someserver',
+ 'component.someserver')
+
+ def testJIDEquality(self):
+ """Test that JIDs with the same content are equal."""
+ jid1 = JID('user@domain/resource')
+ jid2 = JID('user@domain/resource')
+ self.assertTrue(jid1 == jid2, "Same JIDs are not considered equal")
+ self.assertFalse(jid1 != jid2, "Same JIDs are considered not equal")
+
+ def testJIDInequality(self):
+ jid1 = JID('user@domain/resource')
+ jid2 = JID('otheruser@domain/resource')
+ self.assertFalse(jid1 == jid2, "Same JIDs are not considered equal")
+ self.assertTrue(jid1 != jid2, "Same JIDs are considered not equal")
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestJIDClass)
diff --git a/tests/test_overall.py b/tests/test_overall.py
new file mode 100644
index 00000000..05fdc6d8
--- /dev/null
+++ b/tests/test_overall.py
@@ -0,0 +1,29 @@
+import os
+import re
+import sys
+import unittest
+import tabnanny
+import compileall
+
+class TestOverall(unittest.TestCase):
+
+ """
+ Test overall package health by compiling and checking
+ code style.
+ """
+
+ def testModules(self):
+ """Testing all modules by compiling them"""
+ src = '.%ssleekxmpp' % os.sep
+ if sys.version_info < (3, 0):
+ rx = re.compile('/[.]svn')
+ else:
+ rx = re.compile('/[.]svn|.*26.*')
+ self.failUnless(compileall.compile_dir(src, rx=rx, quiet=True))
+
+ def testTabNanny(self):
+ """Testing that indentation is consistent"""
+ self.failIf(tabnanny.check('..%ssleekxmpp' % os.sep))
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestOverall)
diff --git a/tests/test_stanza_base.py b/tests/test_stanza_base.py
new file mode 100644
index 00000000..9bd326b6
--- /dev/null
+++ b/tests/test_stanza_base.py
@@ -0,0 +1,79 @@
+from sleekxmpp.test import *
+from sleekxmpp.xmlstream.stanzabase import ET, StanzaBase
+
+
+class TestStanzaBase(SleekTest):
+
+ def testTo(self):
+ """Test the 'to' interface of StanzaBase."""
+ stanza = StanzaBase()
+ stanza['to'] = 'user@example.com'
+ self.failUnless(str(stanza['to']) == 'user@example.com',
+ "Setting and retrieving stanza 'to' attribute did not work.")
+
+ def testFrom(self):
+ """Test the 'from' interface of StanzaBase."""
+ stanza = StanzaBase()
+ stanza['from'] = 'user@example.com'
+ self.failUnless(str(stanza['from']) == 'user@example.com',
+ "Setting and retrieving stanza 'from' attribute did not work.")
+
+ def testPayload(self):
+ """Test the 'payload' interface of StanzaBase."""
+ stanza = StanzaBase()
+ self.failUnless(stanza['payload'] == [],
+ "Empty stanza does not have an empty payload.")
+
+ stanza['payload'] = ET.Element("{foo}foo")
+ self.failUnless(len(stanza['payload']) == 1,
+ "Stanza contents and payload do not match.")
+
+ stanza['payload'] = ET.Element('{bar}bar')
+ self.failUnless(len(stanza['payload']) == 2,
+ "Stanza payload was not appended.")
+
+ del stanza['payload']
+ self.failUnless(stanza['payload'] == [],
+ "Stanza payload not cleared after deletion.")
+
+ stanza['payload'] = [ET.Element('{foo}foo'),
+ ET.Element('{bar}bar')]
+ self.failUnless(len(stanza['payload']) == 2,
+ "Adding multiple elements to stanza's payload did not work.")
+
+ def testClear(self):
+ """Test clearing a stanza."""
+ stanza = StanzaBase()
+ stanza['to'] = 'user@example.com'
+ stanza['payload'] = ET.Element("{foo}foo")
+ stanza.clear()
+
+ self.failUnless(stanza['payload'] == [],
+ "Stanza payload was not cleared after calling .clear()")
+ self.failUnless(str(stanza['to']) == "user@example.com",
+ "Stanza attributes were not preserved after calling .clear()")
+
+ def testReply(self):
+ """Test creating a reply stanza."""
+ stanza = StanzaBase()
+ stanza['to'] = "recipient@example.com"
+ stanza['from'] = "sender@example.com"
+ stanza['payload'] = ET.Element("{foo}foo")
+
+ stanza.reply()
+
+ self.failUnless(str(stanza['to'] == "sender@example.com"),
+ "Stanza reply did not change 'to' attribute.")
+ self.failUnless(stanza['payload'] == [],
+ "Stanza reply did not empty stanza payload.")
+
+ def testError(self):
+ """Test marking a stanza as an error."""
+ stanza = StanzaBase()
+ stanza['type'] = 'get'
+ stanza.error()
+ self.failUnless(stanza['type'] == 'error',
+ "Stanza type is not 'error' after calling error()")
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestStanzaBase)
diff --git a/tests/test_stanza_element.py b/tests/test_stanza_element.py
new file mode 100644
index 00000000..f7ec59c0
--- /dev/null
+++ b/tests/test_stanza_element.py
@@ -0,0 +1,746 @@
+from sleekxmpp.test import *
+from sleekxmpp.xmlstream.stanzabase import ElementBase
+
+
+class TestElementBase(SleekTest):
+
+ def testFixNs(self):
+ """Test fixing namespaces in an XPath expression."""
+
+ e = ElementBase()
+ ns = "http://jabber.org/protocol/disco#items"
+ result = e._fix_ns("{%s}foo/bar/{abc}baz/{%s}more" % (ns, ns))
+
+ expected = "/".join(["{%s}foo" % ns,
+ "{%s}bar" % ns,
+ "{abc}baz",
+ "{%s}more" % ns])
+ self.failUnless(expected == result,
+ "Incorrect namespace fixing result: %s" % str(result))
+
+
+ def testExtendedName(self):
+ """Test element names of the form tag1/tag2/tag3."""
+
+ class TestStanza(ElementBase):
+ name = "foo/bar/baz"
+ namespace = "test"
+
+ stanza = TestStanza()
+ self.check(stanza, """
+ <foo xmlns="test">
+ <bar>
+ <baz />
+ </bar>
+ </foo>
+ """)
+
+ def testGetStanzaValues(self):
+ """Test getStanzaValues using plugins and substanzas."""
+
+ class TestStanzaPlugin(ElementBase):
+ name = "foo2"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+ plugin_attrib = "foo2"
+
+ class TestSubStanza(ElementBase):
+ name = "subfoo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+
+ register_stanza_plugin(TestStanza, TestStanzaPlugin, iterable=True)
+
+ stanza = TestStanza()
+ stanza['bar'] = 'a'
+ stanza['foo2']['baz'] = 'b'
+ substanza = TestSubStanza()
+ substanza['bar'] = 'c'
+ stanza.append(substanza)
+
+ values = stanza.getStanzaValues()
+ expected = {'bar': 'a',
+ 'baz': '',
+ 'foo2': {'bar': '',
+ 'baz': 'b'},
+ 'substanzas': [{'__childtag__': '{foo}foo2',
+ 'bar': '',
+ 'baz': 'b'},
+ {'__childtag__': '{foo}subfoo',
+ 'bar': 'c',
+ 'baz': ''}]}
+ self.failUnless(values == expected,
+ "Unexpected stanza values:\n%s\n%s" % (str(expected), str(values)))
+
+
+ def testSetStanzaValues(self):
+ """Test using setStanzaValues with substanzas and plugins."""
+
+ class TestStanzaPlugin(ElementBase):
+ name = "pluginfoo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+ plugin_attrib = "plugin_foo"
+
+ class TestStanzaPlugin2(ElementBase):
+ name = "pluginfoo2"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+ plugin_attrib = "plugin_foo2"
+
+ class TestSubStanza(ElementBase):
+ name = "subfoo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+
+ register_stanza_plugin(TestStanza, TestSubStanza, iterable=True)
+ register_stanza_plugin(TestStanza, TestStanzaPlugin)
+ register_stanza_plugin(TestStanza, TestStanzaPlugin2)
+
+ stanza = TestStanza()
+ values = {'bar': 'a',
+ 'baz': '',
+ 'plugin_foo': {'bar': '',
+ 'baz': 'b'},
+ 'plugin_foo2': {'bar': 'd',
+ 'baz': 'e'},
+ 'substanzas': [{'__childtag__': '{foo}subfoo',
+ 'bar': 'c',
+ 'baz': ''}]}
+ stanza.values = values
+
+ self.check(stanza, """
+ <foo xmlns="foo" bar="a">
+ <pluginfoo baz="b" />
+ <pluginfoo2 bar="d" baz="e" />
+ <subfoo bar="c" />
+ </foo>
+ """)
+
+ def testGetItem(self):
+ """Test accessing stanza interfaces."""
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz', 'qux'))
+ sub_interfaces = set(('baz',))
+
+ def getQux(self):
+ return 'qux'
+
+ class TestStanzaPlugin(ElementBase):
+ name = "foobar"
+ namespace = "foo"
+ plugin_attrib = "foobar"
+ interfaces = set(('fizz',))
+
+ register_stanza_plugin(TestStanza, TestStanza, iterable=True)
+ register_stanza_plugin(TestStanza, TestStanzaPlugin)
+
+ stanza = TestStanza()
+ substanza = TestStanza()
+ stanza.append(substanza)
+ stanza.setStanzaValues({'bar': 'a',
+ 'baz': 'b',
+ 'qux': 42,
+ 'foobar': {'fizz': 'c'}})
+
+ # Test non-plugin interfaces
+ expected = {'substanzas': [substanza],
+ 'bar': 'a',
+ 'baz': 'b',
+ 'qux': 'qux',
+ 'meh': ''}
+ for interface, value in expected.items():
+ result = stanza[interface]
+ self.failUnless(result == value,
+ "Incorrect stanza interface access result: %s" % result)
+
+ # Test plugin interfaces
+ self.failUnless(isinstance(stanza['foobar'], TestStanzaPlugin),
+ "Incorrect plugin object result.")
+ self.failUnless(stanza['foobar']['fizz'] == 'c',
+ "Incorrect plugin subvalue result.")
+
+ def testSetItem(self):
+ """Test assigning to stanza interfaces."""
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz', 'qux'))
+ sub_interfaces = set(('baz',))
+
+ def setQux(self, value):
+ pass
+
+ class TestStanzaPlugin(ElementBase):
+ name = "foobar"
+ namespace = "foo"
+ plugin_attrib = "foobar"
+ interfaces = set(('foobar',))
+
+ register_stanza_plugin(TestStanza, TestStanzaPlugin)
+
+ stanza = TestStanza()
+
+ stanza['bar'] = 'attribute!'
+ stanza['baz'] = 'element!'
+ stanza['qux'] = 'overridden'
+ stanza['foobar'] = 'plugin'
+
+ self.check(stanza, """
+ <foo xmlns="foo" bar="attribute!">
+ <baz>element!</baz>
+ <foobar foobar="plugin" />
+ </foo>
+ """)
+
+ def testDelItem(self):
+ """Test deleting stanza interface values."""
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz', 'qux'))
+ sub_interfaces = set(('bar',))
+
+ def delQux(self):
+ pass
+
+ class TestStanzaPlugin(ElementBase):
+ name = "foobar"
+ namespace = "foo"
+ plugin_attrib = "foobar"
+ interfaces = set(('foobar',))
+
+ register_stanza_plugin(TestStanza, TestStanzaPlugin)
+
+ stanza = TestStanza()
+ stanza['bar'] = 'a'
+ stanza['baz'] = 'b'
+ stanza['qux'] = 'c'
+ stanza['foobar']['foobar'] = 'd'
+
+ self.check(stanza, """
+ <foo xmlns="foo" baz="b" qux="c">
+ <bar>a</bar>
+ <foobar foobar="d" />
+ </foo>
+ """)
+
+ del stanza['bar']
+ del stanza['baz']
+ del stanza['qux']
+ del stanza['foobar']
+
+ self.check(stanza, """
+ <foo xmlns="foo" qux="c" />
+ """)
+
+ def testModifyingAttributes(self):
+ """Test modifying top level attributes of a stanza's XML object."""
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+
+ stanza = TestStanza()
+
+ self.check(stanza, """
+ <foo xmlns="foo" />
+ """)
+
+ self.failUnless(stanza._get_attr('bar') == '',
+ "Incorrect value returned for an unset XML attribute.")
+
+ stanza._set_attr('bar', 'a')
+ stanza._set_attr('baz', 'b')
+
+ self.check(stanza, """
+ <foo xmlns="foo" bar="a" baz="b" />
+ """)
+
+ self.failUnless(stanza._get_attr('bar') == 'a',
+ "Retrieved XML attribute value is incorrect.")
+
+ stanza._set_attr('bar', None)
+ stanza._del_attr('baz')
+
+ self.check(stanza, """
+ <foo xmlns="foo" />
+ """)
+
+ self.failUnless(stanza._get_attr('bar', 'c') == 'c',
+ "Incorrect default value returned for an unset XML attribute.")
+
+ def testGetSubText(self):
+ """Test retrieving the contents of a sub element."""
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar',))
+
+ def setBar(self, value):
+ wrapper = ET.Element("{foo}wrapper")
+ bar = ET.Element("{foo}bar")
+ bar.text = value
+ wrapper.append(bar)
+ self.xml.append(wrapper)
+
+ def getBar(self):
+ return self._get_sub_text("wrapper/bar", default="not found")
+
+ stanza = TestStanza()
+ self.failUnless(stanza['bar'] == 'not found',
+ "Default _get_sub_text value incorrect.")
+
+ stanza['bar'] = 'found'
+ self.check(stanza, """
+ <foo xmlns="foo">
+ <wrapper>
+ <bar>found</bar>
+ </wrapper>
+ </foo>
+ """)
+ self.failUnless(stanza['bar'] == 'found',
+ "_get_sub_text value incorrect: %s." % stanza['bar'])
+
+ def testSubElement(self):
+ """Test setting the contents of a sub element."""
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+
+ def setBaz(self, value):
+ self._set_sub_text("wrapper/baz", text=value)
+
+ def getBaz(self):
+ return self._get_sub_text("wrapper/baz")
+
+ def setBar(self, value):
+ self._set_sub_text("wrapper/bar", text=value)
+
+ def getBar(self):
+ return self._get_sub_text("wrapper/bar")
+
+ stanza = TestStanza()
+ stanza['bar'] = 'a'
+ stanza['baz'] = 'b'
+ self.check(stanza, """
+ <foo xmlns="foo">
+ <wrapper>
+ <bar>a</bar>
+ <baz>b</baz>
+ </wrapper>
+ </foo>
+ """)
+ stanza._set_sub_text('wrapper/bar', text='', keep=True)
+ self.check(stanza, """
+ <foo xmlns="foo">
+ <wrapper>
+ <bar />
+ <baz>b</baz>
+ </wrapper>
+ </foo>
+ """, use_values=False)
+
+ stanza['bar'] = 'a'
+ stanza._set_sub_text('wrapper/bar', text='')
+ self.check(stanza, """
+ <foo xmlns="foo">
+ <wrapper>
+ <baz>b</baz>
+ </wrapper>
+ </foo>
+ """)
+
+ def testDelSub(self):
+ """Test removing sub elements."""
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+
+ def setBar(self, value):
+ self._set_sub_text("path/to/only/bar", value);
+
+ def getBar(self):
+ return self._get_sub_text("path/to/only/bar")
+
+ def delBar(self):
+ self._del_sub("path/to/only/bar")
+
+ def setBaz(self, value):
+ self._set_sub_text("path/to/just/baz", value);
+
+ def getBaz(self):
+ return self._get_sub_text("path/to/just/baz")
+
+ def delBaz(self):
+ self._del_sub("path/to/just/baz")
+
+ stanza = TestStanza()
+ stanza['bar'] = 'a'
+ stanza['baz'] = 'b'
+
+ self.check(stanza, """
+ <foo xmlns="foo">
+ <path>
+ <to>
+ <only>
+ <bar>a</bar>
+ </only>
+ <just>
+ <baz>b</baz>
+ </just>
+ </to>
+ </path>
+ </foo>
+ """)
+
+ del stanza['bar']
+ del stanza['baz']
+
+ self.check(stanza, """
+ <foo xmlns="foo">
+ <path>
+ <to>
+ <only />
+ <just />
+ </to>
+ </path>
+ </foo>
+ """, use_values=False)
+
+ stanza['bar'] = 'a'
+ stanza['baz'] = 'b'
+
+ stanza._del_sub('path/to/only/bar', all=True)
+
+ self.check(stanza, """
+ <foo xmlns="foo">
+ <path>
+ <to>
+ <just>
+ <baz>b</baz>
+ </just>
+ </to>
+ </path>
+ </foo>
+ """)
+
+ def testMatch(self):
+ """Test matching a stanza against an XPath expression."""
+
+ class TestSubStanza(ElementBase):
+ name = "sub"
+ namespace = "baz"
+ interfaces = set(('attrib',))
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar','baz', 'qux'))
+ sub_interfaces = set(('qux',))
+
+ def setQux(self, value):
+ self._set_sub_text('qux', text=value)
+
+ def getQux(self):
+ return self._get_sub_text('qux')
+
+ class TestStanzaPlugin(ElementBase):
+ name = "plugin"
+ namespace = "http://test/slash/bar"
+ interfaces = set(('attrib',))
+
+ register_stanza_plugin(TestStanza, TestSubStanza, iterable=True)
+ register_stanza_plugin(TestStanza, TestStanzaPlugin)
+
+ stanza = TestStanza()
+ self.failUnless(stanza.match("foo"),
+ "Stanza did not match its own tag name.")
+
+ self.failUnless(stanza.match("{foo}foo"),
+ "Stanza did not match its own namespaced name.")
+
+ stanza['bar'] = 'a'
+ self.failUnless(stanza.match("foo@bar=a"),
+ "Stanza did not match its own name with attribute value check.")
+
+ stanza['baz'] = 'b'
+ self.failUnless(stanza.match("foo@bar=a@baz=b"),
+ "Stanza did not match its own name with multiple attributes.")
+
+ stanza['qux'] = 'c'
+ self.failUnless(stanza.match("foo/qux"),
+ "Stanza did not match with subelements.")
+
+ stanza['qux'] = ''
+ self.failUnless(stanza.match("foo/qux") == False,
+ "Stanza matched missing subinterface element.")
+
+ self.failUnless(stanza.match("foo/bar") == False,
+ "Stanza matched nonexistent element.")
+
+ stanza['plugin']['attrib'] = 'c'
+ self.failUnless(stanza.match("foo/plugin@attrib=c"),
+ "Stanza did not match with plugin and attribute.")
+
+ self.failUnless(stanza.match("foo/{http://test/slash/bar}plugin"),
+ "Stanza did not match with namespaced plugin.")
+
+ substanza = TestSubStanza()
+ substanza['attrib'] = 'd'
+ stanza.append(substanza)
+ self.failUnless(stanza.match("foo/sub@attrib=d"),
+ "Stanza did not match with substanzas and attribute.")
+
+ self.failUnless(stanza.match("foo/{baz}sub"),
+ "Stanza did not match with namespaced substanza.")
+
+ def testComparisons(self):
+ """Test comparing ElementBase objects."""
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+
+ stanza1 = TestStanza()
+ stanza1['bar'] = 'a'
+
+ self.failUnless(stanza1,
+ "Stanza object does not evaluate to True")
+
+ stanza2 = TestStanza()
+ stanza2['baz'] = 'b'
+
+ self.failUnless(stanza1 != stanza2,
+ "Different stanza objects incorrectly compared equal.")
+
+ stanza1['baz'] = 'b'
+ stanza2['bar'] = 'a'
+
+ self.failUnless(stanza1 == stanza2,
+ "Equal stanzas incorrectly compared inequal.")
+
+ def testKeys(self):
+ """Test extracting interface names from a stanza object."""
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+ plugin_attrib = 'qux'
+
+ register_stanza_plugin(TestStanza, TestStanza)
+
+ stanza = TestStanza()
+
+ self.failUnless(set(stanza.keys()) == set(('bar', 'baz')),
+ "Returned set of interface keys does not match expected.")
+
+ stanza.enable('qux')
+
+ self.failUnless(set(stanza.keys()) == set(('bar', 'baz', 'qux')),
+ "Incorrect set of interface and plugin keys.")
+
+ def testGet(self):
+ """Test accessing stanza interfaces using get()."""
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+
+ stanza = TestStanza()
+ stanza['bar'] = 'a'
+
+ self.failUnless(stanza.get('bar') == 'a',
+ "Incorrect value returned by stanza.get")
+
+ self.failUnless(stanza.get('baz', 'b') == 'b',
+ "Incorrect default value returned by stanza.get")
+
+ def testSubStanzas(self):
+ """Test manipulating substanzas of a stanza object."""
+
+ class TestSubStanza(ElementBase):
+ name = "foobar"
+ namespace = "foo"
+ interfaces = set(('qux',))
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+
+ register_stanza_plugin(TestStanza, TestSubStanza, iterable=True)
+
+ stanza = TestStanza()
+ substanza1 = TestSubStanza()
+ substanza2 = TestSubStanza()
+ substanza1['qux'] = 'a'
+ substanza2['qux'] = 'b'
+
+ # Test appending substanzas
+ self.failUnless(len(stanza) == 0,
+ "Incorrect empty stanza size.")
+
+ stanza.append(substanza1)
+ self.check(stanza, """
+ <foo xmlns="foo">
+ <foobar qux="a" />
+ </foo>
+ """, use_values=False)
+ self.failUnless(len(stanza) == 1,
+ "Incorrect stanza size with 1 substanza.")
+
+ stanza.append(substanza2)
+ self.check(stanza, """
+ <foo xmlns="foo">
+ <foobar qux="a" />
+ <foobar qux="b" />
+ </foo>
+ """, use_values=False)
+ self.failUnless(len(stanza) == 2,
+ "Incorrect stanza size with 2 substanzas.")
+
+ # Test popping substanzas
+ stanza.pop(0)
+ self.check(stanza, """
+ <foo xmlns="foo">
+ <foobar qux="b" />
+ </foo>
+ """, use_values=False)
+
+ # Test iterating over substanzas
+ stanza.append(substanza1)
+ results = []
+ for substanza in stanza:
+ results.append(substanza['qux'])
+ self.failUnless(results == ['b', 'a'],
+ "Iteration over substanzas failed: %s." % str(results))
+
+ def testCopy(self):
+ """Test copying stanza objects."""
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+
+ stanza1 = TestStanza()
+ stanza1['bar'] = 'a'
+
+ stanza2 = stanza1.__copy__()
+
+ self.failUnless(stanza1 == stanza2,
+ "Copied stanzas are not equal to each other.")
+
+ stanza1['baz'] = 'b'
+ self.failUnless(stanza1 != stanza2,
+ "Divergent stanza copies incorrectly compared equal.")
+
+ def testExtension(self):
+ """Testing using is_extension."""
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+
+ class TestExtension(ElementBase):
+ name = 'extended'
+ namespace = 'foo'
+ plugin_attrib = name
+ interfaces = set((name,))
+ is_extension = True
+
+ def set_extended(self, value):
+ self.xml.text = value
+
+ def get_extended(self):
+ return self.xml.text
+
+ def del_extended(self):
+ self.parent().xml.remove(self.xml)
+
+ register_stanza_plugin(TestStanza, TestExtension)
+
+ stanza = TestStanza()
+ stanza['extended'] = 'testing'
+
+ self.check(stanza, """
+ <foo xmlns="foo">
+ <extended>testing</extended>
+ </foo>
+ """)
+
+ self.failUnless(stanza['extended'] == 'testing',
+ "Could not retrieve stanza extension value.")
+
+ del stanza['extended']
+ self.check(stanza, """
+ <foo xmlns="foo" />
+ """)
+
+ def testOverrides(self):
+ """Test using interface overrides."""
+
+ class TestStanza(ElementBase):
+ name = "foo"
+ namespace = "foo"
+ interfaces = set(('bar', 'baz'))
+
+ class TestOverride(ElementBase):
+ name = 'overrider'
+ namespace = 'foo'
+ plugin_attrib = name
+ interfaces = set(('bar',))
+ overrides = ['set_bar']
+
+ def setup(self, xml):
+ # Don't create XML for the plugin
+ self.xml = ET.Element('')
+
+ def set_bar(self, value):
+ if not value.startswith('override-'):
+ self.parent()._set_attr('bar', 'override-%s' % value)
+ else:
+ self.parent()._set_attr('bar', value)
+
+ stanza = TestStanza()
+ stanza['bar'] = 'foo'
+ self.check(stanza, """
+ <foo xmlns="foo" bar="foo" />
+ """)
+
+ register_stanza_plugin(TestStanza, TestOverride, overrides=True)
+
+ stanza = TestStanza()
+ stanza['bar'] = 'foo'
+ self.check(stanza, """
+ <foo xmlns="foo" bar="override-foo" />
+ """)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestElementBase)
diff --git a/tests/test_stanza_error.py b/tests/test_stanza_error.py
new file mode 100644
index 00000000..a41bf4bf
--- /dev/null
+++ b/tests/test_stanza_error.py
@@ -0,0 +1,81 @@
+from sleekxmpp.test import *
+
+
+class TestErrorStanzas(SleekTest):
+
+ def setUp(self):
+ # Ensure that the XEP-0086 plugin has been loaded.
+ self.stream_start()
+ self.stream_close()
+
+ def testSetup(self):
+ """Test setting initial values in error stanza."""
+ msg = self.Message()
+ msg.enable('error')
+ self.check(msg, """
+ <message type="error">
+ <error type="cancel" code="501">
+ <feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ </error>
+ </message>
+ """)
+
+ def testCondition(self):
+ """Test modifying the error condition."""
+ msg = self.Message()
+ msg['error']['condition'] = 'item-not-found'
+
+ self.check(msg, """
+ <message type="error">
+ <error type="cancel" code="404">
+ <item-not-found xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ </error>
+ </message>
+ """)
+
+ self.failUnless(msg['error']['condition'] == 'item-not-found', "Error condition doesn't match.")
+
+ msg['error']['condition'] = 'resource-constraint'
+
+ self.check(msg, """
+ <message type="error">
+ <error type="wait" code="500">
+ <resource-constraint xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ </error>
+ </message>
+ """)
+
+ def testDelCondition(self):
+ """Test that deleting error conditions doesn't remove extra elements."""
+ msg = self.Message()
+ msg['error']['text'] = 'Error!'
+ msg['error']['condition'] = 'internal-server-error'
+
+ del msg['error']['condition']
+
+ self.check(msg, """
+ <message type="error">
+ <error type="wait" code="500">
+ <text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">Error!</text>
+ </error>
+ </message>
+ """, use_values=False)
+
+ def testDelText(self):
+ """Test deleting the text of an error."""
+ msg = self.Message()
+ msg['error']['test'] = 'Error!'
+ msg['error']['condition'] = 'internal-server-error'
+
+ del msg['error']['text']
+
+ self.check(msg, """
+ <message type="error">
+ <error type="wait" code="500">
+ <internal-server-error xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ </error>
+ </message>
+ """)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestErrorStanzas)
diff --git a/tests/test_stanza_gmail.py b/tests/test_stanza_gmail.py
new file mode 100644
index 00000000..6190c608
--- /dev/null
+++ b/tests/test_stanza_gmail.py
@@ -0,0 +1,88 @@
+from sleekxmpp.test import *
+import sleekxmpp.plugins.gmail_notify as gmail
+
+
+class TestGmail(SleekTest):
+
+ def setUp(self):
+ register_stanza_plugin(Iq, gmail.GmailQuery)
+ register_stanza_plugin(Iq, gmail.MailBox)
+ register_stanza_plugin(Iq, gmail.NewMail)
+
+ def testCreateQuery(self):
+ """Testing querying Gmail for emails."""
+
+ iq = self.Iq()
+ iq['type'] = 'get'
+ iq['gmail']['search'] = 'is:starred'
+ iq['gmail']['newer-than-time'] = '1140638252542'
+ iq['gmail']['newer-than-tid'] = '11134623426430234'
+
+ self.check(iq, """
+ <iq type="get">
+ <query xmlns="google:mail:notify"
+ newer-than-time="1140638252542"
+ newer-than-tid="11134623426430234"
+ q="is:starred" />
+ </iq>
+ """, use_values=False)
+
+ def testMailBox(self):
+ """Testing reading from Gmail mailbox result"""
+
+ # Use the example from Google's documentation at
+ # http://code.google.com/apis/talk/jep_extensions/gmail.html#notifications
+ xml = ET.fromstring("""
+ <iq type="result">
+ <mailbox xmlns="google:mail:notify"
+ result-time='1118012394209'
+ url='http://mail.google.com/mail'
+ total-matched='95'
+ total-estimate='0'>
+ <mail-thread-info tid='1172320964060972012'
+ participation='1'
+ messages='28'
+ date='1118012394209'
+ url='http://mail.google.com/mail?view=cv'>
+ <senders>
+ <sender name='Me' address='romeo@gmail.com' originator='1' />
+ <sender name='Benvolio' address='benvolio@gmail.com' />
+ <sender name='Mercutio' address='mercutio@gmail.com' unread='1'/>
+ </senders>
+ <labels>act1scene3</labels>
+ <subject>Put thy rapier up.</subject>
+ <snippet>Ay, ay, a scratch, a scratch; marry, 'tis enough.</snippet>
+ </mail-thread-info>
+ </mailbox>
+ </iq>
+ """)
+
+ iq = self.Iq(xml=xml)
+ mailbox = iq['mailbox']
+ self.failUnless(mailbox['result-time'] == '1118012394209', "result-time doesn't match")
+ self.failUnless(mailbox['url'] == 'http://mail.google.com/mail', "url doesn't match")
+ self.failUnless(mailbox['matched'] == '95', "total-matched incorrect")
+ self.failUnless(mailbox['estimate'] == False, "total-estimate incorrect")
+ self.failUnless(len(mailbox['threads']) == 1, "could not extract message threads")
+
+ thread = mailbox['threads'][0]
+ self.failUnless(thread['tid'] == '1172320964060972012', "thread tid doesn't match")
+ self.failUnless(thread['participation'] == '1', "thread participation incorrect")
+ self.failUnless(thread['messages'] == '28', "thread message count incorrect")
+ self.failUnless(thread['date'] == '1118012394209', "thread date doesn't match")
+ self.failUnless(thread['url'] == 'http://mail.google.com/mail?view=cv', "thread url doesn't match")
+ self.failUnless(thread['labels'] == 'act1scene3', "thread labels incorrect")
+ self.failUnless(thread['subject'] == 'Put thy rapier up.', "thread subject doesn't match")
+ self.failUnless(thread['snippet'] == "Ay, ay, a scratch, a scratch; marry, 'tis enough.", "snippet doesn't match")
+ self.failUnless(len(thread['senders']) == 3, "could not extract senders")
+
+ sender1 = thread['senders'][0]
+ self.failUnless(sender1['name'] == 'Me', "sender name doesn't match")
+ self.failUnless(sender1['address'] == 'romeo@gmail.com', "sender address doesn't match")
+ self.failUnless(sender1['originator'] == True, "sender originator incorrect")
+ self.failUnless(sender1['unread'] == False, "sender unread incorrectly True")
+
+ sender2 = thread['senders'][2]
+ self.failUnless(sender2['unread'] == True, "sender unread incorrectly False")
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestGmail)
diff --git a/tests/test_stanza_iq.py b/tests/test_stanza_iq.py
new file mode 100644
index 00000000..42e4dcde
--- /dev/null
+++ b/tests/test_stanza_iq.py
@@ -0,0 +1,90 @@
+from sleekxmpp.test import *
+from sleekxmpp.xmlstream.stanzabase import ET
+
+
+class TestIqStanzas(SleekTest):
+
+ def tearDown(self):
+ """Shutdown the XML stream after testing."""
+ self.stream_close()
+
+ def testSetup(self):
+ """Test initializing default Iq values."""
+ iq = self.Iq()
+ self.check(iq, """
+ <iq id="0" />
+ """)
+
+ def testPayload(self):
+ """Test setting Iq stanza payload."""
+ iq = self.Iq()
+ iq.setPayload(ET.Element('{test}tester'))
+ self.check(iq, """
+ <iq id="0">
+ <tester xmlns="test" />
+ </iq>
+ """, use_values=False)
+
+
+ def testUnhandled(self):
+ """Test behavior for Iq.unhandled."""
+ self.stream_start()
+ self.recv("""
+ <iq id="test" type="get">
+ <query xmlns="test" />
+ </iq>
+ """)
+
+ iq = self.Iq()
+ iq['id'] = 'test'
+ iq['error']['condition'] = 'feature-not-implemented'
+ iq['error']['text'] = 'No handlers registered for this request.'
+
+ self.send(iq, """
+ <iq id="test" type="error">
+ <error type="cancel">
+ <feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ <text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
+ No handlers registered for this request.
+ </text>
+ </error>
+ </iq>
+ """)
+
+ def testQuery(self):
+ """Test modifying query element of Iq stanzas."""
+ iq = self.Iq()
+
+ iq['query'] = 'query_ns'
+ self.check(iq, """
+ <iq id="0">
+ <query xmlns="query_ns" />
+ </iq>
+ """)
+
+ iq['query'] = 'query_ns2'
+ self.check(iq, """
+ <iq id="0">
+ <query xmlns="query_ns2" />
+ </iq>
+ """)
+
+ self.failUnless(iq['query'] == 'query_ns2', "Query namespace doesn't match")
+
+ del iq['query']
+ self.check(iq, """
+ <iq id="0" />
+ """)
+
+ def testReply(self):
+ """Test setting proper result type in Iq replies."""
+ iq = self.Iq()
+ iq['to'] = 'user@localhost'
+ iq['type'] = 'get'
+ iq.reply()
+
+ self.check(iq, """
+ <iq id="0" type="result" />
+ """)
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestIqStanzas)
diff --git a/tests/test_stanza_message.py b/tests/test_stanza_message.py
new file mode 100644
index 00000000..e55971df
--- /dev/null
+++ b/tests/test_stanza_message.py
@@ -0,0 +1,57 @@
+from sleekxmpp.test import *
+from sleekxmpp.stanza.message import Message
+from sleekxmpp.stanza.htmlim import HTMLIM
+
+
+class TestMessageStanzas(SleekTest):
+
+ def setUp(self):
+ register_stanza_plugin(Message, HTMLIM)
+
+ def testGroupchatReplyRegression(self):
+ "Regression groupchat reply should be to barejid"
+ msg = self.Message()
+ msg['to'] = 'me@myserver.tld'
+ msg['from'] = 'room@someservice.someserver.tld/somenick'
+ msg['type'] = 'groupchat'
+ msg['body'] = "this is a message"
+ msg.reply()
+ self.failUnless(str(msg['to']) == 'room@someservice.someserver.tld')
+
+ def testAttribProperty(self):
+ "Test attrib property returning self"
+ msg = self.Message()
+ msg.attrib.attrib.attrib['to'] = 'usr@server.tld'
+ self.failUnless(str(msg['to']) == 'usr@server.tld')
+
+ def testHTMLPlugin(self):
+ "Test message/html/body stanza"
+ msg = self.Message()
+ msg['to'] = "fritzy@netflint.net/sleekxmpp"
+ msg['body'] = "this is the plaintext message"
+ msg['type'] = 'chat'
+ p = ET.Element('{http://www.w3.org/1999/xhtml}p')
+ p.text = "This is the htmlim message"
+ msg['html']['body'] = p
+ self.check(msg, """
+ <message to="fritzy@netflint.net/sleekxmpp" type="chat">
+ <body>this is the plaintext message</body>
+ <html xmlns="http://jabber.org/protocol/xhtml-im">
+ <body xmlns="http://www.w3.org/1999/xhtml">
+ <p>This is the htmlim message</p>
+ </body>
+ </html>
+ </message>""")
+
+ def testNickPlugin(self):
+ "Test message/nick/nick stanza."
+ msg = self.Message()
+ msg['nick']['nick'] = 'A nickname!'
+ self.check(msg, """
+ <message>
+ <nick xmlns="http://jabber.org/protocol/nick">A nickname!</nick>
+ </message>
+ """)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestMessageStanzas)
diff --git a/tests/test_stanza_presence.py b/tests/test_stanza_presence.py
new file mode 100644
index 00000000..2ec43b65
--- /dev/null
+++ b/tests/test_stanza_presence.py
@@ -0,0 +1,67 @@
+from sleekxmpp.test import *
+from sleekxmpp.stanza.presence import Presence
+
+
+class TestPresenceStanzas(SleekTest):
+
+ def testPresenceShowRegression(self):
+ """Regression check presence['type'] = 'dnd' show value working"""
+ p = self.Presence()
+ p['type'] = 'dnd'
+ self.check(p, "<presence><show>dnd</show></presence>")
+
+ def testPresenceType(self):
+ """Test manipulating presence['type']"""
+ p = self.Presence()
+ p['type'] = 'available'
+ self.check(p, "<presence />")
+ self.failUnless(p['type'] == 'available',
+ "Incorrect presence['type'] for type 'available': %s" % p['type'])
+
+ for showtype in ['away', 'chat', 'dnd', 'xa']:
+ p['type'] = showtype
+ self.check(p, """
+ <presence><show>%s</show></presence>
+ """ % showtype)
+ self.failUnless(p['type'] == showtype,
+ "Incorrect presence['type'] for type '%s'" % showtype)
+
+ p['type'] = None
+ self.check(p, "<presence />")
+
+ def testPresenceUnsolicitedOffline(self):
+ """
+ Unsolicted offline presence does not spawn changed_status
+ or update the roster.
+ """
+ p = self.Presence()
+ p['type'] = 'unavailable'
+ p['from'] = 'bill@chadmore.com/gmail15af'
+
+ c = sleekxmpp.ClientXMPP('crap@wherever', 'password')
+ happened = []
+
+ def handlechangedpresence(event):
+ happened.append(True)
+
+ c.add_event_handler("changed_status", handlechangedpresence)
+ c._handle_presence(p)
+
+ self.failUnless(happened == [],
+ "changed_status event triggered for extra unavailable presence")
+ roster = c.roster['crap@wherever']
+ self.failUnless(roster['bill@chadmore.com'].resources == {},
+ "Roster updated for superfulous unavailable presence")
+
+ def testNickPlugin(self):
+ """Test presence/nick/nick stanza."""
+ p = self.Presence()
+ p['nick']['nick'] = 'A nickname!'
+ self.check(p, """
+ <presence>
+ <nick xmlns="http://jabber.org/protocol/nick">A nickname!</nick>
+ </presence>
+ """)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestPresenceStanzas)
diff --git a/tests/test_stanza_roster.py b/tests/test_stanza_roster.py
new file mode 100644
index 00000000..8ec2d32b
--- /dev/null
+++ b/tests/test_stanza_roster.py
@@ -0,0 +1,88 @@
+from sleekxmpp.test import *
+from sleekxmpp.stanza.roster import Roster
+
+
+class TestRosterStanzas(SleekTest):
+
+ def testAddItems(self):
+ """Test adding items to a roster stanza."""
+ iq = self.Iq()
+ iq['roster'].setItems({
+ 'user@example.com': {
+ 'name': 'User',
+ 'subscription': 'both',
+ 'groups': ['Friends', 'Coworkers']},
+ 'otheruser@example.com': {
+ 'name': 'Other User',
+ 'subscription': 'both',
+ 'groups': []}})
+ self.check(iq, """
+ <iq>
+ <query xmlns="jabber:iq:roster">
+ <item jid="user@example.com" name="User" subscription="both">
+ <group>Friends</group>
+ <group>Coworkers</group>
+ </item>
+ <item jid="otheruser@example.com" name="Other User"
+ subscription="both" />
+ </query>
+ </iq>
+ """)
+
+ def testGetItems(self):
+ """Test retrieving items from a roster stanza."""
+ xml_string = """
+ <iq>
+ <query xmlns="jabber:iq:roster">
+ <item jid="user@example.com" name="User" subscription="both">
+ <group>Friends</group>
+ <group>Coworkers</group>
+ </item>
+ <item jid="otheruser@example.com" name="Other User"
+ subscription="both" />
+ </query>
+ </iq>
+ """
+ iq = self.Iq(ET.fromstring(xml_string))
+ expected = {
+ 'user@example.com': {
+ 'name': 'User',
+ 'subscription': 'both',
+ 'ask': '',
+ 'approved': '',
+ 'groups': ['Friends', 'Coworkers']},
+ 'otheruser@example.com': {
+ 'name': 'Other User',
+ 'subscription': 'both',
+ 'ask': '',
+ 'approved': '',
+ 'groups': []}}
+ debug = "Roster items don't match after retrieval."
+ debug += "\nReturned: %s" % str(iq['roster']['items'])
+ debug += "\nExpected: %s" % str(expected)
+ self.failUnless(iq['roster']['items'] == expected, debug)
+
+ def testDelItems(self):
+ """Test clearing items from a roster stanza."""
+ xml_string = """
+ <iq>
+ <query xmlns="jabber:iq:roster">
+ <item jid="user@example.com" name="User" subscription="both">
+ <group>Friends</group>
+ <group>Coworkers</group>
+ </item>
+ <item jid="otheruser@example.com" name="Other User"
+ subscription="both" />
+ </query>
+ </iq>
+ """
+ iq = self.Iq(ET.fromstring(xml_string))
+ del iq['roster']['items']
+ self.check(iq, """
+ <iq>
+ <query xmlns="jabber:iq:roster" />
+ </iq>
+ """)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestRosterStanzas)
diff --git a/tests/test_stanza_xep_0004.py b/tests/test_stanza_xep_0004.py
new file mode 100644
index 00000000..e183e5e9
--- /dev/null
+++ b/tests/test_stanza_xep_0004.py
@@ -0,0 +1,198 @@
+from sleekxmpp.test import *
+from sleekxmpp.thirdparty import OrderedDict
+
+import sleekxmpp.plugins.xep_0004 as xep_0004
+
+
+class TestDataForms(SleekTest):
+
+ def setUp(self):
+ register_stanza_plugin(Message, xep_0004.Form)
+ register_stanza_plugin(xep_0004.Form, xep_0004.FormField)
+ register_stanza_plugin(xep_0004.FormField, xep_0004.FieldOption)
+
+ def testMultipleInstructions(self):
+ """Testing using multiple instructions elements in a data form."""
+ msg = self.Message()
+ msg['form']['instructions'] = "Instructions\nSecond batch"
+
+ self.check(msg, """
+ <message>
+ <x xmlns="jabber:x:data" type="form">
+ <instructions>Instructions</instructions>
+ <instructions>Second batch</instructions>
+ </x>
+ </message>
+ """)
+
+ def testAddField(self):
+ """Testing adding fields to a data form."""
+
+ msg = self.Message()
+ form = msg['form']
+ form.addField(var='f1',
+ ftype='text-single',
+ label='Text',
+ desc='A text field',
+ required=True,
+ value='Some text!')
+
+ self.check(msg, """
+ <message>
+ <x xmlns="jabber:x:data" type="form">
+ <field var="f1" type="text-single" label="Text">
+ <desc>A text field</desc>
+ <required />
+ <value>Some text!</value>
+ </field>
+ </x>
+ </message>
+ """)
+
+ fields = OrderedDict()
+ fields['f1'] = {'type': 'text-single',
+ 'label': 'Username',
+ 'required': True}
+ fields['f2'] = {'type': 'text-private',
+ 'label': 'Password',
+ 'required': True}
+ fields['f3'] = {'type': 'text-multi',
+ 'label': 'Message',
+ 'value': 'Enter message.\nA long one even.'}
+ fields['f4'] = {'type': 'list-single',
+ 'label': 'Message Type',
+ 'options': [{'label': 'Cool!',
+ 'value': 'cool'},
+ {'label': 'Urgh!',
+ 'value': 'urgh'}]}
+ form['fields'] = fields
+
+
+ self.check(msg, """
+ <message>
+ <x xmlns="jabber:x:data" type="form">
+ <field var="f1" type="text-single" label="Username">
+ <required />
+ </field>
+ <field var="f2" type="text-private" label="Password">
+ <required />
+ </field>
+ <field var="f3" type="text-multi" label="Message">
+ <value>Enter message.</value>
+ <value>A long one even.</value>
+ </field>
+ <field var="f4" type="list-single" label="Message Type">
+ <option label="Cool!">
+ <value>cool</value>
+ </option>
+ <option label="Urgh!">
+ <value>urgh</value>
+ </option>
+ </field>
+ </x>
+ </message>
+ """)
+
+ def testSetValues(self):
+ """Testing setting form values"""
+
+ msg = self.Message()
+ form = msg['form']
+ form.add_field(var='foo', ftype='text-single')
+ form.add_field(var='bar', ftype='list-multi')
+
+ form.setValues({'foo': 'Foo!',
+ 'bar': ['a', 'b']})
+
+ self.check(msg, """
+ <message>
+ <x xmlns="jabber:x:data" type="form">
+ <field var="foo" type="text-single">
+ <value>Foo!</value>
+ </field>
+ <field var="bar" type="list-multi">
+ <value>a</value>
+ <value>b</value>
+ </field>
+ </x>
+ </message>""")
+
+ def testSubmitType(self):
+ """Test that setting type to 'submit' clears extra details"""
+ msg = self.Message()
+ form = msg['form']
+
+ fields = OrderedDict()
+ fields['f1'] = {'type': 'text-single',
+ 'label': 'Username',
+ 'required': True}
+ fields['f2'] = {'type': 'text-private',
+ 'label': 'Password',
+ 'required': True}
+ fields['f3'] = {'type': 'text-multi',
+ 'label': 'Message',
+ 'value': 'Enter message.\nA long one even.'}
+ fields['f4'] = {'type': 'list-single',
+ 'label': 'Message Type',
+ 'options': [{'label': 'Cool!',
+ 'value': 'cool'},
+ {'label': 'Urgh!',
+ 'value': 'urgh'}]}
+ form['fields'] = fields
+
+ form['type'] = 'submit'
+ form['values'] = {'f1': 'username',
+ 'f2': 'hunter2',
+ 'f3': 'A long\nmultiline\nmessage',
+ 'f4': 'cool'}
+
+ self.check(form, """
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="f1">
+ <value>username</value>
+ </field>
+ <field var="f2">
+ <value>hunter2</value>
+ </field>
+ <field var="f3">
+ <value>A long</value>
+ <value>multiline</value>
+ <value>message</value>
+ </field>
+ <field var="f4">
+ <value>cool</value>
+ </field>
+ </x>
+ """, use_values=False)
+
+ def testCancelType(self):
+ """Test that setting type to 'cancel' clears all fields"""
+ msg = self.Message()
+ form = msg['form']
+
+ fields = OrderedDict()
+ fields['f1'] = {'type': 'text-single',
+ 'label': 'Username',
+ 'required': True}
+ fields['f2'] = {'type': 'text-private',
+ 'label': 'Password',
+ 'required': True}
+ fields['f3'] = {'type': 'text-multi',
+ 'label': 'Message',
+ 'value': 'Enter message.\nA long one even.'}
+ fields['f4'] = {'type': 'list-single',
+ 'label': 'Message Type',
+ 'options': [{'label': 'Cool!',
+ 'value': 'cool'},
+ {'label': 'Urgh!',
+ 'value': 'urgh'}]}
+ form['fields'] = fields
+
+ form['type'] = 'cancel'
+
+ self.check(form, """
+ <x xmlns="jabber:x:data" type="cancel" />
+ """)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestDataForms)
diff --git a/tests/test_stanza_xep_0009.py b/tests/test_stanza_xep_0009.py
new file mode 100644
index 00000000..36800335
--- /dev/null
+++ b/tests/test_stanza_xep_0009.py
@@ -0,0 +1,288 @@
+"""
+ SleekXMPP: The Sleek XMPP Library
+ Copyright (C) 2011 Nathanael C. Fritz, Dann Martens (TOMOTON).
+ This file is part of SleekXMPP.
+
+ See the file LICENSE for copying permission.
+"""
+
+import base64
+
+from sleekxmpp.plugins.xep_0009.stanza.RPC import RPCQuery, MethodCall, \
+ MethodResponse
+from sleekxmpp.plugins.xep_0009.binding import py2xml, xml2py, rpcbase64, \
+ rpctime
+from sleekxmpp.stanza.iq import Iq
+from sleekxmpp.test.sleektest import SleekTest
+from sleekxmpp.xmlstream.stanzabase import register_stanza_plugin
+from sleekxmpp.xmlstream.tostring import tostring
+import unittest
+
+
+
+class TestJabberRPC(SleekTest):
+
+ def setUp(self):
+ register_stanza_plugin(Iq, RPCQuery)
+ register_stanza_plugin(RPCQuery, MethodCall)
+ register_stanza_plugin(RPCQuery, MethodResponse)
+
+ def testMethodCall(self):
+ iq = self.Iq()
+ iq['rpc_query']['method_call']['method_name'] = 'system.exit'
+ iq['rpc_query']['method_call']['params'] = py2xml(*())
+ self.check(iq, """
+ <iq>
+ <query xmlns="jabber:iq:rpc">
+ <methodCall>
+ <methodName>system.exit</methodName>
+ <params />
+ </methodCall>
+ </query>
+ </iq>
+ """, use_values=False)
+
+ def testMethodResponse(self):
+ iq = self.Iq()
+ iq['rpc_query']['method_response']['params'] = py2xml(*())
+ self.check(iq, """
+ <iq>
+ <query xmlns="jabber:iq:rpc">
+ <methodResponse>
+ <params />
+ </methodResponse>
+ </query>
+ </iq>
+ """, use_values=False)
+
+ def testConvertNil(self):
+ params = [None]
+ params_xml = py2xml(*params)
+ expected_xml = self.parse_xml("""
+ <params xmlns="jabber:iq:rpc">
+ <param>
+ <value>
+ <nil />
+ </value>
+ </param>
+ </params>
+ """)
+ self.assertTrue(self.compare(expected_xml, params_xml),
+ "Nil to XML conversion\nExpected: %s\nGot: %s" % (
+ tostring(expected_xml), tostring(params_xml)))
+ self.assertEqual(params, xml2py(expected_xml),
+ "XML to nil conversion")
+
+ def testConvertBoolean(self):
+ params = [True, False]
+ params_xml = py2xml(*params)
+ expected_xml = self.parse_xml("""
+ <params xmlns="jabber:iq:rpc">
+ <param>
+ <value>
+ <boolean>1</boolean>
+ </value>
+ </param>
+ <param>
+ <value>
+ <boolean>0</boolean>
+ </value>
+ </param>
+ </params>
+ """)
+ self.assertTrue(self.compare(expected_xml, params_xml),
+ "Boolean to XML conversion\nExpected: %s\nGot: %s" % (
+ tostring(expected_xml), tostring(params_xml)))
+ self.assertEqual(params, xml2py(expected_xml),
+ "XML to boolean conversion")
+
+ def testConvertString(self):
+ params = ["'This' & \"That\""]
+ params_xml = py2xml(*params)
+ expected_xml = self.parse_xml("""
+ <params xmlns="jabber:iq:rpc">
+ <param>
+ <value>
+ <string>&apos;This&apos; &amp; &quot;That&quot;</string>
+ </value>
+ </param>
+ </params>
+ """)
+ self.assertTrue(self.compare(expected_xml, params_xml),
+ "String to XML conversion\nExpected: %s\nGot: %s" % (
+ tostring(expected_xml), tostring(params_xml)))
+ self.assertEqual(params, xml2py(expected_xml),
+ "XML to string conversion")
+
+ def testConvertInteger(self):
+ params = [32767, -32768]
+ params_xml = py2xml(*params)
+ expected_xml = self.parse_xml("""
+ <params xmlns="jabber:iq:rpc">
+ <param>
+ <value>
+ <i4>32767</i4>
+ </value>
+ </param>
+ <param>
+ <value>
+ <i4>-32768</i4>
+ </value>
+ </param>
+ </params>
+ """)
+ alternate_xml = self.parse_xml("""
+ <params xmlns="jabber:iq:rpc">
+ <param>
+ <value>
+ <int>32767</int>
+ </value>
+ </param>
+ <param>
+ <value>
+ <int>-32768</int>
+ </value>
+ </param>
+ </params>
+ """)
+ self.assertTrue(self.compare(expected_xml, params_xml),
+ "Integer to XML conversion\nExpected: %s\nGot: %s" % (
+ tostring(expected_xml), tostring(params_xml)))
+ self.assertEqual(params, xml2py(expected_xml),
+ "XML to boolean conversion")
+ self.assertEqual(params, xml2py(alternate_xml),
+ "Alternate XML to boolean conversion")
+
+
+ def testConvertDouble(self):
+ params = [3.14159265]
+ params_xml = py2xml(*params)
+ expected_xml = self.parse_xml("""
+ <params xmlns="jabber:iq:rpc">
+ <param>
+ <value>
+ <double>3.14159265</double>
+ </value>
+ </param>
+ </params>
+ """)
+ self.assertTrue(self.compare(expected_xml, params_xml),
+ "Double to XML conversion\nExpected: %s\nGot: %s" % (
+ tostring(expected_xml), tostring(params_xml)))
+ self.assertEqual(params, xml2py(expected_xml),
+ "XML to double conversion")
+
+ def testConvertBase64(self):
+ params = [rpcbase64(base64.b64encode(b"Hello, world!"))]
+ params_xml = py2xml(*params)
+ expected_xml = self.parse_xml("""
+ <params xmlns="jabber:iq:rpc">
+ <param>
+ <value>
+ <base64>SGVsbG8sIHdvcmxkIQ==</base64>
+ </value>
+ </param>
+ </params>
+ """)
+ alternate_xml = self.parse_xml("""
+ <params xmlns="jabber:iq:rpc">
+ <param>
+ <value>
+ <Base64>SGVsbG8sIHdvcmxkIQ==</Base64>
+ </value>
+ </param>
+ </params>
+ """)
+ self.assertTrue(self.compare(expected_xml, params_xml),
+ "Base64 to XML conversion\nExpected: %s\nGot: %s" % (
+ tostring(expected_xml), tostring(params_xml)))
+ self.assertEqual(list(map(lambda x: x.decode(), params)),
+ list(map(lambda x: x.decode(), xml2py(expected_xml))),
+ "XML to base64 conversion")
+ self.assertEqual(list(map(lambda x: x.decode(), params)),
+ list(map(lambda x: x.decode(), xml2py(alternate_xml))),
+ "Alternate XML to base64 conversion")
+
+ def testConvertDateTime(self):
+ params = [rpctime("20111220T01:50:00")]
+ params_xml = py2xml(*params)
+ expected_xml = self.parse_xml("""
+ <params xmlns="jabber:iq:rpc">
+ <param>
+ <value>
+ <dateTime.iso8601>20111220T01:50:00</dateTime.iso8601>
+ </value>
+ </param>
+ </params>
+ """)
+ self.assertTrue(self.compare(expected_xml, params_xml),
+ "DateTime to XML conversion\nExpected: %s\nGot: %s" % (
+ tostring(expected_xml), tostring(params_xml)))
+ self.assertEqual(list(map(lambda x: x.iso8601(), params)),
+ list(map(lambda x: x.iso8601(), xml2py(expected_xml))),
+ None)
+
+ def testConvertArray(self):
+ params = [[1,2,3], ('a', 'b', 'c')]
+ params_xml = py2xml(*params)
+ expected_xml = self.parse_xml("""
+ <params xmlns="jabber:iq:rpc">
+ <param>
+ <value>
+ <array>
+ <data>
+ <value><i4>1</i4></value>
+ <value><i4>2</i4></value>
+ <value><i4>3</i4></value>
+ </data>
+ </array>
+ </value>
+ </param>
+ <param>
+ <value>
+ <array>
+ <data>
+ <value><string>a</string></value>
+ <value><string>b</string></value>
+ <value><string>c</string></value>
+ </data>
+ </array>
+ </value>
+ </param>
+ </params>
+ """)
+ self.assertTrue(self.compare(expected_xml, params_xml),
+ "Array to XML conversion\nExpected: %s\nGot: %s" % (
+ tostring(expected_xml), tostring(params_xml)))
+ self.assertEqual(list(map(list, params)), xml2py(expected_xml),
+ "XML to array conversion")
+
+ def testConvertStruct(self):
+ params = [{"foo": "bar", "baz": False}]
+ params_xml = py2xml(*params)
+ expected_xml = self.parse_xml("""
+ <params xmlns="jabber:iq:rpc">
+ <param>
+ <value>
+ <struct>
+ <member>
+ <name>foo</name>
+ <value><string>bar</string></value>
+ </member>
+ <member>
+ <name>baz</name>
+ <value><boolean>0</boolean></value>
+ </member>
+ </struct>
+ </value>
+ </param>
+ </params>
+ """)
+ self.assertTrue(self.compare(expected_xml, params_xml),
+ "Struct to XML conversion\nExpected: %s\nGot: %s" % (
+ tostring(expected_xml), tostring(params_xml)))
+ self.assertEqual(params, xml2py(expected_xml),
+ "XML to struct conversion")
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestJabberRPC)
+
diff --git a/tests/test_stanza_xep_0030.py b/tests/test_stanza_xep_0030.py
new file mode 100644
index 00000000..2d64988d
--- /dev/null
+++ b/tests/test_stanza_xep_0030.py
@@ -0,0 +1,516 @@
+from sleekxmpp.test import *
+import sleekxmpp.plugins.xep_0030 as xep_0030
+
+
+class TestDisco(SleekTest):
+
+ """
+ Test creating and manipulating the disco#info and
+ disco#items stanzas from the XEP-0030 plugin.
+ """
+
+ def setUp(self):
+ register_stanza_plugin(Iq, xep_0030.DiscoInfo)
+ register_stanza_plugin(Iq, xep_0030.DiscoItems)
+
+ def testCreateInfoQueryNoNode(self):
+ """Testing disco#info query with no node."""
+ iq = self.Iq()
+ iq['disco_info']['node'] = ''
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info" />
+ </iq>
+ """)
+
+ def testCreateInfoQueryWithNode(self):
+ """Testing disco#info query with a node."""
+ iq = self.Iq()
+ iq['disco_info']['node'] = 'foo'
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="foo" />
+ </iq>
+ """)
+
+ def testCreateItemsQueryNoNode(self):
+ """Testing disco#items query with no node."""
+ iq = self.Iq()
+ iq['disco_items']['node'] = ''
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items" />
+ </iq>
+ """)
+
+ def testCreateItemsQueryWithNode(self):
+ """Testing disco#items query with a node."""
+ iq = self.Iq()
+ iq['disco_items']['node'] = 'foo'
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="foo" />
+ </iq>
+ """)
+
+ def testIdentities(self):
+ """Testing adding identities to disco#info."""
+ iq = self.Iq()
+ iq['disco_info'].add_identity('conference', 'text',
+ name='Chatroom',
+ lang='en')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="conference"
+ type="text"
+ name="Chatroom"
+ xml:lang="en" />
+ </query>
+ </iq>
+ """)
+
+ def testDuplicateIdentities(self):
+ """
+ Test adding multiple copies of the same category
+ and type combination. Only the first identity should
+ be kept.
+ """
+ iq = self.Iq()
+ iq['disco_info'].add_identity('conference', 'text',
+ name='Chatroom')
+ iq['disco_info'].add_identity('conference', 'text',
+ name='MUC')
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="conference"
+ type="text"
+ name="Chatroom" />
+ </query>
+ </iq>
+ """)
+
+ def testDuplicateIdentitiesWithLangs(self):
+ """
+ Test adding multiple copies of the same category,
+ type, and language combination. Only the first identity
+ should be kept.
+ """
+ iq = self.Iq()
+ iq['disco_info'].add_identity('conference', 'text',
+ name='Chatroom',
+ lang='en')
+ iq['disco_info'].add_identity('conference', 'text',
+ name='MUC',
+ lang='en')
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="conference"
+ type="text"
+ name="Chatroom"
+ xml:lang="en" />
+ </query>
+ </iq>
+ """)
+
+ def testRemoveIdentitiesNoLang(self):
+ """Test removing identities from a disco#info stanza."""
+ iq = self.Iq()
+ iq['disco_info'].add_identity('client', 'pc')
+ iq['disco_info'].add_identity('client', 'bot')
+
+ iq['disco_info'].del_identity('client', 'pc')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="client" type="bot" />
+ </query>
+ </iq>
+ """)
+
+ def testRemoveIdentitiesWithLang(self):
+ """Test removing identities from a disco#info stanza."""
+ iq = self.Iq()
+ iq['disco_info'].add_identity('client', 'pc')
+ iq['disco_info'].add_identity('client', 'bot')
+ iq['disco_info'].add_identity('client', 'pc', lang='no')
+
+ iq['disco_info'].del_identity('client', 'pc')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="client" type="bot" />
+ <identity category="client"
+ type="pc"
+ xml:lang="no" />
+ </query>
+ </iq>
+ """)
+
+ def testRemoveAllIdentitiesNoLang(self):
+ """Test removing all identities from a disco#info stanza."""
+ iq = self.Iq()
+ iq['disco_info'].add_identity('client', 'bot', name='Bot')
+ iq['disco_info'].add_identity('client', 'bot', lang='no')
+ iq['disco_info'].add_identity('client', 'console')
+
+ del iq['disco_info']['identities']
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info" />
+ </iq>
+ """)
+
+ def testRemoveAllIdentitiesWithLang(self):
+ """Test removing all identities from a disco#info stanza."""
+ iq = self.Iq()
+ iq['disco_info'].add_identity('client', 'bot', name='Bot')
+ iq['disco_info'].add_identity('client', 'bot', lang='no')
+ iq['disco_info'].add_identity('client', 'console')
+
+ iq['disco_info'].del_identities(lang='no')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="client" type="bot" name="Bot" />
+ <identity category="client" type="console" />
+ </query>
+ </iq>
+ """)
+
+ def testAddBatchIdentitiesNoLang(self):
+ """Test adding multiple identities at once to a disco#info stanza."""
+ iq = self.Iq()
+ identities = [('client', 'pc', 'no', 'PC Client'),
+ ('client', 'bot', None, 'Bot'),
+ ('client', 'console', None, None)]
+
+ iq['disco_info']['identities'] = identities
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="client"
+ type="pc"
+ xml:lang="no"
+ name="PC Client" />
+ <identity category="client" type="bot" name="Bot" />
+ <identity category="client" type="console" />
+ </query>
+ </iq>
+ """)
+
+
+ def testAddBatchIdentitiesWithLang(self):
+ """Test selectively replacing identities based on language."""
+ iq = self.Iq()
+ iq['disco_info'].add_identity('client', 'pc', lang='no')
+ iq['disco_info'].add_identity('client', 'pc', lang='en')
+ iq['disco_info'].add_identity('client', 'pc', lang='fr')
+
+ identities = [('client', 'bot', 'fr', 'Bot'),
+ ('client', 'bot', 'en', 'Bot')]
+
+ iq['disco_info'].set_identities(identities, lang='fr')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="client" type="pc" xml:lang="no" />
+ <identity category="client" type="pc" xml:lang="en" />
+ <identity category="client"
+ type="bot"
+ xml:lang="fr"
+ name="Bot" />
+ <identity category="client"
+ type="bot"
+ xml:lang="en"
+ name="Bot" />
+ </query>
+ </iq>
+ """)
+
+ def testGetIdentitiesNoLang(self):
+ """Test getting all identities from a disco#info stanza."""
+ iq = self.Iq()
+ iq['disco_info'].add_identity('client', 'pc')
+ iq['disco_info'].add_identity('client', 'pc', lang='no')
+ iq['disco_info'].add_identity('client', 'pc', lang='en')
+ iq['disco_info'].add_identity('client', 'pc', lang='fr')
+
+ expected = set([('client', 'pc', None, None),
+ ('client', 'pc', 'no', None),
+ ('client', 'pc', 'en', None),
+ ('client', 'pc', 'fr', None)])
+ self.failUnless(iq['disco_info']['identities'] == expected,
+ "Identities do not match:\n%s\n%s" % (
+ expected,
+ iq['disco_info']['identities']))
+
+ def testGetIdentitiesWithLang(self):
+ """
+ Test getting all identities of a given
+ lang from a disco#info stanza.
+ """
+ iq = self.Iq()
+ iq['disco_info'].add_identity('client', 'pc')
+ iq['disco_info'].add_identity('client', 'pc', lang='no')
+ iq['disco_info'].add_identity('client', 'pc', lang='en')
+ iq['disco_info'].add_identity('client', 'pc', lang='fr')
+
+ expected = set([('client', 'pc', 'no', None)])
+ result = iq['disco_info'].get_identities(lang='no')
+ self.failUnless(result == expected,
+ "Identities do not match:\n%s\n%s" % (
+ expected, result))
+
+ def testFeatures(self):
+ """Testing adding features to disco#info."""
+ iq = self.Iq()
+ iq['disco_info'].add_feature('foo')
+ iq['disco_info'].add_feature('bar')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <feature var="foo" />
+ <feature var="bar" />
+ </query>
+ </iq>
+ """)
+
+ def testFeaturesDuplicate(self):
+ """Test adding duplicate features to disco#info."""
+ iq = self.Iq()
+ iq['disco_info'].add_feature('foo')
+ iq['disco_info'].add_feature('bar')
+ iq['disco_info'].add_feature('foo')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <feature var="foo" />
+ <feature var="bar" />
+ </query>
+ </iq>
+ """)
+
+ def testRemoveFeature(self):
+ """Test removing a feature from disco#info."""
+ iq = self.Iq()
+ iq['disco_info'].add_feature('foo')
+ iq['disco_info'].add_feature('bar')
+ iq['disco_info'].add_feature('baz')
+
+ iq['disco_info'].del_feature('foo')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <feature var="bar" />
+ <feature var="baz" />
+ </query>
+ </iq>
+ """)
+
+ def testGetFeatures(self):
+ """Test getting all features from a disco#info stanza."""
+ iq = self.Iq()
+ iq['disco_info'].add_feature('foo')
+ iq['disco_info'].add_feature('bar')
+ iq['disco_info'].add_feature('baz')
+
+ expected = set(['foo', 'bar', 'baz'])
+ self.failUnless(iq['disco_info']['features'] == expected,
+ "Features do not match:\n%s\n%s" % (
+ expected,
+ iq['disco_info']['features']))
+
+ def testRemoveAllFeatures(self):
+ """Test removing all features from a disco#info stanza."""
+ iq = self.Iq()
+ iq['disco_info'].add_feature('foo')
+ iq['disco_info'].add_feature('bar')
+ iq['disco_info'].add_feature('baz')
+
+ del iq['disco_info']['features']
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info" />
+ </iq>
+ """)
+
+ def testAddBatchFeatures(self):
+ """Test adding multiple features at once to a disco#info stanza."""
+ iq = self.Iq()
+ features = ['foo', 'bar', 'baz']
+
+ iq['disco_info']['features'] = features
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <feature var="foo" />
+ <feature var="bar" />
+ <feature var="baz" />
+ </query>
+ </iq>
+ """)
+
+ def testItems(self):
+ """Testing adding features to disco#info."""
+ iq = self.Iq()
+ iq['disco_items'].add_item('user@localhost')
+ iq['disco_items'].add_item('user@localhost', 'foo')
+ iq['disco_items'].add_item('user@localhost', 'bar', name='Testing')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="user@localhost" />
+ <item jid="user@localhost"
+ node="foo" />
+ <item jid="user@localhost"
+ node="bar"
+ name="Testing" />
+ </query>
+ </iq>
+ """)
+
+ def testDuplicateItems(self):
+ """Test adding items with the same JID without any nodes."""
+ iq = self.Iq()
+ iq['disco_items'].add_item('user@localhost', name='First')
+ iq['disco_items'].add_item('user@localhost', name='Second')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="user@localhost" name="First" />
+ </query>
+ </iq>
+ """)
+
+
+ def testDuplicateItemsWithNodes(self):
+ """Test adding items with the same JID/node combination."""
+ iq = self.Iq()
+ iq['disco_items'].add_item('user@localhost',
+ node='foo',
+ name='First')
+ iq['disco_items'].add_item('user@localhost',
+ node='foo',
+ name='Second')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="user@localhost" node="foo" name="First" />
+ </query>
+ </iq>
+ """)
+
+ def testRemoveItemsNoNode(self):
+ """Test removing items without nodes from a disco#items stanza."""
+ iq = self.Iq()
+ iq['disco_items'].add_item('user@localhost')
+ iq['disco_items'].add_item('user@localhost', node='foo')
+ iq['disco_items'].add_item('test@localhost')
+
+ iq['disco_items'].del_item('user@localhost')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="user@localhost" node="foo" />
+ <item jid="test@localhost" />
+ </query>
+ </iq>
+ """)
+
+ def testRemoveItemsWithNode(self):
+ """Test removing items with nodes from a disco#items stanza."""
+ iq = self.Iq()
+ iq['disco_items'].add_item('user@localhost')
+ iq['disco_items'].add_item('user@localhost', node='foo')
+ iq['disco_items'].add_item('test@localhost')
+
+ iq['disco_items'].del_item('user@localhost', node='foo')
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="user@localhost" />
+ <item jid="test@localhost" />
+ </query>
+ </iq>
+ """)
+
+ def testGetItems(self):
+ """Test retrieving items from disco#items stanza."""
+ iq = self.Iq()
+ iq['disco_items'].add_item('user@localhost')
+ iq['disco_items'].add_item('user@localhost', node='foo')
+ iq['disco_items'].add_item('test@localhost',
+ node='bar',
+ name='Tester')
+
+ expected = set([('user@localhost', None, None),
+ ('user@localhost', 'foo', None),
+ ('test@localhost', 'bar', 'Tester')])
+ self.failUnless(iq['disco_items']['items'] == expected,
+ "Items do not match:\n%s\n%s" % (
+ expected,
+ iq['disco_items']['items']))
+
+ def testRemoveAllItems(self):
+ """Test removing all items from a disco#items stanza."""
+ iq = self.Iq()
+ iq['disco_items'].add_item('user@localhost')
+ iq['disco_items'].add_item('user@localhost', node='foo')
+ iq['disco_items'].add_item('test@localhost',
+ node='bar',
+ name='Tester')
+
+ del iq['disco_items']['items']
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items" />
+ </iq>
+ """)
+
+ def testAddBatchItems(self):
+ """Test adding multiple items to a disco#items stanza."""
+ iq = self.Iq()
+ items = [('user@localhost', 'foo', 'Test'),
+ ('test@localhost', None, None),
+ ('other@localhost', None, 'Other')]
+
+ iq['disco_items']['items'] = items
+
+ self.check(iq, """
+ <iq>
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="user@localhost" node="foo" name="Test" />
+ <item jid="test@localhost" />
+ <item jid="other@localhost" name="Other" />
+ </query>
+ </iq>
+ """)
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestDisco)
diff --git a/tests/test_stanza_xep_0033.py b/tests/test_stanza_xep_0033.py
new file mode 100644
index 00000000..ec9a5309
--- /dev/null
+++ b/tests/test_stanza_xep_0033.py
@@ -0,0 +1,111 @@
+from sleekxmpp.test import *
+import sleekxmpp.plugins.xep_0033 as xep_0033
+
+
+class TestAddresses(SleekTest):
+
+ def setUp(self):
+ register_stanza_plugin(Message, xep_0033.Addresses)
+
+ def testAddAddress(self):
+ """Testing adding extended stanza address."""
+ msg = self.Message()
+ msg['addresses'].addAddress(atype='to', jid='to@header1.org')
+ self.check(msg, """
+ <message>
+ <addresses xmlns="http://jabber.org/protocol/address">
+ <address jid="to@header1.org" type="to" />
+ </addresses>
+ </message>
+ """)
+
+ msg = self.Message()
+ msg['addresses'].addAddress(atype='replyto',
+ jid='replyto@header1.org',
+ desc='Reply address')
+ self.check(msg, """
+ <message>
+ <addresses xmlns="http://jabber.org/protocol/address">
+ <address jid="replyto@header1.org" type="replyto" desc="Reply address" />
+ </addresses>
+ </message>
+ """)
+
+ def testAddAddresses(self):
+ """Testing adding multiple extended stanza addresses."""
+
+ xmlstring = """
+ <message>
+ <addresses xmlns="http://jabber.org/protocol/address">
+ <address jid="replyto@header1.org" type="replyto" desc="Reply address" />
+ <address jid="cc@header2.org" type="cc" />
+ <address jid="bcc@header2.org" type="bcc" />
+ </addresses>
+ </message>
+ """
+
+ msg = self.Message()
+ msg['addresses'].setAddresses([
+ {'type':'replyto',
+ 'jid':'replyto@header1.org',
+ 'desc':'Reply address'},
+ {'type':'cc',
+ 'jid':'cc@header2.org'},
+ {'type':'bcc',
+ 'jid':'bcc@header2.org'}])
+ self.check(msg, xmlstring)
+
+ msg = self.Message()
+ msg['addresses']['replyto'] = [{'jid':'replyto@header1.org',
+ 'desc':'Reply address'}]
+ msg['addresses']['cc'] = [{'jid':'cc@header2.org'}]
+ msg['addresses']['bcc'] = [{'jid':'bcc@header2.org'}]
+ self.check(msg, xmlstring)
+
+ def testAddURI(self):
+ """Testing adding URI attribute to extended stanza address."""
+
+ msg = self.Message()
+ addr = msg['addresses'].addAddress(atype='to',
+ jid='to@header1.org',
+ node='foo')
+ self.check(msg, """
+ <message>
+ <addresses xmlns="http://jabber.org/protocol/address">
+ <address node="foo" jid="to@header1.org" type="to" />
+ </addresses>
+ </message>
+ """)
+
+ addr['uri'] = 'mailto:to@header2.org'
+ self.check(msg, """
+ <message>
+ <addresses xmlns="http://jabber.org/protocol/address">
+ <address type="to" uri="mailto:to@header2.org" />
+ </addresses>
+ </message>
+ """)
+
+ def testDelivered(self):
+ """Testing delivered attribute of extended stanza addresses."""
+
+ xmlstring = """
+ <message>
+ <addresses xmlns="http://jabber.org/protocol/address">
+ <address %s jid="to@header1.org" type="to" />
+ </addresses>
+ </message>
+ """
+
+ msg = self.Message()
+ addr = msg['addresses'].addAddress(jid='to@header1.org', atype='to')
+ self.check(msg, xmlstring % '')
+
+ addr['delivered'] = True
+ self.check(msg, xmlstring % 'delivered="true"')
+
+ addr['delivered'] = False
+ self.check(msg, xmlstring % '')
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestAddresses)
diff --git a/tests/test_stanza_xep_0050.py b/tests/test_stanza_xep_0050.py
new file mode 100644
index 00000000..ae584de4
--- /dev/null
+++ b/tests/test_stanza_xep_0050.py
@@ -0,0 +1,114 @@
+from sleekxmpp import Iq
+from sleekxmpp.test import *
+from sleekxmpp.plugins.xep_0050 import Command
+
+
+class TestAdHocCommandStanzas(SleekTest):
+
+ def setUp(self):
+ register_stanza_plugin(Iq, Command)
+
+ def testAction(self):
+ """Test using the action attribute."""
+ iq = self.Iq()
+ iq['type'] = 'set'
+ iq['command']['node'] = 'foo'
+
+ iq['command']['action'] = 'execute'
+ self.failUnless(iq['command']['action'] == 'execute')
+
+ iq['command']['action'] = 'complete'
+ self.failUnless(iq['command']['action'] == 'complete')
+
+ iq['command']['action'] = 'cancel'
+ self.failUnless(iq['command']['action'] == 'cancel')
+
+ def testSetActions(self):
+ """Test setting next actions in a command stanza."""
+ iq = self.Iq()
+ iq['type'] = 'result'
+ iq['command']['node'] = 'foo'
+ iq['command']['actions'] = ['prev', 'next']
+
+ self.check(iq, """
+ <iq id="0" type="result">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo">
+ <actions>
+ <prev />
+ <next />
+ </actions>
+ </command>
+ </iq>
+ """)
+
+ def testGetActions(self):
+ """Test retrieving next actions from a command stanza."""
+ iq = self.Iq()
+ iq['command']['node'] = 'foo'
+ iq['command']['actions'] = ['prev', 'next']
+
+ results = iq['command']['actions']
+ expected = ['prev', 'next']
+ self.assertEqual(results, expected,
+ "Incorrect next actions: %s" % results)
+
+ def testDelActions(self):
+ """Test removing next actions from a command stanza."""
+ iq = self.Iq()
+ iq['type'] = 'result'
+ iq['command']['node'] = 'foo'
+ iq['command']['actions'] = ['prev', 'next']
+
+ del iq['command']['actions']
+
+ self.check(iq, """
+ <iq id="0" type="result">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo" />
+ </iq>
+ """)
+
+ def testAddNote(self):
+ """Test adding a command note."""
+ iq = self.Iq()
+ iq['type'] = 'result'
+ iq['command']['node'] = 'foo'
+ iq['command'].add_note('Danger!', ntype='warning')
+
+ self.check(iq, """
+ <iq id="0" type="result">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo">
+ <note type="warning">Danger!</note>
+ </command>
+ </iq>
+ """)
+
+ def testNotes(self):
+ """Test using command notes."""
+ iq = self.Iq()
+ iq['type'] = 'result'
+ iq['command']['node'] = 'foo'
+
+ notes = [('info', 'Interesting...'),
+ ('warning', 'Danger!'),
+ ('error', "I can't let you do that")]
+ iq['command']['notes'] = notes
+
+ self.failUnless(iq['command']['notes'] == notes,
+ "Notes don't match: %s %s" % (notes, iq['command']['notes']))
+
+ self.check(iq, """
+ <iq id="0" type="result">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo">
+ <note type="info">Interesting...</note>
+ <note type="warning">Danger!</note>
+ <note type="error">I can't let you do that</note>
+ </command>
+ </iq>
+ """)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestAdHocCommandStanzas)
diff --git a/tests/test_stanza_xep_0059.py b/tests/test_stanza_xep_0059.py
new file mode 100644
index 00000000..913436a6
--- /dev/null
+++ b/tests/test_stanza_xep_0059.py
@@ -0,0 +1,106 @@
+from sleekxmpp.test import *
+from sleekxmpp.plugins.xep_0059 import Set
+
+
+class TestSetStanzas(SleekTest):
+
+ def testSetFirstIndex(self):
+ s = Set()
+ s['first'] = 'id'
+ s.set_first_index('10')
+ self.check(s, """
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <first index="10">id</first>
+ </set>
+ """)
+
+ def testGetFirstIndex(self):
+ xml_string = """
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <first index="10">id</first>
+ </set>
+ """
+ s = Set(ET.fromstring(xml_string))
+ expected = '10'
+ self.failUnless(s['first_index'] == expected)
+
+ def testDelFirstIndex(self):
+ xml_string = """
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <first index="10">id</first>
+ </set>
+ """
+ s = Set(ET.fromstring(xml_string))
+ del s['first_index']
+ self.check(s, """
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <first>id</first>
+ </set>
+ """)
+
+ def testSetBefore(self):
+ s = Set()
+ s['before'] = True
+ self.check(s, """
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <before />
+ </set>
+ """)
+
+ def testGetBefore(self):
+ xml_string = """
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <before />
+ </set>
+ """
+ s = Set(ET.fromstring(xml_string))
+ expected = True
+ self.failUnless(s['before'] == expected)
+
+ def testGetBefore(self):
+ xml_string = """
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <before />
+ </set>
+ """
+ s = Set(ET.fromstring(xml_string))
+ del s['before']
+ self.check(s, """
+ <set xmlns="http://jabber.org/protocol/rsm">
+ </set>
+ """)
+
+ def testSetBeforeVal(self):
+ s = Set()
+ s['before'] = 'id'
+ self.check(s, """
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <before>id</before>
+ </set>
+ """)
+
+ def testGetBeforeVal(self):
+ xml_string = """
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <before>id</before>
+ </set>
+ """
+ s = Set(ET.fromstring(xml_string))
+ expected = 'id'
+ self.failUnless(s['before'] == expected)
+
+ def testGetBeforeVal(self):
+ xml_string = """
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <before>id</before>
+ </set>
+ """
+ s = Set(ET.fromstring(xml_string))
+ del s['before']
+ self.check(s, """
+ <set xmlns="http://jabber.org/protocol/rsm">
+ </set>
+ """)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestSetStanzas)
diff --git a/tests/test_stanza_xep_0060.py b/tests/test_stanza_xep_0060.py
new file mode 100644
index 00000000..16a7cb37
--- /dev/null
+++ b/tests/test_stanza_xep_0060.py
@@ -0,0 +1,575 @@
+from sleekxmpp.test import *
+import sleekxmpp.plugins.xep_0004 as xep_0004
+import sleekxmpp.plugins.xep_0060.stanza as pubsub
+
+
+class TestPubsubStanzas(SleekTest):
+
+ def testAffiliations(self):
+ "Testing iq/pubsub/affiliations/affiliation stanzas"
+ iq = self.Iq()
+ aff1 = pubsub.Affiliation()
+ aff1['node'] = 'testnode'
+ aff1['affiliation'] = 'owner'
+ aff2 = pubsub.Affiliation()
+ aff2['node'] = 'testnode2'
+ aff2['affiliation'] = 'publisher'
+ iq['pubsub']['affiliations'].append(aff1)
+ iq['pubsub']['affiliations'].append(aff2)
+ self.check(iq, """
+ <iq id="0">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <affiliations>
+ <affiliation node="testnode" affiliation="owner" />
+ <affiliation node="testnode2" affiliation="publisher" />
+ </affiliations>
+ </pubsub>
+ </iq>""")
+
+ def testSubscriptions(self):
+ "Testing iq/pubsub/subscriptions/subscription stanzas"
+ iq = self.Iq()
+ sub1 = pubsub.Subscription()
+ sub1['node'] = 'testnode'
+ sub1['jid'] = 'steve@myserver.tld/someresource'
+ sub2 = pubsub.Subscription()
+ sub2['node'] = 'testnode2'
+ sub2['jid'] = 'boogers@bork.top/bill'
+ sub2['subscription'] = 'subscribed'
+ iq['pubsub']['subscriptions'].append(sub1)
+ iq['pubsub']['subscriptions'].append(sub2)
+ self.check(iq, """
+ <iq id="0">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <subscriptions>
+ <subscription node="testnode" jid="steve@myserver.tld/someresource" />
+ <subscription node="testnode2" jid="boogers@bork.top/bill" subscription="subscribed" />
+ </subscriptions>
+ </pubsub>
+ </iq>""")
+
+ def testOptionalSettings(self):
+ "Testing iq/pubsub/subscription/subscribe-options stanzas"
+ iq = self.Iq()
+ iq['pubsub']['subscription']['suboptions']['required'] = True
+ iq['pubsub']['subscription']['node'] = 'testnode alsdkjfas'
+ iq['pubsub']['subscription']['jid'] = "fritzy@netflint.net/sleekxmpp"
+ iq['pubsub']['subscription']['subscription'] = 'unconfigured'
+ self.check(iq, """
+ <iq id="0">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <subscription node="testnode alsdkjfas" jid="fritzy@netflint.net/sleekxmpp" subscription="unconfigured">
+ <subscribe-options>
+ <required />
+ </subscribe-options>
+ </subscription>
+ </pubsub>
+ </iq>""")
+
+ def testItems(self):
+ "Testing iq/pubsub/items stanzas"
+ iq = self.Iq()
+ iq['pubsub']['items']['node'] = 'crap'
+ payload = ET.fromstring("""
+ <thinger xmlns="http://andyet.net/protocol/thinger" x="1" y='2'>
+ <child1 />
+ <child2 normandy='cheese' foo='bar' />
+ </thinger>""")
+ payload2 = ET.fromstring("""
+ <thinger2 xmlns="http://andyet.net/protocol/thinger2" x="12" y='22'>
+ <child12 />
+ <child22 normandy='cheese2' foo='bar2' />
+ </thinger2>""")
+ item = pubsub.Item()
+ item['id'] = 'asdf'
+ item['payload'] = payload
+ item2 = pubsub.Item()
+ item2['id'] = 'asdf2'
+ item2['payload'] = payload2
+ iq['pubsub']['items'].append(item)
+ iq['pubsub']['items'].append(item2)
+ self.check(iq, """
+ <iq id="0">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <items node="crap">
+ <item id="asdf">
+ <thinger xmlns="http://andyet.net/protocol/thinger" y="2" x="1">
+ <child1 />
+ <child2 foo="bar" normandy="cheese" />
+ </thinger>
+ </item>
+ <item id="asdf2">
+ <thinger2 xmlns="http://andyet.net/protocol/thinger2" y="22" x="12">
+ <child12 />
+ <child22 foo="bar2" normandy="cheese2" />
+ </thinger2>
+ </item>
+ </items>
+ </pubsub>
+ </iq>""")
+
+ def testCreate(self):
+ "Testing iq/pubsub/create&configure stanzas"
+ iq = self.Iq()
+ iq['pubsub']['create']['node'] = 'mynode'
+ iq['pubsub']['configure']['form'].addField('pubsub#title',
+ ftype='text-single',
+ value='This thing is awesome')
+ self.check(iq, """
+ <iq id="0">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <create node="mynode" />
+ <configure>
+ <x xmlns="jabber:x:data" type="form">
+ <field var="pubsub#title" type="text-single">
+ <value>This thing is awesome</value>
+ </field>
+ </x>
+ </configure>
+ </pubsub>
+ </iq>""")
+
+ def testState(self):
+ "Testing iq/psstate stanzas"
+ iq = self.Iq()
+ iq['psstate']['node']= 'mynode'
+ iq['psstate']['item']= 'myitem'
+ pl = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed')
+ iq['psstate']['payload'] = pl
+ self.check(iq, """
+ <iq id="0">
+ <state xmlns="http://jabber.org/protocol/psstate" node="mynode" item="myitem">
+ <claimed xmlns="http://andyet.net/protocol/pubsubqueue" />
+ </state>
+ </iq>""")
+
+ def testDefault(self):
+ "Testing iq/pubsub_owner/default stanzas"
+ iq = self.Iq()
+ iq['pubsub_owner']['default']
+ iq['pubsub_owner']['default']['node'] = 'mynode'
+ iq['pubsub_owner']['default']['form'].addField('pubsub#title',
+ ftype='text-single',
+ value='This thing is awesome')
+ self.check(iq, """
+ <iq id="0">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub#owner">
+ <default node="mynode">
+ <x xmlns="jabber:x:data" type="form">
+ <field var="pubsub#title" type="text-single">
+ <value>This thing is awesome</value>
+ </field>
+ </x>
+ </default>
+ </pubsub>
+ </iq>""", use_values=False)
+
+ def testSubscribe(self):
+ "testing iq/pubsub/subscribe stanzas"
+ iq = self.Iq()
+ iq['pubsub']['subscribe']['options']
+ iq['pubsub']['subscribe']['node'] = 'cheese'
+ iq['pubsub']['subscribe']['jid'] = 'fritzy@netflint.net/sleekxmpp'
+ iq['pubsub']['subscribe']['options']['node'] = 'cheese'
+ iq['pubsub']['subscribe']['options']['jid'] = 'fritzy@netflint.net/sleekxmpp'
+ form = xep_0004.Form()
+ form.addField('pubsub#title', ftype='text-single', value='this thing is awesome')
+ iq['pubsub']['subscribe']['options']['options'] = form
+ self.check(iq, """
+ <iq id="0">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <subscribe node="cheese" jid="fritzy@netflint.net/sleekxmpp">
+ <options node="cheese" jid="fritzy@netflint.net/sleekxmpp">
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="pubsub#title">
+ <value>this thing is awesome</value>
+ </field>
+ </x>
+ </options>
+ </subscribe>
+ </pubsub>
+ </iq>""", use_values=False)
+
+ def testPublish(self):
+ "Testing iq/pubsub/publish stanzas"
+ iq = self.Iq()
+ iq['pubsub']['publish']['node'] = 'thingers'
+ payload = ET.fromstring("""
+ <thinger xmlns="http://andyet.net/protocol/thinger" x="1" y='2'>
+ <child1 />
+ <child2 normandy='cheese' foo='bar' />
+ </thinger>""")
+ payload2 = ET.fromstring("""
+ <thinger2 xmlns="http://andyet.net/protocol/thinger2" x="12" y='22'>
+ <child12 />
+ <child22 normandy='cheese2' foo='bar2' />
+ </thinger2>""")
+ item = pubsub.Item()
+ item['id'] = 'asdf'
+ item['payload'] = payload
+ item2 = pubsub.Item()
+ item2['id'] = 'asdf2'
+ item2['payload'] = payload2
+ iq['pubsub']['publish'].append(item)
+ iq['pubsub']['publish'].append(item2)
+ form = xep_0004.Form()
+ form.addField('pubsub#description', ftype='text-single', value='this thing is awesome')
+ iq['pubsub']['publish_options'] = form
+
+ self.check(iq, """
+ <iq id="0">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <publish node="thingers">
+ <item id="asdf">
+ <thinger xmlns="http://andyet.net/protocol/thinger" y="2" x="1">
+ <child1 />
+ <child2 foo="bar" normandy="cheese" />
+ </thinger>
+ </item>
+ <item id="asdf2">
+ <thinger2 xmlns="http://andyet.net/protocol/thinger2" y="22" x="12">
+ <child12 />
+ <child22 foo="bar2" normandy="cheese2" />
+ </thinger2>
+ </item>
+ </publish>
+ <publish-options>
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="pubsub#description">
+ <value>this thing is awesome</value>
+ </field>
+ </x>
+ </publish-options>
+ </pubsub>
+ </iq>""")
+
+ def testDelete(self):
+ "Testing iq/pubsub_owner/delete stanzas"
+ iq = self.Iq()
+ iq['pubsub_owner']['delete']['node'] = 'thingers'
+ self.check(iq, """
+ <iq id="0">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub#owner">
+ <delete node="thingers" />
+ </pubsub>
+ </iq>""")
+
+ def testCreateConfigGet(self):
+ """Testing getting config from full create"""
+ iq = self.Iq()
+ iq['to'] = 'pubsub.asdf'
+ iq['from'] = 'fritzy@asdf/87292ede-524d-4117-9076-d934ed3db8e7'
+ iq['type'] = 'set'
+ iq['id'] = 'E'
+
+ pub = iq['pubsub']
+ pub['create']['node'] = 'testnode2'
+ pub['configure']['form']['type'] = 'submit'
+ pub['configure']['form'].setFields([
+ ('FORM_TYPE', {'type': 'hidden',
+ 'value': 'http://jabber.org/protocol/pubsub#node_config'}),
+ ('pubsub#node_type', {'type': 'list-single',
+ 'label': 'Select the node type',
+ 'value': 'leaf'}),
+ ('pubsub#title', {'type': 'text-single',
+ 'label': 'A friendly name for the node'}),
+ ('pubsub#deliver_notifications', {'type': 'boolean',
+ 'label': 'Deliver event notifications',
+ 'value': True}),
+ ('pubsub#deliver_payloads', {'type': 'boolean',
+ 'label': 'Deliver payloads with event notifications',
+ 'value': True}),
+ ('pubsub#notify_config', {'type': 'boolean',
+ 'label': 'Notify subscribers when the node configuration changes'}),
+ ('pubsub#notify_delete', {'type': 'boolean',
+ 'label': 'Notify subscribers when the node is deleted'}),
+ ('pubsub#notify_retract', {'type': 'boolean',
+ 'label': 'Notify subscribers when items are removed from the node',
+ 'value': True}),
+ ('pubsub#notify_sub', {'type': 'boolean',
+ 'label': 'Notify owners about new subscribers and unsubscribes'}),
+ ('pubsub#persist_items', {'type': 'boolean',
+ 'label': 'Persist items in storage'}),
+ ('pubsub#max_items', {'type': 'text-single',
+ 'label': 'Max # of items to persist',
+ 'value': '10'}),
+ ('pubsub#subscribe', {'type': 'boolean',
+ 'label': 'Whether to allow subscriptions',
+ 'value': True}),
+ ('pubsub#access_model', {'type': 'list-single',
+ 'label': 'Specify the subscriber model',
+ 'value': 'open'}),
+ ('pubsub#publish_model', {'type': 'list-single',
+ 'label': 'Specify the publisher model',
+ 'value': 'publishers'}),
+ ('pubsub#send_last_published_item', {'type': 'list-single',
+ 'label': 'Send last published item',
+ 'value': 'never'}),
+ ('pubsub#presence_based_delivery', {'type': 'boolean',
+ 'label': 'Deliver notification only to available users'}),
+ ])
+
+ self.check(iq, """
+ <iq to="pubsub.asdf" type="set" id="E" from="fritzy@asdf/87292ede-524d-4117-9076-d934ed3db8e7">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <create node="testnode2" />
+ <configure>
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="FORM_TYPE">
+ <value>http://jabber.org/protocol/pubsub#node_config</value>
+ </field>
+ <field var="pubsub#node_type">
+ <value>leaf</value>
+ </field>
+ <field var="pubsub#title" />
+ <field var="pubsub#deliver_notifications">
+ <value>1</value>
+ </field>
+ <field var="pubsub#deliver_payloads">
+ <value>1</value>
+ </field>
+ <field var="pubsub#notify_config" />
+ <field var="pubsub#notify_delete" />
+ <field var="pubsub#notify_retract">
+ <value>1</value>
+ </field>
+ <field var="pubsub#notify_sub" />
+ <field var="pubsub#persist_items" />
+ <field var="pubsub#max_items">
+ <value>10</value>
+ </field>
+ <field var="pubsub#subscribe">
+ <value>1</value>
+ </field>
+ <field var="pubsub#access_model">
+ <value>open</value>
+ </field>
+ <field var="pubsub#publish_model">
+ <value>publishers</value>
+ </field>
+ <field var="pubsub#send_last_published_item">
+ <value>never</value>
+ </field>
+ <field var="pubsub#presence_based_delivery" />
+ </x>
+ </configure>
+ </pubsub>
+ </iq>""")
+
+ def testItemEvent(self):
+ """Testing message/pubsub_event/items/item"""
+ msg = self.Message()
+ item = pubsub.EventItem()
+ pl = ET.Element('{http://netflint.net/protocol/test}test', {'failed':'3', 'passed':'24'})
+ item['payload'] = pl
+ item['id'] = 'abc123'
+ msg['pubsub_event']['items'].append(item)
+ msg['pubsub_event']['items']['node'] = 'cheese'
+ msg['type'] = 'normal'
+ self.check(msg, """
+ <message type="normal">
+ <event xmlns="http://jabber.org/protocol/pubsub#event">
+ <items node="cheese">
+ <item id="abc123">
+ <test xmlns="http://netflint.net/protocol/test" failed="3" passed="24" />
+ </item>
+ </items>
+ </event>
+ </message>""")
+
+ def testItemsEvent(self):
+ """Testing multiple message/pubsub_event/items/item"""
+ msg = self.Message()
+ item = pubsub.EventItem()
+ item2 = pubsub.EventItem()
+ pl = ET.Element('{http://netflint.net/protocol/test}test', {'failed':'3', 'passed':'24'})
+ pl2 = ET.Element('{http://netflint.net/protocol/test-other}test', {'total':'27', 'failed':'3'})
+ item2['payload'] = pl2
+ item['payload'] = pl
+ item['id'] = 'abc123'
+ item2['id'] = '123abc'
+ msg['pubsub_event']['items'].append(item)
+ msg['pubsub_event']['items'].append(item2)
+ msg['pubsub_event']['items']['node'] = 'cheese'
+ msg['type'] = 'normal'
+ self.check(msg, """
+ <message type="normal">
+ <event xmlns="http://jabber.org/protocol/pubsub#event">
+ <items node="cheese">
+ <item id="abc123">
+ <test xmlns="http://netflint.net/protocol/test" failed="3" passed="24" />
+ </item>
+ <item id="123abc">
+ <test xmlns="http://netflint.net/protocol/test-other" failed="3" total="27" />
+ </item>
+ </items>
+ </event>
+ </message>""")
+
+ def testItemsEvent(self):
+ """Testing message/pubsub_event/items/item & retract mix"""
+ msg = self.Message()
+ item = pubsub.EventItem()
+ item2 = pubsub.EventItem()
+ pl = ET.Element('{http://netflint.net/protocol/test}test', {'failed':'3', 'passed':'24'})
+ pl2 = ET.Element('{http://netflint.net/protocol/test-other}test', {'total':'27', 'failed':'3'})
+ item2['payload'] = pl2
+ retract = pubsub.EventRetract()
+ retract['id'] = 'aabbcc'
+ item['payload'] = pl
+ item['id'] = 'abc123'
+ item2['id'] = '123abc'
+ msg['pubsub_event']['items'].append(item)
+ msg['pubsub_event']['items'].append(retract)
+ msg['pubsub_event']['items'].append(item2)
+ msg['pubsub_event']['items']['node'] = 'cheese'
+ msg['type'] = 'normal'
+ self.check(msg, """
+ <message type="normal">
+ <event xmlns="http://jabber.org/protocol/pubsub#event">
+ <items node="cheese">
+ <item id="abc123">
+ <test xmlns="http://netflint.net/protocol/test" failed="3" passed="24" />
+ </item><retract id="aabbcc" />
+ <item id="123abc">
+ <test xmlns="http://netflint.net/protocol/test-other" failed="3" total="27" />
+ </item>
+ </items>
+ </event>
+ </message>""")
+
+ def testCollectionAssociate(self):
+ """Testing message/pubsub_event/collection/associate"""
+ msg = self.Message()
+ msg['pubsub_event']['collection']['associate']['node'] = 'cheese'
+ msg['pubsub_event']['collection']['node'] = 'cheeseburger'
+ msg['type'] = 'headline'
+ self.check(msg, """
+ <message type="headline">
+ <event xmlns="http://jabber.org/protocol/pubsub#event">
+ <collection node="cheeseburger">
+ <associate node="cheese" />
+ </collection>
+ </event>
+ </message>""")
+
+ def testCollectionDisassociate(self):
+ """Testing message/pubsub_event/collection/disassociate"""
+ msg = self.Message()
+ msg['pubsub_event']['collection']['disassociate']['node'] = 'cheese'
+ msg['pubsub_event']['collection']['node'] = 'cheeseburger'
+ msg['type'] = 'headline'
+ self.check(msg, """
+ <message type="headline">
+ <event xmlns="http://jabber.org/protocol/pubsub#event">
+ <collection node="cheeseburger">
+ <disassociate node="cheese" />
+ </collection>
+ </event>
+ </message>""")
+
+ def testEventConfiguration(self):
+ """Testing message/pubsub_event/configuration/config"""
+ msg = self.Message()
+ msg['pubsub_event']['configuration']['node'] = 'cheese'
+ msg['pubsub_event']['configuration']['form'].addField('pubsub#title',
+ ftype='text-single',
+ value='This thing is awesome')
+ msg['type'] = 'headline'
+ self.check(msg, """
+ <message type="headline">
+ <event xmlns="http://jabber.org/protocol/pubsub#event">
+ <configuration node="cheese">
+ <x xmlns="jabber:x:data" type="form">
+ <field var="pubsub#title" type="text-single">
+ <value>This thing is awesome</value>
+ </field>
+ </x>
+ </configuration>
+ </event>
+ </message>""")
+
+ def testEventPurge(self):
+ """Testing message/pubsub_event/purge"""
+ msg = self.Message()
+ msg['pubsub_event']['purge']['node'] = 'pickles'
+ msg['type'] = 'headline'
+ self.check(msg, """
+ <message type="headline">
+ <event xmlns="http://jabber.org/protocol/pubsub#event">
+ <purge node="pickles" />
+ </event>
+ </message>""")
+
+ def testEventSubscription(self):
+ """Testing message/pubsub_event/subscription"""
+ msg = self.Message()
+ msg['pubsub_event']['subscription']['node'] = 'pickles'
+ msg['pubsub_event']['subscription']['jid'] = 'fritzy@netflint.net/test'
+ msg['pubsub_event']['subscription']['subid'] = 'aabb1122'
+ msg['pubsub_event']['subscription']['subscription'] = 'subscribed'
+ msg['pubsub_event']['subscription']['expiry'] = 'presence'
+ msg['type'] = 'headline'
+ self.check(msg, """
+ <message type="headline">
+ <event xmlns="http://jabber.org/protocol/pubsub#event">
+ <subscription node="pickles" subid="aabb1122" jid="fritzy@netflint.net/test" subscription="subscribed" expiry="presence" />
+ </event>
+ </message>""")
+
+ def testPubsubError(self):
+ """Test getting a pubsub specific condition from an error stanza"""
+ iq = self.Iq()
+ iq['error']['type'] = 'cancel'
+ iq['error']['code'] = '501'
+ iq['error']['condition'] = 'feature-not-implemented'
+ iq['error']['pubsub']['condition'] = 'subid-required'
+ self.check(iq, """
+ <iq type="error">
+ <error type="cancel" code="501">
+ <feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ <subid-required xmlns="http://jabber.org/protocol/pubsub#errors" />
+ </error>
+ </iq>
+ """, use_values=False)
+
+ del iq['error']['pubsub']['condition']
+ self.check(iq, """
+ <iq type="error">
+ <error type="cancel" code="501">
+ <feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ </error>
+ </iq>
+ """, use_values=False)
+
+ def testPubsubUnsupportedError(self):
+ """Test getting the feature from an unsupported error"""
+ iq = self.Iq()
+ iq['error']['type'] = 'cancel'
+ iq['error']['code'] = '501'
+ iq['error']['condition'] = 'feature-not-implemented'
+ iq['error']['pubsub']['condition'] = 'unsupported'
+ iq['error']['pubsub']['unsupported'] = 'instant-node'
+ self.check(iq, """
+ <iq type="error">
+ <error type="cancel" code="501">
+ <feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ <unsupported xmlns="http://jabber.org/protocol/pubsub#errors" feature="instant-node" />
+ </error>
+ </iq>
+ """, use_values=False)
+
+ self.assertEqual(iq['error']['pubsub']['condition'], 'unsupported')
+ self.assertEqual(iq['error']['pubsub']['unsupported'], 'instant-node')
+
+ del iq['error']['pubsub']['unsupported']
+ self.check(iq, """
+ <iq type="error">
+ <error type="cancel" code="501">
+ <feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ </error>
+ </iq>
+ """, use_values=False)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestPubsubStanzas)
diff --git a/tests/test_stanza_xep_0085.py b/tests/test_stanza_xep_0085.py
new file mode 100644
index 00000000..b08404e2
--- /dev/null
+++ b/tests/test_stanza_xep_0085.py
@@ -0,0 +1,41 @@
+from sleekxmpp.test import *
+import sleekxmpp.plugins.xep_0085 as xep_0085
+
+class TestChatStates(SleekTest):
+
+ def setUp(self):
+ register_stanza_plugin(Message, xep_0085.ChatState)
+
+ def testCreateChatState(self):
+ """Testing creating chat states."""
+
+ xmlstring = """
+ <message>
+ <%s xmlns="http://jabber.org/protocol/chatstates" />
+ </message>
+ """
+
+ msg = self.Message()
+
+ self.assertEqual(msg['chat_state'], '')
+ self.check(msg, "<message />", use_values=False)
+
+ msg['chat_state'] = 'active'
+ self.check(msg, xmlstring % 'active', use_values=False)
+
+ msg['chat_state'] = 'composing'
+ self.check(msg, xmlstring % 'composing', use_values=False)
+
+ msg['chat_state'] = 'gone'
+ self.check(msg, xmlstring % 'gone', use_values=False)
+
+ msg['chat_state'] = 'inactive'
+ self.check(msg, xmlstring % 'inactive', use_values=False)
+
+ msg['chat_state'] = 'paused'
+ self.check(msg, xmlstring % 'paused', use_values=False)
+
+ del msg['chat_state']
+ self.check(msg, "<message />")
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestChatStates)
diff --git a/tests/test_stream.py b/tests/test_stream.py
new file mode 100644
index 00000000..deac24a5
--- /dev/null
+++ b/tests/test_stream.py
@@ -0,0 +1,79 @@
+import time
+from sleekxmpp.test import *
+
+
+class TestStreamTester(SleekTest):
+ """
+ Test that we can simulate and test a stanza stream.
+ """
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testClientEcho(self):
+ """Test that we can interact with a ClientXMPP instance."""
+ self.stream_start(mode='client')
+
+ def echo(msg):
+ msg.reply('Thanks for sending: %(body)s' % msg).send()
+
+ self.xmpp.add_event_handler('message', echo)
+
+ self.recv("""
+ <message to="tester@localhost" from="user@localhost">
+ <body>Hi!</body>
+ </message>
+ """)
+
+ self.send("""
+ <message to="user@localhost">
+ <body>Thanks for sending: Hi!</body>
+ </message>
+ """)
+
+ def testComponentEcho(self):
+ """Test that we can interact with a ComponentXMPP instance."""
+ self.stream_start(mode='component')
+
+ def echo(msg):
+ msg.reply('Thanks for sending: %(body)s' % msg).send()
+
+ self.xmpp.add_event_handler('message', echo)
+
+ self.recv("""
+ <message to="tester.localhost" from="user@localhost">
+ <body>Hi!</body>
+ </message>
+ """)
+
+ self.send("""
+ <message to="user@localhost" from="tester.localhost">
+ <body>Thanks for sending: Hi!</body>
+ </message>
+ """)
+
+ def testSendStreamHeader(self):
+ """Test that we can check a sent stream header."""
+ self.stream_start(mode='client', skip=False)
+ self.send_header(sto='localhost')
+
+ def testStreamDisconnect(self):
+ """Test that the test socket can simulate disconnections."""
+ self.stream_start()
+ events = set()
+
+ def stream_error(event):
+ events.add('socket_error')
+
+ self.xmpp.add_event_handler('socket_error', stream_error)
+
+ self.stream_disconnect()
+ self.xmpp.send_raw(' ')
+
+ time.sleep(.1)
+
+ self.failUnless('socket_error' in events,
+ "Stream error event not raised: %s" % events)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamTester)
diff --git a/tests/test_stream_exceptions.py b/tests/test_stream_exceptions.py
new file mode 100644
index 00000000..c41edbb2
--- /dev/null
+++ b/tests/test_stream_exceptions.py
@@ -0,0 +1,274 @@
+import sys
+import sleekxmpp
+from sleekxmpp.xmlstream.matcher import MatchXPath
+from sleekxmpp.xmlstream.handler import Callback
+from sleekxmpp.exceptions import XMPPError
+from sleekxmpp.test import *
+
+
+class TestStreamExceptions(SleekTest):
+ """
+ Test handling roster updates.
+ """
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testExceptionReply(self):
+ """Test that raising an exception replies with the original stanza."""
+
+ def message(msg):
+ msg.reply()
+ msg['body'] = 'Body changed'
+ raise XMPPError(clear=False)
+
+ self.stream_start()
+ self.xmpp.add_event_handler('message', message)
+
+ self.recv("""
+ <message>
+ <body>This is going to cause an error.</body>
+ </message>
+ """)
+
+ self.send("""
+ <message type="error">
+ <body>This is going to cause an error.</body>
+ <error type="cancel" code="500">
+ <undefined-condition
+ xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ </error>
+ </message>
+ """)
+
+ def testExceptionContinueWorking(self):
+ """Test that Sleek continues to respond after an XMPPError is raised."""
+
+ def message(msg):
+ msg.reply()
+ msg['body'] = 'Body changed'
+ raise XMPPError(clear=False)
+
+ self.stream_start()
+ self.xmpp.add_event_handler('message', message)
+
+ self.recv("""
+ <message>
+ <body>This is going to cause an error.</body>
+ </message>
+ """)
+
+ self.send("""
+ <message type="error">
+ <body>This is going to cause an error.</body>
+ <error type="cancel" code="500">
+ <undefined-condition
+ xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ </error>
+ </message>
+ """)
+
+ self.recv("""
+ <message>
+ <body>This is going to cause an error.</body>
+ </message>
+ """)
+
+ self.send("""
+ <message type="error">
+ <body>This is going to cause an error.</body>
+ <error type="cancel" code="500">
+ <undefined-condition
+ xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ </error>
+ </message>
+ """)
+
+ def testXMPPErrorException(self):
+ """Test raising an XMPPError exception."""
+
+ def message(msg):
+ raise XMPPError(condition='feature-not-implemented',
+ text="We don't do things that way here.",
+ etype='cancel',
+ extension='foo',
+ extension_ns='foo:error',
+ extension_args={'test': 'true'})
+
+ self.stream_start()
+ self.xmpp.add_event_handler('message', message)
+
+ self.recv("""
+ <message>
+ <body>This is going to cause an error.</body>
+ </message>
+ """)
+
+ self.send("""
+ <message type="error">
+ <error type="cancel" code="501">
+ <feature-not-implemented
+ xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ <text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
+ We don&apos;t do things that way here.
+ </text>
+ <foo xmlns="foo:error" test="true" />
+ </error>
+ </message>
+ """, use_values=False)
+
+ def testIqErrorException(self):
+ """Test using error exceptions with Iq stanzas."""
+
+ def handle_iq(iq):
+ raise XMPPError(condition='feature-not-implemented',
+ text="We don't do things that way here.",
+ etype='cancel',
+ clear=False)
+
+ self.stream_start()
+ self.xmpp.register_handler(
+ Callback(
+ 'Test Iq',
+ MatchXPath('{%s}iq/{test}query' % self.xmpp.default_ns),
+ handle_iq))
+
+ self.recv("""
+ <iq type="get" id="0">
+ <query xmlns="test" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="error" id="0">
+ <query xmlns="test" />
+ <error type="cancel" code="501">
+ <feature-not-implemented
+ xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ <text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
+ We don&apos;t do things that way here.
+ </text>
+ </error>
+ </iq>
+ """, use_values=False)
+
+ def testThreadedXMPPErrorException(self):
+ """Test raising an XMPPError exception in a threaded handler."""
+
+ def message(msg):
+ raise XMPPError(condition='feature-not-implemented',
+ text="We don't do things that way here.",
+ etype='cancel')
+
+ self.stream_start()
+ self.xmpp.add_event_handler('message', message,
+ threaded=True)
+
+ self.recv("""
+ <message>
+ <body>This is going to cause an error.</body>
+ </message>
+ """)
+
+ self.send("""
+ <message type="error">
+ <error type="cancel" code="501">
+ <feature-not-implemented
+ xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ <text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
+ We don&apos;t do things that way here.
+ </text>
+ </error>
+ </message>
+ """)
+
+ def testUnknownException(self):
+ """Test raising an generic exception in a threaded handler."""
+
+ raised_errors = []
+
+ def message(msg):
+ raise ValueError("Did something wrong")
+
+ def catch_error(*args, **kwargs):
+ raised_errors.append(True)
+
+ self.stream_start()
+ self.xmpp.exception = catch_error
+ self.xmpp.add_event_handler('message', message)
+
+ self.recv("""
+ <message>
+ <body>This is going to cause an error.</body>
+ </message>
+ """)
+
+ self.send("""
+ <message type="error">
+ <error type="cancel" code="500">
+ <undefined-condition
+ xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ <text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
+ SleekXMPP got into trouble.
+ </text>
+ </error>
+ </message>
+ """)
+
+ self.assertEqual(raised_errors, [True], "Exception was not raised: %s" % raised_errors)
+
+ def testUnknownException(self):
+ """Test Sleek continues to respond after an unknown exception."""
+
+ raised_errors = []
+
+ def message(msg):
+ raise ValueError("Did something wrong")
+
+ def catch_error(*args, **kwargs):
+ raised_errors.append(True)
+
+ self.stream_start()
+ self.xmpp.exception = catch_error
+ self.xmpp.add_event_handler('message', message)
+
+ self.recv("""
+ <message>
+ <body>This is going to cause an error.</body>
+ </message>
+ """)
+
+ self.send("""
+ <message type="error">
+ <error type="cancel" code="500">
+ <undefined-condition
+ xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ <text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
+ SleekXMPP got into trouble.
+ </text>
+ </error>
+ </message>
+ """)
+
+ self.recv("""
+ <message>
+ <body>This is going to cause an error.</body>
+ </message>
+ """)
+
+ self.send("""
+ <message type="error">
+ <error type="cancel" code="500">
+ <undefined-condition
+ xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
+ <text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
+ SleekXMPP got into trouble.
+ </text>
+ </error>
+ </message>
+ """)
+
+ self.assertEqual(raised_errors, [True, True], "Exceptions were not raised: %s" % raised_errors)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamExceptions)
diff --git a/tests/test_stream_filters.py b/tests/test_stream_filters.py
new file mode 100644
index 00000000..ef4d5dc8
--- /dev/null
+++ b/tests/test_stream_filters.py
@@ -0,0 +1,88 @@
+import time
+
+from sleekxmpp import Message
+from sleekxmpp.test import *
+from sleekxmpp.xmlstream.handler import *
+from sleekxmpp.xmlstream.matcher import *
+
+
+class TestFilters(SleekTest):
+
+ """
+ Test using incoming and outgoing filters.
+ """
+
+ def setUp(self):
+ self.stream_start()
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testIncoming(self):
+
+ data = []
+
+ def in_filter(stanza):
+ if isinstance(stanza, Message):
+ if stanza['body'] == 'testing':
+ stanza['subject'] = stanza['body'] + ' filter'
+ print('>>> %s' % stanza['subject'])
+ return stanza
+
+ def on_message(msg):
+ print('<<< %s' % msg['subject'])
+ data.append(msg['subject'])
+
+ self.xmpp.add_filter('in', in_filter)
+ self.xmpp.add_event_handler('message', on_message)
+
+ self.recv("""
+ <message>
+ <body>no filter</body>
+ </message>
+ """)
+
+ self.recv("""
+ <message>
+ <body>testing</body>
+ </message>
+ """)
+
+ time.sleep(0.5)
+
+ self.assertEqual(data, ['', 'testing filter'],
+ 'Incoming filter did not apply %s' % data)
+
+ def testOutgoing(self):
+
+ def out_filter(stanza):
+ if isinstance(stanza, Message):
+ if stanza['body'] == 'testing':
+ stanza['body'] = 'changed!'
+ return stanza
+
+ self.xmpp.add_filter('out', out_filter)
+
+ m1 = self.Message()
+ m1['body'] = 'testing'
+ m1.send()
+
+ m2 = self.Message()
+ m2['body'] = 'blah'
+ m2.send()
+
+ self.send("""
+ <message>
+ <body>changed!</body>
+ </message>
+ """)
+
+ self.send("""
+ <message>
+ <body>blah</body>
+ </message>
+ """)
+
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestFilters)
diff --git a/tests/test_stream_handlers.py b/tests/test_stream_handlers.py
new file mode 100644
index 00000000..7fd4e648
--- /dev/null
+++ b/tests/test_stream_handlers.py
@@ -0,0 +1,201 @@
+import time
+
+from sleekxmpp import Message
+from sleekxmpp.test import *
+from sleekxmpp.xmlstream.handler import *
+from sleekxmpp.xmlstream.matcher import *
+
+
+class TestHandlers(SleekTest):
+ """
+ Test using handlers and waiters.
+ """
+
+ def setUp(self):
+ self.stream_start()
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testCallback(self):
+ """Test using stream callback handlers."""
+
+ def callback_handler(stanza):
+ self.xmpp.sendRaw("""
+ <message>
+ <body>Success!</body>
+ </message>
+ """)
+
+ callback = Callback('Test Callback',
+ MatchXPath('{test}tester'),
+ callback_handler)
+
+ self.xmpp.registerHandler(callback)
+
+ self.recv("""<tester xmlns="test" />""")
+
+ msg = self.Message()
+ msg['body'] = 'Success!'
+ self.send(msg)
+
+ def testWaiter(self):
+ """Test using stream waiter handler."""
+
+ def waiter_handler(stanza):
+ iq = self.xmpp.Iq()
+ iq['id'] = 'test'
+ iq['type'] = 'set'
+ iq['query'] = 'test'
+ reply = iq.send(block=True)
+ if reply:
+ self.xmpp.sendRaw("""
+ <message>
+ <body>Successful: %s</body>
+ </message>
+ """ % reply['query'])
+
+ self.xmpp.add_event_handler('message', waiter_handler, threaded=True)
+
+ # Send message to trigger waiter_handler
+ self.recv("""
+ <message>
+ <body>Testing</body>
+ </message>
+ """)
+
+ # Check that Iq was sent by waiter_handler
+ iq = self.Iq()
+ iq['id'] = 'test'
+ iq['type'] = 'set'
+ iq['query'] = 'test'
+ self.send(iq)
+
+ # Send the reply Iq
+ self.recv("""
+ <iq id="test" type="result">
+ <query xmlns="test" />
+ </iq>
+ """)
+
+ # Check that waiter_handler received the reply
+ msg = self.Message()
+ msg['body'] = 'Successful: test'
+ self.send(msg)
+
+ def testWaiterTimeout(self):
+ """Test that waiter handler is removed after timeout."""
+
+ def waiter_handler(stanza):
+ iq = self.xmpp.Iq()
+ iq['id'] = 'test2'
+ iq['type'] = 'set'
+ iq['query'] = 'test2'
+ try:
+ reply = iq.send(block=True, timeout=0)
+ except IqTimeout:
+ pass
+
+ self.xmpp.add_event_handler('message', waiter_handler, threaded=True)
+
+ # Start test by triggerig waiter_handler
+ self.recv("""<message><body>Start Test</body></message>""")
+
+ # Check that Iq was sent to trigger start of timeout period
+ iq = self.Iq()
+ iq['id'] = 'test2'
+ iq['type'] = 'set'
+ iq['query'] = 'test2'
+ self.send(iq)
+
+ # Give the event queue time to process.
+ time.sleep(0.1)
+
+ # Check that the waiter is no longer registered
+ waiter_exists = self.xmpp.removeHandler('IqWait_test2')
+
+ self.failUnless(waiter_exists == False,
+ "Waiter handler was not removed.")
+
+ def testIqCallback(self):
+ """Test that iq.send(callback=handle_foo) works."""
+ events = []
+
+ def handle_foo(iq):
+ events.append('foo')
+
+ iq = self.Iq()
+ iq['type'] = 'get'
+ iq['id'] = 'test-foo'
+ iq['to'] = 'user@localhost'
+ iq['query'] = 'foo'
+ iq.send(callback=handle_foo)
+
+ self.send("""
+ <iq type="get" id="test-foo" to="user@localhost">
+ <query xmlns="foo" />
+ </iq>
+ """)
+
+ self.recv("""
+ <iq type="result" id="test-foo"
+ to="test@localhost"
+ from="user@localhost">
+ <query xmlns="foo">
+ <data />
+ </query>
+ </iq>
+ """)
+
+ # Give event queue time to process
+ time.sleep(0.1)
+
+ self.failUnless(events == ['foo'],
+ "Iq callback was not executed: %s" % events)
+
+ def testMultipleHandlersForStanza(self):
+ """
+ Test that multiple handlers for a single stanza work
+ without clobbering each other.
+ """
+
+ def handler_1(msg):
+ msg.reply("Handler 1: %s" % msg['body']).send()
+
+ def handler_2(msg):
+ msg.reply("Handler 2: %s" % msg['body']).send()
+
+ def handler_3(msg):
+ msg.reply("Handler 3: %s" % msg['body']).send()
+
+ self.xmpp.add_event_handler('message', handler_1)
+ self.xmpp.add_event_handler('message', handler_2)
+ self.xmpp.add_event_handler('message', handler_3)
+
+ self.recv("""
+ <message to="tester@localhost" from="user@example.com">
+ <body>Testing</body>
+ </message>
+ """)
+
+
+ # This test is brittle, depending on the fact that handlers
+ # will be checked in the order they are registered.
+ self.send("""
+ <message to="user@example.com">
+ <body>Handler 1: Testing</body>
+ </message>
+ """)
+ self.send("""
+ <message to="user@example.com">
+ <body>Handler 2: Testing</body>
+ </message>
+ """)
+ self.send("""
+ <message to="user@example.com">
+ <body>Handler 3: Testing</body>
+ </message>
+ """)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestHandlers)
diff --git a/tests/test_stream_presence.py b/tests/test_stream_presence.py
new file mode 100644
index 00000000..63ccb043
--- /dev/null
+++ b/tests/test_stream_presence.py
@@ -0,0 +1,380 @@
+import time
+from sleekxmpp.test import *
+
+
+class TestStreamPresence(SleekTest):
+ """
+ Test handling roster updates.
+ """
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testInitialUnavailablePresences(self):
+ """
+ Test receiving unavailable presences from JIDs that
+ are not online.
+ """
+ events = set()
+
+ def got_offline(presence):
+ # The got_offline event should not be triggered.
+ events.add('got_offline')
+
+ def unavailable(presence):
+ # The presence_unavailable event should be triggered.
+ events.add('unavailable')
+
+ self.stream_start()
+ self.xmpp.add_event_handler('got_offline', got_offline)
+ self.xmpp.add_event_handler('presence_unavailable', unavailable)
+
+ self.recv("""
+ <presence type="unavailable"
+ from="otheruser@localhost"
+ to="tester@localhost"/>
+ """)
+
+ # Give event queue time to process.
+ time.sleep(0.1)
+
+ self.assertEqual(events, set(('unavailable',)),
+ "Got offline incorrectly triggered: %s." % events)
+
+ def testGotOffline(self):
+ """Test that got_offline is triggered properly."""
+ events = []
+
+ def got_offline(presence):
+ events.append('got_offline')
+
+ self.stream_start()
+ self.xmpp.add_event_handler('got_offline', got_offline)
+
+ # Setup roster. Use a 'set' instead of 'result' so we
+ # don't have to handle get_roster() blocking.
+ #
+ # We use the stream to initialize the roster to make
+ # the test independent of the roster implementation.
+ self.recv("""
+ <iq type="set">
+ <query xmlns="jabber:iq:roster">
+ <item jid="otheruser@localhost"
+ name="Other User"
+ subscription="both">
+ <group>Testers</group>
+ </item>
+ </query>
+ </iq>
+ """)
+
+ # Contact comes online.
+ self.recv("""
+ <presence from="otheruser@localhost/foobar"
+ to="tester@localhost" />
+ """)
+
+ # Contact goes offline, should trigger got_offline.
+ self.recv("""
+ <presence from="otheruser@localhost/foobar"
+ to="tester@localhost"
+ type="unavailable" />
+ """)
+
+ # Give event queue time to process.
+ time.sleep(0.1)
+
+ self.assertEqual(events, ['got_offline'],
+ "Got offline incorrectly triggered: %s" % events)
+
+ def testGotOnline(self):
+ """Test that got_online is triggered properly."""
+
+ events = set()
+
+ def presence_available(p):
+ events.add('presence_available')
+
+ def got_online(p):
+ events.add('got_online')
+
+ self.stream_start()
+ self.xmpp.add_event_handler('presence_available', presence_available)
+ self.xmpp.add_event_handler('got_online', got_online)
+
+ self.recv("""
+ <presence from="user@localhost"
+ to="tester@localhost" />
+ """)
+
+ # Give event queue time to process.
+ time.sleep(0.1)
+
+ expected = set(('presence_available', 'got_online'))
+ self.assertEqual(events, expected,
+ "Incorrect events triggered: %s" % events)
+
+ def testAutoAuthorizeAndSubscribe(self):
+ """
+ Test auto authorizing and auto subscribing
+ to subscription requests.
+ """
+
+ events = set()
+
+ def presence_subscribe(p):
+ events.add('presence_subscribe')
+
+ def changed_subscription(p):
+ events.add('changed_subscription')
+
+ self.stream_start(jid='tester@localhost')
+
+ self.xmpp.add_event_handler('changed_subscription',
+ changed_subscription)
+ self.xmpp.add_event_handler('presence_subscribe',
+ presence_subscribe)
+
+ # With these settings we should accept a subscription
+ # and request a subscription in return.
+ self.xmpp.auto_authorize = True
+ self.xmpp.auto_subscribe = True
+
+ self.recv("""
+ <presence from="user@localhost"
+ to="tester@localhost"
+ type="subscribe" />
+ """)
+
+ self.send("""
+ <presence to="user@localhost"
+ type="subscribed" />
+ """)
+
+ self.send("""
+ <presence to="user@localhost" />
+ """)
+
+ self.send("""
+ <presence to="user@localhost"
+ type="subscribe" />
+ """)
+
+ expected = set(('presence_subscribe', 'changed_subscription'))
+ self.assertEqual(events, expected,
+ "Incorrect events triggered: %s" % events)
+
+ def testNoAutoAuthorize(self):
+ """Test auto rejecting subscription requests."""
+
+ events = set()
+
+ def presence_subscribe(p):
+ events.add('presence_subscribe')
+
+ def changed_subscription(p):
+ events.add('changed_subscription')
+
+ self.stream_start(jid='tester@localhost')
+
+ self.xmpp.add_event_handler('changed_subscription',
+ changed_subscription)
+ self.xmpp.add_event_handler('presence_subscribe',
+ presence_subscribe)
+
+ # With this setting we should reject all subscriptions.
+ self.xmpp.roster['tester@localhost'].auto_authorize = False
+
+ self.recv("""
+ <presence from="user@localhost"
+ to="tester@localhost"
+ type="subscribe" />
+ """)
+
+ self.send("""
+ <presence to="user@localhost"
+ type="unsubscribed" />
+ """)
+
+ expected = set(('presence_subscribe', 'changed_subscription'))
+ self.assertEqual(events, expected,
+ "Incorrect events triggered: %s" % events)
+
+ def test_presence_events(self):
+ """Test that presence events are raised."""
+
+ events = []
+
+ self.stream_start()
+
+ ptypes = ['available', 'away', 'dnd', 'xa', 'chat',
+ 'unavailable', 'subscribe', 'subscribed',
+ 'unsubscribe', 'unsubscribed']
+
+ for ptype in ptypes:
+ handler = lambda p: events.append(p['type'])
+ self.xmpp.add_event_handler('presence_%s' % ptype, handler)
+
+ self.recv("""
+ <presence />
+ """)
+ self.recv("""
+ <presence><show>away</show></presence>
+ """)
+ self.recv("""
+ <presence><show>dnd</show></presence>
+ """)
+ self.recv("""
+ <presence><show>xa</show></presence>
+ """)
+ self.recv("""
+ <presence><show>chat</show></presence>
+ """)
+ self.recv("""
+ <presence type="unavailable" />
+ """)
+ self.recv("""
+ <presence type="subscribe" />
+ """)
+ self.recv("""
+ <presence type="subscribed" />
+ """)
+ self.recv("""
+ <presence type="unsubscribe" />
+ """)
+ self.recv("""
+ <presence type="unsubscribed" />
+ """)
+
+ time.sleep(.5)
+
+ self.assertEqual(events, ptypes,
+ "Not all events raised: %s" % events)
+
+ def test_changed_status(self):
+ """Test that the changed_status event is handled properly."""
+ events = []
+ self.stream_start()
+
+ def changed_status(presence):
+ events.append(presence['type'])
+
+ self.xmpp.add_event_handler('changed_status', changed_status)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost" />
+ """)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost" />
+ """)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost">
+ <show>away</show>
+ </presence>
+ """)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost">
+ <show>away</show>
+ </presence>
+ """)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost">
+ <show>dnd</show>
+ </presence>
+ """)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost">
+ <show>dnd</show>
+ </presence>
+ """)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost">
+ <show>chat</show>
+ </presence>
+ """)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost">
+ <show>chat</show>
+ </presence>
+ """)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost">
+ <show>xa</show>
+ </presence>
+ """)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost">
+ <show>xa</show>
+ </presence>
+ """)
+
+ self.recv("""
+ <presence from="user@example.com"
+ to="tester@localhost"
+ type="unavailable" />
+ """)
+
+ self.recv("""
+ <presence from="user@example.com"
+ to="tester@localhost"
+ type="unavailable" />
+ """)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost" />
+ """)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost" />
+ """)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost" />
+ """)
+
+ # Changed status text, so fire new event
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost">
+ <status>Testing!</status>
+ </presence>
+ """)
+
+ # No change in show/status values, no event
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost">
+ <status>Testing!</status>
+ </presence>
+ """)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost">
+ <show>dnd</show>
+ <status>Testing!</status>
+ </presence>
+ """)
+
+ self.recv("""
+ <presence from="user@example.com" to="tester@localhost">
+ <show>dnd</show>
+ <status>Testing!</status>
+ </presence>
+ """)
+
+ time.sleep(0.3)
+
+ self.assertEqual(events, ['available', 'away', 'dnd', 'chat',
+ 'xa', 'unavailable', 'available',
+ 'available', 'dnd'],
+ "Changed status events incorrect: %s" % events)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamPresence)
diff --git a/tests/test_stream_roster.py b/tests/test_stream_roster.py
new file mode 100644
index 00000000..eb6d2f4f
--- /dev/null
+++ b/tests/test_stream_roster.py
@@ -0,0 +1,231 @@
+# -*- encoding:utf-8 -*-
+
+from __future__ import unicode_literals
+
+from sleekxmpp.test import *
+import time
+import threading
+
+
+class TestStreamRoster(SleekTest):
+ """
+ Test handling roster updates.
+ """
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testGetRoster(self):
+ """Test handling roster requests."""
+ self.stream_start(mode='client', jid='tester@localhost')
+
+ events = []
+
+ def roster_received(iq):
+ events.append('roster_received')
+
+ self.xmpp.add_event_handler('roster_received', roster_received)
+
+ # Since get_roster blocks, we need to run it in a thread.
+ t = threading.Thread(name='get_roster', target=self.xmpp.get_roster)
+ t.start()
+
+ self.send("""
+ <iq type="get" id="1">
+ <query xmlns="jabber:iq:roster" />
+ </iq>
+ """)
+ self.recv("""
+ <iq to='tester@localhost' type="result" id="1">
+ <query xmlns="jabber:iq:roster">
+ <item jid="user@localhost"
+ name="User"
+ subscription="from"
+ ask="subscribe">
+ <group>Friends</group>
+ <group>Examples</group>
+ </item>
+ </query>
+ </iq>
+ """)
+
+ # Wait for get_roster to return.
+ t.join()
+
+ self.check_roster('tester@localhost', 'user@localhost',
+ name='User',
+ subscription='from',
+ afrom=True,
+ pending_out=True,
+ groups=['Friends', 'Examples'])
+
+ # Give the event queue time to process.
+ time.sleep(.1)
+
+ self.failUnless('roster_received' in events,
+ "Roster received event not triggered: %s" % events)
+
+ def testRosterSet(self):
+ """Test handling pushed roster updates."""
+ self.stream_start(mode='client')
+ events = []
+
+ def roster_update(e):
+ events.append('roster_update')
+
+ self.xmpp.add_event_handler('roster_update', roster_update)
+
+ self.recv("""
+ <iq to='tester@localhost' type="set" id="1">
+ <query xmlns="jabber:iq:roster">
+ <item jid="user@localhost"
+ name="User"
+ subscription="both">
+ <group>Friends</group>
+ <group>Examples</group>
+ </item>
+ </query>
+ </iq>
+ """)
+ self.send("""
+ <iq type="result" id="1">
+ <query xmlns="jabber:iq:roster" />
+ </iq>
+ """)
+
+ self.check_roster('tester@localhost', 'user@localhost',
+ name='User',
+ subscription='both',
+ groups=['Friends', 'Examples'])
+
+ # Give the event queue time to process.
+ time.sleep(.1)
+
+ self.failUnless('roster_update' in events,
+ "Roster updated event not triggered: %s" % events)
+
+ def testRosterTimeout(self):
+ """Test handling a timed out roster request."""
+ self.stream_start()
+
+ def do_test():
+ self.xmpp.get_roster(timeout=0)
+ time.sleep(.1)
+
+ self.assertRaises(IqTimeout, do_test)
+
+ def testRosterCallback(self):
+ """Test handling a roster request callback."""
+ self.stream_start()
+ events = []
+
+ def roster_callback(iq):
+ events.append('roster_callback')
+
+ # Since get_roster blocks, we need to run it in a thread.
+ t = threading.Thread(name='get_roster',
+ target=self.xmpp.get_roster,
+ kwargs={str('block'): False,
+ str('callback'): roster_callback})
+ t.start()
+
+ self.send("""
+ <iq type="get" id="1">
+ <query xmlns="jabber:iq:roster" />
+ </iq>
+ """)
+ self.recv("""
+ <iq type="result" id="1">
+ <query xmlns="jabber:iq:roster">
+ <item jid="user@localhost"
+ name="User"
+ subscription="both">
+ <group>Friends</group>
+ <group>Examples</group>
+ </item>
+ </query>
+ </iq>
+ """)
+
+ # Wait for get_roster to return.
+ t.join()
+
+ # Give the event queue time to process.
+ time.sleep(.1)
+
+ self.failUnless(events == ['roster_callback'],
+ "Roster timeout event not triggered: %s." % events)
+
+ def testRosterUnicode(self):
+ """Test that JIDs with Unicode values are handled properly."""
+ self.stream_start()
+ self.recv("""
+ <iq to="tester@localhost" type="set" id="1">
+ <query xmlns="jabber:iq:roster">
+ <item jid="andré@foo" subscription="both">
+ <group>Unicode</group>
+ </item>
+ </query>
+ </iq>
+ """)
+
+ # Give the event queue time to process.
+ time.sleep(.1)
+
+ self.check_roster('tester@localhost', 'andré@foo',
+ subscription='both',
+ groups=['Unicode'])
+
+ jids = list(self.xmpp.client_roster.keys())
+ self.failUnless(jids == ['andré@foo'],
+ "Too many roster entries found: %s" % jids)
+
+ self.recv("""
+ <presence to="tester@localhost" from="andré@foo/bar">
+ <show>away</show>
+ <status>Testing</status>
+ </presence>
+ """)
+
+ # Give the event queue time to process.
+ time.sleep(.1)
+
+ result = self.xmpp.client_roster['andré@foo'].resources
+ expected = {'bar': {'status':'Testing',
+ 'show':'away',
+ 'priority':0}}
+ self.failUnless(result == expected,
+ "Unexpected roster values: %s" % result)
+
+ def testSendLastPresence(self):
+ """Test that sending the last presence works."""
+ self.stream_start()
+ self.xmpp.send_presence(pshow='dnd')
+ self.xmpp.auto_authorize = True
+ self.xmpp.auto_subscribe = True
+
+ self.send("""
+ <presence>
+ <show>dnd</show>
+ </presence>
+ """)
+
+ self.recv("""
+ <presence from="user@localhost"
+ to="tester@localhost"
+ type="subscribe" />
+ """)
+
+ self.send("""
+ <presence to="user@localhost"
+ type="subscribed" />
+ """)
+
+ self.send("""
+ <presence to="user@localhost">
+ <show>dnd</show>
+ </presence>
+ """)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamRoster)
diff --git a/tests/test_stream_xep_0030.py b/tests/test_stream_xep_0030.py
new file mode 100644
index 00000000..dd43778a
--- /dev/null
+++ b/tests/test_stream_xep_0030.py
@@ -0,0 +1,576 @@
+import sys
+import time
+import threading
+
+from sleekxmpp.test import *
+
+
+class TestStreamDisco(SleekTest):
+
+ """
+ Test using the XEP-0030 plugin.
+ """
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testInfoEmptyDefaultNode(self):
+ """
+ Info query result from an entity MUST have at least one identity
+ and feature, namely http://jabber.org/protocol/disco#info.
+
+ Since the XEP-0030 plugin is loaded, a disco response should
+ be generated and not an error result.
+ """
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+ self.recv("""
+ <iq type="get" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#info" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="client" type="bot" />
+ <feature var="http://jabber.org/protocol/disco#info" />
+ </query>
+ </iq>
+ """)
+
+ def testInfoEmptyDefaultNodeComponent(self):
+ """
+ Test requesting an empty, default node using a Component.
+ """
+ self.stream_start(mode='component',
+ jid='tester.localhost',
+ plugins=['xep_0030'])
+
+ self.recv("""
+ <iq type="get" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#info" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#info">
+ <identity category="component" type="generic" />
+ <feature var="http://jabber.org/protocol/disco#info" />
+ </query>
+ </iq>
+ """)
+
+ def testInfoIncludeNode(self):
+ """
+ Results for info queries directed to a particular node MUST
+ include the node in the query response.
+ """
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+
+ self.xmpp['xep_0030'].static.add_node(node='testing')
+
+ self.recv("""
+ <iq to="tester@localhost" type="get" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing">
+ </query>
+ </iq>""",
+ method='mask')
+
+ def testItemsIncludeNode(self):
+ """
+ Results for items queries directed to a particular node MUST
+ include the node in the query response.
+ """
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+
+ self.xmpp['xep_0030'].static.add_node(node='testing')
+
+ self.recv("""
+ <iq to="tester@localhost" type="get" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing">
+ </query>
+ </iq>""",
+ method='mask')
+
+ def testDynamicInfoJID(self):
+ """
+ Test using a dynamic info handler for a particular JID.
+ """
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+ def dynamic_jid(jid, node, ifrom, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoInfo()
+ result['node'] = node
+ result.add_identity('client', 'console', name='Dynamic Info')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_info',
+ jid='tester@localhost',
+ handler=dynamic_jid)
+
+ self.recv("""
+ <iq type="get" id="test" to="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing">
+ <identity category="client"
+ type="console"
+ name="Dynamic Info" />
+ </query>
+ </iq>
+ """)
+
+ def testDynamicInfoGlobal(self):
+ """
+ Test using a dynamic info handler for all requests.
+ """
+ self.stream_start(mode='component',
+ jid='tester.localhost',
+ plugins=['xep_0030'])
+
+ def dynamic_global(jid, node, ifrom, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoInfo()
+ result['node'] = node
+ result.add_identity('component', 'generic', name='Dynamic Info')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_info',
+ handler=dynamic_global)
+
+ self.recv("""
+ <iq type="get" id="test"
+ to="user@tester.localhost"
+ from="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test"
+ to="tester@localhost"
+ from="user@tester.localhost">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing">
+ <identity category="component"
+ type="generic"
+ name="Dynamic Info" />
+ </query>
+ </iq>
+ """)
+
+ def testOverrideJIDInfoHandler(self):
+ """Test overriding a JID info handler."""
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+ def dynamic_jid(jid, node, ifrom, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoInfo()
+ result['node'] = node
+ result.add_identity('client', 'console', name='Dynamic Info')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_info',
+ jid='tester@localhost',
+ handler=dynamic_jid)
+
+
+ self.xmpp['xep_0030'].make_static(jid='tester@localhost',
+ node='testing')
+
+ self.xmpp['xep_0030'].add_identity(jid='tester@localhost',
+ node='testing',
+ category='automation',
+ itype='command-list')
+
+ self.recv("""
+ <iq type="get" id="test" to="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing">
+ <identity category="automation"
+ type="command-list" />
+ </query>
+ </iq>
+ """)
+
+ def testOverrideGlobalInfoHandler(self):
+ """Test overriding the global JID info handler."""
+ self.stream_start(mode='component',
+ jid='tester.localhost',
+ plugins=['xep_0030'])
+
+ def dynamic_global(jid, node, ifrom, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoInfo()
+ result['node'] = node
+ result.add_identity('component', 'generic', name='Dynamic Info')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_info',
+ handler=dynamic_global)
+
+ self.xmpp['xep_0030'].make_static(jid='user@tester.localhost',
+ node='testing')
+
+ self.xmpp['xep_0030'].add_feature(jid='user@tester.localhost',
+ node='testing',
+ feature='urn:xmpp:ping')
+
+ self.recv("""
+ <iq type="get" id="test"
+ to="user@tester.localhost"
+ from="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test"
+ to="tester@localhost"
+ from="user@tester.localhost">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="testing">
+ <feature var="urn:xmpp:ping" />
+ </query>
+ </iq>
+ """)
+
+ def testGetInfoRemote(self):
+ """
+ Test sending a disco#info query to another entity
+ and receiving the result.
+ """
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+ events = set()
+
+ def handle_disco_info(iq):
+ events.add('disco_info')
+
+
+ self.xmpp.add_event_handler('disco_info', handle_disco_info)
+
+ t = threading.Thread(name="get_info",
+ target=self.xmpp['xep_0030'].get_info,
+ args=('user@localhost', 'foo'))
+ t.start()
+
+ self.send("""
+ <iq type="get" to="user@localhost" id="1">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="foo" />
+ </iq>
+ """)
+
+ self.recv("""
+ <iq type="result" to="tester@localhost" id="1">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="foo">
+ <identity category="client" type="bot" />
+ <feature var="urn:xmpp:ping" />
+ </query>
+ </iq>
+ """)
+
+ # Wait for disco#info request to be received.
+ t.join()
+
+ time.sleep(0.1)
+
+ self.assertEqual(events, set(('disco_info',)),
+ "Disco info event was not triggered: %s" % events)
+
+ def testDynamicItemsJID(self):
+ """
+ Test using a dynamic items handler for a particular JID.
+ """
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+ def dynamic_jid(jid, node, ifrom, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoItems()
+ result['node'] = node
+ result.add_item('tester@localhost', node='foo', name='JID')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_items',
+ jid='tester@localhost',
+ handler=dynamic_jid)
+
+ self.recv("""
+ <iq type="get" id="test" to="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing">
+ <item jid="tester@localhost" node="foo" name="JID" />
+ </query>
+ </iq>
+ """)
+
+ def testDynamicItemsGlobal(self):
+ """
+ Test using a dynamic items handler for all requests.
+ """
+ self.stream_start(mode='component',
+ jid='tester.localhost',
+ plugins=['xep_0030'])
+
+ def dynamic_global(jid, node, ifrom, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoItems()
+ result['node'] = node
+ result.add_item('tester@localhost', node='foo', name='Global')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_items',
+ handler=dynamic_global)
+
+ self.recv("""
+ <iq type="get" id="test"
+ to="user@tester.localhost"
+ from="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test"
+ to="tester@localhost"
+ from="user@tester.localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing">
+ <item jid="tester@localhost" node="foo" name="Global" />
+ </query>
+ </iq>
+ """)
+
+ def testOverrideJIDItemsHandler(self):
+ """Test overriding a JID items handler."""
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+ def dynamic_jid(jid, node, ifrom, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoItems()
+ result['node'] = node
+ result.add_item('tester@localhost', node='foo', name='Global')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_items',
+ jid='tester@localhost',
+ handler=dynamic_jid)
+
+
+ self.xmpp['xep_0030'].make_static(jid='tester@localhost',
+ node='testing')
+
+ self.xmpp['xep_0030'].add_item(ijid='tester@localhost',
+ node='testing',
+ jid='tester@localhost',
+ subnode='foo',
+ name='Test')
+
+ self.recv("""
+ <iq type="get" id="test" to="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing">
+ <item jid="tester@localhost" node="foo" name="Test" />
+ </query>
+ </iq>
+ """)
+
+ def testOverrideGlobalItemsHandler(self):
+ """Test overriding the global JID items handler."""
+ self.stream_start(mode='component',
+ jid='tester.localhost',
+ plugins=['xep_0030'])
+
+ def dynamic_global(jid, node, ifrom, iq):
+ result = self.xmpp['xep_0030'].stanza.DiscoItems()
+ result['node'] = node
+ result.add_item('tester.localhost', node='foo', name='Global')
+ return result
+
+ self.xmpp['xep_0030'].set_node_handler('get_items',
+ handler=dynamic_global)
+
+ self.xmpp['xep_0030'].make_static(jid='user@tester.localhost',
+ node='testing')
+
+ self.xmpp['xep_0030'].add_item(ijid='user@tester.localhost',
+ node='testing',
+ jid='user@tester.localhost',
+ subnode='foo',
+ name='Test')
+
+ self.recv("""
+ <iq type="get" id="test"
+ to="user@tester.localhost"
+ from="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test"
+ to="tester@localhost"
+ from="user@tester.localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="testing">
+ <item jid="user@tester.localhost" node="foo" name="Test" />
+ </query>
+ </iq>
+ """)
+
+ def testGetItemsRemote(self):
+ """
+ Test sending a disco#items query to another entity
+ and receiving the result.
+ """
+ self.stream_start(mode='client',
+ plugins=['xep_0030'])
+
+ events = set()
+ results = set()
+
+ def handle_disco_items(iq):
+ events.add('disco_items')
+ results.update(iq['disco_items']['items'])
+
+
+ self.xmpp.add_event_handler('disco_items', handle_disco_items)
+
+ t = threading.Thread(name="get_items",
+ target=self.xmpp['xep_0030'].get_items,
+ args=('user@localhost', 'foo'))
+ t.start()
+
+ self.send("""
+ <iq type="get" to="user@localhost" id="1">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="foo" />
+ </iq>
+ """)
+
+ self.recv("""
+ <iq type="result" to="tester@localhost" id="1">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="foo">
+ <item jid="user@localhost" node="bar" name="Test" />
+ <item jid="user@localhost" node="baz" name="Test 2" />
+ </query>
+ </iq>
+ """)
+
+ # Wait for disco#items request to be received.
+ t.join()
+
+ time.sleep(0.1)
+
+ items = set([('user@localhost', 'bar', 'Test'),
+ ('user@localhost', 'baz', 'Test 2')])
+ self.assertEqual(events, set(('disco_items',)),
+ "Disco items event was not triggered: %s" % events)
+ self.assertEqual(results, items,
+ "Unexpected items: %s" % results)
+
+ def testGetItemsIterator(self):
+ """Test interaction between XEP-0030 and XEP-0059 plugins."""
+
+ raised_exceptions = []
+
+ self.stream_start(mode='client',
+ plugins=['xep_0030', 'xep_0059'])
+
+ results = self.xmpp['xep_0030'].get_items(jid='foo@localhost',
+ node='bar',
+ iterator=True)
+ results.amount = 10
+
+ def run_test():
+ try:
+ results.next()
+ except StopIteration:
+ raised_exceptions.append(True)
+
+ t = threading.Thread(name="get_items_iterator",
+ target=run_test)
+ t.start()
+
+ self.send("""
+ <iq id="2" type="get" to="foo@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items"
+ node="bar">
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <max>10</max>
+ </set>
+ </query>
+ </iq>
+ """)
+ self.recv("""
+ <iq id="2" type="result" to="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <set xmlns="http://jabber.org/protocol/rsm">
+ </set>
+ </query>
+ </iq>
+ """)
+
+ t.join()
+
+ self.assertEqual(raised_exceptions, [True],
+ "StopIteration was not raised: %s" % raised_exceptions)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamDisco)
diff --git a/tests/test_stream_xep_0050.py b/tests/test_stream_xep_0050.py
new file mode 100644
index 00000000..1931349d
--- /dev/null
+++ b/tests/test_stream_xep_0050.py
@@ -0,0 +1,726 @@
+import time
+import threading
+
+from sleekxmpp.test import *
+
+
+class TestAdHocCommands(SleekTest):
+
+ def setUp(self):
+ self.stream_start(mode='client',
+ plugins=['xep_0030', 'xep_0004', 'xep_0050'])
+
+ # Real session IDs don't make for nice tests, so use
+ # a dummy value.
+ self.xmpp['xep_0050'].new_session = lambda: '_sessionid_'
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testZeroStepCommand(self):
+ """Test running a command with no steps."""
+
+ def handle_command(iq, session):
+ form = self.xmpp['xep_0004'].makeForm(ftype='result')
+ form.addField(var='foo', ftype='text-single',
+ label='Foo', value='bar')
+
+ session['payload'] = form
+ session['next'] = None
+ session['has_next'] = False
+
+ return session
+
+ self.xmpp['xep_0050'].add_command('tester@localhost', 'foo',
+ 'Do Foo', handle_command)
+
+ self.recv("""
+ <iq id="11" type="set" to="tester@localhost" from="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ action="execute" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq id="11" type="result" to="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ status="completed"
+ sessionid="_sessionid_">
+ <x xmlns="jabber:x:data" type="result">
+ <field var="foo" label="Foo" type="text-single">
+ <value>bar</value>
+ </field>
+ </x>
+ </command>
+ </iq>
+ """)
+
+ def testOneStepCommand(self):
+ """Test running a single step command."""
+ results = []
+
+ def handle_command(iq, session):
+
+ def handle_form(form, session):
+ results.append(form['values']['foo'])
+
+ form = self.xmpp['xep_0004'].makeForm('form')
+ form.addField(var='foo', ftype='text-single', label='Foo')
+
+ session['payload'] = form
+ session['next'] = handle_form
+ session['has_next'] = False
+
+ return session
+
+ self.xmpp['xep_0050'].add_command('tester@localhost', 'foo',
+ 'Do Foo', handle_command)
+
+ self.recv("""
+ <iq id="11" type="set" to="tester@localhost" from="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ action="execute" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq id="11" type="result" to="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ status="executing"
+ sessionid="_sessionid_">
+ <actions>
+ <complete />
+ </actions>
+ <x xmlns="jabber:x:data" type="form">
+ <field var="foo" label="Foo" type="text-single" />
+ </x>
+ </command>
+ </iq>
+ """)
+
+ self.recv("""
+ <iq id="12" type="set" to="tester@localhost" from="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ action="complete"
+ sessionid="_sessionid_">
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="foo" label="Foo" type="text-single">
+ <value>blah</value>
+ </field>
+ </x>
+ </command>
+ </iq>
+ """)
+
+ self.send("""
+ <iq id="12" type="result" to="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ status="completed"
+ sessionid="_sessionid_" />
+ </iq>
+ """)
+
+ self.assertEqual(results, ['blah'],
+ "Command handler was not executed: %s" % results)
+
+ def testTwoStepCommand(self):
+ """Test using a two-stage command."""
+ results = []
+
+ def handle_command(iq, session):
+
+ def handle_step2(form, session):
+ results.append(form['values']['bar'])
+
+ def handle_step1(form, session):
+ results.append(form['values']['foo'])
+
+ form = self.xmpp['xep_0004'].makeForm('form')
+ form.addField(var='bar', ftype='text-single', label='Bar')
+
+ session['payload'] = form
+ session['next'] = handle_step2
+ session['has_next'] = False
+
+ return session
+
+ form = self.xmpp['xep_0004'].makeForm('form')
+ form.addField(var='foo', ftype='text-single', label='Foo')
+
+ session['payload'] = form
+ session['next'] = handle_step1
+ session['has_next'] = True
+
+ return session
+
+ self.xmpp['xep_0050'].add_command('tester@localhost', 'foo',
+ 'Do Foo', handle_command)
+
+ self.recv("""
+ <iq id="11" type="set" to="tester@localhost" from="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ action="execute" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq id="11" type="result" to="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ status="executing"
+ sessionid="_sessionid_">
+ <actions>
+ <next />
+ </actions>
+ <x xmlns="jabber:x:data" type="form">
+ <field var="foo" label="Foo" type="text-single" />
+ </x>
+ </command>
+ </iq>
+ """)
+
+ self.recv("""
+ <iq id="12" type="set" to="tester@localhost" from="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ action="next"
+ sessionid="_sessionid_">
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="foo" label="Foo" type="text-single">
+ <value>blah</value>
+ </field>
+ </x>
+ </command>
+ </iq>
+ """)
+
+ self.send("""
+ <iq id="12" type="result" to="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ status="executing"
+ sessionid="_sessionid_">
+ <actions>
+ <complete />
+ </actions>
+ <x xmlns="jabber:x:data" type="form">
+ <field var="bar" label="Bar" type="text-single" />
+ </x>
+ </command>
+ </iq>
+ """)
+
+ self.recv("""
+ <iq id="13" type="set" to="tester@localhost" from="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ action="complete"
+ sessionid="_sessionid_">
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="bar" label="Bar" type="text-single">
+ <value>meh</value>
+ </field>
+ </x>
+ </command>
+ </iq>
+ """)
+ self.send("""
+ <iq id="13" type="result" to="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ status="completed"
+ sessionid="_sessionid_" />
+ </iq>
+ """)
+
+ self.assertEqual(results, ['blah', 'meh'],
+ "Command handler was not executed: %s" % results)
+
+ def testCancelCommand(self):
+ """Test canceling command."""
+ results = []
+
+ def handle_command(iq, session):
+
+ def handle_form(form, session):
+ results.append(form['values']['foo'])
+
+ def handle_cancel(iq, session):
+ results.append('canceled')
+
+ form = self.xmpp['xep_0004'].makeForm('form')
+ form.addField(var='foo', ftype='text-single', label='Foo')
+
+ session['payload'] = form
+ session['next'] = handle_form
+ session['cancel'] = handle_cancel
+ session['has_next'] = False
+
+ return session
+
+ self.xmpp['xep_0050'].add_command('tester@localhost', 'foo',
+ 'Do Foo', handle_command)
+
+ self.recv("""
+ <iq id="11" type="set" to="tester@localhost" from="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ action="execute" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq id="11" type="result" to="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ status="executing"
+ sessionid="_sessionid_">
+ <actions>
+ <complete />
+ </actions>
+ <x xmlns="jabber:x:data" type="form">
+ <field var="foo" label="Foo" type="text-single" />
+ </x>
+ </command>
+ </iq>
+ """)
+
+ self.recv("""
+ <iq id="12" type="set" to="tester@localhost" from="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ action="cancel"
+ sessionid="_sessionid_">
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="foo" label="Foo" type="text-single">
+ <value>blah</value>
+ </field>
+ </x>
+ </command>
+ </iq>
+ """)
+
+ self.send("""
+ <iq id="12" type="result" to="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ status="canceled"
+ sessionid="_sessionid_" />
+ </iq>
+ """)
+
+ self.assertEqual(results, ['canceled'],
+ "Cancelation handler not executed: %s" % results)
+
+ def testCommandNote(self):
+ """Test adding notes to commands."""
+
+ def handle_command(iq, session):
+ form = self.xmpp['xep_0004'].makeForm(ftype='result')
+ form.addField(var='foo', ftype='text-single',
+ label='Foo', value='bar')
+
+ session['payload'] = form
+ session['next'] = None
+ session['has_next'] = False
+ session['notes'] = [('info', 'testing notes')]
+
+ return session
+
+ self.xmpp['xep_0050'].add_command('tester@localhost', 'foo',
+ 'Do Foo', handle_command)
+
+ self.recv("""
+ <iq id="11" type="set" to="tester@localhost" from="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ action="execute" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq id="11" type="result" to="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ status="completed"
+ sessionid="_sessionid_">
+ <note type="info">testing notes</note>
+ <x xmlns="jabber:x:data" type="result">
+ <field var="foo" label="Foo" type="text-single">
+ <value>bar</value>
+ </field>
+ </x>
+ </command>
+ </iq>
+ """)
+
+
+
+ def testMultiPayloads(self):
+ """Test using commands with multiple payloads."""
+ results = []
+
+ def handle_command(iq, session):
+
+ def handle_form(forms, session):
+ for form in forms:
+ results.append(form['values']['FORM_TYPE'])
+
+ form1 = self.xmpp['xep_0004'].makeForm('form')
+ form1.addField(var='FORM_TYPE', ftype='hidden', value='form_1')
+ form1.addField(var='foo', ftype='text-single', label='Foo')
+
+ form2 = self.xmpp['xep_0004'].makeForm('form')
+ form2.addField(var='FORM_TYPE', ftype='hidden', value='form_2')
+ form2.addField(var='foo', ftype='text-single', label='Foo')
+
+ session['payload'] = [form1, form2]
+ session['next'] = handle_form
+ session['has_next'] = False
+
+ return session
+
+ self.xmpp['xep_0050'].add_command('tester@localhost', 'foo',
+ 'Do Foo', handle_command)
+
+ self.recv("""
+ <iq id="11" type="set" to="tester@localhost" from="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ action="execute" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq id="11" type="result" to="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ status="executing"
+ sessionid="_sessionid_">
+ <actions>
+ <complete />
+ </actions>
+ <x xmlns="jabber:x:data" type="form">
+ <field var="FORM_TYPE" type="hidden">
+ <value>form_1</value>
+ </field>
+ <field var="foo" label="Foo" type="text-single" />
+ </x>
+ <x xmlns="jabber:x:data" type="form">
+ <field var="FORM_TYPE" type="hidden">
+ <value>form_2</value>
+ </field>
+ <field var="foo" label="Foo" type="text-single" />
+ </x>
+ </command>
+ </iq>
+ """)
+
+ self.recv("""
+ <iq id="12" type="set" to="tester@localhost" from="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ action="complete"
+ sessionid="_sessionid_">
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="FORM_TYPE" type="hidden">
+ <value>form_1</value>
+ </field>
+ <field var="foo" type="text-single">
+ <value>bar</value>
+ </field>
+ </x>
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="FORM_TYPE" type="hidden">
+ <value>form_2</value>
+ </field>
+ <field var="foo" type="text-single">
+ <value>bar</value>
+ </field>
+ </x>
+ </command>
+ </iq>
+ """)
+
+ self.send("""
+ <iq id="12" type="result" to="foo@bar">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="foo"
+ status="completed"
+ sessionid="_sessionid_" />
+ </iq>
+ """)
+
+ self.assertEqual(results, [['form_1'], ['form_2']],
+ "Command handler was not executed: %s" % results)
+
+ def testClientAPI(self):
+ """Test using client-side API for commands."""
+ results = []
+
+ def handle_complete(iq, session):
+ for item in session['custom_data']:
+ results.append(item)
+
+ def handle_step2(iq, session):
+ form = self.xmpp['xep_0004'].makeForm(ftype='submit')
+ form.addField(var='bar', value='123')
+
+ session['custom_data'].append('baz')
+ session['payload'] = form
+ session['next'] = handle_complete
+ self.xmpp['xep_0050'].complete_command(session)
+
+ def handle_step1(iq, session):
+ form = self.xmpp['xep_0004'].makeForm(ftype='submit')
+ form.addField(var='foo', value='42')
+
+ session['custom_data'].append('bar')
+ session['payload'] = form
+ session['next'] = handle_step2
+ self.xmpp['xep_0050'].continue_command(session)
+
+ session = {'custom_data': ['foo'],
+ 'next': handle_step1}
+
+ self.xmpp['xep_0050'].start_command(
+ 'foo@example.com',
+ 'test_client',
+ session)
+
+ self.send("""
+ <iq id="1" to="foo@example.com" type="set">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="test_client"
+ action="execute" />
+ </iq>
+ """)
+
+ self.recv("""
+ <iq id="1" from="foo@example.com" type="result">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="test_client"
+ sessionid="_sessionid_"
+ status="executing">
+ <x xmlns="jabber:x:data" type="form">
+ <field var="foo" type="text-single" />
+ </x>
+ </command>
+ </iq>
+ """)
+
+ self.send("""
+ <iq id="2" to="foo@example.com" type="set">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="test_client"
+ sessionid="_sessionid_"
+ action="next">
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="foo">
+ <value>42</value>
+ </field>
+ </x>
+ </command>
+ </iq>
+ """)
+
+ self.recv("""
+ <iq id="2" from="foo@example.com" type="result">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="test_client"
+ sessionid="_sessionid_"
+ status="executing">
+ <x xmlns="jabber:x:data" type="form">
+ <field var="bar" type="text-single" />
+ </x>
+ </command>
+ </iq>
+ """)
+
+ self.send("""
+ <iq id="3" to="foo@example.com" type="set">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="test_client"
+ sessionid="_sessionid_"
+ action="complete">
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="bar">
+ <value>123</value>
+ </field>
+ </x>
+ </command>
+ </iq>
+ """)
+
+ self.recv("""
+ <iq id="3" from="foo@example.com" type="result">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="test_client"
+ sessionid="_sessionid_"
+ status="completed" />
+ </iq>
+ """)
+
+ # Give the event queue time to process
+ time.sleep(0.3)
+
+ self.failUnless(results == ['foo', 'bar', 'baz'],
+ 'Incomplete command workflow: %s' % results)
+
+ def testClientAPICancel(self):
+ """Test using client-side cancel API for commands."""
+ results = []
+
+ def handle_canceled(iq, session):
+ for item in session['custom_data']:
+ results.append(item)
+
+ def handle_step1(iq, session):
+ session['custom_data'].append('bar')
+ session['next'] = handle_canceled
+ self.xmpp['xep_0050'].cancel_command(session)
+
+ session = {'custom_data': ['foo'],
+ 'next': handle_step1}
+
+ self.xmpp['xep_0050'].start_command(
+ 'foo@example.com',
+ 'test_client',
+ session)
+
+ self.send("""
+ <iq id="1" to="foo@example.com" type="set">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="test_client"
+ action="execute" />
+ </iq>
+ """)
+
+ self.recv("""
+ <iq id="1" to="foo@example.com" type="result">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="test_client"
+ sessionid="_sessionid_"
+ status="executing">
+ <x xmlns="jabber:x:data" type="form">
+ <field var="foo" type="text-single" />
+ </x>
+ </command>
+ </iq>
+ """)
+
+ self.send("""
+ <iq id="2" to="foo@example.com" type="set">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="test_client"
+ sessionid="_sessionid_"
+ action="cancel" />
+ </iq>
+ """)
+
+ self.recv("""
+ <iq id="2" to="foo@example.com" type="result">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="test_client"
+ sessionid="_sessionid_"
+ status="canceled" />
+ </iq>
+ """)
+
+ # Give the event queue time to process
+ time.sleep(0.3)
+
+ self.failUnless(results == ['foo', 'bar'],
+ 'Incomplete command workflow: %s' % results)
+
+ def testClientAPIError(self):
+ """Test using client-side error API for commands."""
+ results = []
+
+ def handle_error(iq, session):
+ for item in session['custom_data']:
+ results.append(item)
+
+ session = {'custom_data': ['foo'],
+ 'error': handle_error}
+
+ self.xmpp['xep_0050'].start_command(
+ 'foo@example.com',
+ 'test_client',
+ session)
+
+ self.send("""
+ <iq id="1" to="foo@example.com" type="set">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="test_client"
+ action="execute" />
+ </iq>
+ """)
+
+ self.recv("""
+ <iq id="1" to="foo@example.com" type="error">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="test_client"
+ action="execute" />
+ <error type='cancel'>
+ <item-not-found xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>
+ </error>
+ </iq>
+ """)
+
+ # Give the event queue time to process
+ time.sleep(0.3)
+
+ self.failUnless(results == ['foo'],
+ 'Incomplete command workflow: %s' % results)
+
+ def testClientAPIErrorStrippedResponse(self):
+ """Test errors that don't include the command substanza."""
+ results = []
+
+ def handle_error(iq, session):
+ for item in session['custom_data']:
+ results.append(item)
+
+ session = {'custom_data': ['foo'],
+ 'error': handle_error}
+
+ self.xmpp['xep_0050'].start_command(
+ 'foo@example.com',
+ 'test_client',
+ session)
+
+ self.send("""
+ <iq id="1" to="foo@example.com" type="set">
+ <command xmlns="http://jabber.org/protocol/commands"
+ node="test_client"
+ action="execute" />
+ </iq>
+ """)
+
+ self.recv("""
+ <iq id="1" to="foo@example.com" type="error">
+ <error type='cancel'>
+ <item-not-found xmlns='urn:ietf:params:xml:ns:xmpp-stanzas' />
+ </error>
+ </iq>
+ """)
+
+ # Give the event queue time to process
+ time.sleep(0.3)
+
+ self.failUnless(results == ['foo'],
+ 'Incomplete command workflow: %s' % results)
+
+
+
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestAdHocCommands)
diff --git a/tests/test_stream_xep_0059.py b/tests/test_stream_xep_0059.py
new file mode 100644
index 00000000..3a99842b
--- /dev/null
+++ b/tests/test_stream_xep_0059.py
@@ -0,0 +1,162 @@
+import threading
+
+from sleekxmpp.test import *
+from sleekxmpp.xmlstream import register_stanza_plugin
+from sleekxmpp.plugins.xep_0030 import DiscoItems
+from sleekxmpp.plugins.xep_0059 import ResultIterator, Set
+
+
+class TestStreamSet(SleekTest):
+
+ def setUp(self):
+ register_stanza_plugin(DiscoItems, Set)
+
+ def tearDown(self):
+ self.stream_close()
+
+ def iter(self, rev=False):
+ q = self.xmpp.Iq()
+ q['type'] = 'get'
+ it = ResultIterator(q, 'disco_items', '1', reverse=rev)
+ for i in it:
+ for j in i['disco_items']['items']:
+ self.items.append(j[0])
+
+ def testResultIterator(self):
+ self.items = []
+ self.stream_start(mode='client')
+ t = threading.Thread(target=self.iter)
+ t.start()
+ self.send("""
+ <iq type="get" id="2">
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <max>1</max>
+ </set>
+ </query>
+ </iq>
+ """)
+ self.recv("""
+ <iq type="result" id="2">
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="item1" />
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <last>item1</last>
+ </set>
+ </query>
+ </iq>
+ """)
+ self.send("""
+ <iq type="get" id="3">
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <max>1</max>
+ <after>item1</after>
+ </set>
+ </query>
+ </iq>
+ """)
+ self.recv("""
+ <iq type="result" id="3">
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="item2" />
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <last>item2</last>
+ </set>
+ </query>
+ </iq>
+ """)
+ self.send("""
+ <iq type="get" id="4">
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <max>1</max>
+ <after>item2</after>
+ </set>
+ </query>
+ </iq>
+ """)
+ self.recv("""
+ <iq type="result" id="4">
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="item2" />
+ <set xmlns="http://jabber.org/protocol/rsm">
+ </set>
+ </query>
+ </iq>
+ """)
+ t.join()
+ self.failUnless(self.items == ['item1', 'item2'])
+
+ def testResultIteratorReverse(self):
+ self.items = []
+ self.stream_start(mode='client')
+
+ t = threading.Thread(target=self.iter, args=(True,))
+ t.start()
+
+ self.send("""
+ <iq type="get" id="2">
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <max>1</max>
+ <before />
+ </set>
+ </query>
+ </iq>
+ """)
+ self.recv("""
+ <iq type="result" id="2">
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="item2" />
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <first>item2</first>
+ </set>
+ </query>
+ </iq>
+ """)
+ self.send("""
+ <iq type="get" id="3">
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <max>1</max>
+ <before>item2</before>
+ </set>
+ </query>
+ </iq>
+ """)
+ self.recv("""
+ <iq type="result" id="3">
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="item1" />
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <first>item1</first>
+ </set>
+ </query>
+ </iq>
+ """)
+ self.send("""
+ <iq type="get" id="4">
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <set xmlns="http://jabber.org/protocol/rsm">
+ <max>1</max>
+ <before>item1</before>
+ </set>
+ </query>
+ </iq>
+ """)
+ self.recv("""
+ <iq type="result" id="4">
+ <query xmlns="http://jabber.org/protocol/disco#items">
+ <item jid="item1" />
+ <set xmlns="http://jabber.org/protocol/rsm">
+ </set>
+ </query>
+ </iq>
+ """)
+
+ t.join()
+ self.failUnless(self.items == ['item2', 'item1'])
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamSet)
diff --git a/tests/test_stream_xep_0060.py b/tests/test_stream_xep_0060.py
new file mode 100644
index 00000000..e0936660
--- /dev/null
+++ b/tests/test_stream_xep_0060.py
@@ -0,0 +1,794 @@
+import sys
+import time
+import threading
+
+from sleekxmpp.test import *
+from sleekxmpp.stanza.atom import AtomEntry
+from sleekxmpp.xmlstream import register_stanza_plugin
+
+
+class TestStreamPubsub(SleekTest):
+
+ """
+ Test using the XEP-0030 plugin.
+ """
+
+ def setUp(self):
+ self.stream_start()
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testCreateInstantNode(self):
+ """Test creating an instant node"""
+ t = threading.Thread(name='create_node',
+ target=self.xmpp['xep_0060'].create_node,
+ args=('pubsub.example.com', None))
+ t.start()
+
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <create />
+ </pubsub>
+ </iq>
+ """)
+
+ self.recv("""
+ <iq type="result" id="1"
+ to="tester@localhost" from="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <create node="25e3d37dabbab9541f7523321421edc5bfeb2dae" />
+ </pubsub>
+ </iq>
+ """)
+
+ t.join()
+
+ def testCreateNodeNoConfig(self):
+ """Test creating a node without a config"""
+ self.xmpp['xep_0060'].create_node(
+ 'pubsub.example.com',
+ 'princely_musings',
+ block=False)
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <create node="princely_musings" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testCreateNodeConfig(self):
+ """Test creating a node with a config"""
+ form = self.xmpp['xep_0004'].stanza.Form()
+ form['type'] = 'submit'
+ form.add_field(var='pubsub#access_model', value='whitelist')
+
+ self.xmpp['xep_0060'].create_node(
+ 'pubsub.example.com',
+ 'princely_musings',
+ config=form, block=False)
+
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <create node="princely_musings" />
+ <configure>
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="pubsub#access_model">
+ <value>whitelist</value>
+ </field>
+ <field var="FORM_TYPE">
+ <value>http://jabber.org/protocol/pubsub#node_config</value>
+ </field>
+ </x>
+ </configure>
+ </pubsub>
+ </iq>
+ """)
+
+ def testDeleteNode(self):
+ """Test deleting a node"""
+ self.xmpp['xep_0060'].delete_node(
+ 'pubsub.example.com',
+ 'some_node',
+ block=False)
+ self.send("""
+ <iq type="set" to="pubsub.example.com" id="1">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub#owner">
+ <delete node="some_node" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testSubscribeCase1(self):
+ """
+ Test subscribing to a node: Case 1:
+ No subscribee, default 'from' JID, bare JID
+ """
+ self.xmpp['xep_0060'].subscribe(
+ 'pubsub.example.com',
+ 'somenode',
+ block=False)
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <subscribe node="somenode" jid="tester@localhost" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testSubscribeCase2(self):
+ """
+ Test subscribing to a node: Case 2:
+ No subscribee, given 'from' JID, bare JID
+ """
+ self.xmpp['xep_0060'].subscribe(
+ 'pubsub.example.com',
+ 'somenode',
+ ifrom='foo@comp.example.com/bar',
+ block=False)
+ self.send("""
+ <iq type="set" id="1"
+ to="pubsub.example.com" from="foo@comp.example.com/bar">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <subscribe node="somenode" jid="foo@comp.example.com" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testSubscribeCase3(self):
+ """
+ Test subscribing to a node: Case 3:
+ No subscribee, given 'from' JID, full JID
+ """
+ self.xmpp['xep_0060'].subscribe(
+ 'pubsub.example.com',
+ 'somenode',
+ ifrom='foo@comp.example.com/bar',
+ bare=False,
+ block=False)
+ self.send("""
+ <iq type="set" id="1"
+ to="pubsub.example.com" from="foo@comp.example.com/bar">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <subscribe node="somenode" jid="foo@comp.example.com/bar" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testSubscribeCase4(self):
+ """
+ Test subscribing to a node: Case 4:
+ No subscribee, no 'from' JID, full JID
+ """
+ self.stream_close()
+ self.stream_start(jid='tester@localhost/full')
+
+ self.xmpp['xep_0060'].subscribe(
+ 'pubsub.example.com',
+ 'somenode',
+ bare=False,
+ block=False)
+ self.send("""
+ <iq type="set" id="1"
+ to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <subscribe node="somenode" jid="tester@localhost/full" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testSubscribeCase5(self):
+ """
+ Test subscribing to a node: Case 5:
+ Subscribee given
+ """
+ self.xmpp['xep_0060'].subscribe(
+ 'pubsub.example.com',
+ 'somenode',
+ subscribee='user@example.com/foo',
+ ifrom='foo@comp.example.com/bar',
+ block=False)
+ self.send("""
+ <iq type="set" id="1"
+ to="pubsub.example.com" from="foo@comp.example.com/bar">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <subscribe node="somenode" jid="user@example.com/foo" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testSubscribeWithOptions(self):
+ """Test subscribing to a node, with options."""
+ opts = self.xmpp['xep_0004'].make_form()
+ opts.add_field(
+ var='FORM_TYPE',
+ value='http://jabber.org/protocol/pubsub#subscribe_options',
+ ftype='hidden')
+ opts.add_field(
+ var='pubsub#digest',
+ value=False,
+ ftype='boolean')
+ opts['type'] = 'submit'
+
+ self.xmpp['xep_0060'].subscribe(
+ 'pubsub.example.com',
+ 'somenode',
+ options=opts,
+ block=False)
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <subscribe node="somenode" jid="tester@localhost" />
+ <options>
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="FORM_TYPE">
+ <value>http://jabber.org/protocol/pubsub#subscribe_options</value>
+ </field>
+ <field var="pubsub#digest">
+ <value>0</value>
+ </field>
+ </x>
+ </options>
+ </pubsub>
+ </iq>
+ """)
+
+ def testUnsubscribeCase1(self):
+ """
+ Test unsubscribing from a node: Case 1:
+ No subscribee, default 'from' JID, bare JID
+ """
+ self.xmpp['xep_0060'].unsubscribe(
+ 'pubsub.example.com',
+ 'somenode',
+ block=False)
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <unsubscribe node="somenode" jid="tester@localhost" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testUnsubscribeCase2(self):
+ """
+ Test unsubscribing from a node: Case 2:
+ No subscribee, given 'from' JID, bare JID
+ """
+ self.xmpp['xep_0060'].unsubscribe(
+ 'pubsub.example.com',
+ 'somenode',
+ ifrom='foo@comp.example.com/bar',
+ block=False)
+ self.send("""
+ <iq type="set" id="1"
+ to="pubsub.example.com" from="foo@comp.example.com/bar">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <unsubscribe node="somenode" jid="foo@comp.example.com" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testUnsubscribeCase3(self):
+ """
+ Test unsubscribing from a node: Case 3:
+ No subscribee, given 'from' JID, full JID
+ """
+ self.xmpp['xep_0060'].unsubscribe(
+ 'pubsub.example.com',
+ 'somenode',
+ ifrom='foo@comp.example.com/bar',
+ bare=False,
+ block=False)
+ self.send("""
+ <iq type="set" id="1"
+ to="pubsub.example.com" from="foo@comp.example.com/bar">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <unsubscribe node="somenode" jid="foo@comp.example.com/bar" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testUnsubscribeCase4(self):
+ """
+ Test unsubscribing from a node: Case 4:
+ No subscribee, no 'from' JID, full JID
+ """
+ self.stream_close()
+ self.stream_start(jid='tester@localhost/full')
+
+ self.xmpp['xep_0060'].unsubscribe(
+ 'pubsub.example.com',
+ 'somenode',
+ bare=False,
+ block=False)
+ self.send("""
+ <iq type="set" id="1"
+ to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <unsubscribe node="somenode" jid="tester@localhost/full" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testUnsubscribeCase5(self):
+ """
+ Test unsubscribing from a node: Case 5:
+ Subscribee given
+ """
+ self.xmpp['xep_0060'].unsubscribe(
+ 'pubsub.example.com',
+ 'somenode',
+ subscribee='user@example.com/foo',
+ ifrom='foo@comp.example.com/bar',
+ block=False)
+ self.send("""
+ <iq type="set" id="1"
+ to="pubsub.example.com" from="foo@comp.example.com/bar">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <unsubscribe node="somenode" jid="user@example.com/foo" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testGetDefaultNodeConfig(self):
+ """Test retrieving the default node config for a pubsub service."""
+ self.xmpp['xep_0060'].get_node_config(
+ 'pubsub.example.com',
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub#owner">
+ <default />
+ </pubsub>
+ </iq>
+ """, use_values=False)
+
+ def testGetNodeConfig(self):
+ """Test getting the config for a given node."""
+ self.xmpp['xep_0060'].get_node_config(
+ 'pubsub.example.com',
+ 'somenode',
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub#owner">
+ <configure node="somenode" />
+ </pubsub>
+ </iq>
+ """, use_values=False)
+
+ def testSetNodeConfig(self):
+ """Test setting the configuration for a node."""
+ form = self.xmpp['xep_0004'].make_form()
+ form.add_field(var='FORM_TYPE', ftype='hidden',
+ value='http://jabber.org/protocol/pubsub#node_config')
+ form.add_field(var='pubsub#title', ftype='text-single',
+ value='This is awesome!')
+ form['type'] = 'submit'
+
+ self.xmpp['xep_0060'].set_node_config(
+ 'pubsub.example.com',
+ 'somenode',
+ form,
+ block=False)
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub#owner">
+ <configure node="somenode">
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="FORM_TYPE">
+ <value>http://jabber.org/protocol/pubsub#node_config</value>
+ </field>
+ <field var="pubsub#title">
+ <value>This is awesome!</value>
+ </field>
+ </x>
+ </configure>
+ </pubsub>
+ </iq>
+ """)
+
+ def testPublishNoItems(self):
+ """Test publishing no items (in order to generate events)"""
+ self.xmpp['xep_0060'].publish(
+ 'pubsub.example.com',
+ 'somenode',
+ block=False)
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <publish node="somenode" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testPublishSingle(self):
+ """Test publishing a single item."""
+ payload = AtomEntry()
+ payload['title'] = 'Test'
+
+ register_stanza_plugin(self.xmpp['xep_0060'].stanza.Item, AtomEntry)
+
+ self.xmpp['xep_0060'].publish(
+ 'pubsub.example.com',
+ 'somenode',
+ id='id42',
+ payload=payload,
+ block=False)
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <publish node="somenode">
+ <item id="id42">
+ <entry xmlns="http://www.w3.org/2005/Atom">
+ <title>Test</title>
+ </entry>
+ </item>
+ </publish>
+ </pubsub>
+ </iq>
+ """)
+
+ def testPublishSingleOptions(self):
+ """Test publishing a single item, with options."""
+ payload = AtomEntry()
+ payload['title'] = 'Test'
+
+ register_stanza_plugin(self.xmpp['xep_0060'].stanza.Item, AtomEntry)
+
+ options = self.xmpp['xep_0004'].make_form()
+ options.add_field(var='FORM_TYPE', ftype='hidden',
+ value='http://jabber.org/protocol/pubsub#publish-options')
+ options.add_field(var='pubsub#access_model', ftype='text-single',
+ value='presence')
+ options['type'] = 'submit'
+
+ self.xmpp['xep_0060'].publish(
+ 'pubsub.example.com',
+ 'somenode',
+ id='ID42',
+ payload=payload,
+ options=options,
+ block=False)
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <publish node="somenode">
+ <item id="ID42">
+ <entry xmlns="http://www.w3.org/2005/Atom">
+ <title>Test</title>
+ </entry>
+ </item>
+ </publish>
+ <publish-options>
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="FORM_TYPE">
+ <value>http://jabber.org/protocol/pubsub#publish-options</value>
+ </field>
+ <field var="pubsub#access_model">
+ <value>presence</value>
+ </field>
+ </x>
+ </publish-options>
+ </pubsub>
+ </iq>
+ """, use_values=False)
+
+ def testRetract(self):
+ """Test deleting an item."""
+ self.xmpp['xep_0060'].retract(
+ 'pubsub.example.com',
+ 'somenode',
+ 'ID1',
+ notify=True,
+ block=False)
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <retract node="somenode" notify="true">
+ <item id="ID1" />
+ </retract>
+ </pubsub>
+ </iq>
+ """)
+
+ def testRetract(self):
+ """Test deleting an item."""
+ self.xmpp['xep_0060'].retract(
+ 'pubsub.example.com',
+ 'somenode',
+ 'ID1',
+ block=False)
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <retract node="somenode">
+ <item id="ID1" />
+ </retract>
+ </pubsub>
+ </iq>
+ """)
+
+ def testPurge(self):
+ """Test removing all items from a node."""
+ self.xmpp['xep_0060'].purge(
+ 'pubsub.example.com',
+ 'somenode',
+ block=False)
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub#owner">
+ <purge node="somenode" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testGetItem(self):
+ """Test retrieving a single item."""
+ self.xmpp['xep_0060'].get_item(
+ 'pubsub.example.com',
+ 'somenode',
+ 'id42',
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <items node="somenode">
+ <item id="id42" />
+ </items>
+ </pubsub>
+ </iq>
+ """)
+
+ def testGetLatestItems(self):
+ """Test retrieving the most recent N items."""
+ self.xmpp['xep_0060'].get_items(
+ 'pubsub.example.com',
+ 'somenode',
+ max_items=3,
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <items node="somenode" max_items="3" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testGetAllItems(self):
+ """Test retrieving all items."""
+ self.xmpp['xep_0060'].get_items(
+ 'pubsub.example.com',
+ 'somenode',
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <items node="somenode" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testGetSpecificItems(self):
+ """Test retrieving a specific set of items."""
+ self.xmpp['xep_0060'].get_items(
+ 'pubsub.example.com',
+ 'somenode',
+ item_ids=['A', 'B', 'C'],
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <items node="somenode">
+ <item id="A" />
+ <item id="B" />
+ <item id="C" />
+ </items>
+ </pubsub>
+ </iq>
+ """)
+
+ def testGetSubscriptionGlobalDefaultOptions(self):
+ """Test getting the subscription options for a node/JID."""
+ self.xmpp['xep_0060'].get_subscription_options(
+ 'pubsub.example.com',
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <default />
+ </pubsub>
+ </iq>
+ """, use_values=False)
+
+ def testGetSubscriptionNodeDefaultOptions(self):
+ """Test getting the subscription options for a node/JID."""
+ self.xmpp['xep_0060'].get_subscription_options(
+ 'pubsub.example.com',
+ node='somenode',
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <default node="somenode" />
+ </pubsub>
+ </iq>
+ """, use_values=False)
+
+ def testGetSubscriptionOptions(self):
+ """Test getting the subscription options for a node/JID."""
+ self.xmpp['xep_0060'].get_subscription_options(
+ 'pubsub.example.com',
+ 'somenode',
+ 'tester@localhost',
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <options node="somenode" jid="tester@localhost" />
+ </pubsub>
+ </iq>
+ """, use_values=False)
+
+ def testSetSubscriptionOptions(self):
+ """Test setting the subscription options for a node/JID."""
+ opts = self.xmpp['xep_0004'].make_form()
+ opts.add_field(
+ var='FORM_TYPE',
+ value='http://jabber.org/protocol/pubsub#subscribe_options',
+ ftype='hidden')
+ opts.add_field(
+ var='pubsub#digest',
+ value=False,
+ ftype='boolean')
+ opts['type'] = 'submit'
+
+ self.xmpp['xep_0060'].set_subscription_options(
+ 'pubsub.example.com',
+ 'somenode',
+ 'tester@localhost',
+ opts,
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <options node="somenode" jid="tester@localhost">
+ <x xmlns="jabber:x:data" type="submit">
+ <field var="FORM_TYPE">
+ <value>http://jabber.org/protocol/pubsub#subscribe_options</value>
+ </field>
+ <field var="pubsub#digest">
+ <value>0</value>
+ </field>
+ </x>
+ </options>
+ </pubsub>
+ </iq>
+ """)
+
+ def testGetNodeSubscriptions(self):
+ """Test retrieving all subscriptions for a node."""
+ self.xmpp['xep_0060'].get_node_subscriptions(
+ 'pubsub.example.com',
+ 'somenode',
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub#owner">
+ <subscriptions node="somenode" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testGetSubscriptions(self):
+ """Test retrieving a users's subscriptions."""
+ self.xmpp['xep_0060'].get_subscriptions(
+ 'pubsub.example.com',
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <subscriptions />
+ </pubsub>
+ </iq>
+ """)
+
+ def testGetSubscriptionsForNode(self):
+ """Test retrieving a users's subscriptions for a given node."""
+ self.xmpp['xep_0060'].get_subscriptions(
+ 'pubsub.example.com',
+ node='somenode',
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <subscriptions node="somenode" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testGetAffiliations(self):
+ """Test retrieving a users's affiliations."""
+ self.xmpp['xep_0060'].get_affiliations(
+ 'pubsub.example.com',
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <affiliations />
+ </pubsub>
+ </iq>
+ """)
+
+ def testGetAffiliatinssForNode(self):
+ """Test retrieving a users's affiliations for a given node."""
+ self.xmpp['xep_0060'].get_affiliations(
+ 'pubsub.example.com',
+ node='somenode',
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub">
+ <affiliations node="somenode" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testGetNodeAffiliations(self):
+ """Test getting the affiliations for a node."""
+ self.xmpp['xep_0060'].get_node_affiliations(
+ 'pubsub.example.com',
+ 'somenode',
+ block=False)
+ self.send("""
+ <iq type="get" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub#owner">
+ <affiliations node="somenode" />
+ </pubsub>
+ </iq>
+ """)
+
+ def testModifySubscriptions(self):
+ """Test owner modifying node subscriptions."""
+ self.xmpp['xep_0060'].modify_subscriptions(
+ 'pubsub.example.com',
+ 'somenode',
+ subscriptions=[('user@example.com', 'subscribed'),
+ ('foo@example.net', 'none')],
+ block=False)
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub#owner">
+ <subscriptions node="somenode">
+ <subscription jid="user@example.com" subscription="subscribed" />
+ <subscription jid="foo@example.net" subscription="none" />
+ </subscriptions>
+ </pubsub>
+ </iq>
+ """)
+
+ def testModifyAffiliations(self):
+ """Test owner modifying node affiliations."""
+ self.xmpp['xep_0060'].modify_affiliations(
+ 'pubsub.example.com',
+ 'somenode',
+ affiliations=[('user@example.com', 'publisher'),
+ ('foo@example.net', 'none')],
+ block=False)
+ self.send("""
+ <iq type="set" id="1" to="pubsub.example.com">
+ <pubsub xmlns="http://jabber.org/protocol/pubsub#owner">
+ <affiliations node="somenode">
+ <affiliation jid="user@example.com" affiliation="publisher" />
+ <affiliation jid="foo@example.net" affiliation="none" />
+ </affiliations>
+ </pubsub>
+ </iq>
+ """)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamPubsub)
diff --git a/tests/test_stream_xep_0066.py b/tests/test_stream_xep_0066.py
new file mode 100644
index 00000000..e3f2ddfa
--- /dev/null
+++ b/tests/test_stream_xep_0066.py
@@ -0,0 +1,44 @@
+import time
+import threading
+
+from sleekxmpp.test import *
+
+
+class TestOOB(SleekTest):
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testSendOOB(self):
+ """Test sending an OOB transfer request."""
+ self.stream_start(plugins=['xep_0066', 'xep_0030'])
+
+ url = 'http://github.com/fritzy/SleekXMPP/blob/master/README'
+
+ t = threading.Thread(
+ name='send_oob',
+ target=self.xmpp['xep_0066'].send_oob,
+ args=('user@example.com', url),
+ kwargs={'desc': 'SleekXMPP README'})
+
+ t.start()
+
+ self.send("""
+ <iq to="user@example.com" type="set" id="1">
+ <query xmlns="jabber:iq:oob">
+ <url>http://github.com/fritzy/SleekXMPP/blob/master/README</url>
+ <desc>SleekXMPP README</desc>
+ </query>
+ </iq>
+ """)
+
+ self.recv("""
+ <iq id="1" type="result"
+ to="tester@localhost"
+ from="user@example.com" />
+ """)
+
+ t.join()
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestOOB)
diff --git a/tests/test_stream_xep_0085.py b/tests/test_stream_xep_0085.py
new file mode 100644
index 00000000..2a814805
--- /dev/null
+++ b/tests/test_stream_xep_0085.py
@@ -0,0 +1,59 @@
+import threading
+import time
+
+from sleekxmpp.test import *
+
+
+class TestStreamChatStates(SleekTest):
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testChatStates(self):
+ self.stream_start(mode='client', plugins=['xep_0030', 'xep_0085'])
+
+ results = []
+
+ def handle_state(msg):
+ results.append(msg['chat_state'])
+
+ self.xmpp.add_event_handler('chatstate_active', handle_state)
+ self.xmpp.add_event_handler('chatstate_inactive', handle_state)
+ self.xmpp.add_event_handler('chatstate_paused', handle_state)
+ self.xmpp.add_event_handler('chatstate_gone', handle_state)
+ self.xmpp.add_event_handler('chatstate_composing', handle_state)
+
+ self.recv("""
+ <message>
+ <active xmlns="http://jabber.org/protocol/chatstates" />
+ </message>
+ """)
+ self.recv("""
+ <message>
+ <inactive xmlns="http://jabber.org/protocol/chatstates" />
+ </message>
+ """)
+ self.recv("""
+ <message>
+ <paused xmlns="http://jabber.org/protocol/chatstates" />
+ </message>
+ """)
+ self.recv("""
+ <message>
+ <composing xmlns="http://jabber.org/protocol/chatstates" />
+ </message>
+ """)
+ self.recv("""
+ <message>
+ <gone xmlns="http://jabber.org/protocol/chatstates" />
+ </message>
+ """)
+
+ # Give event queue time to process
+ time.sleep(0.3)
+ expected = ['active', 'inactive', 'paused', 'composing', 'gone']
+ self.failUnless(results == expected,
+ "Chat state event not handled: %s" % results)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamChatStates)
diff --git a/tests/test_stream_xep_0092.py b/tests/test_stream_xep_0092.py
new file mode 100644
index 00000000..4a038558
--- /dev/null
+++ b/tests/test_stream_xep_0092.py
@@ -0,0 +1,69 @@
+import threading
+
+from sleekxmpp.test import *
+
+
+class TestStreamSet(SleekTest):
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testHandleSoftwareVersionRequest(self):
+ self.stream_start(mode='client', plugins=['xep_0030', 'xep_0092'])
+
+ self.xmpp['xep_0092'].name = 'SleekXMPP'
+ self.xmpp['xep_0092'].version = 'dev'
+ self.xmpp['xep_0092'].os = 'Linux'
+
+ self.recv("""
+ <iq type="get" id="1">
+ <query xmlns="jabber:iq:version" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="1">
+ <query xmlns="jabber:iq:version">
+ <name>SleekXMPP</name>
+ <version>dev</version>
+ <os>Linux</os>
+ </query>
+ </iq>
+ """)
+
+ def testMakeSoftwareVersionRequest(self):
+ results = []
+
+ def query():
+ r = self.xmpp['xep_0092'].get_version('foo@bar')
+ results.append(r)
+
+ self.stream_start(mode='client', plugins=['xep_0030', 'xep_0092'])
+
+ t = threading.Thread(target=query)
+ t.start()
+
+ self.send("""
+ <iq type="get" id="1" to="foo@bar">
+ <query xmlns="jabber:iq:version" />
+ </iq>
+ """)
+
+ self.recv("""
+ <iq type="result" id="1" from="foo@bar" to="tester@localhost">
+ <query xmlns="jabber:iq:version">
+ <name>Foo</name>
+ <version>1.0</version>
+ <os>Linux</os>
+ </query>
+ </iq>
+ """)
+
+ t.join()
+
+ expected = [{'name': 'Foo', 'version': '1.0', 'os':'Linux'}]
+ self.assertEqual(results, expected,
+ "Did not receive expected results: %s" % results)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamSet)
diff --git a/tests/test_stream_xep_0128.py b/tests/test_stream_xep_0128.py
new file mode 100644
index 00000000..42fc9143
--- /dev/null
+++ b/tests/test_stream_xep_0128.py
@@ -0,0 +1,105 @@
+import sys
+import time
+import threading
+
+from sleekxmpp.test import *
+from sleekxmpp.xmlstream import ElementBase
+
+
+class TestStreamExtendedDisco(SleekTest):
+
+ """
+ Test using the XEP-0128 plugin.
+ """
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testUsingExtendedInfo(self):
+ self.stream_start(mode='client',
+ jid='tester@localhost',
+ plugins=['xep_0030',
+ 'xep_0004',
+ 'xep_0128'])
+
+ form = self.xmpp['xep_0004'].makeForm(ftype='result')
+ form.addField(var='FORM_TYPE', ftype='hidden', value='testing')
+
+ info_ns = 'http://jabber.org/protocol/disco#info'
+ self.xmpp['xep_0030'].add_identity(node='test',
+ category='client',
+ itype='bot')
+ self.xmpp['xep_0030'].add_feature(node='test', feature=info_ns)
+ self.xmpp['xep_0128'].set_extended_info(node='test', data=form)
+
+ self.recv("""
+ <iq type="get" id="test" to="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="test" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="test">
+ <identity category="client" type="bot" />
+ <feature var="http://jabber.org/protocol/disco#info" />
+ <x xmlns="jabber:x:data" type="result">
+ <field var="FORM_TYPE" type="hidden">
+ <value>testing</value>
+ </field>
+ </x>
+ </query>
+ </iq>
+ """)
+
+ def testUsingMultipleExtendedInfo(self):
+ self.stream_start(mode='client',
+ jid='tester@localhost',
+ plugins=['xep_0030',
+ 'xep_0004',
+ 'xep_0128'])
+
+ form1 = self.xmpp['xep_0004'].makeForm(ftype='result')
+ form1.addField(var='FORM_TYPE', ftype='hidden', value='testing')
+
+ form2 = self.xmpp['xep_0004'].makeForm(ftype='result')
+ form2.addField(var='FORM_TYPE', ftype='hidden', value='testing_2')
+
+ info_ns = 'http://jabber.org/protocol/disco#info'
+ self.xmpp['xep_0030'].add_identity(node='test',
+ category='client',
+ itype='bot')
+ self.xmpp['xep_0030'].add_feature(node='test', feature=info_ns)
+ self.xmpp['xep_0128'].set_extended_info(node='test', data=[form1, form2])
+
+ self.recv("""
+ <iq type="get" id="test" to="tester@localhost">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="test" />
+ </iq>
+ """)
+
+ self.send("""
+ <iq type="result" id="test">
+ <query xmlns="http://jabber.org/protocol/disco#info"
+ node="test">
+ <identity category="client" type="bot" />
+ <feature var="http://jabber.org/protocol/disco#info" />
+ <x xmlns="jabber:x:data" type="result">
+ <field var="FORM_TYPE" type="hidden">
+ <value>testing</value>
+ </field>
+ </x>
+ <x xmlns="jabber:x:data" type="result">
+ <field var="FORM_TYPE" type="hidden">
+ <value>testing_2</value>
+ </field>
+ </x>
+ </query>
+ </iq>
+ """)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamExtendedDisco)
diff --git a/tests/test_stream_xep_0249.py b/tests/test_stream_xep_0249.py
new file mode 100644
index 00000000..9a25253f
--- /dev/null
+++ b/tests/test_stream_xep_0249.py
@@ -0,0 +1,63 @@
+import sys
+import time
+import threading
+
+from sleekxmpp.test import *
+from sleekxmpp.xmlstream import ElementBase
+
+
+class TestStreamDirectInvite(SleekTest):
+
+ """
+ Test using the XEP-0249 plugin.
+ """
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testReceiveInvite(self):
+ self.stream_start(mode='client',
+ plugins=['xep_0030',
+ 'xep_0249'])
+
+ events = []
+
+ def handle_invite(msg):
+ events.append(True)
+
+ self.xmpp.add_event_handler('groupchat_direct_invite',
+ handle_invite)
+
+ self.recv("""
+ <message>
+ <x xmlns="jabber:x:conference"
+ jid="sleek@conference.jabber.org"
+ password="foo"
+ reason="For testing" />
+ </message>
+ """)
+
+ time.sleep(.5)
+
+ self.failUnless(events == [True],
+ "Event not raised: %s" % events)
+
+ def testSentDirectInvite(self):
+ self.stream_start(mode='client',
+ plugins=['xep_0030',
+ 'xep_0249'])
+
+ self.xmpp['xep_0249'].send_invitation('user@example.com',
+ 'sleek@conference.jabber.org',
+ reason='Need to test Sleek')
+
+ self.send("""
+ <message to="user@example.com">
+ <x xmlns="jabber:x:conference"
+ jid="sleek@conference.jabber.org"
+ reason="Need to test Sleek" />
+ </message>
+ """)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamDirectInvite)
diff --git a/tests/test_tostring.py b/tests/test_tostring.py
new file mode 100644
index 00000000..e456d28e
--- /dev/null
+++ b/tests/test_tostring.py
@@ -0,0 +1,132 @@
+from sleekxmpp.test import *
+from sleekxmpp.stanza import Message
+from sleekxmpp.xmlstream.stanzabase import ET, ElementBase
+from sleekxmpp.xmlstream.tostring import tostring, xml_escape
+
+
+class TestToString(SleekTest):
+
+ """
+ Test the implementation of sleekxmpp.xmlstream.tostring
+ """
+
+ def tearDown(self):
+ self.stream_close()
+
+ def tryTostring(self, original='', expected=None, message='', **kwargs):
+ """
+ Compare the result of calling tostring against an
+ expected result.
+ """
+ if not expected:
+ expected=original
+ if isinstance(original, str):
+ xml = ET.fromstring(original)
+ else:
+ xml=original
+ result = tostring(xml, **kwargs)
+ self.failUnless(result == expected, "%s: %s" % (message, result))
+
+ def testXMLEscape(self):
+ """Test escaping XML special characters."""
+ original = """<foo bar="baz">'Hi & welcome!'</foo>"""
+ escaped = xml_escape(original)
+ desired = """&lt;foo bar=&quot;baz&quot;&gt;&apos;Hi"""
+ desired += """ &amp; welcome!&apos;&lt;/foo&gt;"""
+
+ self.failUnless(escaped == desired,
+ "XML escaping did not work: %s." % escaped)
+
+ def testEmptyElement(self):
+ """Test converting an empty element to a string."""
+ self.tryTostring(
+ original='<bar xmlns="foo" />',
+ message="Empty element not serialized correctly")
+
+ def testEmptyElementWrapped(self):
+ """Test converting an empty element inside another element."""
+ self.tryTostring(
+ original='<bar xmlns="foo"><baz /></bar>',
+ message="Wrapped empty element not serialized correctly")
+
+ def testEmptyElementWrappedText(self):
+ """
+ Test converting an empty element wrapped with text
+ inside another element.
+ """
+ self.tryTostring(
+ original='<bar xmlns="foo">Some text. <baz /> More text.</bar>',
+ message="Text wrapped empty element serialized incorrectly")
+
+ def testMultipleChildren(self):
+ """Test converting multiple child elements to a Unicode string."""
+ self.tryTostring(
+ original='<bar xmlns="foo"><baz><qux /></baz><quux /></bar>',
+ message="Multiple child elements not serialized correctly")
+
+ def testXMLNS(self):
+ """
+ Test using xmlns tostring parameter, which will prevent adding
+ an xmlns attribute to the serialized element if the element's
+ namespace is the same.
+ """
+ self.tryTostring(
+ original='<bar xmlns="foo" />',
+ expected='<bar />',
+ message="The xmlns parameter was not used properly.",
+ xmlns='foo')
+
+ def testTailContent(self):
+ """
+ Test that elements of the form <a>foo <b>bar</b> baz</a> only
+ include " baz" once.
+ """
+ self.tryTostring(
+ original='<a>foo <b>bar</b> baz</a>',
+ message='Element tail content is incorrect.')
+
+
+ def testStanzaNs(self):
+ """
+ Test using the stanza_ns tostring parameter, which will prevent
+ adding an xmlns attribute to the serialized element if the
+ element's namespace is the same.
+ """
+ self.tryTostring(
+ original='<bar xmlns="foo" />',
+ expected='<bar />',
+ message="The stanza_ns parameter was not used properly.",
+ stanza_ns='foo')
+
+ def testStanzaStr(self):
+ """
+ Test that stanza objects are serialized properly.
+ """
+ self.stream_start()
+
+ utf8_message = '\xe0\xb2\xa0_\xe0\xb2\xa0'
+ if not hasattr(utf8_message, 'decode'):
+ # Python 3
+ utf8_message = bytes(utf8_message, encoding='utf-8')
+ msg = self.Message()
+ msg['body'] = utf8_message.decode('utf-8')
+ expected = '<message><body>\xe0\xb2\xa0_\xe0\xb2\xa0</body></message>'
+ result = msg.__str__()
+ self.failUnless(result == expected,
+ "Stanza Unicode handling is incorrect: %s" % result)
+
+ def testXMLLang(self):
+ """Test that serializing xml:lang works."""
+
+ self.stream_start()
+
+ msg = self.Message()
+ msg._set_attr('{%s}lang' % msg.xml_ns, "no")
+
+ expected = '<message xml:lang="no" />'
+ result = msg.__str__()
+ self.failUnless(expected == result,
+ "Serialization with xml:lang failed: %s" % result)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestToString)