1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
import socket
import threading
import logging
from sleekxmpp.stanza import Iq
from sleekxmpp.util import Queue
from sleekxmpp.exceptions import XMPPError
log = logging.getLogger(__name__)
class IBBytestream(object):
def __init__(self, xmpp, sid, block_size, jid, peer, window_size=1, use_messages=False):
self.xmpp = xmpp
self.sid = sid
self.block_size = block_size
self.window_size = window_size
self.use_messages = use_messages
if jid is None:
jid = xmpp.boundjid
self.self_jid = jid
self.peer_jid = peer
self.send_seq = -1
self.recv_seq = -1
self._send_seq_lock = threading.Lock()
self._recv_seq_lock = threading.Lock()
self.stream_started = threading.Event()
self.stream_in_closed = threading.Event()
self.stream_out_closed = threading.Event()
self.recv_queue = Queue()
self.send_window = threading.BoundedSemaphore(value=self.window_size)
self.window_ids = set()
self.window_empty = threading.Event()
self.window_empty.set()
def send(self, data):
if not self.stream_started.is_set() or \
self.stream_out_closed.is_set():
raise socket.error
data = data[0:self.block_size]
self.send_window.acquire()
with self._send_seq_lock:
self.send_seq = (self.send_seq + 1) % 65535
seq = self.send_seq
if self.use_messages:
msg = self.xmpp.Message()
msg['to'] = self.peer_jid
msg['from'] = self.self_jid
msg['id'] = self.xmpp.new_id()
msg['ibb_data']['sid'] = self.sid
msg['ibb_data']['seq'] = seq
msg['ibb_data']['data'] = data
msg.send()
self.send_window.release()
else:
iq = self.xmpp.Iq()
iq['type'] = 'set'
iq['to'] = self.peer_jid
iq['from'] = self.self_jid
iq['ibb_data']['sid'] = self.sid
iq['ibb_data']['seq'] = seq
iq['ibb_data']['data'] = data
self.window_empty.clear()
self.window_ids.add(iq['id'])
iq.send(block=False, callback=self._recv_ack)
return len(data)
def sendall(self, data):
sent_len = 0
while sent_len < len(data):
sent_len += self.send(data[sent_len:])
def _recv_ack(self, iq):
self.window_ids.remove(iq['id'])
if not self.window_ids:
self.window_empty.set()
self.send_window.release()
if iq['type'] == 'error':
self.close()
def _recv_data(self, stanza):
with self._recv_seq_lock:
new_seq = stanza['ibb_data']['seq']
if new_seq != (self.recv_seq + 1) % 65535:
self.close()
raise XMPPError('unexpected-request')
self.recv_seq = new_seq
data = stanza['ibb_data']['data']
if len(data) > self.block_size:
self.close()
raise XMPPError('not-acceptable')
self.recv_queue.put(data)
self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data})
if isinstance(stanza, Iq):
stanza.reply()
stanza.send()
def recv(self, *args, **kwargs):
return self.read(block=True)
def read(self, block=True, timeout=None, **kwargs):
if not self.stream_started.is_set() or \
self.stream_in_closed.is_set():
raise socket.error
if timeout is not None:
block = True
try:
return self.recv_queue.get(block, timeout)
except:
return None
def close(self):
iq = self.xmpp.Iq()
iq['type'] = 'set'
iq['to'] = self.peer_jid
iq['from'] = self.self_jid
iq['ibb_close']['sid'] = self.sid
self.stream_out_closed.set()
iq.send(block=False,
callback=lambda x: self.stream_in_closed.set())
self.xmpp.event('ibb_stream_end', self)
def _closed(self, iq):
self.stream_in_closed.set()
self.stream_out_closed.set()
iq.reply()
iq.send()
self.xmpp.event('ibb_stream_end', self)
def makefile(self, *args, **kwargs):
return self
def connect(*args, **kwargs):
return None
def shutdown(self, *args, **kwargs):
return None
|