summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/sleektest.py13
-rw-r--r--tests/test_elementbase.py43
-rw-r--r--tests/test_handlers.py112
-rw-r--r--tests/test_stanzabase.py79
4 files changed, 241 insertions, 6 deletions
diff --git a/tests/sleektest.py b/tests/sleektest.py
index 801253d3..66535bcb 100644
--- a/tests/sleektest.py
+++ b/tests/sleektest.py
@@ -504,7 +504,18 @@ class SleekTest(unittest.TestCase):
if xml.attrib != other.attrib:
return False
- # Step 3: Recursively check children
+ # Step 3: Check text
+ if xml.text is None:
+ xml.text = ""
+ if other.text is None:
+ other.text = ""
+ xml.text = xml.text.strip()
+ other.text = other.text.strip()
+
+ if xml.text != other.text:
+ return False
+
+ # Step 4: Recursively check children
for child in xml:
child2s = other.findall("%s" % child.tag)
if child2s is None:
diff --git a/tests/test_elementbase.py b/tests/test_elementbase.py
index 6b0c076b..dfd37b53 100644
--- a/tests/test_elementbase.py
+++ b/tests/test_elementbase.py
@@ -3,6 +3,21 @@ from sleekxmpp.xmlstream.stanzabase import ElementBase
class TestElementBase(SleekTest):
+ def testFixNs(self):
+ """Test fixing namespaces in an XPath expression."""
+
+ e = ElementBase()
+ ns = "http://jabber.org/protocol/disco#items"
+ result = e._fix_ns("{%s}foo/bar/{abc}baz/{%s}more" % (ns, ns))
+
+ expected = "/".join(["{%s}foo" % ns,
+ "{%s}bar" % ns,
+ "{abc}baz",
+ "{%s}more" % ns])
+ self.failUnless(expected == result,
+ "Incorrect namespace fixing result: %s" % str(result))
+
+
def testExtendedName(self):
"""Test element names of the form tag1/tag2/tag3."""
@@ -332,7 +347,7 @@ class TestElementBase(SleekTest):
</wrapper>
</foo>
""")
- stanza._setSubText('bar', text='', keep=True)
+ stanza._setSubText('wrapper/bar', text='', keep=True)
self.checkStanza(TestStanza, stanza, """
<foo xmlns="foo">
<wrapper>
@@ -343,7 +358,7 @@ class TestElementBase(SleekTest):
""", use_values=False)
stanza['bar'] = 'a'
- stanza._setSubText('bar', text='')
+ stanza._setSubText('wrapper/bar', text='')
self.checkStanza(TestStanza, stanza, """
<foo xmlns="foo">
<wrapper>
@@ -439,12 +454,19 @@ class TestElementBase(SleekTest):
class TestStanza(ElementBase):
name = "foo"
namespace = "foo"
- interfaces = set(('bar','baz'))
+ interfaces = set(('bar','baz', 'qux'))
+ sub_interfaces = set(('qux',))
subitem = (TestSubStanza,)
+ def setQux(self, value):
+ self._setSubText('qux', text=value)
+
+ def getQux(self):
+ return self._getSubText('qux')
+
class TestStanzaPlugin(ElementBase):
name = "plugin"
- namespace = "bar"
+ namespace = "http://test/slash/bar"
interfaces = set(('attrib',))
registerStanzaPlugin(TestStanza, TestStanzaPlugin)
@@ -464,11 +486,22 @@ class TestElementBase(SleekTest):
self.failUnless(stanza.match("foo@bar=a@baz=b"),
"Stanza did not match its own name with multiple attributes.")
+ stanza['qux'] = 'c'
+ self.failUnless(stanza.match("foo/qux"),
+ "Stanza did not match with subelements.")
+
+ stanza['qux'] = ''
+ self.failUnless(stanza.match("foo/qux") == False,
+ "Stanza matched missing subinterface element.")
+
+ self.failUnless(stanza.match("foo/bar") == False,
+ "Stanza matched nonexistent element.")
+
stanza['plugin']['attrib'] = 'c'
self.failUnless(stanza.match("foo/plugin@attrib=c"),
"Stanza did not match with plugin and attribute.")
- self.failUnless(stanza.match("foo/{bar}plugin"),
+ self.failUnless(stanza.match("foo/{http://test/slash/bar}plugin"),
"Stanza did not match with namespaced plugin.")
substanza = TestSubStanza()
diff --git a/tests/test_handlers.py b/tests/test_handlers.py
new file mode 100644
index 00000000..c6262c61
--- /dev/null
+++ b/tests/test_handlers.py
@@ -0,0 +1,112 @@
+from . sleektest import *
+import sleekxmpp
+from sleekxmpp.xmlstream.handler import *
+from sleekxmpp.xmlstream.matcher import *
+
+class TestHandlers(SleekTest):
+ """
+ Test that we can simulate and test a stanza stream.
+ """
+
+ def setUp(self):
+ self.streamStart()
+
+ def tearDown(self):
+ self.streamClose()
+
+ def testCallback(self):
+ """Test using stream callback handlers."""
+
+ def callback_handler(stanza):
+ self.xmpp.sendRaw("""
+ <message>
+ <body>Success!</body>
+ </message>
+ """)
+
+ callback = Callback('Test Callback',
+ MatchXPath('{test}tester'),
+ callback_handler)
+
+ self.xmpp.registerHandler(callback)
+
+ self.streamRecv("""<tester xmlns="test" />""")
+
+ msg = self.Message()
+ msg['body'] = 'Success!'
+ self.streamSendMessage(msg)
+
+ def testWaiter(self):
+ """Test using stream waiter handler."""
+
+ def waiter_handler(stanza):
+ iq = self.xmpp.Iq()
+ iq['id'] = 'test'
+ iq['type'] = 'set'
+ iq['query'] = 'test'
+ reply = iq.send(block=True)
+ if reply:
+ self.xmpp.sendRaw("""
+ <message>
+ <body>Successful: %s</body>
+ </message>
+ """ % reply['query'])
+
+ self.xmpp.add_event_handler('message', waiter_handler, threaded=True)
+
+ # Send message to trigger waiter_handler
+ self.streamRecv("""
+ <message>
+ <body>Testing</body>
+ </message>
+ """)
+
+ # Check that Iq was sent by waiter_handler
+ iq = self.Iq()
+ iq['id'] = 'test'
+ iq['type'] = 'set'
+ iq['query'] = 'test'
+ self.streamSendIq(iq)
+
+ # Send the reply Iq
+ self.streamRecv("""
+ <iq id="test" type="result">
+ <query xmlns="test" />
+ </iq>
+ """)
+
+ # Check that waiter_handler received the reply
+ msg = self.Message()
+ msg['body'] = 'Successful: test'
+ self.streamSendMessage(msg)
+
+ def testWaiterTimeout(self):
+ """Test that waiter handler is removed after timeout."""
+
+ def waiter_handler(stanza):
+ iq = self.xmpp.Iq()
+ iq['id'] = 'test2'
+ iq['type'] = 'set'
+ iq['query'] = 'test2'
+ reply = iq.send(block=True, timeout=0)
+
+ self.xmpp.add_event_handler('message', waiter_handler, threaded=True)
+
+ # Start test by triggerig waiter_handler
+ self.streamRecv("""<message><body>Start Test</body></message>""")
+
+ # Check that Iq was sent to trigger start of timeout period
+ iq = self.Iq()
+ iq['id'] = 'test2'
+ iq['type'] = 'set'
+ iq['query'] = 'test2'
+ self.streamSendIq(iq)
+
+ # Check that the waiter is no longer registered
+ waiter_exists = self.xmpp.removeHandler('IqWait_test2')
+
+ self.failUnless(waiter_exists == False,
+ "Waiter handler was not removed.")
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestHandlers)
diff --git a/tests/test_stanzabase.py b/tests/test_stanzabase.py
new file mode 100644
index 00000000..682068d9
--- /dev/null
+++ b/tests/test_stanzabase.py
@@ -0,0 +1,79 @@
+from . sleektest import *
+import sleekxmpp
+from sleekxmpp.xmlstream.stanzabase import ET, StanzaBase
+
+class TestStanzaBase(SleekTest):
+
+ def testTo(self):
+ """Test the 'to' interface of StanzaBase."""
+ stanza = StanzaBase()
+ stanza['to'] = 'user@example.com'
+ self.failUnless(str(stanza['to']) == 'user@example.com',
+ "Setting and retrieving stanza 'to' attribute did not work.")
+
+ def testFrom(self):
+ """Test the 'from' interface of StanzaBase."""
+ stanza = StanzaBase()
+ stanza['from'] = 'user@example.com'
+ self.failUnless(str(stanza['from']) == 'user@example.com',
+ "Setting and retrieving stanza 'from' attribute did not work.")
+
+ def testPayload(self):
+ """Test the 'payload' interface of StanzaBase."""
+ stanza = StanzaBase()
+ self.failUnless(stanza['payload'] == [],
+ "Empty stanza does not have an empty payload.")
+
+ stanza['payload'] = ET.Element("{foo}foo")
+ self.failUnless(len(stanza['payload']) == 1,
+ "Stanza contents and payload do not match.")
+
+ stanza['payload'] = ET.Element('{bar}bar')
+ self.failUnless(len(stanza['payload']) == 2,
+ "Stanza payload was not appended.")
+
+ del stanza['payload']
+ self.failUnless(stanza['payload'] == [],
+ "Stanza payload not cleared after deletion.")
+
+ stanza['payload'] = [ET.Element('{foo}foo'),
+ ET.Element('{bar}bar')]
+ self.failUnless(len(stanza['payload']) == 2,
+ "Adding multiple elements to stanza's payload did not work.")
+
+ def testClear(self):
+ """Test clearing a stanza."""
+ stanza = StanzaBase()
+ stanza['to'] = 'user@example.com'
+ stanza['payload'] = ET.Element("{foo}foo")
+ stanza.clear()
+
+ self.failUnless(stanza['payload'] == [],
+ "Stanza payload was not cleared after calling .clear()")
+ self.failUnless(str(stanza['to']) == "user@example.com",
+ "Stanza attributes were not preserved after calling .clear()")
+
+ def testReply(self):
+ """Test creating a reply stanza."""
+ stanza = StanzaBase()
+ stanza['to'] = "recipient@example.com"
+ stanza['from'] = "sender@example.com"
+ stanza['payload'] = ET.Element("{foo}foo")
+
+ stanza.reply()
+
+ self.failUnless(str(stanza['to'] == "sender@example.com"),
+ "Stanza reply did not change 'to' attribute.")
+ self.failUnless(stanza['payload'] == [],
+ "Stanza reply did not empty stanza payload.")
+
+ def testError(self):
+ """Test marking a stanza as an error."""
+ stanza = StanzaBase()
+ stanza['type'] = 'get'
+ stanza.error()
+ self.failUnless(stanza['type'] == 'error',
+ "Stanza type is not 'error' after calling error()")
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestStanzaBase)