blob: 3a7b949f0fec57c6eb3ccc9a2107ebd384f96c55 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of Slixmpp.
See the file LICENSE for copying permission.
"""
import logging
from asyncio import Future
from typing import Optional
import slixmpp
from slixmpp import JID
from slixmpp.stanza import Iq
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.plugins import BasePlugin
from slixmpp.plugins.xep_0092 import Version, stanza
log = logging.getLogger(__name__)
class XEP_0092(BasePlugin):
"""
XEP-0092: Software Version
"""
name = 'xep_0092'
description = 'XEP-0092: Software Version'
dependencies = {'xep_0030'}
stanza = stanza
default_config = {
'software_name': 'Slixmpp',
'version': slixmpp.__version__,
'os': ''
}
def plugin_init(self):
"""
Start the XEP-0092 plugin.
"""
if 'name' in self.config:
self.software_name = self.config['name']
self.xmpp.register_handler(
Callback('Software Version',
StanzaPath('iq@type=get/software_version'),
self._handle_version))
register_stanza_plugin(Iq, Version)
def plugin_end(self):
self.xmpp.remove_handler('Software Version')
self.xmpp['xep_0030'].del_feature(feature='jabber:iq:version')
def session_bind(self, jid):
self.xmpp.plugin['xep_0030'].add_feature('jabber:iq:version')
def _handle_version(self, iq: Iq):
"""
Respond to a software version query.
:param iq: The Iq stanza containing the software version query.
"""
iq = iq.reply()
if self.software_name:
iq['software_version']['name'] = self.software_name
iq['software_version']['version'] = self.version
iq['software_version']['os'] = self.os
else:
iq.error()
iq['error']['type'] = 'cancel'
iq['error']['condition'] = 'service-unavailable'
iq.send()
def get_version(self, jid: JID, ifrom: Optional[JID] = None, **iqkwargs) -> Future:
"""
Retrieve the software version of a remote agent.
:param jid: The JID of the entity to query.
"""
iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom)
iq['query'] = Version.namespace
return iq.send(**iqkwargs)
|