summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmmanuel Gil Peyrot <linkmauve@linkmauve.fr>2015-04-14 19:19:46 +0200
committerEmmanuel Gil Peyrot <linkmauve@linkmauve.fr>2015-04-19 20:48:02 +0200
commit4415d3be1ab10717e1bd3c3fde68b4c04932adda (patch)
treedfa38554eecb81a7c7bbd27ce790b96a72e9ecf6
parent058c5307877c4e0d8cf8cbef1b83bbd187de1bc4 (diff)
downloadslixmpp-4415d3be1ab10717e1bd3c3fde68b4c04932adda.tar.gz
slixmpp-4415d3be1ab10717e1bd3c3fde68b4c04932adda.tar.bz2
slixmpp-4415d3be1ab10717e1bd3c3fde68b4c04932adda.tar.xz
slixmpp-4415d3be1ab10717e1bd3c3fde68b4c04932adda.zip
XEP-0047: use coroutines for send(), sendall() and the new sendfile().
-rw-r--r--slixmpp/plugins/xep_0047/stream.py27
-rw-r--r--tests/test_stream_xep_0047.py4
2 files changed, 20 insertions, 11 deletions
diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py
index eee6bd0e..3be894eb 100644
--- a/slixmpp/plugins/xep_0047/stream.py
+++ b/slixmpp/plugins/xep_0047/stream.py
@@ -31,7 +31,8 @@ class IBBytestream(object):
self.recv_queue = asyncio.Queue()
- def send(self, data):
+ @asyncio.coroutine
+ def send(self, data, timeout=None):
if not self.stream_started or self.stream_out_closed:
raise socket.error
if len(data) > self.block_size:
@@ -55,17 +56,22 @@ class IBBytestream(object):
iq['ibb_data']['sid'] = self.sid
iq['ibb_data']['seq'] = seq
iq['ibb_data']['data'] = data
- iq.send(callback=self._recv_ack)
+ yield from iq.send(timeout=timeout)
return len(data)
- def sendall(self, data):
+ @asyncio.coroutine
+ def sendall(self, data, timeout=None):
sent_len = 0
while sent_len < len(data):
- sent_len += self.send(data[sent_len:self.block_size])
+ sent_len += yield from self.send(data[sent_len:self.block_size], timeout=timeout)
- def _recv_ack(self, iq):
- if iq['type'] == 'error':
- self.close()
+ @asyncio.coroutine
+ def sendfile(self, file, timeout=None):
+ while True:
+ data = file.read(self.block_size)
+ if not data:
+ break
+ yield from self.send(data, timeout=timeout)
def _recv_data(self, stanza):
new_seq = stanza['ibb_data']['seq']
@@ -80,7 +86,7 @@ class IBBytestream(object):
raise XMPPError('not-acceptable')
self.recv_queue.put_nowait(data)
- self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data})
+ self.xmpp.event('ibb_stream_data', self)
if isinstance(stanza, Iq):
stanza.reply().send()
@@ -93,7 +99,7 @@ class IBBytestream(object):
raise socket.error
return self.recv_queue.get_nowait()
- def close(self):
+ def close(self, timeout=None):
iq = self.xmpp.Iq()
iq['type'] = 'set'
iq['to'] = self.peer_jid
@@ -102,8 +108,9 @@ class IBBytestream(object):
self.stream_out_closed = True
def _close_stream(_):
self.stream_in_closed = True
- iq.send(callback=_close_stream)
+ future = iq.send(timeout=timeout, callback=_close_stream)
self.xmpp.event('ibb_stream_end', self)
+ return future
def _closed(self, iq):
self.stream_in_closed = True
diff --git a/tests/test_stream_xep_0047.py b/tests/test_stream_xep_0047.py
index 2cc43823..ecba2445 100644
--- a/tests/test_stream_xep_0047.py
+++ b/tests/test_stream_xep_0047.py
@@ -1,3 +1,4 @@
+import asyncio
import threading
import time
@@ -78,6 +79,7 @@ class TestInBandByteStreams(SlixTest):
self.assertEqual(events, set(['ibb_stream_start', 'callback']))
+ @asyncio.coroutine
def testSendData(self):
"""Test sending data over an in-band bytestream."""
@@ -115,7 +117,7 @@ class TestInBandByteStreams(SlixTest):
# Test sending data out
- stream.send("Testing")
+ yield from stream.send("Testing")
self.send("""
<iq type="set" id="2"