from sleekxmpp.test import * import time import threading class TestStreamRoster(SleekTest): """ Test handling roster updates. """ def tearDown(self): self.stream_close() def testGetRoster(self): """Test handling roster requests.""" self.stream_start(mode='client', jid='tester@localhost') # Since get_roster blocks, we need to run it in a thread. t = threading.Thread(name='get_roster', target=self.xmpp.get_roster) t.start() self.send(""" """) self.recv(""" Friends Examples """) # Wait for get_roster to return. t.join() print self.xmpp.rosters['tester@localhost']['user@localhost']._state self.check_roster('tester@localhost', 'user@localhost', name='User', subscription='from', afrom=True, pending_out=True, groups=['Friends', 'Examples']) def testRosterSet(self): """Test handling pushed roster updates.""" self.stream_start(mode='client', jid='tester@localhost') self.recv(""" Friends Examples """) self.send(""" """) self.check_roster('tester@localhost', 'user@localhost', name='User', subscription='both', groups=['Friends', 'Examples']) suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamRoster)