summaryrefslogtreecommitdiff
path: root/plugins/otr.py
blob: f3a1d7a28f3f489b2ac5c21480a839234a1d0be4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import pyotr
from sleekxmpp.xmlstream.stanzabase import JID

import logging
log = logging.getLogger(__name__)

from plugin import BasePlugin

import tabs
from tabs import ConversationTab

class Plugin(BasePlugin):
    def init(self):
        self.contacts = {}
        # a dict of {full-JID: OTR object}
        self.add_event_handler('conversation_say_after', self.on_conversation_say)
        self.add_event_handler('conversation_msg', self.on_conversation_msg)

        self.add_tab_command(ConversationTab, 'otr', self.command_otr,
                usage='<start|end|fpr>',
                help='Start or stop OTR for the current conversation.',
                short='Manage OTR status',
                completion=self.otr_completion)
        ConversationTab.add_information_element('otr', self.display_encryption_status)

    def cleanup(self):
        ConversationTab.remove_information_element('otr')
        self.del_tab_command(ConversationTab, 'otr')

    def otr_special(self, tab, typ):
        def helper(msg):
            tab.add_message('%s: %s' % (typ, msg.decode()))
        return helper

    def otr_on_state_change(self, tab):
        def helper(old, new):
            old = self.otr_state(old)
            new = self.otr_state(new)
            tab.add_message('OTR state has changed from %s to %s' % (old, new))
        return helper

    def get_otr(self, tab):
        if tab not in self.contacts:
            self.contacts[tab] = pyotr.OTR(on_error=self.otr_special(tab, 'Error'), on_warn=self.otr_special(tab, 'Warn'), on_state_change=self.otr_on_state_change(tab))
        return self.contacts[tab]

    def on_conversation_say(self, message, tab):
        """
        Feed the message through the OTR filter
        """
        to = message['to']
        if not message['body']:
            # there’s nothing to encrypt if this is a chatstate, for example
            return
        otr_state = self.get_otr(tab)
        # Not sure what to do with xhtml bodies, and I don't like them anyway ;)
        del message['xhtml_im']
        say = otr_state.transform_msg(message['body'].encode())
        if say is not None:
            message['body'] = say.decode()
        else:
            del message['body']

    def on_conversation_msg(self, message, tab):
        """
        Feed the message through the OTR filter
        """
        fro = message['from']
        if not message['body']:
            # there’s nothing to decrypt if this is a chatstate, for example
            return
        otr_state = self.get_otr(tab)
        # Not sure what to do with xhtml bodies, and I don't like them anyway ;)
        del message['xhtml_im']
        display, reply = otr_state.handle_msg(message['body'].encode())
        #self.core.information('D: {!r}, R: {!r}'.format(display, reply))
        if display is not None:
            message['body'] = display.decode()
        else:
            del message['body']
        if reply is not None:
            self.otr_say(tab, reply.decode())

    @staticmethod
    def otr_state(state):
        if state == pyotr.MSG_STATE_PLAINTEXT:
            return 'plaintext'
        elif state == pyotr.MSG_STATE_ENCRYPTED:
            return 'encrypted'
        elif state == pyotr.MSG_STATE_FINISHED:
            return 'finished'

    def display_encryption_status(self, jid):
        """
        Returns the status of encryption for the associated jid. This is to be used
        in the ConversationTab’s InfoWin.
        """
        tab = self.core.get_tab_by_name(jid, tabs.ConversationTab)
        if tab not in self.contacts:
            return ''
        state = self.otr_state(self.contacts[tab].state)
        return ' OTR: %s' % (state,)

    def otr_say(self, tab, line):
        msg = self.core.xmpp.make_message(tab.get_name())
        msg['type'] = 'chat'
        msg['body'] = line
        msg.send()

    def command_otr(self, args):
        """
        A command to start or end OTR encryption
        """
        args = args.split()
        if not args:
            return self.core.command_help("otr")
        if isinstance(self.core.current_tab(), ConversationTab):
            jid = JID(self.core.current_tab().get_name())
        command = args[0]
        if command == 'start':
            otr_state = self.get_otr(self.core.current_tab())
            self.otr_say(self.core.current_tab(), otr_state.start().decode())
        elif command == 'end':
            otr_state = self.get_otr(self.core.current_tab())
            msg = otr_state.end()
            if msg is not None:
                self.otr_say(self.core.current_tab(), msg.decode())
        elif command == 'fpr':
            otr_state = self.get_otr(self.core.current_tab())
            our = otr_state.our_fpr
            if our:
                our = hex(int.from_bytes(our, 'big'))[2:].ljust(40).upper()
            their = otr_state.their_fpr
            if their:
                their = hex(int.from_bytes(their, 'big'))[2:].ljust(40).upper()
            self.core.current_tab().add_message('Your: %s Their: %s' % (our, their))
        self.core.refresh_window()

    def otr_completion(self, the_input):
        return the_input.auto_completion(['start', 'fpr', 'end'], '', quotify=False)