1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2018 Maxime “pep” Buquet <pep@bouah.net>
#
# Distributed under terms of the zlib license.
"""
OMEMO Plugin.
"""
import os
import asyncio
import textwrap
from poezio.plugin import BasePlugin
from poezio.tabs import ConversationTab
from poezio.xdg import CACHE_HOME
from slixmpp.plugins.xep_0384.plugin import MissingOwnKey
import logging
log = logging.getLogger(__name__)
class Plugin(BasePlugin):
def init(self):
self.info = lambda i: self.api.information(i, 'Info')
self.xmpp = self.core.xmpp
cache_dir = os.path.join(CACHE_HOME, 'omemo')
os.makedirs(cache_dir, exist_ok=True)
self.xmpp.register_plugin(
'xep_0384', {
'cache_dir': cache_dir,
})
self.api.add_command(
'omemo',
self.command_status,
help='Display contextual information status',
)
self.api.add_tab_command(
ConversationTab,
'omemo_enable',
self.command_enable,
help='Enable OMEMO encryption',
)
self.api.add_tab_command(
ConversationTab,
'omemo_disable',
self.command_disable,
help='Disable OMEMO encryption',
)
self.api.add_tab_command(
ConversationTab,
'omemo_toggle',
self.command_toggle,
help='Toggle OMEMO encryption state',
)
self.api.add_command(
'omemo_clear_devices',
self.command_clear_devices,
help='Clear all other OMEMO devices',
)
self.api.add_event_handler(
'conversation_say_after',
self.on_conversation_say_after,
)
self.api.add_event_handler(
'conversation_msg',
self.on_conversation_msg,
)
def command_status(self, _args):
"""Display contextual information depending on currenttab."""
tab = self.api.current_tab()
self.info('OMEMO!')
self.info("My device id: %d" % self.xmpp['xep_0384'].my_device_id())
def command_enable(self, _args):
pass
def command_disable(self, args):
pass
def command_toggle(self, _args):
pass
def command_clear_devices(self, _args):
asyncio.ensure_future(self.xmpp['xep_0384'].clear_device_list())
info = """
Device list has been reset.
Your other devices will reannounce themselves next time they get
online, but they might not be able to read encrypted messages in the
meantime.
"""
self.info(textwrap.dedent(info).strip())
def on_conversation_say_after(self, message, tab):
"""
Outbound messages
"""
# Check encryption status globally and to the contact, if enabled, add
# ['omemo_encrypt'] attribute to message and send. Maybe delete
# ['body'] and tab.add_message ourselves to specify typ=0 so messages
# are not logged.
fromjid = message['from']
self.xmpp['xep_0384'].encrypt_message(message)
self.info('Foo1')
def on_conversation_msg(self, message, _tab):
"""
Inbound messages
"""
# Check if encrypted, and if so replace message['body'] with
# plaintext.
self.info('Foo2')
if self.xmpp['xep_0384'].is_encrypted(message):
try:
body = self.xmpp['xep_0384'].decrypt_message(message)
except (MissingOwnKey,):
log.debug("The following message is missing our key;"
"Couldn't decrypt: %r", message)
return None
message['body'] = body.decode("utf8")
|