diff options
author | Lance Stout <lancestout@gmail.com> | 2012-03-21 13:00:43 -0700 |
---|---|---|
committer | Lance Stout <lancestout@gmail.com> | 2012-03-21 13:00:43 -0700 |
commit | fa4c52e499b63a79cccc4f947bd25f92af260ba2 (patch) | |
tree | 8eb11c22c7f5588d1c27d64afd4760af7c70fdb4 /sleekxmpp/plugins | |
parent | d5484808a794e13e005db9e5a7f910b5c62bcca2 (diff) | |
download | slixmpp-fa4c52e499b63a79cccc4f947bd25f92af260ba2.tar.gz slixmpp-fa4c52e499b63a79cccc4f947bd25f92af260ba2.tar.bz2 slixmpp-fa4c52e499b63a79cccc4f947bd25f92af260ba2.tar.xz slixmpp-fa4c52e499b63a79cccc4f947bd25f92af260ba2.zip |
Correct handling of acks for XEP-0198 under heavy load.
Diffstat (limited to 'sleekxmpp/plugins')
-rw-r--r-- | sleekxmpp/plugins/xep_0198/stream_management.py | 15 |
1 files changed, 10 insertions, 5 deletions
diff --git a/sleekxmpp/plugins/xep_0198/stream_management.py b/sleekxmpp/plugins/xep_0198/stream_management.py index 6ed1ea26..04aa0001 100644 --- a/sleekxmpp/plugins/xep_0198/stream_management.py +++ b/sleekxmpp/plugins/xep_0198/stream_management.py @@ -60,6 +60,8 @@ class XEP_0198(BasePlugin): #: the server. Setting this to ``1`` will send an ack request after #: every sent stanza. Defaults to ``5``. self.window = self.config.get('window', 5) + self.window_counter = self.window + self.window_counter_lock = threading.Lock() #: Control whether or not the ability to resume the stream will be #: requested when enabling stream management. Defaults to ``True``. @@ -132,12 +134,12 @@ class XEP_0198(BasePlugin): ack = stanza.Ack(self.xmpp) with self.handled_lock: ack['h'] = self.handled - ack.send() + self.xmpp.send_raw(str(ack), now=True) def request_ack(self, e=None): """Request an ack from the server.""" req = stanza.RequestAck(self.xmpp) - req.send() + self.xmpp.send_queue.put(str(req)) def _handle_sm_feature(self, features): """ @@ -158,7 +160,7 @@ class XEP_0198(BasePlugin): self.enabled.set() enable = stanza.Enable(self.xmpp) enable['resume'] = self.allow_resume - enable.send() + enable.send(now=True) self.handled = 0 elif self.sm_id and self.allow_resume: self.enabled.set() @@ -261,6 +263,9 @@ class XEP_0198(BasePlugin): self.seq = (self.seq + 1) % MAX_SEQ seq = self.seq self.unacked_queue.append((seq, stanza)) - if len(self.unacked_queue) > self.window: - self.xmpp.event('need_ack') + with self.window_counter_lock: + self.window_counter -= 1 + if self.window_counter == 0: + self.window_counter = self.window + self.request_ack() return stanza |