from sleekxmpp.test import * import sleekxmpp.plugins.xep_0033 as xep_0033 class TestLiveStream(SleekTest): """ Test that we can test a live stanza stream. """ def tearDown(self): self.stream_close() def testClientConnection(self): """Test that we can interact with a live ClientXMPP instance.""" self.stream_start(mode='client', socket='live', skip=False, jid='user@localhost/test', password='user') # Use sid=None to ignore any id sent by the server since # we can't know it in advance. self.stream_recv_header(sfrom='localhost', sid=None) self.stream_send_header(sto='localhost') self.stream_recv_feature(""" DIGEST-MD5 PLAIN """) self.stream_send_feature(""" """) self.stream_recv_feature(""" """) self.stream_send_header(sto='localhost') self.stream_recv_header(sfrom='localhost', sid=None) self.stream_recv_feature(""" DIGEST-MD5 PLAIN """) self.stream_send_feature(""" AHVzZXIAdXNlcg== """) self.stream_recv_feature(""" """) self.stream_send_header(sto='localhost') self.stream_recv_header(sfrom='localhost', sid=None) self.stream_recv_feature(""" """) # Should really use stream_send_iq, but our Iq stanza objects # can't handle bind element payloads yet. self.stream_send_feature(""" test """) self.stream_recv_feature(""" user@localhost/test """) self.stream_close() suite = unittest.TestLoader().loadTestsFromTestCase(TestLiveStream) if __name__ == '__main__': tests = unittest.TestSuite([suite]) result = unittest.TextTestRunner(verbosity=2).run(tests) test_ns = 'http://andyet.net/protocol/tests' print("" % ( test_ns, 'ran="%s"' % result.testsRun, 'errors="%s"' % len(result.errors), 'fails="%s"' % len(result.failures), 'success="%s"' % result.wasSuccessful()))