import asyncio import threading import time import unittest from slixmpp.test import SlixTest class TestInBandByteStreams(SlixTest): def setUp(self): self.stream_start(plugins=['xep_0047', 'xep_0030']) def tearDown(self): self.stream_close() def testOpenStream(self): """Test requesting a stream, successfully""" events = [] def on_stream_start(stream): events.append('ibb_stream_start') self.xmpp.add_event_handler('ibb_stream_start', on_stream_start) self.xmpp.wrap(self.xmpp['xep_0047'].open_stream('tester@localhost/receiver', sid='testing')) self.wait_() self.send(""" """) self.recv(""" """) self.assertEqual(events, ['ibb_stream_start']) def testAysncOpenStream(self): """Test requesting a stream, aysnc""" events = set() def on_stream_start(stream): events.add('ibb_stream_start') def stream_callback(iq): events.add('callback') self.xmpp.add_event_handler('ibb_stream_start', on_stream_start) self.xmpp.wrap(self.xmpp['xep_0047'].open_stream('tester@localhost/receiver', sid='testing', callback=stream_callback)) self.wait_() self.send(""" """) self.recv(""" """) self.assertEqual(events, {'ibb_stream_start', 'callback'}) def testSendData(self): """Test sending data over an in-band bytestream.""" streams = [] data = [] def on_stream_start(stream): streams.append(stream) def on_stream_data(d): data.append(d.read()) self.xmpp.add_event_handler('ibb_stream_start', on_stream_start) self.xmpp.add_event_handler('ibb_stream_data', on_stream_data) self.xmpp.wrap(self.xmpp['xep_0047'].open_stream('tester@localhost/receiver', sid='testing')) self.wait_() self.send(""" """) self.recv(""" """) stream = streams[0] # Test sending data out self.xmpp.wrap(stream.send("Testing")) self.wait_() self.send(""" VGVzdGluZw== """) self.recv(""" """) # Test receiving data self.recv(""" aXQgd29ya3Mh """) self.send(""" """) self.assertEqual(data, [b'it works!']) suite = unittest.TestLoader().loadTestsFromTestCase(TestInBandByteStreams)