summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0198/stream_management.py
blob: 2b68faec2a429f87320a9cbe1748173fa0a56a91 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
"""
    Slixmpp: The Slick XMPP Library
    Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
    This file is part of Slixmpp.

    See the file LICENSE for copying permission.
"""

import asyncio
import logging
import collections

from slixmpp.stanza import Message, Presence, Iq, StreamFeatures
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream.handler import Callback, Waiter
from slixmpp.xmlstream.matcher import MatchXPath, MatchMany
from slixmpp.plugins.base import BasePlugin
from slixmpp.plugins.xep_0198 import stanza


log = logging.getLogger(__name__)


MAX_SEQ = 2 ** 32


class XEP_0198(BasePlugin):

    """
    XEP-0198: Stream Management
    """

    name = 'xep_0198'
    description = 'XEP-0198: Stream Management'
    dependencies = set()
    stanza = stanza
    default_config = {
        #: The last ack number received from the server.
        'last_ack': 0,

        #: The number of stanzas to wait between sending ack requests to
        #: the server. Setting this to ``1`` will send an ack request after
        #: every sent stanza. Defaults to ``5``.
        'window': 5,

        #: The stream management ID for the stream. Knowing this value is
        #: required in order to do stream resumption.
        'sm_id': None,

        #: A counter of handled incoming stanzas, mod 2^32.
        'handled': 0,

        #: A counter of unacked outgoing stanzas, mod 2^32.
        'seq': 0,

        #: Control whether or not the ability to resume the stream will be
        #: requested when enabling stream management. Defaults to ``True``.
        'allow_resume': True,

        'order': 10100,
        'resume_order': 9000
    }

    def plugin_init(self):
        """Start the XEP-0198 plugin."""

        # Only enable stream management for non-components,
        # since components do not yet perform feature negotiation.
        if self.xmpp.is_component:
            return

        self.window_counter = self.window

        self.enabled_in = False
        self.enabled_out = False
        self.unacked_queue = collections.deque()

        register_stanza_plugin(StreamFeatures, stanza.StreamManagement)
        self.xmpp.register_stanza(stanza.Enable)
        self.xmpp.register_stanza(stanza.Enabled)
        self.xmpp.register_stanza(stanza.Resume)
        self.xmpp.register_stanza(stanza.Resumed)
        self.xmpp.register_stanza(stanza.Ack)
        self.xmpp.register_stanza(stanza.RequestAck)

        # Only end the session when a </stream> element is sent,
        # not just because the connection has died.
        self.xmpp.end_session_on_disconnect = False

        # Register the feature twice because it may be ordered two
        # different ways: enabling after binding and resumption
        # before binding.
        self.xmpp.register_feature('sm',
                self._handle_sm_feature,
                restart=True,
                order=self.order)
        self.xmpp.register_feature('sm',
                self._handle_sm_feature,
                restart=True,
                order=self.resume_order)

        self.xmpp.register_handler(
                Callback('Stream Management Enabled',
                    MatchXPath(stanza.Enabled.tag_name()),
                    self._handle_enabled,
                    instream=True))

        self.xmpp.register_handler(
                Callback('Stream Management Resumed',
                    MatchXPath(stanza.Resumed.tag_name()),
                    self._handle_resumed,
                    instream=True))

        self.xmpp.register_handler(
                Callback('Stream Management Failed',
                    MatchXPath(stanza.Failed.tag_name()),
                    self._handle_failed,
                    instream=True))

        self.xmpp.register_handler(
                Callback('Stream Management Ack',
                    MatchXPath(stanza.Ack.tag_name()),
                    self._handle_ack,
                    instream=True))

        self.xmpp.register_handler(
                Callback('Stream Management Request Ack',
                    MatchXPath(stanza.RequestAck.tag_name()),
                    self._handle_request_ack,
                    instream=True))

        self.xmpp.add_filter('in', self._handle_incoming)
        self.xmpp.add_filter('out_sync', self._handle_outgoing)

        self.xmpp.add_event_handler('disconnected', self.disconnected)
        self.xmpp.add_event_handler('session_end', self.session_end)

    def plugin_end(self):
        if self.xmpp.is_component:
            return

        self.xmpp.unregister_feature('sm', self.order)
        self.xmpp.unregister_feature('sm', self.resume_order)
        self.xmpp.del_event_handler('disconnected', self.disconnected)
        self.xmpp.del_event_handler('session_end', self.session_end)
        self.xmpp.del_filter('in', self._handle_incoming)
        self.xmpp.del_filter('out_sync', self._handle_outgoing)
        self.xmpp.remove_handler('Stream Management Enabled')
        self.xmpp.remove_handler('Stream Management Resumed')
        self.xmpp.remove_handler('Stream Management Failed')
        self.xmpp.remove_handler('Stream Management Ack')
        self.xmpp.remove_handler('Stream Management Request Ack')
        self.xmpp.remove_stanza(stanza.Enable)
        self.xmpp.remove_stanza(stanza.Enabled)
        self.xmpp.remove_stanza(stanza.Resume)
        self.xmpp.remove_stanza(stanza.Resumed)
        self.xmpp.remove_stanza(stanza.Ack)
        self.xmpp.remove_stanza(stanza.RequestAck)

    def disconnected(self, event):
        """Reset enabled state until we can resume/reenable."""
        log.debug("disconnected, disabling SM")
        self.xmpp.event('sm_disabled', event)
        self.enabled_in = False
        self.enabled_out = False

    def session_end(self, event):
        """Reset stream management state."""
        log.debug("session_end, disabling SM")
        self.xmpp.event('sm_disabled', event)
        self.enabled_in = False
        self.enabled_out = False
        self.unacked_queue.clear()
        self.sm_id = None
        self.handled = 0
        self.seq = 0
        self.last_ack = 0

    def send_ack(self):
        """Send the current ack count to the server."""
        ack = stanza.Ack(self.xmpp)
        ack['h'] = self.handled
        self.xmpp.send_raw(str(ack))

    def request_ack(self, e=None):
        """Request an ack from the server."""
        log.debug("requesting ack")
        req = stanza.RequestAck(self.xmpp)
        self.xmpp.send_raw(str(req))

    async def _handle_sm_feature(self, features):
        """
        Enable or resume stream management.

        If no SM-ID is stored, and resource binding has taken place,
        stream management will be enabled.

        If an SM-ID is known, and the server allows resumption, the
        previous stream will be resumed.
        """
        if 'stream_management' in self.xmpp.features:
            # We've already negotiated stream management,
            # so no need to do it again.
            return False
        if not self.sm_id:
            if 'bind' in self.xmpp.features:
                enable = stanza.Enable(self.xmpp)
                enable['resume'] = self.allow_resume
                enable.send()
                log.debug("enabling SM")

                waiter = Waiter('enabled_or_failed',
                        MatchMany([
                            MatchXPath(stanza.Enabled.tag_name()),
                            MatchXPath(stanza.Failed.tag_name())]))
                self.xmpp.register_handler(waiter)
                result = await waiter.wait()
        elif self.sm_id and self.allow_resume and 'bind' not in self.xmpp.features:
            resume = stanza.Resume(self.xmpp)
            resume['h'] = self.handled
            resume['previd'] = self.sm_id
            resume.send()
            log.debug("resuming SM")

            # Wait for a response before allowing stream feature processing
            # to continue. The actual result processing will be done in the
            # _handle_resumed() or _handle_failed() methods.
            waiter = Waiter('resumed_or_failed',
                    MatchMany([
                        MatchXPath(stanza.Resumed.tag_name()),
                        MatchXPath(stanza.Failed.tag_name())]))
            self.xmpp.register_handler(waiter)
            result = await waiter.wait()
            if result is not None and result.name == 'resumed':
                return True
        return False

    def _handle_enabled(self, stanza):
        """Save the SM-ID, if provided.

        Raises an :term:`sm_enabled` event.
        """
        self.xmpp.features.add('stream_management')
        if stanza['id']:
            self.sm_id = stanza['id']
        self.enabled_in = True
        self.handled = 0
        self.xmpp.event('sm_enabled', stanza)

    def _handle_resumed(self, stanza):
        """Finish resuming a stream by resending unacked stanzas.

        Raises a :term:`session_resumed` event.
        """
        self.xmpp.features.add('stream_management')
        self.enabled_in = True
        self._handle_ack(stanza)
        for id, stanza in self.unacked_queue:
            self.xmpp.send(stanza, use_filters=False)
        self.xmpp.event('session_resumed', stanza)

    def _handle_failed(self, stanza):
        """
        Disable and reset any features used since stream management was
        requested (tracked stanzas may have been sent during the interval
        between the enable request and the enabled response).

        Raises an :term:`sm_failed` event.
        """
        self.enabled_in = False
        self.enabled_out = False
        self.unacked_queue.clear()
        self.xmpp.event('sm_failed', stanza)

    def _handle_ack(self, ack):
        """Process a server ack by freeing acked stanzas from the queue.

        Raises a :term:`stanza_acked` event for each acked stanza.
        """
        if ack['h'] == self.last_ack:
            return

        num_acked = (ack['h'] - self.last_ack) % MAX_SEQ
        num_unacked = len(self.unacked_queue)
        log.debug("Ack: %s, Last Ack: %s, " + \
                  "Unacked: %s, Num Acked: %s, " + \
                  "Remaining: %s",
            ack['h'],
            self.last_ack,
            num_unacked,
            num_acked,
            num_unacked - num_acked)
        if num_acked > len(self.unacked_queue) or num_acked < 0:
            log.error('Inconsistent sequence numbers from the server,'
                      ' ignoring and replacing ours with them.')
            num_acked = len(self.unacked_queue)
        for x in range(num_acked):
            seq, stanza = self.unacked_queue.popleft()
            self.xmpp.event('stanza_acked', stanza)
        self.last_ack = ack['h']

    def _handle_request_ack(self, req):
        """Handle an ack request by sending an ack."""
        self.send_ack()

    def _handle_incoming(self, stanza):
        """Increment the handled counter for each inbound stanza."""
        if not self.enabled_in:
            return stanza

        if isinstance(stanza, (Message, Presence, Iq)):
            # Sequence numbers are mod 2^32
            self.handled = (self.handled + 1) % MAX_SEQ
        return stanza

    def _handle_outgoing(self, stanza):
        """Store outgoing stanzas in a queue to be acked."""
        from slixmpp.plugins.xep_0198 import stanza as st
        if isinstance(stanza, (st.Enable, st.Resume)):
            self.enabled_out = True
            self.unacked_queue.clear()
            log.debug("enabling outgoing SM: %s" % stanza)

        if not self.enabled_out:
            return stanza

        if isinstance(stanza, (Message, Presence, Iq)):
            seq = None
            # Sequence numbers are mod 2^32
            self.seq = (self.seq + 1) % MAX_SEQ
            seq = self.seq
            self.unacked_queue.append((seq, stanza))
            self.window_counter -= 1
            if self.window_counter == 0:
                self.window_counter = self.window
                self.request_ack()
        return stanza