summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0050/adhoc.py
blob: 072ec5aaf0aa174eb610fe89ee2da5ea53753d82 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628

# Slixmpp: The Slick XMPP Library
# Copyright (C) 2011 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
import asyncio
import logging
import time

from slixmpp import Iq
from slixmpp.exceptions import XMPPError
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream import register_stanza_plugin, JID
from slixmpp.plugins import BasePlugin
from slixmpp.plugins.xep_0050 import stanza
from slixmpp.plugins.xep_0050 import Command
from slixmpp.plugins.xep_0004 import Form


log = logging.getLogger(__name__)


class XEP_0050(BasePlugin):

    """
    XEP-0050: Ad-Hoc Commands

    XMPP's Adhoc Commands provides a generic workflow mechanism for
    interacting with applications. The result is similar to menu selections
    and multi-step dialogs in normal desktop applications. Clients do not
    need to know in advance what commands are provided by any particular
    application or agent. While adhoc commands provide similar functionality
    to Jabber-RPC, adhoc commands are used primarily for human interaction.

    Also see <http://xmpp.org/extensions/xep-0050.html>

    Events:
        command_execute  -- Received a command with action="execute"
        command_next     -- Received a command with action="next"
        command_complete -- Received a command with action="complete"
        command_cancel   -- Received a command with action="cancel"

    Attributes:
        commands -- A dictionary mapping JID/node pairs to command
                    names and handlers.
        sessions -- A dictionary or equivalent backend mapping
                    session IDs to dictionaries containing data
                    relevant to a command's session.

    """

    name = 'xep_0050'
    description = 'XEP-0050: Ad-Hoc Commands'
    dependencies = {'xep_0030', 'xep_0004'}
    stanza = stanza
    default_config = {
        'session_db': None
    }

    def plugin_init(self):
        """Start the XEP-0050 plugin."""
        self.sessions = self.session_db
        if self.sessions is None:
            self.sessions = {}

        self.commands = {}

        self.xmpp.register_handler(
            Callback("Ad-Hoc Execute",
                     StanzaPath('iq@type=set/command'),
                     self._handle_command))

        register_stanza_plugin(Iq, Command)
        register_stanza_plugin(Command, Form, iterable=True)

        self.xmpp.add_event_handler('command', self._handle_command_all)

    def plugin_end(self):
        self.xmpp.del_event_handler('command', self._handle_command_all)
        self.xmpp.remove_handler('Ad-Hoc Execute')
        self.xmpp['xep_0030'].del_feature(feature=Command.namespace)
        self.xmpp['xep_0030'].set_items(node=Command.namespace, items=tuple())

    def session_bind(self, jid):
        self.xmpp['xep_0030'].add_feature(Command.namespace)
        self.xmpp['xep_0030'].set_items(node=Command.namespace, items=tuple())

    def set_backend(self, db):
        """
        Replace the default session storage dictionary with
        a generic, external data storage mechanism.

        The replacement backend must be able to interact through
        the same syntax and interfaces as a normal dictionary.

        :param db: The new session storage mechanism.
        """
        self.sessions = db

    def prep_handlers(self, handlers, **kwargs):
        """
        Prepare a list of functions for use by the backend service.

        Intended to be replaced by the backend service as needed.

        :param handlers: A list of function pointers
        :param kwargs: Any additional parameters required by the backend.
        """
        pass

    # =================================================================
    # Server side (command provider) API

    def add_command(self, jid=None, node=None, name='', handler=None):
        """
        Make a new command available to external entities.

        Access control may be implemented in the provided handler.

        Command workflow is done across a sequence of command handlers. The
        first handler is given the initial Iq stanza of the request in order
        to support access control. Subsequent handlers are given only the
        payload items of the command. All handlers will receive the command's
        session data.

        :param jid: The JID that will expose the command.
        :param node: The node associated with the command.
        :param name: A human readable name for the command.
        :param handler: A function that will generate the response to the
                        initial command request, as well as enforcing any
                        access control policies.
        """
        if jid is None:
            jid = self.xmpp.boundjid
        elif not isinstance(jid, JID):
            jid = JID(jid)
        item_jid = jid.full

        self.xmpp['xep_0030'].add_identity(category='automation',
                                           itype='command-list',
                                           name='Ad-Hoc commands',
                                           node=Command.namespace,
                                           jid=jid)
        self.xmpp['xep_0030'].add_item(jid=item_jid,
                                       name=name,
                                       node=Command.namespace,
                                       subnode=node,
                                       ijid=jid)
        self.xmpp['xep_0030'].add_identity(category='automation',
                                           itype='command-node',
                                           name=name,
                                           node=node,
                                           jid=jid)
        self.xmpp['xep_0030'].add_feature(Command.namespace, None, jid)

        self.commands[(item_jid, node)] = (name, handler)

    def new_session(self):
        """Return a new session ID."""
        return str(time.time()) + '-' + self.xmpp.new_id()

    def _handle_command(self, iq):
        """Raise command events based on the command action."""
        self.xmpp.event('command', iq)
        self.xmpp.event('command_%s' % iq['command']['action'], iq)

    async def _handle_command_all(self, iq: Iq) -> None:
        action = iq['command']['action']
        sessionid = iq['command']['sessionid']
        session = self.sessions.get(sessionid)

        if session is None:
            return await self._handle_command_start(iq)

        if action in ('next', 'execute'):
            return await self._handle_command_next(iq)
        if action == 'prev':
            return await self._handle_command_prev(iq)
        if action == 'complete':
            return await self._handle_command_complete(iq)
        if action == 'cancel':
            return await self._handle_command_cancel(iq)
        return None

    async def _handle_command_start(self, iq):
        """
        Process an initial request to execute a command.

        :param iq: The command execution request.
        """
        sessionid = self.new_session()
        node = iq['command']['node']
        key = (iq['to'].full, node)
        name, handler = self.commands.get(key, ('Not found', None))
        if not handler:
            log.debug('Command not found: %s, %s', key, self.commands)
            raise XMPPError('item-not-found')

        payload = []
        for stanza in iq['command']['substanzas']:
            payload.append(stanza)

        if len(payload) == 1:
            payload = payload[0]

        interfaces = {item.plugin_attrib for item in payload}
        payload_classes = {item.__class__ for item in payload}

        initial_session = {'id': sessionid,
                           'from': iq['from'],
                           'to': iq['to'],
                           'node': node,
                           'payload': payload,
                           'interfaces': interfaces,
                           'payload_classes': payload_classes,
                           'notes': None,
                           'has_next': False,
                           'allow_complete': False,
                           'allow_prev': False,
                           'past': [],
                           'next': None,
                           'prev': None,
                           'cancel': None}

        session = await _await_if_needed(handler, iq, initial_session)

        self._process_command_response(iq, session)

    async def _handle_command_next(self, iq):
        """
        Process a request for the next step in the workflow
        for a command with multiple steps.

        :param iq: The command continuation request.
        """
        sessionid = iq['command']['sessionid']
        session = self.sessions.get(sessionid)

        if session:
            handler = session['next']
            interfaces = session['interfaces']
            results = []
            for stanza in iq['command']['substanzas']:
                if stanza.plugin_attrib in interfaces:
                    results.append(stanza)
            if len(results) == 1:
                results = results[0]

            session = await _await_if_needed(handler, results, session)

            self._process_command_response(iq, session)
        else:
            raise XMPPError('item-not-found')

    async def _handle_command_prev(self, iq):
        """
        Process a request for the prev step in the workflow
        for a command with multiple steps.

        :param iq: The command continuation request.
        """
        sessionid = iq['command']['sessionid']
        session = self.sessions.get(sessionid)

        if session:
            handler = session['prev']
            interfaces = session['interfaces']
            results = []
            for stanza in iq['command']['substanzas']:
                if stanza.plugin_attrib in interfaces:
                    results.append(stanza)
            if len(results) == 1:
                results = results[0]

            session = await _await_if_needed(handler, results, session)

            self._process_command_response(iq, session)
        else:
            raise XMPPError('item-not-found')

    def _process_command_response(self, iq, session):
        """
        Generate a command reply stanza based on the
        provided session data.

        :param iq: The command request stanza.
        :param session: A dictionary of relevant session data.
        """
        sessionid = session['id']

        payload = session['payload']
        if payload is None:
            payload = []
        if not isinstance(payload, list):
            payload = [payload]

        interfaces = session.get('interfaces', set())
        payload_classes = session.get('payload_classes', set())

        interfaces.update({item.plugin_attrib for item in payload})
        payload_classes.update({item.__class__ for item in payload})

        session['interfaces'] = interfaces
        session['payload_classes'] = payload_classes

        self.sessions[sessionid] = session

        for item in payload:
            register_stanza_plugin(Command, item.__class__, iterable=True)

        iq = iq.reply()
        iq['command']['node'] = session['node']
        iq['command']['sessionid'] = session['id']

        if session['next'] is None:
            iq['command']['actions'] = []
            iq['command']['status'] = 'completed'
        elif session['has_next']:
            actions = ['next']
            if session['allow_complete']:
                actions.append('complete')
            if session['allow_prev']:
                actions.append('prev')
            iq['command']['actions'] = actions
            iq['command']['status'] = 'executing'
        else:
            iq['command']['actions'] = ['complete']
            iq['command']['status'] = 'executing'

        iq['command']['notes'] = session['notes']

        for item in payload:
            iq['command'].append(item)

        iq.send()

    async def _handle_command_cancel(self, iq):
        """
        Process a request to cancel a command's execution.

        :param iq: The command cancellation request.
        """
        node = iq['command']['node']
        sessionid = iq['command']['sessionid']

        session = self.sessions.get(sessionid)

        if session:
            handler = session['cancel']
            if handler:
                await _await_if_needed(handler, iq, session)
            del self.sessions[sessionid]
            iq = iq.reply()
            iq['command']['node'] = node
            iq['command']['sessionid'] = sessionid
            iq['command']['status'] = 'canceled'
            iq['command']['notes'] = session['notes']
            iq.send()
        else:
            raise XMPPError('item-not-found')


    async def _handle_command_complete(self, iq):
        """
        Process a request to finish the execution of command
        and terminate the workflow.

        All data related to the command session will be removed.

        Arguments:
        :param iq: The command completion request.
        """
        node = iq['command']['node']
        sessionid = iq['command']['sessionid']
        session = self.sessions.get(sessionid)

        if session:
            handler = session['next']
            interfaces = session['interfaces']
            results = []
            for stanza in iq['command']['substanzas']:
                if stanza.plugin_attrib in interfaces:
                    results.append(stanza)
            if len(results) == 1:
                results = results[0]

            if handler:
                await _await_if_needed(handler, results, session)

            del self.sessions[sessionid]

            payload = session['payload']
            if payload is None:
                payload = []
            if not isinstance(payload, list):
                payload = [payload]

            for item in payload:
                register_stanza_plugin(Command, item.__class__, iterable=True)

            iq = iq.reply()

            iq['command']['node'] = node
            iq['command']['sessionid'] = sessionid
            iq['command']['actions'] = []
            iq['command']['status'] = 'completed'
            iq['command']['notes'] = session['notes']

            for item in payload:
                iq['command'].append(item)

            iq.send()
        else:
            raise XMPPError('item-not-found')

    # =================================================================
    # Client side (command user) API

    def get_commands(self, jid, **kwargs):
        """
        Return a list of commands provided by a given JID.

        :param jid: The JID to query for commands.
        :param local: If true, then the query is for a JID/node
                      combination handled by this Slixmpp instance and
                      no stanzas need to be sent.
                      Otherwise, a disco stanza must be sent to the
                      remove JID to retrieve the items.
        :param iterator: If True, return a result set iterator using
                         the XEP-0059 plugin, if the plugin is loaded.
                         Otherwise the parameter is ignored.
        """
        return self.xmpp['xep_0030'].get_items(jid=jid,
                                               node=Command.namespace,
                                               **kwargs)

    def send_command(self, jid, node, ifrom=None, action='execute',
                     payload=None, sessionid=None, flow=False, **kwargs):
        """
        Create and send a command stanza, without using the provided
        workflow management APIs.

        :param jid: The JID to send the command request or result.
        :param node: The node for the command.
        :param ifrom: Specify the sender's JID.
        :param action: May be one of: execute, cancel, complete,
                         or cancel.
        :param payload: Either a list of payload items, or a single
                        payload item such as a data form.
        :param sessionid: The current session's ID value.
        :param flow: If True, process the Iq result using the
                     command workflow methods contained in the
                     session instead of returning the response
                     stanza itself. Defaults to False.
        """
        iq = self.xmpp.Iq()
        iq['type'] = 'set'
        iq['to'] = jid
        iq['from'] = ifrom
        iq['command']['node'] = node
        iq['command']['action'] = action
        if sessionid is not None:
            iq['command']['sessionid'] = sessionid
        if payload is not None:
            if not isinstance(payload, list):
                payload = [payload]
            for item in payload:
                iq['command'].append(item)
        if not flow:
            return iq.send(**kwargs)
        else:
            iq.send(callback=self._handle_command_result)

    def start_command(self, jid, node, session, ifrom=None):
        """
        Initiate executing a command provided by a remote agent.

        The provided session dictionary should contain:

        :param next: A handler for processing the command result.
        :param error: A handler for processing any error stanzas
                      generated by the request.

        :param jid: The JID to send the command request.
        :param node: The node for the desired command.
        :param session: A dictionary of relevant session data.
        """
        session['jid'] = jid
        session['node'] = node
        session['timestamp'] = time.time()
        if 'payload' not in session:
            session['payload'] = None

        iq = self.xmpp.Iq()
        iq['type'] = 'set'
        iq['to'] = jid
        iq['from'] = ifrom
        session['from'] = ifrom
        iq['command']['node'] = node
        iq['command']['action'] = 'execute'
        if session['payload'] is not None:
            payload = session['payload']
            if not isinstance(payload, list):
                payload = list(payload)
            for stanza in payload:
                iq['command'].append(stanza)
        sessionid = 'client:pending_' + iq['id']
        session['id'] = sessionid
        self.sessions[sessionid] = session
        iq.send(callback=self._handle_command_result)

    def continue_command(self, session, direction='next'):
        """
        Execute the next action of the command.

        :param session: All stored data relevant to the current
                        command session.
        """
        sessionid = 'client:' + session['id']
        self.sessions[sessionid] = session

        self.send_command(session['jid'],
                          session['node'],
                          ifrom=session.get('from', None),
                          action=direction,
                          payload=session.get('payload', None),
                          sessionid=session['id'],
                          flow=True)

    def cancel_command(self, session):
        """
        Cancel the execution of a command.

        :param session: All stored data relevant to the current
                        command session.
        """
        sessionid = 'client:' + session['id']
        self.sessions[sessionid] = session

        self.send_command(session['jid'],
                          session['node'],
                          ifrom=session.get('from', None),
                          action='cancel',
                          payload=session.get('payload', None),
                          sessionid=session['id'],
                          flow=True)

    def complete_command(self, session):
        """
        Finish the execution of a command workflow.

        :param session: All stored data relevant to the current
                        command session.
        """
        sessionid = 'client:' + session['id']
        self.sessions[sessionid] = session

        self.send_command(session['jid'],
                          session['node'],
                          ifrom=session.get('from', None),
                          action='complete',
                          payload=session.get('payload', None),
                          sessionid=session['id'],
                          flow=True)

    def terminate_command(self, session):
        """
        Delete a command's session after a command has completed
        or an error has occurred.

        :param session: All stored data relevant to the current
                        command session.
        """
        sessionid = 'client:' + session['id']
        try:
            del self.sessions[sessionid]
        except Exception as e:
            log.error("Error deleting adhoc command session: %s" % e.message)

    def _handle_command_result(self, iq):
        """
        Process the results of a command request.

        Will execute the 'next' handler stored in the session
        data, or the 'error' handler depending on the Iq's type.

        :param iq: The command response.
        """
        sessionid = 'client:' + iq['command']['sessionid']
        pending = False

        if sessionid not in self.sessions:
            pending = True
            pendingid = 'client:pending_' + iq['id']
            if pendingid not in self.sessions:
                return
            sessionid = pendingid

        session = self.sessions[sessionid]
        sessionid = 'client:' + iq['command']['sessionid']
        session['id'] = iq['command']['sessionid']

        self.sessions[sessionid] = session

        if pending:
            del self.sessions[pendingid]

        handler_type = 'next'
        if iq['type'] == 'error':
            handler_type = 'error'
        handler = session.get(handler_type, None)
        if handler:
            handler(iq, session)
        elif iq['type'] == 'error':
            self.terminate_command(session)

        if iq['command']['status'] == 'completed':
            self.terminate_command(session)


async def _await_if_needed(handler, *args):
    if asyncio.iscoroutinefunction(handler):
        log.debug(f"%s is async", handler)
        return await handler(*args)
    else:
        log.debug(f"%s is sync", handler)
        return handler(*args)