From d56d44225316652f8a818771c661c6f2463eb23f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Maxime=20=E2=80=9Cpep=E2=80=9D=20Buquet?=
Date: Thu, 6 Jun 2019 13:21:29 +0200
Subject: First shot of an E2EE plugin interface
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Features:
- Decryption by default once the plugin is loaded if messages contains
the right EME magic
- Encryption of messages only if encryption is enabled for the current
tab
Missing pieces:
- No special treatment done on the order of the treatment for messages
yet
- No special handling of non-Partial(/Full) Stanza Encryption yet (i.e.,
removing stuff other than )
Signed-off-by: Maxime “pep” Buquet
---
plugins/b64.py | 50 ++++++++++++++++
poezio/plugin_e2ee.py | 155 ++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 205 insertions(+)
create mode 100644 plugins/b64.py
create mode 100644 poezio/plugin_e2ee.py
diff --git a/plugins/b64.py b/plugins/b64.py
new file mode 100644
index 00000000..57b2664f
--- /dev/null
+++ b/plugins/b64.py
@@ -0,0 +1,50 @@
+#! /usr/bin/env python3
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+#
+# Copyright © 2019 Maxime “pep” Buquet
+#
+# Distributed under terms of the zlib license.
+
+"""
+Usage
+-----
+
+Base64 encryption plugin.
+
+This plugin also respects security guidelines listed in XEP-0419.
+
+.. glossary::
+ /b64
+ **Usage:** ``/b64``
+
+ This command enables encryption of outgoing messages for the current
+ tab.
+"""
+
+from base64 import b64decode, b64encode
+from poezio.plugin_e2ee import E2EEPlugin
+from slixmpp import Message
+
+
+class Plugin(E2EEPlugin):
+ """Base64 Plugin"""
+
+ encryption_name = 'base64'
+ encryption_short_name = 'b64'
+ eme_ns = 'urn:xmpps:base64:0'
+
+ def decrypt(self, message: Message, _tab) -> None:
+ """
+ Decrypt base64
+ """
+ body = message['body']
+ message['body'] = b64decode(body.encode()).decode()
+
+ def encrypt(self, message: Message, _tab) -> None:
+ """
+ Encrypt to base64
+ """
+ # TODO: Stop using for this. Put the encoded payload in another element.
+ body = message['body']
+ message['body'] = b64encode(body.encode()).decode()
diff --git a/poezio/plugin_e2ee.py b/poezio/plugin_e2ee.py
new file mode 100644
index 00000000..38ada42f
--- /dev/null
+++ b/poezio/plugin_e2ee.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8 et ts=4 sts=4 sw=4
+#
+# Copyright © 2019 Maxime “pep” Buquet
+#
+# Distributed under terms of the zlib license. See COPYING file.
+
+"""
+ Interface for E2EE (End-to-end Encryption) plugins.
+"""
+
+from typing import Optional, Union
+
+from slixmpp import InvalidJID, JID, Message
+from poezio.tabs import ConversationTab, DynamicConversationTab, PrivateTab, MucTab
+from poezio.plugin import BasePlugin
+
+import logging
+log = logging.getLogger(__name__)
+
+
+ChatTabs = Union[
+ MucTab,
+ DynamicConversationTab,
+ PrivateTab,
+]
+
+
+class E2EEPlugin(BasePlugin):
+ """Interface for E2EE plugins"""
+
+ # At least one of encryption_name and encryption_short_name must be set
+ encryption_name = None # type: Optional[str]
+ encryption_short_name = None # type: Optional[str]
+
+ # Required.
+ eme_ns = None # type: Optional[str]
+
+ def init(self):
+ if self.encryption_name is None and self.encryption_short_name is None:
+ raise NotImplementedError
+
+ if self.eme_ns is None:
+ raise NotImplementedError
+
+ if self.encryption_name is None:
+ self.encryption_name = self.encryption_short_name
+ if self.encryption_short_name is None:
+ self.encryption_short_name = self.encryption_name
+
+ self.api.add_event_handler('muc_msg', self._decrypt)
+ self.api.add_event_handler('muc_say', self._encrypt)
+ self.api.add_event_handler('conversation_msg', self._decrypt)
+ self.api.add_event_handler('conversation_say', self._encrypt)
+ self.api.add_event_handler('private_msg', self._decrypt)
+ self.api.add_event_handler('private_say', self._encrypt)
+
+ self._enabled_tabs = set()
+
+ for tab_t in (DynamicConversationTab, PrivateTab, MucTab):
+ self.api.add_tab_command(
+ tab_t,
+ self.encryption_short_name,
+ self._toggle_tab,
+ usage='',
+ short='Toggle {} encryption for tab.'.format(self.encryption_name),
+ help='Toggle automatic {} encryption for tab.'.format(self.encryption_name),
+ )
+
+ ConversationTab.add_information_element(
+ self.encryption_short_name,
+ self._display_encryption_status,
+ )
+ MucTab.add_information_element(
+ self.encryption_short_name,
+ self._display_encryption_status,
+ )
+ PrivateTab.add_information_element(
+ self.encryption_short_name,
+ self._display_encryption_status,
+ )
+
+ def cleanup(self):
+ ConversationTab.remove_information_element(self.encryption_short_name)
+ MucTab.remove_information_element(self.encryption_short_name)
+ PrivateTab.remove_information_element(self.encryption_short_name)
+
+ def _display_encryption_status(self, jid_s: str) -> str:
+ """
+ Return information to display in the infobar if encryption is
+ enabled for the JID.
+ """
+
+ try:
+ jid = JID(jid_s)
+ except InvalidJID:
+ return ""
+
+ if jid in self._enabled_tabs:
+ return self.encryption_short_name
+ return ""
+
+ def _toggle_tab(self, _input: str) -> None:
+ jid = self.api.current_tab().jid # type: JID
+
+ if jid in self._enabled_tabs:
+ self._enabled_tabs.remove(jid)
+ self.api.information(
+ '{} encryption disabled for {}'.format(self.encryption_name, jid),
+ 'Info',
+ )
+ else:
+ self._enabled_tabs.add(jid)
+ self.api.information(
+ '{} encryption enabled for {}'.format(self.encryption_name, jid),
+ 'Info',
+ )
+
+ def _decrypt(self, message: Message, tab: ChatTabs) -> None:
+ if message['eme'] is None:
+ return None
+
+ if message['eme']['namespace'] != self.eme_ns:
+ return None
+
+ log.debug('Received %s message: %r', self.encryption_name, message['body'])
+
+ self.decrypt(message, tab)
+
+ log.debug('Decrypted %s message: %r', self.encryption_name, message['body'])
+ return None
+
+ def _encrypt(self, message: Message, tab: ChatTabs):
+ jid = tab.jid
+ if jid not in self._enabled_tabs:
+ return None
+
+ log.debug('Sending %s message: %r', self.encryption_name, message['body'])
+
+ message['eme']['namespace'] = self.eme_ns
+ message['eme']['name'] = self.encryption_name
+
+ self.encrypt(message, tab)
+
+ log.debug('Decrypted %s message: %r', self.encryption_name, message['body'])
+ return None
+
+ def decrypt(self, _message: Message, tab: ChatTabs):
+ """Decryption method"""
+ raise NotImplementedError
+
+ def encrypt(self, _message: Message, tab: ChatTabs):
+ """Encryption method"""
+ raise NotImplementedError
--
cgit v1.2.3