import time from sleekxmpp import Message from sleekxmpp.test import * from sleekxmpp.xmlstream.handler import * from sleekxmpp.xmlstream.matcher import * class TestHandlers(SleekTest): """ Test using handlers and waiters. """ def setUp(self): self.stream_start() def tearDown(self): self.stream_close() def testCallback(self): """Test using stream callback handlers.""" def callback_handler(stanza): self.xmpp.sendRaw(""" Success! """) callback = Callback('Test Callback', MatchXPath('{test}tester'), callback_handler) self.xmpp.registerHandler(callback) self.recv("""""") msg = self.Message() msg['body'] = 'Success!' self.send(msg) def testWaiter(self): """Test using stream waiter handler.""" def waiter_handler(stanza): iq = self.xmpp.Iq() iq['id'] = 'test' iq['type'] = 'set' iq['query'] = 'test' reply = iq.send(block=True) if reply: self.xmpp.sendRaw(""" Successful: %s """ % reply['query']) self.xmpp.add_event_handler('message', waiter_handler, threaded=True) # Send message to trigger waiter_handler self.recv(""" Testing """) # Check that Iq was sent by waiter_handler iq = self.Iq() iq['id'] = 'test' iq['type'] = 'set' iq['query'] = 'test' self.send(iq) # Send the reply Iq self.recv(""" """) # Check that waiter_handler received the reply msg = self.Message() msg['body'] = 'Successful: test' self.send(msg) def testWaiterTimeout(self): """Test that waiter handler is removed after timeout.""" def waiter_handler(stanza): iq = self.xmpp.Iq() iq['id'] = 'test2' iq['type'] = 'set' iq['query'] = 'test2' try: reply = iq.send(block=True, timeout=0) except IqTimeout: pass self.xmpp.add_event_handler('message', waiter_handler, threaded=True) # Start test by triggerig waiter_handler self.recv("""Start Test""") # Check that Iq was sent to trigger start of timeout period iq = self.Iq() iq['id'] = 'test2' iq['type'] = 'set' iq['query'] = 'test2' self.send(iq) # Give the event queue time to process. time.sleep(0.1) # Check that the waiter is no longer registered waiter_exists = self.xmpp.removeHandler('IqWait_test2') self.failUnless(waiter_exists == False, "Waiter handler was not removed.") def testIqCallback(self): """Test that iq.send(callback=handle_foo) works.""" events = [] def handle_foo(iq): events.append('foo') iq = self.Iq() iq['type'] = 'get' iq['id'] = 'test-foo' iq['to'] = 'user@localhost' iq['query'] = 'foo' iq.send(callback=handle_foo) self.send(""" """) self.recv(""" """) # Give event queue time to process time.sleep(0.1) self.failUnless(events == ['foo'], "Iq callback was not executed: %s" % events) def testMultipleHandlersForStanza(self): """ Test that multiple handlers for a single stanza work without clobbering each other. """ def handler_1(msg): msg.reply("Handler 1: %s" % msg['body']).send() def handler_2(msg): msg.reply("Handler 2: %s" % msg['body']).send() def handler_3(msg): msg.reply("Handler 3: %s" % msg['body']).send() self.xmpp.add_event_handler('message', handler_1) self.xmpp.add_event_handler('message', handler_2) self.xmpp.add_event_handler('message', handler_3) self.recv(""" Testing """) # This test is brittle, depending on the fact that handlers # will be checked in the order they are registered. self.send(""" Handler 1: Testing """) self.send(""" Handler 2: Testing """) self.send(""" Handler 3: Testing """) suite = unittest.TestLoader().loadTestsFromTestCase(TestHandlers)