summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0070/confirm.py
blob: ba66c6072dada9d41d8da27fb4f57973284a2b8f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""
    Slixmpp: The Slick XMPP Library
    Copyright (C) 2015 Emmanuel Gil Peyrot
    This file is part of Slixmpp.

    See the file LICENSE for copying permission.
"""

import asyncio
import logging

from slixmpp.plugins import BasePlugin, register_plugin
from slixmpp import future_wrapper, Iq, Message
from slixmpp.exceptions import XMPPError, IqError, IqTimeout
from slixmpp.xmlstream import JID, register_stanza_plugin
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.plugins.xep_0070 import stanza, Confirm


log = logging.getLogger(__name__)


class XEP_0070(BasePlugin):

    """
    XEP-0070 Verifying HTTP Requests via XMPP
    """

    name = 'xep_0070'
    description = 'XEP-0070: Verifying HTTP Requests via XMPP'
    dependencies = {'xep_0030'}
    stanza = stanza

    def plugin_init(self):
        register_stanza_plugin(Iq, Confirm)
        register_stanza_plugin(Message, Confirm)

        self.xmpp.register_handler(
            Callback('Confirm',
                 StanzaPath('iq@type=get/confirm'),
                 self._handle_iq_confirm))

        self.xmpp.register_handler(
            Callback('Confirm',
                 StanzaPath('message/confirm'),
                 self._handle_message_confirm))

        #self.api.register(self._default_get_confirm,
        #        'get_confirm',
        #        default=True)

    def plugin_end(self):
        self.xmpp.remove_handler('Confirm')
        self.xmpp['xep_0030'].del_feature(feature='http://jabber.org/protocol/http-auth')

    def session_bind(self, jid):
        self.xmpp['xep_0030'].add_feature('http://jabber.org/protocol/http-auth')

    def ask_confirm(self, jid, id, url, method, *, ifrom=None, message=None):
        if message is None:
            stanza = self.xmpp.Iq()
            stanza['type'] = 'get'
        else:
            stanza = self.xmpp.Message()
        stanza['from'] = ifrom
        stanza['to'] = jid
        stanza['confirm']['id'] = id
        stanza['confirm']['url'] = url
        stanza['confirm']['method'] = method
        if message is not None:
            stanza['body'] = message.format(id=id, url=url, method=method)
            stanza.send()
        else:
            try:
                yield from stanza.send()
            except IqError:
                return False
            except IqTimeout:
                return False
            else:
                return True

    def _handle_iq_confirm(self, iq):
        emitter = iq['from']
        id = iq['confirm']['id']
        url = iq['confirm']['url']
        method = iq['confirm']['method']
        accept = self.api['get_confirm'](emitter, id, url, method)
        if not accept:
            raise XMPPError(etype='auth', condition='not-authorized')

        iq.reply().send()

    def _handle_message_confirm(self, message):
        emitter = message['from']
        id = message['confirm']['id']
        url = message['confirm']['url']
        method = message['confirm']['method']
        accept = self.api['get_confirm'](emitter, id, url, method)
        if not accept:
            raise XMPPError(etype='auth', condition='not-authorized')

        message.reply().send()

    #def _default_get_confirm(self, jid, id, url, method):
    #    return False