diff options
Diffstat (limited to 'tests/test_events.py')
-rw-r--r-- | tests/test_events.py | 90 |
1 files changed, 66 insertions, 24 deletions
diff --git a/tests/test_events.py b/tests/test_events.py index bbc5832e..df36969d 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -1,31 +1,73 @@ import sleekxmpp +import time from . sleektest import * class TestEvents(SleekTest): - - def testEventHappening(self): - "Test handler working" - c = sleekxmpp.ClientXMPP('crap@wherever', 'password') - happened = [] - def handletestevent(event): - happened.append(True) - c.add_event_handler("test_event", handletestevent) - c.event("test_event", {}) - c.event("test_event", {}) - self.failUnless(happened == [True, True], "event did not get triggered twice") - - def testDelEvent(self): - "Test handler working, then deleted and not triggered" - c = sleekxmpp.ClientXMPP('crap@wherever', 'password') - happened = [] - def handletestevent(event): - happened.append(True) - c.add_event_handler("test_event", handletestevent) - c.event("test_event", {}) - c.del_event_handler("test_event", handletestevent) - c.event("test_event", {}) # should not trigger because it was deleted - self.failUnless(happened == [True], "event did not get triggered the correct number of times") - + + def setUp(self): + self.streamStart() + + def tearDown(self): + self.streamClose() + + def testEventHappening(self): + """Test handler working""" + happened = [] + + def handletestevent(event): + happened.append(True) + + self.xmpp.add_event_handler("test_event", handletestevent) + self.xmpp.event("test_event") + self.xmpp.event("test_event") + + # Give the event queue time to process. + time.sleep(0.1) + + msg = "Event was not triggered the correct number of times: %s" + self.failUnless(happened == [True, True], msg) + + def testDelEvent(self): + """Test handler working, then deleted and not triggered""" + happened = [] + + def handletestevent(event): + happened.append(True) + + self.xmpp.add_event_handler("test_event", handletestevent) + self.xmpp.event("test_event", {}) + + self.xmpp.del_event_handler("test_event", handletestevent) + + # Should not trigger because it was deleted + self.xmpp.event("test_event", {}) + + # Give the event queue time to process. + time.sleep(0.1) + + msg = "Event was not triggered the correct number of times: %s" + self.failUnless(happened == [True], msg % happened) + + def testDisposableEvent(self): + """Test disposable handler working, then not being triggered again.""" + happened = [] + + def handletestevent(event): + happened.append(True) + + self.xmpp.add_event_handler("test_event", handletestevent, + disposable=True) + self.xmpp.event("test_event", {}) + + # Should not trigger because it was deleted + self.xmpp.event("test_event", {}) + + # Give the event queue time to process. + time.sleep(0.1) + + msg = "Event was not triggered the correct number of times: %s" + self.failUnless(happened == [True], msg % happened) + suite = unittest.TestLoader().loadTestsFromTestCase(TestEvents) |