diff options
author | Emmanuel Gil Peyrot <linkmauve@linkmauve.fr> | 2015-04-14 19:19:46 +0200 |
---|---|---|
committer | Emmanuel Gil Peyrot <linkmauve@linkmauve.fr> | 2015-04-19 20:48:02 +0200 |
commit | 4415d3be1ab10717e1bd3c3fde68b4c04932adda (patch) | |
tree | dfa38554eecb81a7c7bbd27ce790b96a72e9ecf6 | |
parent | 058c5307877c4e0d8cf8cbef1b83bbd187de1bc4 (diff) | |
download | slixmpp-4415d3be1ab10717e1bd3c3fde68b4c04932adda.tar.gz slixmpp-4415d3be1ab10717e1bd3c3fde68b4c04932adda.tar.bz2 slixmpp-4415d3be1ab10717e1bd3c3fde68b4c04932adda.tar.xz slixmpp-4415d3be1ab10717e1bd3c3fde68b4c04932adda.zip |
XEP-0047: use coroutines for send(), sendall() and the new sendfile().
-rw-r--r-- | slixmpp/plugins/xep_0047/stream.py | 27 | ||||
-rw-r--r-- | tests/test_stream_xep_0047.py | 4 |
2 files changed, 20 insertions, 11 deletions
diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index eee6bd0e..3be894eb 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -31,7 +31,8 @@ class IBBytestream(object): self.recv_queue = asyncio.Queue() - def send(self, data): + @asyncio.coroutine + def send(self, data, timeout=None): if not self.stream_started or self.stream_out_closed: raise socket.error if len(data) > self.block_size: @@ -55,17 +56,22 @@ class IBBytestream(object): iq['ibb_data']['sid'] = self.sid iq['ibb_data']['seq'] = seq iq['ibb_data']['data'] = data - iq.send(callback=self._recv_ack) + yield from iq.send(timeout=timeout) return len(data) - def sendall(self, data): + @asyncio.coroutine + def sendall(self, data, timeout=None): sent_len = 0 while sent_len < len(data): - sent_len += self.send(data[sent_len:self.block_size]) + sent_len += yield from self.send(data[sent_len:self.block_size], timeout=timeout) - def _recv_ack(self, iq): - if iq['type'] == 'error': - self.close() + @asyncio.coroutine + def sendfile(self, file, timeout=None): + while True: + data = file.read(self.block_size) + if not data: + break + yield from self.send(data, timeout=timeout) def _recv_data(self, stanza): new_seq = stanza['ibb_data']['seq'] @@ -80,7 +86,7 @@ class IBBytestream(object): raise XMPPError('not-acceptable') self.recv_queue.put_nowait(data) - self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data}) + self.xmpp.event('ibb_stream_data', self) if isinstance(stanza, Iq): stanza.reply().send() @@ -93,7 +99,7 @@ class IBBytestream(object): raise socket.error return self.recv_queue.get_nowait() - def close(self): + def close(self, timeout=None): iq = self.xmpp.Iq() iq['type'] = 'set' iq['to'] = self.peer_jid @@ -102,8 +108,9 @@ class IBBytestream(object): self.stream_out_closed = True def _close_stream(_): self.stream_in_closed = True - iq.send(callback=_close_stream) + future = iq.send(timeout=timeout, callback=_close_stream) self.xmpp.event('ibb_stream_end', self) + return future def _closed(self, iq): self.stream_in_closed = True diff --git a/tests/test_stream_xep_0047.py b/tests/test_stream_xep_0047.py index 2cc43823..ecba2445 100644 --- a/tests/test_stream_xep_0047.py +++ b/tests/test_stream_xep_0047.py @@ -1,3 +1,4 @@ +import asyncio import threading import time @@ -78,6 +79,7 @@ class TestInBandByteStreams(SlixTest): self.assertEqual(events, set(['ibb_stream_start', 'callback'])) + @asyncio.coroutine def testSendData(self): """Test sending data over an in-band bytestream.""" @@ -115,7 +117,7 @@ class TestInBandByteStreams(SlixTest): # Test sending data out - stream.send("Testing") + yield from stream.send("Testing") self.send(""" <iq type="set" id="2" |