#! /usr/bin/env python3 # -*- coding: utf-8 -*- # vim:fenc=utf-8 # # Copyright © 2018 Maxime “pep” Buquet # # Distributed under terms of the zlib license. """ OMEMO Plugin. """ import os import asyncio import logging from typing import List from poezio.plugin_e2ee import E2EEPlugin from poezio.xdg import DATA_HOME from poezio.tabs import DynamicConversationTab, StaticConversationTab, MucTab from omemo.exceptions import MissingBundleException from slixmpp import JID from slixmpp.stanza import Message from slixmpp.exceptions import IqError, IqTimeout from slixmpp_omemo import PluginCouldNotLoad, MissingOwnKey, NoAvailableSession from slixmpp_omemo import UndecidedException, UntrustedException, EncryptionPrepareException import slixmpp_omemo log = logging.getLogger(__name__) class Plugin(E2EEPlugin): """OMEMO (XEP-0384) Plugin""" encryption_name = 'omemo' eme_ns = slixmpp_omemo.OMEMO_BASE_NS replace_body_with_eme = True stanza_encryption = False encrypted_tags = [ (slixmpp_omemo.OMEMO_BASE_NS, 'encrypted'), ] # TODO: Make "unverified" state depend on a config option that includes it # either in accepted or rejected states. trust_states = {'accepted': {'verified', 'unverified'}, 'rejected': {'untrusted'}} supported_tab_types = (DynamicConversationTab, StaticConversationTab, MucTab) def init(self) -> None: super().init() self.info = lambda i: self.api.information(i, 'Info') data_dir = os.path.join(DATA_HOME, 'omemo', self.core.xmpp.boundjid.bare) os.makedirs(data_dir, exist_ok=True) try: self.core.xmpp.register_plugin( 'xep_0384', { 'data_dir': data_dir, }, module=slixmpp_omemo, ) # OMEMO except (PluginCouldNotLoad,): log.exception('And error occured when loading the omemo plugin.') asyncio.ensure_future( self.core.xmpp['xep_0384'].session_start(self.core.xmpp.boundjid) ) def display_error(self, txt) -> None: self.api.information(txt, 'Error') def get_fingerprints(self, jid: JID) -> List[str]: devices = self.core.xmpp['xep_0384'].get_trust_for_jid(jid) # XXX: What to do with did -> None entries? # XXX: What to do with the active/inactive devices differenciation? # For now I'll merge both. We should probably display them separately # later on. devices['active'].update(devices['inactive']) return [ slixmpp_omemo.fp_from_ik(trust['key']) for trust in devices['active'].values() if trust is not None ] def decrypt(self, message: Message, tab, allow_untrusted=False) -> None: body = None try: mfrom = message['from'] encrypted = message['omemo_encrypted'] body = self.core.xmpp['xep_0384'].decrypt_message(encrypted, mfrom, allow_untrusted) body = body.decode('utf-8') except (MissingOwnKey,): # The message is missing our own key, it was not encrypted for # us, and we can't decrypt it. self.display_error( 'I can\'t decrypt this message as it is not encrypted for me.' ) except (NoAvailableSession,) as exn: # We received a message from that contained a session that we # don't know about (deleted session storage, etc.). We can't # decrypt the message, and it's going to be lost. # Here, as we need to initiate a new encrypted session, it is # best if we send an encrypted message directly. XXX: Is it # where we talk about self-healing messages? self.display_error( 'I can\'t decrypt this message as it uses an encrypted ' 'session I don\'t know about.', ) except (UndecidedException, UntrustedException) as exn: # We received a message from an untrusted device. We can # choose to decrypt the message nonetheless, with the # `allow_untrusted` flag on the `decrypt_message` call, which # we will do here. This is only possible for decryption, # encryption will require us to decide if we trust the device # or not. Clients _should_ indicate that the message was not # trusted, or in undecided state, if they decide to decrypt it # anyway. self.display_error( "Your device '%s' is not in my trusted devices." % exn.device, ) # We resend, setting the `allow_untrusted` parameter to True. self.decrypt(message, tab, allow_untrusted=True) except (EncryptionPrepareException,): # Slixmpp tried its best, but there were errors it couldn't # resolve. At this point you should have seen other exceptions # and given a chance to resolve them already. self.display_error('I was not able to decrypt the message.') except (Exception,) as exn: self.display_error('An error occured while attempting decryption.\n%r' % exn) raise if body is not None: message['body'] = body async def encrypt(self, message: Message, _tab) -> None: mto = message['to'] body = message['body'] expect_problems = {} # type: Optional[Dict[JID, List[int]]] while True: try: # `encrypt_message` excepts the plaintext to be sent, a list of # bare JIDs to encrypt to, and optionally a dict of problems to # expect per bare JID. # # Note that this function returns an `` object, # and not a full Message stanza. This combined with the # `recipients` parameter that requires for a list of JIDs, # allows you to encrypt for 1:1 as well as groupchats (MUC). # # TODO: Document expect_problems # TODO: Handle multiple recipients (MUCs) recipients = [mto] encrypt = await self.core.xmpp['xep_0384'].encrypt_message(body, recipients, expect_problems) message.append(encrypt) return None except UndecidedException as exn: # The library prevents us from sending a message to an # untrusted/undecided barejid, so we need to make a decision here. # This is where you prompt your user to ask what to do. In # this bot we will automatically trust undecided recipients. self.core.xmpp['xep_0384'].trust(exn.bare_jid, exn.device, exn.ik) # TODO: catch NoEligibleDevicesException except EncryptionPrepareException as exn: log.debug('FOO: EncryptionPrepareException: %r', exn.errors) for error in exn.errors: if isinstance(error, MissingBundleException): self.display_error( 'Could not find keys for device "%d" of recipient "%s". Skipping.' % (error.device, error.bare_jid), ) jid = JID(error.bare_jid) device_list = expect_problems.setdefault(jid, []) device_list.append(error.device) except (IqError, IqTimeout) as exn: self.display_error( 'An error occured while fetching information on a recipient.\n%r' % exn, ) return None return None