From d56d44225316652f8a818771c661c6f2463eb23f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maxime=20=E2=80=9Cpep=E2=80=9D=20Buquet?= Date: Thu, 6 Jun 2019 13:21:29 +0200 Subject: First shot of an E2EE plugin interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Features: - Decryption by default once the plugin is loaded if messages contains the right EME magic - Encryption of messages only if encryption is enabled for the current tab Missing pieces: - No special treatment done on the order of the treatment for messages yet - No special handling of non-Partial(/Full) Stanza Encryption yet (i.e., removing stuff other than ) Signed-off-by: Maxime “pep” Buquet --- poezio/plugin_e2ee.py | 155 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 poezio/plugin_e2ee.py (limited to 'poezio/plugin_e2ee.py') diff --git a/poezio/plugin_e2ee.py b/poezio/plugin_e2ee.py new file mode 100644 index 00000000..38ada42f --- /dev/null +++ b/poezio/plugin_e2ee.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 et ts=4 sts=4 sw=4 +# +# Copyright © 2019 Maxime “pep” Buquet +# +# Distributed under terms of the zlib license. See COPYING file. + +""" + Interface for E2EE (End-to-end Encryption) plugins. +""" + +from typing import Optional, Union + +from slixmpp import InvalidJID, JID, Message +from poezio.tabs import ConversationTab, DynamicConversationTab, PrivateTab, MucTab +from poezio.plugin import BasePlugin + +import logging +log = logging.getLogger(__name__) + + +ChatTabs = Union[ + MucTab, + DynamicConversationTab, + PrivateTab, +] + + +class E2EEPlugin(BasePlugin): + """Interface for E2EE plugins""" + + # At least one of encryption_name and encryption_short_name must be set + encryption_name = None # type: Optional[str] + encryption_short_name = None # type: Optional[str] + + # Required. + eme_ns = None # type: Optional[str] + + def init(self): + if self.encryption_name is None and self.encryption_short_name is None: + raise NotImplementedError + + if self.eme_ns is None: + raise NotImplementedError + + if self.encryption_name is None: + self.encryption_name = self.encryption_short_name + if self.encryption_short_name is None: + self.encryption_short_name = self.encryption_name + + self.api.add_event_handler('muc_msg', self._decrypt) + self.api.add_event_handler('muc_say', self._encrypt) + self.api.add_event_handler('conversation_msg', self._decrypt) + self.api.add_event_handler('conversation_say', self._encrypt) + self.api.add_event_handler('private_msg', self._decrypt) + self.api.add_event_handler('private_say', self._encrypt) + + self._enabled_tabs = set() + + for tab_t in (DynamicConversationTab, PrivateTab, MucTab): + self.api.add_tab_command( + tab_t, + self.encryption_short_name, + self._toggle_tab, + usage='', + short='Toggle {} encryption for tab.'.format(self.encryption_name), + help='Toggle automatic {} encryption for tab.'.format(self.encryption_name), + ) + + ConversationTab.add_information_element( + self.encryption_short_name, + self._display_encryption_status, + ) + MucTab.add_information_element( + self.encryption_short_name, + self._display_encryption_status, + ) + PrivateTab.add_information_element( + self.encryption_short_name, + self._display_encryption_status, + ) + + def cleanup(self): + ConversationTab.remove_information_element(self.encryption_short_name) + MucTab.remove_information_element(self.encryption_short_name) + PrivateTab.remove_information_element(self.encryption_short_name) + + def _display_encryption_status(self, jid_s: str) -> str: + """ + Return information to display in the infobar if encryption is + enabled for the JID. + """ + + try: + jid = JID(jid_s) + except InvalidJID: + return "" + + if jid in self._enabled_tabs: + return self.encryption_short_name + return "" + + def _toggle_tab(self, _input: str) -> None: + jid = self.api.current_tab().jid # type: JID + + if jid in self._enabled_tabs: + self._enabled_tabs.remove(jid) + self.api.information( + '{} encryption disabled for {}'.format(self.encryption_name, jid), + 'Info', + ) + else: + self._enabled_tabs.add(jid) + self.api.information( + '{} encryption enabled for {}'.format(self.encryption_name, jid), + 'Info', + ) + + def _decrypt(self, message: Message, tab: ChatTabs) -> None: + if message['eme'] is None: + return None + + if message['eme']['namespace'] != self.eme_ns: + return None + + log.debug('Received %s message: %r', self.encryption_name, message['body']) + + self.decrypt(message, tab) + + log.debug('Decrypted %s message: %r', self.encryption_name, message['body']) + return None + + def _encrypt(self, message: Message, tab: ChatTabs): + jid = tab.jid + if jid not in self._enabled_tabs: + return None + + log.debug('Sending %s message: %r', self.encryption_name, message['body']) + + message['eme']['namespace'] = self.eme_ns + message['eme']['name'] = self.encryption_name + + self.encrypt(message, tab) + + log.debug('Decrypted %s message: %r', self.encryption_name, message['body']) + return None + + def decrypt(self, _message: Message, tab: ChatTabs): + """Decryption method""" + raise NotImplementedError + + def encrypt(self, _message: Message, tab: ChatTabs): + """Encryption method""" + raise NotImplementedError -- cgit v1.2.3