summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0050/adhoc.py
blob: cb179a03ffafeb3c1570929c965331e2d74aab8c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
"""
    Slixmpp: The Slick XMPP Library
    Copyright (C) 2011 Nathanael C. Fritz, Lance J.T. Stout
    This file is part of Slixmpp.

    See the file LICENSE for copying permission.
"""

import logging
import time

from slixmpp import Iq
from slixmpp.exceptions import IqError
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream import register_stanza_plugin, JID
from slixmpp.plugins import BasePlugin
from slixmpp.plugins.xep_0050 import stanza
from slixmpp.plugins.xep_0050 import Command
from slixmpp.plugins.xep_0004 import Form


log = logging.getLogger(__name__)


class XEP_0050(BasePlugin):

    """
    XEP-0050: Ad-Hoc Commands

    XMPP's Adhoc Commands provides a generic workflow mechanism for
    interacting with applications. The result is similar to menu selections
    and multi-step dialogs in normal desktop applications. Clients do not
    need to know in advance what commands are provided by any particular
    application or agent. While adhoc commands provide similar functionality
    to Jabber-RPC, adhoc commands are used primarily for human interaction.

    Also see <http://xmpp.org/extensions/xep-0050.html>

    Configuration Values:
        threaded -- Indicates if command events should be threaded.
                    Defaults to True.

    Events:
        command_execute  -- Received a command with action="execute"
        command_next     -- Received a command with action="next"
        command_complete -- Received a command with action="complete"
        command_cancel   -- Received a command with action="cancel"

    Attributes:
        threaded -- Indicates if command events should be threaded.
                    Defaults to True.
        commands -- A dictionary mapping JID/node pairs to command
                    names and handlers.
        sessions -- A dictionary or equivalent backend mapping
                    session IDs to dictionaries containing data
                    relevant to a command's session.

    Methods:
        plugin_init       -- Overrides BasePlugin.plugin_init
        post_init         -- Overrides BasePlugin.post_init
        new_session       -- Return a new session ID.
        prep_handlers     -- Placeholder. May call with a list of handlers
                             to prepare them for use with the session storage
                             backend, if needed.
        set_backend       -- Replace the default session storage with some
                             external storage mechanism, such as a database.
                             The provided backend wrapper must be able to
                             act using the same syntax as a dictionary.
        add_command       -- Add a command for use by external entitites.
        get_commands      -- Retrieve a list of commands provided by a
                             remote agent.
        send_command      -- Send a command request to a remote agent.
        start_command     -- Command user API: initiate a command session
        continue_command  -- Command user API: proceed to the next step
        cancel_command    -- Command user API: cancel a command
        complete_command  -- Command user API: finish a command
        terminate_command -- Command user API: delete a command's session
    """

    name = 'xep_0050'
    description = 'XEP-0050: Ad-Hoc Commands'
    dependencies = set(['xep_0030', 'xep_0004'])
    stanza = stanza
    default_config = {
        'threaded': True,
        'session_db': None
    }

    def plugin_init(self):
        """Start the XEP-0050 plugin."""
        self.sessions = self.session_db
        if self.sessions is None:
            self.sessions = {}

        self.commands = {}

        self.xmpp.register_handler(
                Callback("Ad-Hoc Execute",
                         StanzaPath('iq@type=set/command'),
                         self._handle_command))

        register_stanza_plugin(Iq, Command)
        register_stanza_plugin(Command, Form)

        self.xmpp.add_event_handler('command_execute',
                                    self._handle_command_start)
        self.xmpp.add_event_handler('command_next',
                                    self._handle_command_next)
        self.xmpp.add_event_handler('command_cancel',
                                    self._handle_command_cancel)
        self.xmpp.add_event_handler('command_complete',
                                    self._handle_command_complete)

    def plugin_end(self):
        self.xmpp.del_event_handler('command_execute',
                                    self._handle_command_start)
        self.xmpp.del_event_handler('command_next',
                                    self._handle_command_next)
        self.xmpp.del_event_handler('command_cancel',
                                    self._handle_command_cancel)
        self.xmpp.del_event_handler('command_complete',
                                    self._handle_command_complete)
        self.xmpp.remove_handler('Ad-Hoc Execute')
        self.xmpp['xep_0030'].del_feature(feature=Command.namespace)
        self.xmpp['xep_0030'].set_items(node=Command.namespace, items=tuple())

    def session_bind(self, jid):
        self.xmpp['xep_0030'].add_feature(Command.namespace)
        self.xmpp['xep_0030'].set_items(node=Command.namespace, items=tuple())

    def set_backend(self, db):
        """
        Replace the default session storage dictionary with
        a generic, external data storage mechanism.

        The replacement backend must be able to interact through
        the same syntax and interfaces as a normal dictionary.

        Arguments:
            db -- The new session storage mechanism.
        """
        self.sessions = db

    def prep_handlers(self, handlers, **kwargs):
        """
        Prepare a list of functions for use by the backend service.

        Intended to be replaced by the backend service as needed.

        Arguments:
            handlers -- A list of function pointers
            **kwargs -- Any additional parameters required by the backend.
        """
        pass

    # =================================================================
    # Server side (command provider) API

    def add_command(self, jid=None, node=None, name='', handler=None):
        """
        Make a new command available to external entities.

        Access control may be implemented in the provided handler.

        Command workflow is done across a sequence of command handlers. The
        first handler is given the initial Iq stanza of the request in order
        to support access control. Subsequent handlers are given only the
        payload items of the command. All handlers will receive the command's
        session data.

        Arguments:
            jid     -- The JID that will expose the command.
            node    -- The node associated with the command.
            name    -- A human readable name for the command.
            handler -- A function that will generate the response to the
                       initial command request, as well as enforcing any
                       access control policies.
        """
        if jid is None:
            jid = self.xmpp.boundjid
        elif not isinstance(jid, JID):
            jid = JID(jid)
        item_jid = jid.full

        self.xmpp['xep_0030'].add_identity(category='automation',
                                           itype='command-list',
                                           name='Ad-Hoc commands',
                                           node=Command.namespace,
                                           jid=jid)
        self.xmpp['xep_0030'].add_item(jid=item_jid,
                                       name=name,
                                       node=Command.namespace,
                                       subnode=node,
                                       ijid=jid)
        self.xmpp['xep_0030'].add_identity(category='automation',
                                           itype='command-node',
                                           name=name,
                                           node=node,
                                           jid=jid)
        self.xmpp['xep_0030'].add_feature(Command.namespace, None, jid)

        self.commands[(item_jid, node)] = (name, handler)

    def new_session(self):
        """Return a new session ID."""
        return str(time.time()) + '-' + self.xmpp.new_id()

    def _handle_command(self, iq):
        """Raise command events based on the command action."""
        self.xmpp.event('command_%s' % iq['command']['action'], iq)

    def _handle_command_start(self, iq):
        """
        Process an initial request to execute a command.

        Arguments:
            iq -- The command execution request.
        """
        sessionid = self.new_session()
        node = iq['command']['node']
        key = (iq['to'].full, node)
        name, handler = self.commands.get(key, ('Not found', None))
        if not handler:
            log.debug('Command not found: %s, %s', key, self.commands)

        payload = []
        for stanza in iq['command']['substanzas']:
            payload.append(stanza)

        if len(payload) == 1:
            payload = payload[0]

        interfaces = set([item.plugin_attrib for item in payload])
        payload_classes = set([item.__class__ for item in payload])

        initial_session = {'id': sessionid,
                           'from': iq['from'],
                           'to': iq['to'],
                           'node': node,
                           'payload': payload,
                           'interfaces': interfaces,
                           'payload_classes': payload_classes,
                           'notes': None,
                           'has_next': False,
                           'allow_complete': False,
                           'allow_prev': False,
                           'past': [],
                           'next': None,
                           'prev': None,
                           'cancel': None}

        session = handler(iq, initial_session)

        self._process_command_response(iq, session)

    def _handle_command_next(self, iq):
        """
        Process a request for the next step in the workflow
        for a command with multiple steps.

        Arguments:
            iq -- The command continuation request.
        """
        sessionid = iq['command']['sessionid']
        session = self.sessions.get(sessionid)

        if session:
            handler = session['next']
            interfaces = session['interfaces']
            results = []
            for stanza in iq['command']['substanzas']:
                if stanza.plugin_attrib in interfaces:
                    results.append(stanza)
            if len(results) == 1:
                results = results[0]

            session = handler(results, session)

            self._process_command_response(iq, session)
        else:
            raise XMPPError('item-not-found')

    def _handle_command_prev(self, iq):
        """
        Process a request for the prev step in the workflow
        for a command with multiple steps.

        Arguments:
            iq -- The command continuation request.
        """
        sessionid = iq['command']['sessionid']
        session = self.sessions.get(sessionid)

        if session:
            handler = session['prev']
            interfaces = session['interfaces']
            results = []
            for stanza in iq['command']['substanzas']:
                if stanza.plugin_attrib in interfaces:
                    results.append(stanza)
            if len(results) == 1:
                results = results[0]

            session = handler(results, session)

            self._process_command_response(iq, session)
        else:
            raise XMPPError('item-not-found')

    def _process_command_response(self, iq, session):
        """
        Generate a command reply stanza based on the
        provided session data.

        Arguments:
            iq      -- The command request stanza.
            session -- A dictionary of relevant session data.
        """
        sessionid = session['id']

        payload = session['payload']
        if payload is None:
            payload = []
        if not isinstance(payload, list):
            payload = [payload]

        interfaces = session.get('interfaces', set())
        payload_classes = session.get('payload_classes', set())

        interfaces.update(set([item.plugin_attrib for item in payload]))
        payload_classes.update(set([item.__class__ for item in payload]))

        session['interfaces'] = interfaces
        session['payload_classes'] = payload_classes

        self.sessions[sessionid] = session

        for item in payload:
            register_stanza_plugin(Command, item.__class__, iterable=True)

        iq.reply()
        iq['command']['node'] = session['node']
        iq['command']['sessionid'] = session['id']

        if session['next'] is None:
            iq['command']['actions'] = []
            iq['command']['status'] = 'completed'
        elif session['has_next']:
            actions = ['next']
            if session['allow_complete']:
                actions.append('complete')
            if session['allow_prev']:
                actions.append('prev')
            iq['command']['actions'] = actions
            iq['command']['status'] = 'executing'
        else:
            iq['command']['actions'] = ['complete']
            iq['command']['status'] = 'executing'

        iq['command']['notes'] = session['notes']

        for item in payload:
            iq['command'].append(item)

        iq.send()

    def _handle_command_cancel(self, iq):
        """
        Process a request to cancel a command's execution.

        Arguments:
            iq -- The command cancellation request.
        """
        node = iq['command']['node']
        sessionid = iq['command']['sessionid']

        session = self.sessions.get(sessionid)

        if session:
            handler = session['cancel']
            if handler:
                handler(iq, session)
            del self.sessions[sessionid]
            iq.reply()
            iq['command']['node'] = node
            iq['command']['sessionid'] = sessionid
            iq['command']['status'] = 'canceled'
            iq['command']['notes'] = session['notes']
            iq.send()
        else:
            raise XMPPError('item-not-found')


    def _handle_command_complete(self, iq):
        """
        Process a request to finish the execution of command
        and terminate the workflow.

        All data related to the command session will be removed.

        Arguments:
            iq -- The command completion request.
        """
        node = iq['command']['node']
        sessionid = iq['command']['sessionid']
        session = self.sessions.get(sessionid)

        if session:
            handler = session['next']
            interfaces = session['interfaces']
            results = []
            for stanza in iq['command']['substanzas']:
                if stanza.plugin_attrib in interfaces:
                    results.append(stanza)
            if len(results) == 1:
                results = results[0]

            if handler:
                handler(results, session)

            del self.sessions[sessionid]

            iq.reply()
            iq['command']['node'] = node
            iq['command']['sessionid'] = sessionid
            iq['command']['actions'] = []
            iq['command']['status'] = 'completed'
            iq['command']['notes'] = session['notes']
            iq.send()
        else:
            raise XMPPError('item-not-found')

    # =================================================================
    # Client side (command user) API

    def get_commands(self, jid, **kwargs):
        """
        Return a list of commands provided by a given JID.

        Arguments:
            jid      -- The JID to query for commands.
            local    -- If true, then the query is for a JID/node
                        combination handled by this Slixmpp instance and
                        no stanzas need to be sent.
                        Otherwise, a disco stanza must be sent to the
                        remove JID to retrieve the items.
            ifrom    -- Specifiy the sender's JID.
            timeout  -- The time in seconds to block while waiting for
                        a reply. If None, then wait indefinitely.
            callback -- Optional callback to execute when a reply is
                        received instead of blocking and waiting for
                        the reply.
            iterator -- If True, return a result set iterator using
                        the XEP-0059 plugin, if the plugin is loaded.
                        Otherwise the parameter is ignored.
        """
        return self.xmpp['xep_0030'].get_items(jid=jid,
                                               node=Command.namespace,
                                               **kwargs)

    def send_command(self, jid, node, ifrom=None, action='execute',
                    payload=None, sessionid=None, flow=False, **kwargs):
        """
        Create and send a command stanza, without using the provided
        workflow management APIs.

        Arguments:
            jid       -- The JID to send the command request or result.
            node      -- The node for the command.
            ifrom     -- Specify the sender's JID.
            action    -- May be one of: execute, cancel, complete,
                         or cancel.
            payload   -- Either a list of payload items, or a single
                         payload item such as a data form.
            sessionid -- The current session's ID value.
            flow      -- If True, process the Iq result using the
                         command workflow methods contained in the
                         session instead of returning the response
                         stanza itself. Defaults to False.
            timeout   -- The length of time (in seconds) to wait for a
                         response before exiting the send call
                         if blocking is used. Defaults to
                         slixmpp.xmlstream.RESPONSE_TIMEOUT
            callback  -- Optional reference to a stream handler
                         function. Will be executed when a reply
                         stanza is received if flow=False.
        """
        iq = self.xmpp.Iq()
        iq['type'] = 'set'
        iq['to'] = jid
        iq['from'] = ifrom
        iq['command']['node'] = node
        iq['command']['action'] = action
        if sessionid is not None:
            iq['command']['sessionid'] = sessionid
        if payload is not None:
            if not isinstance(payload, list):
                payload = [payload]
            for item in payload:
                iq['command'].append(item)
        if not flow:
            return iq.send(**kwargs)
        else:
            iq.send(callback=self._handle_command_result)

    def start_command(self, jid, node, session, ifrom=None):
        """
        Initiate executing a command provided by a remote agent.

        The provided session dictionary should contain:
            next  -- A handler for processing the command result.
            error -- A handler for processing any error stanzas
                     generated by the request.

        Arguments:
            jid     -- The JID to send the command request.
            node    -- The node for the desired command.
            session -- A dictionary of relevant session data.
            ifrom   -- Optionally specify the sender's JID.
        """
        session['jid'] = jid
        session['node'] = node
        session['timestamp'] = time.time()
        if 'payload' not in session:
            session['payload'] = None

        iq = self.xmpp.Iq()
        iq['type'] = 'set'
        iq['to'] = jid
        iq['from'] = ifrom
        session['from'] = ifrom
        iq['command']['node'] = node
        iq['command']['action'] = 'execute'
        if session['payload'] is not None:
            payload = session['payload']
            if not isinstance(payload, list):
                payload = list(payload)
            for stanza in payload:
                iq['command'].append(stanza)
        sessionid = 'client:pending_' + iq['id']
        session['id'] = sessionid
        self.sessions[sessionid] = session
        iq.send(callback=self._handle_command_result)

    def continue_command(self, session, direction='next'):
        """
        Execute the next action of the command.

        Arguments:
            session -- All stored data relevant to the current
                       command session.
        """
        sessionid = 'client:' + session['id']
        self.sessions[sessionid] = session

        self.send_command(session['jid'],
                          session['node'],
                          ifrom=session.get('from', None),
                          action=direction,
                          payload=session.get('payload', None),
                          sessionid=session['id'],
                          flow=True)

    def cancel_command(self, session):
        """
        Cancel the execution of a command.

        Arguments:
            session -- All stored data relevant to the current
                       command session.
        """
        sessionid = 'client:' + session['id']
        self.sessions[sessionid] = session

        self.send_command(session['jid'],
                          session['node'],
                          ifrom=session.get('from', None),
                          action='cancel',
                          payload=session.get('payload', None),
                          sessionid=session['id'],
                          flow=True)

    def complete_command(self, session):
        """
        Finish the execution of a command workflow.

        Arguments:
            session -- All stored data relevant to the current
                       command session.
        """
        sessionid = 'client:' + session['id']
        self.sessions[sessionid] = session

        self.send_command(session['jid'],
                          session['node'],
                          ifrom=session.get('from', None),
                          action='complete',
                          payload=session.get('payload', None),
                          sessionid=session['id'],
                          flow=True)

    def terminate_command(self, session):
        """
        Delete a command's session after a command has completed
        or an error has occured.

        Arguments:
            session -- All stored data relevant to the current
                       command session.
        """
        sessionid = 'client:' + session['id']
        try:
            del self.sessions[sessionid]
        except Exception as e:
            log.error("Error deleting adhoc command session: %s" % e.message)

    def _handle_command_result(self, iq):
        """
        Process the results of a command request.

        Will execute the 'next' handler stored in the session
        data, or the 'error' handler depending on the Iq's type.

        Arguments:
            iq -- The command response.
        """
        sessionid = 'client:' + iq['command']['sessionid']
        pending = False

        if sessionid not in self.sessions:
            pending = True
            pendingid = 'client:pending_' + iq['id']
            if pendingid not in self.sessions:
                return
            sessionid = pendingid

        session = self.sessions[sessionid]
        sessionid = 'client:' + iq['command']['sessionid']
        session['id'] = iq['command']['sessionid']

        self.sessions[sessionid] = session

        if pending:
            del self.sessions[pendingid]

        handler_type = 'next'
        if iq['type'] == 'error':
            handler_type = 'error'
        handler = session.get(handler_type, None)
        if handler:
            handler(iq, session)
        elif iq['type'] == 'error':
            self.terminate_command(session)

        if iq['command']['status'] == 'completed':
            self.terminate_command(session)