summaryrefslogtreecommitdiff
path: root/sleekxmpp/plugins/xep_0198
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/plugins/xep_0198')
-rw-r--r--sleekxmpp/plugins/xep_0198/stream_management.py15
1 files changed, 10 insertions, 5 deletions
diff --git a/sleekxmpp/plugins/xep_0198/stream_management.py b/sleekxmpp/plugins/xep_0198/stream_management.py
index 6ed1ea26..04aa0001 100644
--- a/sleekxmpp/plugins/xep_0198/stream_management.py
+++ b/sleekxmpp/plugins/xep_0198/stream_management.py
@@ -60,6 +60,8 @@ class XEP_0198(BasePlugin):
#: the server. Setting this to ``1`` will send an ack request after
#: every sent stanza. Defaults to ``5``.
self.window = self.config.get('window', 5)
+ self.window_counter = self.window
+ self.window_counter_lock = threading.Lock()
#: Control whether or not the ability to resume the stream will be
#: requested when enabling stream management. Defaults to ``True``.
@@ -132,12 +134,12 @@ class XEP_0198(BasePlugin):
ack = stanza.Ack(self.xmpp)
with self.handled_lock:
ack['h'] = self.handled
- ack.send()
+ self.xmpp.send_raw(str(ack), now=True)
def request_ack(self, e=None):
"""Request an ack from the server."""
req = stanza.RequestAck(self.xmpp)
- req.send()
+ self.xmpp.send_queue.put(str(req))
def _handle_sm_feature(self, features):
"""
@@ -158,7 +160,7 @@ class XEP_0198(BasePlugin):
self.enabled.set()
enable = stanza.Enable(self.xmpp)
enable['resume'] = self.allow_resume
- enable.send()
+ enable.send(now=True)
self.handled = 0
elif self.sm_id and self.allow_resume:
self.enabled.set()
@@ -261,6 +263,9 @@ class XEP_0198(BasePlugin):
self.seq = (self.seq + 1) % MAX_SEQ
seq = self.seq
self.unacked_queue.append((seq, stanza))
- if len(self.unacked_queue) > self.window:
- self.xmpp.event('need_ack')
+ with self.window_counter_lock:
+ self.window_counter -= 1
+ if self.window_counter == 0:
+ self.window_counter = self.window
+ self.request_ack()
return stanza