import threading from sleekxmpp.test import * class TestStreamSet(SleekTest): def tearDown(self): self.stream_close() def testHandleSoftwareVersionRequest(self): self.stream_start(mode='client', plugins=['xep_0030', 'xep_0092']) self.xmpp['xep_0092'].name = 'SleekXMPP' self.xmpp['xep_0092'].version = 'dev' self.xmpp['xep_0092'].os = 'Linux' self.recv(""" """) self.send(""" SleekXMPP dev Linux """) def testMakeSoftwareVersionRequest(self): results = [] def query(): r = self.xmpp['xep_0092'].get_version('foo@bar') results.append(r) self.stream_start(mode='client', plugins=['xep_0030', 'xep_0092']) t = threading.Thread(target=query) t.start() self.send(""" """) self.recv(""" Foo 1.0 Linux """) t.join() expected = [{'name': 'Foo', 'version': '1.0', 'os':'Linux'}] self.assertEqual(results, expected, "Did not receive expected results: %s" % results) suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamSet)