diff options
Diffstat (limited to 'poezio/plugin_e2ee.py')
-rw-r--r-- | poezio/plugin_e2ee.py | 287 |
1 files changed, 216 insertions, 71 deletions
diff --git a/poezio/plugin_e2ee.py b/poezio/plugin_e2ee.py index 9d1d4903..49f7b067 100644 --- a/poezio/plugin_e2ee.py +++ b/poezio/plugin_e2ee.py @@ -4,29 +4,42 @@ # # Copyright © 2019 Maxime “pep” Buquet <pep@bouah.net> # -# Distributed under terms of the zlib license. See COPYING file. +# Distributed under terms of the GPL-3.0+ license. See COPYING file. """ Interface for E2EE (End-to-end Encryption) plugins. """ -from typing import Callable, Dict, List, Optional, Union, Tuple, Set +from typing import ( + Callable, + Dict, + List, + Optional, + Union, + Tuple, + Set, + Type, +) from slixmpp import InvalidJID, JID, Message from slixmpp.xmlstream import StanzaBase +from slixmpp.xmlstream.handler import CoroutineCallback +from slixmpp.xmlstream.matcher import MatchXPath from poezio.tabs import ( ChatTab, ConversationTab, DynamicConversationTab, MucTab, PrivateTab, + RosterInfoTab, StaticConversationTab, ) from poezio.plugin import BasePlugin -from poezio.theming import get_theme, dump_tuple +from poezio.theming import Theme, get_theme, dump_tuple from poezio.config import config from poezio.decorators import command_args_parser +import asyncio from asyncio import iscoroutinefunction import logging @@ -97,30 +110,32 @@ class E2EEPlugin(BasePlugin): #: Encryption name, used in command descriptions, and logs. At least one #: of `encryption_name` and `encryption_short_name` must be set. - encryption_name = None # type: Optional[str] + encryption_name: Optional[str] = None #: Encryption short name, used as command name, and also to display #: encryption status in a tab. At least one of `encryption_name` and #: `encryption_short_name` must be set. - encryption_short_name = None # type: Optional[str] + encryption_short_name: Optional[str] = None #: Required. https://xmpp.org/extensions/xep-0380.html. - eme_ns = None # type: Optional[str] + eme_ns: Optional[str] = None #: Used to figure out what messages to attempt decryption for. Also used #: in combination with `tag_whitelist` to avoid removing encrypted tags - #: before sending. - encrypted_tags = None # type: Optional[List[Tuple[str, str]]] + #: before sending. If multiple tags are present, a handler will be + #: registered for each invididual tag/ns pair under <message/>, as opposed + #: to a single handler for all tags combined. + encrypted_tags: Optional[List[Tuple[str, str]]] = None # Static map, to be able to limit to one encryption mechanism per tab at a # time - _enabled_tabs = {} # type: Dict[JID, Callable] + _enabled_tabs: Dict[JID, Callable] = {} # Tabs that support this encryption mechanism - supported_tab_types = tuple() # type: Tuple[ChatTabs] + supported_tab_types: Tuple[Type[ChatTab], ...] = tuple() # States for each remote entity - trust_states = {'accepted': set(), 'rejected': set()} # type: Dict[str, Set[str]] + trust_states: Dict[str, Set[str]] = {'accepted': set(), 'rejected': set()} def init(self): self._all_trust_states = self.trust_states['accepted'].union( @@ -137,11 +152,24 @@ class E2EEPlugin(BasePlugin): if self.encryption_short_name is None: self.encryption_short_name = self.encryption_name + if not self.supported_tab_types: + raise NotImplementedError + # Ensure decryption is done before everything, so that other handlers # don't have to know about the encryption mechanism. - self.api.add_event_handler('muc_msg', self._decrypt, priority=0) - self.api.add_event_handler('conversation_msg', self._decrypt, priority=0) - self.api.add_event_handler('private_msg', self._decrypt, priority=0) + self.api.add_event_handler('muc_msg', self._decrypt_wrapper, priority=0) + self.api.add_event_handler('conversation_msg', self._decrypt_wrapper, priority=0) + self.api.add_event_handler('private_msg', self._decrypt_wrapper, priority=0) + + # Register a handler for each invididual tag/ns pair in encrypted_tags + # as well. as _msg handlers only include messages with a <body/>. + if self.encrypted_tags is not None: + default_ns = self.core.xmpp.default_ns + for i, (namespace, tag) in enumerate(self.encrypted_tags): + self.core.xmpp.register_handler(CoroutineCallback(f'EncryptedTag{i}', + MatchXPath(f'{{{default_ns}}}message/{{{namespace}}}{tag}'), + self._decrypt_encryptedtag, + )) # Ensure encryption is done after everything, so that whatever can be # encrypted is encrypted, and no plain element slips in. @@ -182,8 +210,8 @@ class E2EEPlugin(BasePlugin): self.encryption_short_name + '_fingerprint', self._command_show_fingerprints, usage='[jid]', - short='Show %s fingerprint(s) for a JID.' % self.encryption_short_name, - help='Show %s fingerprint(s) for a JID.' % self.encryption_short_name, + short=f'Show {self.encryption_short_name} fingerprint(s) for a JID.', + help=f'Show {self.encryption_short_name} fingerprint(s) for a JID.', ) ConversationTab.add_information_element( @@ -204,9 +232,10 @@ class E2EEPlugin(BasePlugin): def __load_encrypted_states(self) -> None: """Load previously stored encryption states for jids.""" for section in config.sections(): - value = config.get('encryption', section=section) + value = config.getstr('encryption', section=section) if value and value == self.encryption_short_name: - self._enabled_tabs[section] = self.encrypt + section_jid = JID(section) + self._enabled_tabs[section_jid] = self.encrypt def cleanup(self): ConversationTab.remove_information_element(self.encryption_short_name) @@ -224,69 +253,92 @@ class E2EEPlugin(BasePlugin): except InvalidJID: return "" - if self._encryption_enabled(jid): + if self._encryption_enabled(jid) and self.encryption_short_name: return " " + self.encryption_short_name return "" def _toggle_tab(self, _input: str) -> None: - jid = self.api.current_tab().jid # type: JID + tab = self.api.current_tab() + jid: JID = tab.jid if self._encryption_enabled(jid): del self._enabled_tabs[jid] + tab.e2e_encryption = None config.remove_and_save('encryption', section=jid) self.api.information( - '{} encryption disabled for {}'.format(self.encryption_name, jid), + f'{self.encryption_name} encryption disabled for {jid}', 'Info', ) - else: + elif self.encryption_short_name: self._enabled_tabs[jid] = self.encrypt + tab.e2e_encryption = self.encryption_name config.set_and_save('encryption', self.encryption_short_name, section=jid) self.api.information( - '{} encryption enabled for {}'.format(self.encryption_name, jid), + f'{self.encryption_name} encryption enabled for {jid}', 'Info', ) - def _show_fingerprints(self, jid: JID) -> None: + @staticmethod + def format_fingerprint(fingerprint: str, own: bool, theme: Theme) -> str: + return fingerprint + + async def _show_fingerprints(self, jid: JID) -> None: """Display encryption fingerprints for a JID.""" - fprs = self.get_fingerprints(jid) + theme = get_theme() + fprs = await self.get_fingerprints(jid) if len(fprs) == 1: + fp, own = fprs[0] + fingerprint = self.format_fingerprint(fp, own, theme) self.api.information( - 'Fingerprint for %s: %s' % (jid, fprs[0]), + f'Fingerprint for {jid}:\n{fingerprint}', 'Info', ) elif fprs: + fmt_fprs = map(lambda fp: self.format_fingerprint(fp[0], fp[1], theme), fprs) self.api.information( - 'Fingerprints for %s:\n\t%s' % (jid, '\n\t'.join(fprs)), + 'Fingerprints for %s:\n%s' % (jid, '\n\n'.join(fmt_fprs)), 'Info', ) else: self.api.information( - 'No fingerprints to display', + f'{jid}: No fingerprints to display', 'Info', ) @command_args_parser.quoted(0, 1) def _command_show_fingerprints(self, args: List[str]) -> None: - if not args and isinstance(self.api.current_tab(), self.supported_tab_types): - jid = self.api.current_tab().jid + tab = self.api.current_tab() + if not args and isinstance(tab, self.supported_tab_types): + jid = tab.jid + if isinstance(tab, MucTab): + jid = self.core.xmpp.boundjid.bare + elif not args and isinstance(tab, RosterInfoTab): + # Allow running the command without arguments in roster tab + jid = self.core.xmpp.boundjid.bare elif args: jid = args[0] else: + shortname = self.encryption_short_name self.api.information( - '%s_fingerprint: Couldn\'t deduce JID from context' % ( - self.encryption_short_name), + f'{shortname}_fingerprint: Couldn\'t deduce JID from context', 'Error', ) return None - self._show_fingerprints(JID(jid)) + asyncio.create_task(self._show_fingerprints(JID(jid))) @command_args_parser.quoted(2) def __command_set_state_global(self, args, state='') -> None: + if not args: + self.api.information( + 'No fingerprint provided to the command..', + 'Error', + ) + return jid, fpr = args if state not in self._all_trust_states: + shortname = self.encryption_short_name self.api.information( - 'Unknown state for plugin %s: %s' % ( - self.encryption_short_name, state), + f'Unknown state for plugin {shortname}: {state}', 'Error' ) return @@ -309,9 +361,9 @@ class E2EEPlugin(BasePlugin): return fpr = args[0] if state not in self._all_trust_states: + shortname = self.encryption_short_name self.api.information( - 'Unknown state for plugin %s: %s' % ( - self.encryption_short_name, state), + f'Unknown state for plugin {shortname}: {state}', 'Error', ) return @@ -330,6 +382,28 @@ class E2EEPlugin(BasePlugin): except NothingToEncrypt: return stanza except Exception as exc: + jid = stanza['from'] + tab = self.core.tabs.by_name_and_class(jid, ChatTab) + msg = ' \n\x19%s}Could not decrypt message: %s' % ( + dump_tuple(get_theme().COLOR_CHAR_NACK), + exc, + ) + # XXX: check before commit. Do we not nack in MUCs? + if tab and not isinstance(tab, MucTab): + tab.nack_message(msg, stanza['id'], stanza['to']) + # TODO: display exceptions to the user properly + log.error('Exception in encrypt:', exc_info=True) + return None + return result + + async def _decrypt_wrapper(self, stanza: Message, tab: Optional[ChatTabs]) -> None: + """ + Wrapper around _decrypt() to handle errors and display the message after encryption. + """ + try: + # pylint: disable=unexpected-keyword-arg + await self._decrypt(stanza, tab, passthrough=True) + except Exception as exc: jid = stanza['to'] tab = self.core.tabs.by_name_and_class(jid, ChatTab) msg = ' \n\x19%s}Could not send message: %s' % ( @@ -337,27 +411,57 @@ class E2EEPlugin(BasePlugin): exc, ) # XXX: check before commit. Do we not nack in MUCs? - if not isinstance(tab, MucTab): + if tab and not isinstance(tab, MucTab): tab.nack_message(msg, stanza['id'], stanza['from']) # TODO: display exceptions to the user properly - log.error('Exception in encrypt:', exc_info=True) + log.error('Exception in decrypt:', exc_info=True) return None - return result + return None - def _decrypt(self, message: Message, tab: ChatTabs) -> None: + async def _decrypt_encryptedtag(self, stanza: Message) -> None: + """ + Handler to decrypt encrypted_tags elements that are matched separately + from other messages because the default 'message' handler that we use + only matches messages containing a <body/>. + """ + # If the message contains a body, it will already be handled by the + # other handler. If not, pass it to the handler. + if stanza.xml.find(f'{{{self.core.xmpp.default_ns}}}body') is not None: + return None + + mfrom = stanza['from'] + + # Find what tab this message corresponds to. + if stanza['type'] == 'groupchat': # MUC + tab = self.core.tabs.by_name_and_class( + name=mfrom.bare, cls=MucTab, + ) + elif self.core.handler.is_known_muc_pm(stanza, mfrom): # MUC-PM + tab = self.core.tabs.by_name_and_class( + name=mfrom.full, cls=PrivateTab, + ) + else: # 1:1 + tab = self.core.get_conversation_by_jid( + jid=JID(mfrom.bare), + create=False, + fallback_barejid=True, + ) + log.debug('Found tab %r for encrypted message', tab) + await self._decrypt_wrapper(stanza, tab) + + async def _decrypt(self, message: Message, tab: Optional[ChatTabs], passthrough: bool = True) -> None: - has_eme = False - if message.xml.find('{%s}%s' % (EME_NS, EME_TAG)) is not None and \ + has_eme: bool = False + if message.xml.find(f'{{{EME_NS}}}{EME_TAG}') is not None and \ message['eme']['namespace'] == self.eme_ns: has_eme = True - has_encrypted_tag = False + has_encrypted_tag: bool = False if not has_eme and self.encrypted_tags is not None: + tmp: bool = True for (namespace, tag) in self.encrypted_tags: - if message.xml.find('{%s}%s' % (namespace, tag)) is not None: - # TODO: count all encrypted tags. - has_encrypted_tag = True - break + tmp = tmp and message.xml.find(f'{{{namespace}}}{tag}') is not None + has_encrypted_tag = tmp if not has_eme and not has_encrypted_tag: return None @@ -368,39 +472,77 @@ class E2EEPlugin(BasePlugin): # comes from a semi-anonymous MUC for example. Some plugins might be # fine with this so let them handle it. jid = message['from'] - muctab = tab - if isinstance(muctab, PrivateTab): + muctab: Optional[MucTab] = None + if isinstance(tab, PrivateTab): muctab = tab.parent_muc jid = None - if isinstance(muctab, MucTab): + if muctab is not None or isinstance(tab, MucTab): + if muctab is None: + muctab = tab # type: ignore nick = message['from'].resource - for user in muctab.users: - if user.nick == nick: - jid = user.jid or None - break + user = muctab.get_user_by_name(nick) # type: ignore + if user is not None: + jid = user.jid or None - self.decrypt(message, jid, tab) + # Call the enabled encrypt method + func = self.decrypt + if iscoroutinefunction(func): + # pylint: disable=unexpected-keyword-arg + await func(message, jid, tab, passthrough=True) # type: ignore + else: + # pylint: disable=unexpected-keyword-arg + func(message, jid, tab) # type: ignore log.debug('Decrypted %s message: %r', self.encryption_name, message['body']) return None - async def _encrypt(self, stanza: StanzaBase) -> Optional[StanzaBase]: + async def _encrypt(self, stanza: StanzaBase, passthrough: bool = True) -> Optional[StanzaBase]: + # TODO: Let through messages that contain elements that don't need to + # be encrypted even in an encrypted context, such as MUC mediated + # invites, etc. + # What to do when they're mixed with other elements? It probably + # depends on the element. Maybe they can be mixed with + # `self.tag_whitelist` that are already assumed to be sent as plain by + # the E2EE plugin. + # They might not be accompanied by a <body/> most of the time, nor by + # an encrypted tag. + if not isinstance(stanza, Message) or stanza['type'] not in ('normal', 'chat', 'groupchat'): raise NothingToEncrypt() message = stanza + + # Is this message already encrypted? Do we need to do all these + # checks? Such as an OMEMO heartbeat. + has_encrypted_tag: bool = False + if self.encrypted_tags is not None: + tmp: bool = True + for (namespace, tag) in self.encrypted_tags: + tmp = tmp and message.xml.find(f'{{{namespace}}}{tag}') is not None + has_encrypted_tag = tmp + + if has_encrypted_tag: + log.debug('Message already contains encrypted tags.') + raise NothingToEncrypt() + # Find who to encrypt to. If in a groupchat this can be multiple JIDs. # It is possible that we are not able to find a jid (e.g., semi-anon # MUCs). Let the plugin decide what to do with this information. - jids = [message['to']] # type: Optional[List[JID]] + jids: Optional[List[JID]] = [message['to']] tab = self.core.tabs.by_jid(message['to']) - if tab is None: # When does that ever happen? - log.debug('Attempting to encrypt a message to \'%s\' ' - 'that is not attached to a Tab. ?! Aborting ' - 'encryption.', message['to']) - return None + if tab is None and message['to'].resource: + # Redo the search with the bare JID + tab = self.core.tabs.by_jid(message['to'].bare) + + if tab is None: # Possible message sent directly by the e2ee lib? + log.debug( + 'A message we do not have a tab for ' + 'is being sent to \'%s\'. \n%r.', + message['to'], + message, + ) parent = None if isinstance(tab, PrivateTab): @@ -433,19 +575,22 @@ class E2EEPlugin(BasePlugin): if user.jid.bare: jids.append(user.jid) - if not self._encryption_enabled(tab.jid): + if tab and not self._encryption_enabled(tab.jid): raise NothingToEncrypt() log.debug('Sending %s message', self.encryption_name) has_body = message.xml.find('{%s}%s' % (JCLIENT_NS, 'body')) is not None + if not self._encryption_enabled(tab.jid): + raise NothingToEncrypt() + # Drop all messages that don't contain a body if the plugin doesn't do # Stanza Encryption if not self.stanza_encryption and not has_body: log.debug( '%s plugin: Dropping message as it contains no body, and ' - 'not doesn\'t do stanza encryption', + 'doesn\'t do stanza encryption', self.encryption_name, ) return None @@ -479,7 +624,7 @@ class E2EEPlugin(BasePlugin): if self.encrypted_tags is not None: whitelist += self.encrypted_tags - tag_whitelist = {'{%s}%s' % tag for tag in whitelist} + tag_whitelist = {f'{{{ns}}}{tag}' for (ns, tag) in whitelist} for elem in message.xml[:]: if elem.tag not in tag_whitelist: @@ -490,15 +635,15 @@ class E2EEPlugin(BasePlugin): def store_trust(self, jid: JID, state: str, fingerprint: str) -> None: """Store trust for a fingerprint and a jid.""" - option_name = '%s:%s' % (self.encryption_short_name, fingerprint) + option_name = f'{self.encryption_short_name}:{fingerprint}' config.silent_set(option=option_name, value=state, section=jid) def fetch_trust(self, jid: JID, fingerprint: str) -> str: """Fetch trust of a fingerprint and a jid.""" - option_name = '%s:%s' % (self.encryption_short_name, fingerprint) - return config.get(option=option_name, section=jid) + option_name = f'{self.encryption_short_name}:{fingerprint}' + return config.getstr(option=option_name, section=jid) - async def decrypt(self, message: Message, jid: Optional[JID], tab: ChatTab): + async def decrypt(self, message: Message, jid: Optional[JID], tab: Optional[ChatTab]): """Decryption method This is a method the plugin must implement. It is expected that this @@ -530,7 +675,7 @@ class E2EEPlugin(BasePlugin): raise NotImplementedError - def get_fingerprints(self, jid: JID) -> List[str]: + async def get_fingerprints(self, jid: JID) -> List[Tuple[str, bool]]: """Show fingerprint(s) for this encryption method and JID. To overload in plugins. |