summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0353/jingle_message.py
blob: 5d69ff7e1ec4e4da778b2532c8b119d09509ad53 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

# slixmpp: The Slick XMPP Library
# Copyright (C) 2020 Emmanuel Gil Peyrot
# This file is part of slixmpp.
# See the file LICENSE for copying permission.
import logging

from typing import Iterable, Tuple, Optional

from slixmpp import JID, Message
from slixmpp.plugins import BasePlugin
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.plugins.xep_0353 import stanza, Propose, Retract, Accept, Proceed, Reject

log = logging.getLogger(__name__)

class XEP_0353(BasePlugin):

    name = 'xep_0353'
    description = 'XEP-0353: Jingle Message Initiation'
    stanza = stanza

    def plugin_init(self):
        register_stanza_plugin(Message, Propose)
        register_stanza_plugin(Message, Retract)
        register_stanza_plugin(Message, Accept)
        register_stanza_plugin(Message, Proceed)
        register_stanza_plugin(Message, Reject)

        self.xmpp.register_handler(
            Callback('Indicating Intent to Start a Session',
                StanzaPath('message/jingle_propose'),
                self._handle_propose))
        self.xmpp.register_handler(
            Callback('Disavowing Intent to Start a Session',
                StanzaPath('message/jingle_retract'),
                self._handle_retract))
        self.xmpp.register_handler(
            Callback('Accepting Intent to Start a Session',
                StanzaPath('message/jingle_accept'),
                self._handle_accept))
        self.xmpp.register_handler(
            Callback('Proceed',
                StanzaPath('message/jingle_proceed'),
                self._handle_accept))
        self.xmpp.register_handler(
            Callback('Rejecting Intent to Start a Session',
                StanzaPath('message/jingle_reject'),
                self._handle_reject))

    def session_bind(self, jid):
        self.xmpp.plugin['xep_0030'].add_feature(stanza.JingleMessage.namespace)

    def plugin_end(self):
        self.xmpp.plugin['xep_0030'].del_feature(feature=stanza.JingleMessage.namespace)

    def _handle_propose(self, message):
        self.xmpp.event('jingle_message_propose', message)

    def _handle_retract(self, message):
        self.xmpp.event('jingle_message_retract', message)

    def _handle_accept(self, message):
        self.xmpp.event('jingle_message_accept', message)

    def _handle_proceed(self, message):
        self.xmpp.event('jingle_message_proceed', message)

    def _handle_reject(self, message):
        self.xmpp.event('jingle_message_reject', message)

    def propose(self, mto: JID, sid: str, descriptions: Iterable[Tuple[str, str]], *, mfrom: Optional[JID] = None):
        msg = self.xmpp.make_message(mto, mfrom=mfrom)
        msg['jingle_propose']['id'] = sid
        msg['jingle_propose']['descriptions'] = descriptions
        msg.send()

    def retract(self, mto: JID, sid: str, *, mfrom: Optional[JID] = None):
        msg = self.xmpp.make_message(mto, mfrom=mfrom)
        msg['jingle_retract']['id'] = sid
        msg.send()

    def accept(self, mto: JID, sid: str, *, mfrom: Optional[JID] = None):
        msg = self.xmpp.make_message(mto, mfrom=mfrom)
        msg['jingle_accept']['id'] = sid
        msg.send()

    def proceed(self, mto: JID, sid: str, *, mfrom: Optional[JID] = None):
        msg = self.xmpp.make_message(mto, mfrom=mfrom)
        msg['jingle_proceed']['id'] = sid
        msg.send()

    def reject(self, mto: JID, sid: str, *, mfrom: Optional[JID] = None):
        msg = self.xmpp.make_message(mto, mfrom=mfrom)
        msg['jingle_reject']['id'] = sid
        msg.send()