1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2015 Emmanuel Gil Peyrot
This file is part of Slixmpp.
See the file LICENSE for copying permission.
"""
import asyncio
import logging
from slixmpp.plugins import BasePlugin, register_plugin
from slixmpp import future_wrapper, Iq, Message
from slixmpp.exceptions import XMPPError, IqError, IqTimeout
from slixmpp.xmlstream import JID, register_stanza_plugin
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.plugins.xep_0070 import stanza, Confirm
log = logging.getLogger(__name__)
class XEP_0070(BasePlugin):
"""
XEP-0070 Verifying HTTP Requests via XMPP
"""
name = 'xep_0070'
description = 'XEP-0070: Verifying HTTP Requests via XMPP'
dependencies = {'xep_0030'}
stanza = stanza
def plugin_init(self):
register_stanza_plugin(Iq, Confirm)
register_stanza_plugin(Message, Confirm)
self.xmpp.register_handler(
Callback('Confirm',
StanzaPath('iq@type=get/confirm'),
self._handle_iq_confirm))
self.xmpp.register_handler(
Callback('Confirm',
StanzaPath('message/confirm'),
self._handle_message_confirm))
#self.api.register(self._default_get_confirm,
# 'get_confirm',
# default=True)
def plugin_end(self):
self.xmpp.remove_handler('Confirm')
self.xmpp['xep_0030'].del_feature(feature='http://jabber.org/protocol/http-auth')
def session_bind(self, jid):
self.xmpp['xep_0030'].add_feature('http://jabber.org/protocol/http-auth')
def ask_confirm(self, jid, id, url, method, *, ifrom=None, message=None):
if message is None:
stanza = self.xmpp.Iq()
stanza['type'] = 'get'
else:
stanza = self.xmpp.Message()
stanza['from'] = ifrom
stanza['to'] = jid
stanza['confirm']['id'] = id
stanza['confirm']['url'] = url
stanza['confirm']['method'] = method
if message is not None:
stanza['body'] = message.format(id=id, url=url, method=method)
stanza.send()
else:
try:
yield from stanza.send()
except IqError:
return False
except IqTimeout:
return False
else:
return True
def _handle_iq_confirm(self, iq):
emitter = iq['from']
id = iq['confirm']['id']
url = iq['confirm']['url']
method = iq['confirm']['method']
accept = self.api['get_confirm'](emitter, id, url, method)
if not accept:
raise XMPPError(etype='auth', condition='not-authorized')
iq.reply().send()
def _handle_message_confirm(self, message):
emitter = message['from']
id = message['confirm']['id']
url = message['confirm']['url']
method = message['confirm']['method']
accept = self.api['get_confirm'](emitter, id, url, method)
if not accept:
raise XMPPError(etype='auth', condition='not-authorized')
message.reply().send()
#def _default_get_confirm(self, jid, id, url, method):
# return False
|