diff options
-rw-r--r-- | plugins/omemo_plugin.py | 184 |
1 files changed, 0 insertions, 184 deletions
diff --git a/plugins/omemo_plugin.py b/plugins/omemo_plugin.py deleted file mode 100644 index 897a1ad6..00000000 --- a/plugins/omemo_plugin.py +++ /dev/null @@ -1,184 +0,0 @@ -#! /usr/bin/env python3 -# -*- coding: utf-8 -*- -# vim:fenc=utf-8 -# -# Copyright © 2018 Maxime “pep” Buquet <pep@bouah.net> -# -# Distributed under terms of the zlib license. -""" - OMEMO Plugin. -""" - -import os -import asyncio -import logging -from typing import List - -from poezio.plugin_e2ee import E2EEPlugin -from poezio.xdg import DATA_HOME -from poezio.tabs import DynamicConversationTab, StaticConversationTab, MucTab - -from omemo.exceptions import MissingBundleException -from slixmpp import JID -from slixmpp.stanza import Message -from slixmpp.exceptions import IqError, IqTimeout -from slixmpp_omemo import PluginCouldNotLoad, MissingOwnKey, NoAvailableSession -from slixmpp_omemo import UndecidedException, UntrustedException, EncryptionPrepareException -import slixmpp_omemo - -log = logging.getLogger(__name__) - - -class Plugin(E2EEPlugin): - """OMEMO (XEP-0384) Plugin""" - - encryption_name = 'omemo' - eme_ns = slixmpp_omemo.OMEMO_BASE_NS - replace_body_with_eme = True - stanza_encryption = False - - encrypted_tags = [ - (slixmpp_omemo.OMEMO_BASE_NS, 'encrypted'), - ] - # TODO: Make "unverified" state depend on a config option that includes it - # either in accepted or rejected states. - trust_states = {'accepted': {'verified', 'unverified'}, 'rejected': {'untrusted'}} - supported_tab_types = (DynamicConversationTab, StaticConversationTab, MucTab) - - def init(self) -> None: - super().init() - - self.info = lambda i: self.api.information(i, 'Info') - - data_dir = os.path.join(DATA_HOME, 'omemo', self.core.xmpp.boundjid.bare) - os.makedirs(data_dir, exist_ok=True) - - try: - self.core.xmpp.register_plugin( - 'xep_0384', { - 'data_dir': data_dir, - }, - module=slixmpp_omemo, - ) # OMEMO - except (PluginCouldNotLoad,): - log.exception('And error occured when loading the omemo plugin.') - - asyncio.ensure_future( - self.core.xmpp['xep_0384'].session_start(self.core.xmpp.boundjid) - ) - - def display_error(self, txt) -> None: - self.api.information(txt, 'Error') - - def get_fingerprints(self, jid: JID) -> List[str]: - devices = self.core.xmpp['xep_0384'].get_trust_for_jid(jid) - - # XXX: What to do with did -> None entries? - # XXX: What to do with the active/inactive devices differenciation? - # For now I'll merge both. We should probably display them separately - # later on. - devices['active'].update(devices['inactive']) - return [ - slixmpp_omemo.fp_from_ik(trust['key']) - for trust in devices['active'].values() - if trust is not None - ] - - def decrypt(self, message: Message, tab, allow_untrusted=False) -> None: - - body = None - try: - mfrom = message['from'] - encrypted = message['omemo_encrypted'] - body = self.core.xmpp['xep_0384'].decrypt_message(encrypted, mfrom, allow_untrusted) - body = body.decode('utf-8') - except (MissingOwnKey,): - # The message is missing our own key, it was not encrypted for - # us, and we can't decrypt it. - self.display_error( - 'I can\'t decrypt this message as it is not encrypted for me.' - ) - except (NoAvailableSession,) as exn: - # We received a message from that contained a session that we - # don't know about (deleted session storage, etc.). We can't - # decrypt the message, and it's going to be lost. - # Here, as we need to initiate a new encrypted session, it is - # best if we send an encrypted message directly. XXX: Is it - # where we talk about self-healing messages? - self.display_error( - 'I can\'t decrypt this message as it uses an encrypted ' - 'session I don\'t know about.', - ) - except (UndecidedException, UntrustedException) as exn: - # We received a message from an untrusted device. We can - # choose to decrypt the message nonetheless, with the - # `allow_untrusted` flag on the `decrypt_message` call, which - # we will do here. This is only possible for decryption, - # encryption will require us to decide if we trust the device - # or not. Clients _should_ indicate that the message was not - # trusted, or in undecided state, if they decide to decrypt it - # anyway. - self.display_error( - "Your device '%s' is not in my trusted devices." % exn.device, - ) - # We resend, setting the `allow_untrusted` parameter to True. - self.decrypt(message, tab, allow_untrusted=True) - except (EncryptionPrepareException,): - # Slixmpp tried its best, but there were errors it couldn't - # resolve. At this point you should have seen other exceptions - # and given a chance to resolve them already. - self.display_error('I was not able to decrypt the message.') - except (Exception,) as exn: - self.display_error('An error occured while attempting decryption.\n%r' % exn) - raise - - if body is not None: - message['body'] = body - - async def encrypt(self, message: Message, _tab) -> None: - mto = message['to'] - body = message['body'] - expect_problems = {} # type: Optional[Dict[JID, List[int]]] - - while True: - try: - # `encrypt_message` excepts the plaintext to be sent, a list of - # bare JIDs to encrypt to, and optionally a dict of problems to - # expect per bare JID. - # - # Note that this function returns an `<encrypted/>` object, - # and not a full Message stanza. This combined with the - # `recipients` parameter that requires for a list of JIDs, - # allows you to encrypt for 1:1 as well as groupchats (MUC). - # - # TODO: Document expect_problems - # TODO: Handle multiple recipients (MUCs) - recipients = [mto] - encrypt = await self.core.xmpp['xep_0384'].encrypt_message(body, recipients, expect_problems) - message.append(encrypt) - return None - except UndecidedException as exn: - # The library prevents us from sending a message to an - # untrusted/undecided barejid, so we need to make a decision here. - # This is where you prompt your user to ask what to do. In - # this bot we will automatically trust undecided recipients. - self.core.xmpp['xep_0384'].trust(exn.bare_jid, exn.device, exn.ik) - # TODO: catch NoEligibleDevicesException - except EncryptionPrepareException as exn: - log.debug('FOO: EncryptionPrepareException: %r', exn.errors) - for error in exn.errors: - if isinstance(error, MissingBundleException): - self.display_error( - 'Could not find keys for device "%d" of recipient "%s". Skipping.' % - (error.device, error.bare_jid), - ) - jid = JID(error.bare_jid) - device_list = expect_problems.setdefault(jid, []) - device_list.append(error.device) - except (IqError, IqTimeout) as exn: - self.display_error( - 'An error occured while fetching information on a recipient.\n%r' % exn, - ) - return None - - return None |