summaryrefslogtreecommitdiff
path: root/plugins/omemo_plugin.py
blob: daca9e8de2c724c57e1625601c0cd161b54ae3c7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2018 Maxime “pep” Buquet <pep@bouah.net>
#
# Distributed under terms of the zlib license.
"""
    OMEMO Plugin.
"""

import os
import asyncio
import logging
from typing import Callable, List, Optional, Set, Tuple, Union

from poezio.plugin import BasePlugin
from poezio.tabs import DynamicConversationTab, StaticConversationTab, ConversationTab, MucTab
from poezio.xdg import DATA_HOME

from slixmpp import JID
from slixmpp.plugins.xep_0384.plugin import MissingOwnKey

log = logging.getLogger(__name__)


class Plugin(BasePlugin):
    """OMEMO (XEP-0384) Plugin"""
    _enabled_jids = set()  # type: Set[JID]

    def init(self):
        self.info = lambda i: self.api.information(i, 'Info')
        self.xmpp = self.core.xmpp

        data_dir = os.path.join(DATA_HOME, 'omemo')
        os.makedirs(data_dir, exist_ok=True)

        self.xmpp.register_plugin(
            'xep_0384', {
                'data_dir': data_dir,
            })

        self.api.add_command(
            'omemo',
            self.command_status,
            help='Display contextual information',
        )

        ConversationTab.add_information_element('omemo', self.display_encryption_status)
        MucTab.add_information_element('omemo', self.display_encryption_status)

        self.api.add_command(
            'omemo_enable',
            self.command_enable,
            help='Enable OMEMO encryption',
        )

        self.api.add_command(
            'omemo_disable',
            self.command_disable,
            help='Disable OMEMO encryption',
        )

        self.api.add_command(
            'encrypted_message',
            self.send_message,
            help='Send OMEMO encrypted message',
        )

        self.api.add_event_handler(
            'conversation_say_after',
            self.on_conversation_say_after,
        )

        self.api.add_event_handler(
            'conversation_msg',
            self.on_conversation_msg,
        )

    def cleanup(self) -> None:
        ConversationTab.remove_information_element('omemo')
        MucTab.remove_information_element('omemo')

    def display_encryption_status(self, jid: JID) -> str:
        """
            Return information to display in the infobar if OMEMO is enabled
            for the JID.
        """

        if jid in self._enabled_jids:
            return " OMEMO"
        return ""

    def command_status(self, _args):
        """Display contextual information depending on currenttab."""
        tab = self.api.current_tab()
        self.info('OMEMO!')
        self.info("My device id: %d" % self.xmpp['xep_0384'].my_device_id())

    def _jid_from_context(self, jid: Optional[Union[str, JID]]) -> Tuple[Optional[JID], bool]:
        """
            Get bare JID from context if not specified

            Return a tuple with the JID and a bool specifying that the JID
            corresponds to the current tab.
        """

        tab = self.api.current_tab()

        tab_jid = None
        chat_tabs = (DynamicConversationTab, StaticConversationTab, ConversationTab, MucTab)
        if isinstance(tab, chat_tabs):
            tab_jid = JID(tab.name).bare

        # If current tab has a JID, use it if none is specified
        if not jid and tab_jid is not None:
            jid = tab_jid

        # We might not have found a JID at this stage. No JID provided and not
        # in a tab with a JID (InfoTab etc.).
        # If we do, make we
        if jid:
            # XXX: Ugly. We don't know if 'jid' is a str or a JID. And we want
            # to return a bareJID. We could change the JID API to allow us to
            # clear the resource one way or another.
            jid = JID(JID(jid).bare)
        else:
            jid = None

        return (jid, tab_jid is not None and tab_jid == jid)

    def command_enable(self, jid: Optional[str]) -> None:
        """
            Enable JID to use OMEMO with.

            Use current tab JID is none is specified. Refresh the tab if JID
            corresponds to the one being added.
        """

        jid, current_tab = self._jid_from_context(jid)
        if jid is None:
            return None

        if jid not in self._enabled_jids:
            self.info('OMEMO enabled for %s' % jid)
        self._enabled_jids.add(jid)

        # Refresh tab if JID matches
        if current_tab:
            self.api.current_tab().refresh()

        return None

    def command_disable(self, jid: Optional[str]) -> None:
        """
            Enable JID to use OMEMO with.

            Use current tab JID is none is specified. Refresh the tab if JID
            corresponds to the one being added.
        """

        jid, current_tab = self._jid_from_context(jid)
        if jid is None:
            return None

        if jid in self._enabled_jids:
            self.info('OMEMO disabled for %s' % jid)
        self._enabled_jids.remove(jid)

        # Refresh tab if JID matches
        if current_tab:
            self.api.current_tab().refresh()

        return None

    def send_message(self, _args):
        asyncio.ensure_future(
            self._send_message(
                "Hello Encrypted World!",
                [JID('some@jid')],
                mto=JID('some@jid'),
                mtype='chat',
            ),
        )

    async def _send_message(
        self,
        payload: str,
        recipients: List[JID],
        mto: Optional[JID] = None,
        mtype: Optional[str] = 'chat',
    ) -> None:
        encrypted = await self.xmpp['xep_0384'].encrypt_message(payload, recipients)
        msg = self.core.xmpp.make_message(mto, mtype=mtype)
        msg['body'] = 'This message is encrypted with Legacy OMEMO (eu.siacs.conversations.axolotl)'
        msg['eme']['namespace'] = 'eu.siacs.conversations.axolotl'
        msg.append(encrypted)
        log.debug('BAR: message: %r', msg)
        msg.send()

    def on_conversation_say_after(self, message, tab):
        """
        Outbound messages
        """

        # Check encryption status globally and to the contact, if enabled, add
        # ['omemo_encrypt'] attribute to message and send. Maybe delete
        # ['body'] and tab.add_message ourselves to specify typ=0 so messages
        # are not logged.

        fromjid = message['from']
        self.xmpp['xep_0384'].encrypt_message(message)

    def on_conversation_msg(self, message, _tab):
        """
        Inbound messages
        """

        # Check if encrypted, and if so replace message['body'] with
        # plaintext.

        self.info('Foo2')
        if self.xmpp['xep_0384'].is_encrypted(message):
            try:
                body = self.xmpp['xep_0384'].decrypt_message(message)
            except (MissingOwnKey,):
                log.debug("The following message is missing our key;"
                          "Couldn't decrypt: %r", message)
                return None
            message['body'] = body.decode("utf8")
            return None