1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
"""
slixmpp: The Slick XMPP Library
Copyright (C) 2020 Emmanuel Gil Peyrot
This file is part of slixmpp.
See the file LICENSE for copying permission.
"""
import logging
from typing import Iterable, Tuple, Optional
from slixmpp import JID, Message
from slixmpp.plugins import BasePlugin
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.plugins.xep_0353 import stanza, Propose, Retract, Accept, Proceed, Reject
log = logging.getLogger(__name__)
class XEP_0353(BasePlugin):
name = 'xep_0353'
description = 'XEP-0353: Jingle Message Initiation'
stanza = stanza
def plugin_init(self):
register_stanza_plugin(Message, Propose)
register_stanza_plugin(Message, Retract)
register_stanza_plugin(Message, Accept)
register_stanza_plugin(Message, Proceed)
register_stanza_plugin(Message, Reject)
self.xmpp.register_handler(
Callback('Indicating Intent to Start a Session',
StanzaPath('message/jingle_propose'),
self._handle_propose))
self.xmpp.register_handler(
Callback('Disavowing Intent to Start a Session',
StanzaPath('message/jingle_retract'),
self._handle_retract))
self.xmpp.register_handler(
Callback('Accepting Intent to Start a Session',
StanzaPath('message/jingle_accept'),
self._handle_accept))
self.xmpp.register_handler(
Callback('Proceed',
StanzaPath('message/jingle_proceed'),
self._handle_accept))
self.xmpp.register_handler(
Callback('Rejecting Intent to Start a Session',
StanzaPath('message/jingle_reject'),
self._handle_reject))
def session_bind(self, jid):
self.xmpp.plugin['xep_0030'].add_feature(stanza.JingleMessage.namespace)
def plugin_end(self):
self.xmpp.plugin['xep_0030'].del_feature(feature=stanza.JingleMessage.namespace)
def _handle_propose(self, message):
self.xmpp.event('jingle_message_propose', message)
def _handle_retract(self, message):
self.xmpp.event('jingle_message_retract', message)
def _handle_accept(self, message):
self.xmpp.event('jingle_message_accept', message)
def _handle_proceed(self, message):
self.xmpp.event('jingle_message_proceed', message)
def _handle_reject(self, message):
self.xmpp.event('jingle_message_reject', message)
def propose(self, mto: JID, sid: str, descriptions: Iterable[Tuple[str, str]], *, mfrom: Optional[JID] = None):
msg = self.xmpp.make_message(mto, mfrom=mfrom)
msg['jingle_propose']['id'] = sid
msg['jingle_propose']['descriptions'] = descriptions
msg.send()
def retract(self, mto: JID, sid: str, *, mfrom: Optional[JID] = None):
msg = self.xmpp.make_message(mto, mfrom=mfrom)
msg['jingle_retract']['id'] = sid
msg.send()
def accept(self, mto: JID, sid: str, *, mfrom: Optional[JID] = None):
msg = self.xmpp.make_message(mto, mfrom=mfrom)
msg['jingle_accept']['id'] = sid
msg.send()
def proceed(self, mto: JID, sid: str, *, mfrom: Optional[JID] = None):
msg = self.xmpp.make_message(mto, mfrom=mfrom)
msg['jingle_proceed']['id'] = sid
msg.send()
def reject(self, mto: JID, sid: str, *, mfrom: Optional[JID] = None):
msg = self.xmpp.make_message(mto, mfrom=mfrom)
msg['jingle_reject']['id'] = sid
msg.send()
|