diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/sleektest.py | 13 | ||||
-rw-r--r-- | tests/test_elementbase.py | 43 | ||||
-rw-r--r-- | tests/test_handlers.py | 112 | ||||
-rw-r--r-- | tests/test_stanzabase.py | 79 |
4 files changed, 241 insertions, 6 deletions
diff --git a/tests/sleektest.py b/tests/sleektest.py index 801253d3..66535bcb 100644 --- a/tests/sleektest.py +++ b/tests/sleektest.py @@ -504,7 +504,18 @@ class SleekTest(unittest.TestCase): if xml.attrib != other.attrib: return False - # Step 3: Recursively check children + # Step 3: Check text + if xml.text is None: + xml.text = "" + if other.text is None: + other.text = "" + xml.text = xml.text.strip() + other.text = other.text.strip() + + if xml.text != other.text: + return False + + # Step 4: Recursively check children for child in xml: child2s = other.findall("%s" % child.tag) if child2s is None: diff --git a/tests/test_elementbase.py b/tests/test_elementbase.py index 6b0c076b..dfd37b53 100644 --- a/tests/test_elementbase.py +++ b/tests/test_elementbase.py @@ -3,6 +3,21 @@ from sleekxmpp.xmlstream.stanzabase import ElementBase class TestElementBase(SleekTest): + def testFixNs(self): + """Test fixing namespaces in an XPath expression.""" + + e = ElementBase() + ns = "http://jabber.org/protocol/disco#items" + result = e._fix_ns("{%s}foo/bar/{abc}baz/{%s}more" % (ns, ns)) + + expected = "/".join(["{%s}foo" % ns, + "{%s}bar" % ns, + "{abc}baz", + "{%s}more" % ns]) + self.failUnless(expected == result, + "Incorrect namespace fixing result: %s" % str(result)) + + def testExtendedName(self): """Test element names of the form tag1/tag2/tag3.""" @@ -332,7 +347,7 @@ class TestElementBase(SleekTest): </wrapper> </foo> """) - stanza._setSubText('bar', text='', keep=True) + stanza._setSubText('wrapper/bar', text='', keep=True) self.checkStanza(TestStanza, stanza, """ <foo xmlns="foo"> <wrapper> @@ -343,7 +358,7 @@ class TestElementBase(SleekTest): """, use_values=False) stanza['bar'] = 'a' - stanza._setSubText('bar', text='') + stanza._setSubText('wrapper/bar', text='') self.checkStanza(TestStanza, stanza, """ <foo xmlns="foo"> <wrapper> @@ -439,12 +454,19 @@ class TestElementBase(SleekTest): class TestStanza(ElementBase): name = "foo" namespace = "foo" - interfaces = set(('bar','baz')) + interfaces = set(('bar','baz', 'qux')) + sub_interfaces = set(('qux',)) subitem = (TestSubStanza,) + def setQux(self, value): + self._setSubText('qux', text=value) + + def getQux(self): + return self._getSubText('qux') + class TestStanzaPlugin(ElementBase): name = "plugin" - namespace = "bar" + namespace = "http://test/slash/bar" interfaces = set(('attrib',)) registerStanzaPlugin(TestStanza, TestStanzaPlugin) @@ -464,11 +486,22 @@ class TestElementBase(SleekTest): self.failUnless(stanza.match("foo@bar=a@baz=b"), "Stanza did not match its own name with multiple attributes.") + stanza['qux'] = 'c' + self.failUnless(stanza.match("foo/qux"), + "Stanza did not match with subelements.") + + stanza['qux'] = '' + self.failUnless(stanza.match("foo/qux") == False, + "Stanza matched missing subinterface element.") + + self.failUnless(stanza.match("foo/bar") == False, + "Stanza matched nonexistent element.") + stanza['plugin']['attrib'] = 'c' self.failUnless(stanza.match("foo/plugin@attrib=c"), "Stanza did not match with plugin and attribute.") - self.failUnless(stanza.match("foo/{bar}plugin"), + self.failUnless(stanza.match("foo/{http://test/slash/bar}plugin"), "Stanza did not match with namespaced plugin.") substanza = TestSubStanza() diff --git a/tests/test_handlers.py b/tests/test_handlers.py new file mode 100644 index 00000000..c6262c61 --- /dev/null +++ b/tests/test_handlers.py @@ -0,0 +1,112 @@ +from . sleektest import * +import sleekxmpp +from sleekxmpp.xmlstream.handler import * +from sleekxmpp.xmlstream.matcher import * + +class TestHandlers(SleekTest): + """ + Test that we can simulate and test a stanza stream. + """ + + def setUp(self): + self.streamStart() + + def tearDown(self): + self.streamClose() + + def testCallback(self): + """Test using stream callback handlers.""" + + def callback_handler(stanza): + self.xmpp.sendRaw(""" + <message> + <body>Success!</body> + </message> + """) + + callback = Callback('Test Callback', + MatchXPath('{test}tester'), + callback_handler) + + self.xmpp.registerHandler(callback) + + self.streamRecv("""<tester xmlns="test" />""") + + msg = self.Message() + msg['body'] = 'Success!' + self.streamSendMessage(msg) + + def testWaiter(self): + """Test using stream waiter handler.""" + + def waiter_handler(stanza): + iq = self.xmpp.Iq() + iq['id'] = 'test' + iq['type'] = 'set' + iq['query'] = 'test' + reply = iq.send(block=True) + if reply: + self.xmpp.sendRaw(""" + <message> + <body>Successful: %s</body> + </message> + """ % reply['query']) + + self.xmpp.add_event_handler('message', waiter_handler, threaded=True) + + # Send message to trigger waiter_handler + self.streamRecv(""" + <message> + <body>Testing</body> + </message> + """) + + # Check that Iq was sent by waiter_handler + iq = self.Iq() + iq['id'] = 'test' + iq['type'] = 'set' + iq['query'] = 'test' + self.streamSendIq(iq) + + # Send the reply Iq + self.streamRecv(""" + <iq id="test" type="result"> + <query xmlns="test" /> + </iq> + """) + + # Check that waiter_handler received the reply + msg = self.Message() + msg['body'] = 'Successful: test' + self.streamSendMessage(msg) + + def testWaiterTimeout(self): + """Test that waiter handler is removed after timeout.""" + + def waiter_handler(stanza): + iq = self.xmpp.Iq() + iq['id'] = 'test2' + iq['type'] = 'set' + iq['query'] = 'test2' + reply = iq.send(block=True, timeout=0) + + self.xmpp.add_event_handler('message', waiter_handler, threaded=True) + + # Start test by triggerig waiter_handler + self.streamRecv("""<message><body>Start Test</body></message>""") + + # Check that Iq was sent to trigger start of timeout period + iq = self.Iq() + iq['id'] = 'test2' + iq['type'] = 'set' + iq['query'] = 'test2' + self.streamSendIq(iq) + + # Check that the waiter is no longer registered + waiter_exists = self.xmpp.removeHandler('IqWait_test2') + + self.failUnless(waiter_exists == False, + "Waiter handler was not removed.") + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestHandlers) diff --git a/tests/test_stanzabase.py b/tests/test_stanzabase.py new file mode 100644 index 00000000..682068d9 --- /dev/null +++ b/tests/test_stanzabase.py @@ -0,0 +1,79 @@ +from . sleektest import * +import sleekxmpp +from sleekxmpp.xmlstream.stanzabase import ET, StanzaBase + +class TestStanzaBase(SleekTest): + + def testTo(self): + """Test the 'to' interface of StanzaBase.""" + stanza = StanzaBase() + stanza['to'] = 'user@example.com' + self.failUnless(str(stanza['to']) == 'user@example.com', + "Setting and retrieving stanza 'to' attribute did not work.") + + def testFrom(self): + """Test the 'from' interface of StanzaBase.""" + stanza = StanzaBase() + stanza['from'] = 'user@example.com' + self.failUnless(str(stanza['from']) == 'user@example.com', + "Setting and retrieving stanza 'from' attribute did not work.") + + def testPayload(self): + """Test the 'payload' interface of StanzaBase.""" + stanza = StanzaBase() + self.failUnless(stanza['payload'] == [], + "Empty stanza does not have an empty payload.") + + stanza['payload'] = ET.Element("{foo}foo") + self.failUnless(len(stanza['payload']) == 1, + "Stanza contents and payload do not match.") + + stanza['payload'] = ET.Element('{bar}bar') + self.failUnless(len(stanza['payload']) == 2, + "Stanza payload was not appended.") + + del stanza['payload'] + self.failUnless(stanza['payload'] == [], + "Stanza payload not cleared after deletion.") + + stanza['payload'] = [ET.Element('{foo}foo'), + ET.Element('{bar}bar')] + self.failUnless(len(stanza['payload']) == 2, + "Adding multiple elements to stanza's payload did not work.") + + def testClear(self): + """Test clearing a stanza.""" + stanza = StanzaBase() + stanza['to'] = 'user@example.com' + stanza['payload'] = ET.Element("{foo}foo") + stanza.clear() + + self.failUnless(stanza['payload'] == [], + "Stanza payload was not cleared after calling .clear()") + self.failUnless(str(stanza['to']) == "user@example.com", + "Stanza attributes were not preserved after calling .clear()") + + def testReply(self): + """Test creating a reply stanza.""" + stanza = StanzaBase() + stanza['to'] = "recipient@example.com" + stanza['from'] = "sender@example.com" + stanza['payload'] = ET.Element("{foo}foo") + + stanza.reply() + + self.failUnless(str(stanza['to'] == "sender@example.com"), + "Stanza reply did not change 'to' attribute.") + self.failUnless(stanza['payload'] == [], + "Stanza reply did not empty stanza payload.") + + def testError(self): + """Test marking a stanza as an error.""" + stanza = StanzaBase() + stanza['type'] = 'get' + stanza.error() + self.failUnless(stanza['type'] == 'error', + "Stanza type is not 'error' after calling error()") + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestStanzaBase) |