summaryrefslogtreecommitdiff
path: root/poezio/plugin_e2ee.py
diff options
context:
space:
mode:
Diffstat (limited to 'poezio/plugin_e2ee.py')
-rw-r--r--poezio/plugin_e2ee.py287
1 files changed, 216 insertions, 71 deletions
diff --git a/poezio/plugin_e2ee.py b/poezio/plugin_e2ee.py
index 9d1d4903..49f7b067 100644
--- a/poezio/plugin_e2ee.py
+++ b/poezio/plugin_e2ee.py
@@ -4,29 +4,42 @@
#
# Copyright © 2019 Maxime “pep” Buquet <pep@bouah.net>
#
-# Distributed under terms of the zlib license. See COPYING file.
+# Distributed under terms of the GPL-3.0+ license. See COPYING file.
"""
Interface for E2EE (End-to-end Encryption) plugins.
"""
-from typing import Callable, Dict, List, Optional, Union, Tuple, Set
+from typing import (
+ Callable,
+ Dict,
+ List,
+ Optional,
+ Union,
+ Tuple,
+ Set,
+ Type,
+)
from slixmpp import InvalidJID, JID, Message
from slixmpp.xmlstream import StanzaBase
+from slixmpp.xmlstream.handler import CoroutineCallback
+from slixmpp.xmlstream.matcher import MatchXPath
from poezio.tabs import (
ChatTab,
ConversationTab,
DynamicConversationTab,
MucTab,
PrivateTab,
+ RosterInfoTab,
StaticConversationTab,
)
from poezio.plugin import BasePlugin
-from poezio.theming import get_theme, dump_tuple
+from poezio.theming import Theme, get_theme, dump_tuple
from poezio.config import config
from poezio.decorators import command_args_parser
+import asyncio
from asyncio import iscoroutinefunction
import logging
@@ -97,30 +110,32 @@ class E2EEPlugin(BasePlugin):
#: Encryption name, used in command descriptions, and logs. At least one
#: of `encryption_name` and `encryption_short_name` must be set.
- encryption_name = None # type: Optional[str]
+ encryption_name: Optional[str] = None
#: Encryption short name, used as command name, and also to display
#: encryption status in a tab. At least one of `encryption_name` and
#: `encryption_short_name` must be set.
- encryption_short_name = None # type: Optional[str]
+ encryption_short_name: Optional[str] = None
#: Required. https://xmpp.org/extensions/xep-0380.html.
- eme_ns = None # type: Optional[str]
+ eme_ns: Optional[str] = None
#: Used to figure out what messages to attempt decryption for. Also used
#: in combination with `tag_whitelist` to avoid removing encrypted tags
- #: before sending.
- encrypted_tags = None # type: Optional[List[Tuple[str, str]]]
+ #: before sending. If multiple tags are present, a handler will be
+ #: registered for each invididual tag/ns pair under <message/>, as opposed
+ #: to a single handler for all tags combined.
+ encrypted_tags: Optional[List[Tuple[str, str]]] = None
# Static map, to be able to limit to one encryption mechanism per tab at a
# time
- _enabled_tabs = {} # type: Dict[JID, Callable]
+ _enabled_tabs: Dict[JID, Callable] = {}
# Tabs that support this encryption mechanism
- supported_tab_types = tuple() # type: Tuple[ChatTabs]
+ supported_tab_types: Tuple[Type[ChatTab], ...] = tuple()
# States for each remote entity
- trust_states = {'accepted': set(), 'rejected': set()} # type: Dict[str, Set[str]]
+ trust_states: Dict[str, Set[str]] = {'accepted': set(), 'rejected': set()}
def init(self):
self._all_trust_states = self.trust_states['accepted'].union(
@@ -137,11 +152,24 @@ class E2EEPlugin(BasePlugin):
if self.encryption_short_name is None:
self.encryption_short_name = self.encryption_name
+ if not self.supported_tab_types:
+ raise NotImplementedError
+
# Ensure decryption is done before everything, so that other handlers
# don't have to know about the encryption mechanism.
- self.api.add_event_handler('muc_msg', self._decrypt, priority=0)
- self.api.add_event_handler('conversation_msg', self._decrypt, priority=0)
- self.api.add_event_handler('private_msg', self._decrypt, priority=0)
+ self.api.add_event_handler('muc_msg', self._decrypt_wrapper, priority=0)
+ self.api.add_event_handler('conversation_msg', self._decrypt_wrapper, priority=0)
+ self.api.add_event_handler('private_msg', self._decrypt_wrapper, priority=0)
+
+ # Register a handler for each invididual tag/ns pair in encrypted_tags
+ # as well. as _msg handlers only include messages with a <body/>.
+ if self.encrypted_tags is not None:
+ default_ns = self.core.xmpp.default_ns
+ for i, (namespace, tag) in enumerate(self.encrypted_tags):
+ self.core.xmpp.register_handler(CoroutineCallback(f'EncryptedTag{i}',
+ MatchXPath(f'{{{default_ns}}}message/{{{namespace}}}{tag}'),
+ self._decrypt_encryptedtag,
+ ))
# Ensure encryption is done after everything, so that whatever can be
# encrypted is encrypted, and no plain element slips in.
@@ -182,8 +210,8 @@ class E2EEPlugin(BasePlugin):
self.encryption_short_name + '_fingerprint',
self._command_show_fingerprints,
usage='[jid]',
- short='Show %s fingerprint(s) for a JID.' % self.encryption_short_name,
- help='Show %s fingerprint(s) for a JID.' % self.encryption_short_name,
+ short=f'Show {self.encryption_short_name} fingerprint(s) for a JID.',
+ help=f'Show {self.encryption_short_name} fingerprint(s) for a JID.',
)
ConversationTab.add_information_element(
@@ -204,9 +232,10 @@ class E2EEPlugin(BasePlugin):
def __load_encrypted_states(self) -> None:
"""Load previously stored encryption states for jids."""
for section in config.sections():
- value = config.get('encryption', section=section)
+ value = config.getstr('encryption', section=section)
if value and value == self.encryption_short_name:
- self._enabled_tabs[section] = self.encrypt
+ section_jid = JID(section)
+ self._enabled_tabs[section_jid] = self.encrypt
def cleanup(self):
ConversationTab.remove_information_element(self.encryption_short_name)
@@ -224,69 +253,92 @@ class E2EEPlugin(BasePlugin):
except InvalidJID:
return ""
- if self._encryption_enabled(jid):
+ if self._encryption_enabled(jid) and self.encryption_short_name:
return " " + self.encryption_short_name
return ""
def _toggle_tab(self, _input: str) -> None:
- jid = self.api.current_tab().jid # type: JID
+ tab = self.api.current_tab()
+ jid: JID = tab.jid
if self._encryption_enabled(jid):
del self._enabled_tabs[jid]
+ tab.e2e_encryption = None
config.remove_and_save('encryption', section=jid)
self.api.information(
- '{} encryption disabled for {}'.format(self.encryption_name, jid),
+ f'{self.encryption_name} encryption disabled for {jid}',
'Info',
)
- else:
+ elif self.encryption_short_name:
self._enabled_tabs[jid] = self.encrypt
+ tab.e2e_encryption = self.encryption_name
config.set_and_save('encryption', self.encryption_short_name, section=jid)
self.api.information(
- '{} encryption enabled for {}'.format(self.encryption_name, jid),
+ f'{self.encryption_name} encryption enabled for {jid}',
'Info',
)
- def _show_fingerprints(self, jid: JID) -> None:
+ @staticmethod
+ def format_fingerprint(fingerprint: str, own: bool, theme: Theme) -> str:
+ return fingerprint
+
+ async def _show_fingerprints(self, jid: JID) -> None:
"""Display encryption fingerprints for a JID."""
- fprs = self.get_fingerprints(jid)
+ theme = get_theme()
+ fprs = await self.get_fingerprints(jid)
if len(fprs) == 1:
+ fp, own = fprs[0]
+ fingerprint = self.format_fingerprint(fp, own, theme)
self.api.information(
- 'Fingerprint for %s: %s' % (jid, fprs[0]),
+ f'Fingerprint for {jid}:\n{fingerprint}',
'Info',
)
elif fprs:
+ fmt_fprs = map(lambda fp: self.format_fingerprint(fp[0], fp[1], theme), fprs)
self.api.information(
- 'Fingerprints for %s:\n\t%s' % (jid, '\n\t'.join(fprs)),
+ 'Fingerprints for %s:\n%s' % (jid, '\n\n'.join(fmt_fprs)),
'Info',
)
else:
self.api.information(
- 'No fingerprints to display',
+ f'{jid}: No fingerprints to display',
'Info',
)
@command_args_parser.quoted(0, 1)
def _command_show_fingerprints(self, args: List[str]) -> None:
- if not args and isinstance(self.api.current_tab(), self.supported_tab_types):
- jid = self.api.current_tab().jid
+ tab = self.api.current_tab()
+ if not args and isinstance(tab, self.supported_tab_types):
+ jid = tab.jid
+ if isinstance(tab, MucTab):
+ jid = self.core.xmpp.boundjid.bare
+ elif not args and isinstance(tab, RosterInfoTab):
+ # Allow running the command without arguments in roster tab
+ jid = self.core.xmpp.boundjid.bare
elif args:
jid = args[0]
else:
+ shortname = self.encryption_short_name
self.api.information(
- '%s_fingerprint: Couldn\'t deduce JID from context' % (
- self.encryption_short_name),
+ f'{shortname}_fingerprint: Couldn\'t deduce JID from context',
'Error',
)
return None
- self._show_fingerprints(JID(jid))
+ asyncio.create_task(self._show_fingerprints(JID(jid)))
@command_args_parser.quoted(2)
def __command_set_state_global(self, args, state='') -> None:
+ if not args:
+ self.api.information(
+ 'No fingerprint provided to the command..',
+ 'Error',
+ )
+ return
jid, fpr = args
if state not in self._all_trust_states:
+ shortname = self.encryption_short_name
self.api.information(
- 'Unknown state for plugin %s: %s' % (
- self.encryption_short_name, state),
+ f'Unknown state for plugin {shortname}: {state}',
'Error'
)
return
@@ -309,9 +361,9 @@ class E2EEPlugin(BasePlugin):
return
fpr = args[0]
if state not in self._all_trust_states:
+ shortname = self.encryption_short_name
self.api.information(
- 'Unknown state for plugin %s: %s' % (
- self.encryption_short_name, state),
+ f'Unknown state for plugin {shortname}: {state}',
'Error',
)
return
@@ -330,6 +382,28 @@ class E2EEPlugin(BasePlugin):
except NothingToEncrypt:
return stanza
except Exception as exc:
+ jid = stanza['from']
+ tab = self.core.tabs.by_name_and_class(jid, ChatTab)
+ msg = ' \n\x19%s}Could not decrypt message: %s' % (
+ dump_tuple(get_theme().COLOR_CHAR_NACK),
+ exc,
+ )
+ # XXX: check before commit. Do we not nack in MUCs?
+ if tab and not isinstance(tab, MucTab):
+ tab.nack_message(msg, stanza['id'], stanza['to'])
+ # TODO: display exceptions to the user properly
+ log.error('Exception in encrypt:', exc_info=True)
+ return None
+ return result
+
+ async def _decrypt_wrapper(self, stanza: Message, tab: Optional[ChatTabs]) -> None:
+ """
+ Wrapper around _decrypt() to handle errors and display the message after encryption.
+ """
+ try:
+ # pylint: disable=unexpected-keyword-arg
+ await self._decrypt(stanza, tab, passthrough=True)
+ except Exception as exc:
jid = stanza['to']
tab = self.core.tabs.by_name_and_class(jid, ChatTab)
msg = ' \n\x19%s}Could not send message: %s' % (
@@ -337,27 +411,57 @@ class E2EEPlugin(BasePlugin):
exc,
)
# XXX: check before commit. Do we not nack in MUCs?
- if not isinstance(tab, MucTab):
+ if tab and not isinstance(tab, MucTab):
tab.nack_message(msg, stanza['id'], stanza['from'])
# TODO: display exceptions to the user properly
- log.error('Exception in encrypt:', exc_info=True)
+ log.error('Exception in decrypt:', exc_info=True)
return None
- return result
+ return None
- def _decrypt(self, message: Message, tab: ChatTabs) -> None:
+ async def _decrypt_encryptedtag(self, stanza: Message) -> None:
+ """
+ Handler to decrypt encrypted_tags elements that are matched separately
+ from other messages because the default 'message' handler that we use
+ only matches messages containing a <body/>.
+ """
+ # If the message contains a body, it will already be handled by the
+ # other handler. If not, pass it to the handler.
+ if stanza.xml.find(f'{{{self.core.xmpp.default_ns}}}body') is not None:
+ return None
+
+ mfrom = stanza['from']
+
+ # Find what tab this message corresponds to.
+ if stanza['type'] == 'groupchat': # MUC
+ tab = self.core.tabs.by_name_and_class(
+ name=mfrom.bare, cls=MucTab,
+ )
+ elif self.core.handler.is_known_muc_pm(stanza, mfrom): # MUC-PM
+ tab = self.core.tabs.by_name_and_class(
+ name=mfrom.full, cls=PrivateTab,
+ )
+ else: # 1:1
+ tab = self.core.get_conversation_by_jid(
+ jid=JID(mfrom.bare),
+ create=False,
+ fallback_barejid=True,
+ )
+ log.debug('Found tab %r for encrypted message', tab)
+ await self._decrypt_wrapper(stanza, tab)
+
+ async def _decrypt(self, message: Message, tab: Optional[ChatTabs], passthrough: bool = True) -> None:
- has_eme = False
- if message.xml.find('{%s}%s' % (EME_NS, EME_TAG)) is not None and \
+ has_eme: bool = False
+ if message.xml.find(f'{{{EME_NS}}}{EME_TAG}') is not None and \
message['eme']['namespace'] == self.eme_ns:
has_eme = True
- has_encrypted_tag = False
+ has_encrypted_tag: bool = False
if not has_eme and self.encrypted_tags is not None:
+ tmp: bool = True
for (namespace, tag) in self.encrypted_tags:
- if message.xml.find('{%s}%s' % (namespace, tag)) is not None:
- # TODO: count all encrypted tags.
- has_encrypted_tag = True
- break
+ tmp = tmp and message.xml.find(f'{{{namespace}}}{tag}') is not None
+ has_encrypted_tag = tmp
if not has_eme and not has_encrypted_tag:
return None
@@ -368,39 +472,77 @@ class E2EEPlugin(BasePlugin):
# comes from a semi-anonymous MUC for example. Some plugins might be
# fine with this so let them handle it.
jid = message['from']
- muctab = tab
- if isinstance(muctab, PrivateTab):
+ muctab: Optional[MucTab] = None
+ if isinstance(tab, PrivateTab):
muctab = tab.parent_muc
jid = None
- if isinstance(muctab, MucTab):
+ if muctab is not None or isinstance(tab, MucTab):
+ if muctab is None:
+ muctab = tab # type: ignore
nick = message['from'].resource
- for user in muctab.users:
- if user.nick == nick:
- jid = user.jid or None
- break
+ user = muctab.get_user_by_name(nick) # type: ignore
+ if user is not None:
+ jid = user.jid or None
- self.decrypt(message, jid, tab)
+ # Call the enabled encrypt method
+ func = self.decrypt
+ if iscoroutinefunction(func):
+ # pylint: disable=unexpected-keyword-arg
+ await func(message, jid, tab, passthrough=True) # type: ignore
+ else:
+ # pylint: disable=unexpected-keyword-arg
+ func(message, jid, tab) # type: ignore
log.debug('Decrypted %s message: %r', self.encryption_name, message['body'])
return None
- async def _encrypt(self, stanza: StanzaBase) -> Optional[StanzaBase]:
+ async def _encrypt(self, stanza: StanzaBase, passthrough: bool = True) -> Optional[StanzaBase]:
+ # TODO: Let through messages that contain elements that don't need to
+ # be encrypted even in an encrypted context, such as MUC mediated
+ # invites, etc.
+ # What to do when they're mixed with other elements? It probably
+ # depends on the element. Maybe they can be mixed with
+ # `self.tag_whitelist` that are already assumed to be sent as plain by
+ # the E2EE plugin.
+ # They might not be accompanied by a <body/> most of the time, nor by
+ # an encrypted tag.
+
if not isinstance(stanza, Message) or stanza['type'] not in ('normal', 'chat', 'groupchat'):
raise NothingToEncrypt()
message = stanza
+
+ # Is this message already encrypted? Do we need to do all these
+ # checks? Such as an OMEMO heartbeat.
+ has_encrypted_tag: bool = False
+ if self.encrypted_tags is not None:
+ tmp: bool = True
+ for (namespace, tag) in self.encrypted_tags:
+ tmp = tmp and message.xml.find(f'{{{namespace}}}{tag}') is not None
+ has_encrypted_tag = tmp
+
+ if has_encrypted_tag:
+ log.debug('Message already contains encrypted tags.')
+ raise NothingToEncrypt()
+
# Find who to encrypt to. If in a groupchat this can be multiple JIDs.
# It is possible that we are not able to find a jid (e.g., semi-anon
# MUCs). Let the plugin decide what to do with this information.
- jids = [message['to']] # type: Optional[List[JID]]
+ jids: Optional[List[JID]] = [message['to']]
tab = self.core.tabs.by_jid(message['to'])
- if tab is None: # When does that ever happen?
- log.debug('Attempting to encrypt a message to \'%s\' '
- 'that is not attached to a Tab. ?! Aborting '
- 'encryption.', message['to'])
- return None
+ if tab is None and message['to'].resource:
+ # Redo the search with the bare JID
+ tab = self.core.tabs.by_jid(message['to'].bare)
+
+ if tab is None: # Possible message sent directly by the e2ee lib?
+ log.debug(
+ 'A message we do not have a tab for '
+ 'is being sent to \'%s\'. \n%r.',
+ message['to'],
+ message,
+ )
parent = None
if isinstance(tab, PrivateTab):
@@ -433,19 +575,22 @@ class E2EEPlugin(BasePlugin):
if user.jid.bare:
jids.append(user.jid)
- if not self._encryption_enabled(tab.jid):
+ if tab and not self._encryption_enabled(tab.jid):
raise NothingToEncrypt()
log.debug('Sending %s message', self.encryption_name)
has_body = message.xml.find('{%s}%s' % (JCLIENT_NS, 'body')) is not None
+ if not self._encryption_enabled(tab.jid):
+ raise NothingToEncrypt()
+
# Drop all messages that don't contain a body if the plugin doesn't do
# Stanza Encryption
if not self.stanza_encryption and not has_body:
log.debug(
'%s plugin: Dropping message as it contains no body, and '
- 'not doesn\'t do stanza encryption',
+ 'doesn\'t do stanza encryption',
self.encryption_name,
)
return None
@@ -479,7 +624,7 @@ class E2EEPlugin(BasePlugin):
if self.encrypted_tags is not None:
whitelist += self.encrypted_tags
- tag_whitelist = {'{%s}%s' % tag for tag in whitelist}
+ tag_whitelist = {f'{{{ns}}}{tag}' for (ns, tag) in whitelist}
for elem in message.xml[:]:
if elem.tag not in tag_whitelist:
@@ -490,15 +635,15 @@ class E2EEPlugin(BasePlugin):
def store_trust(self, jid: JID, state: str, fingerprint: str) -> None:
"""Store trust for a fingerprint and a jid."""
- option_name = '%s:%s' % (self.encryption_short_name, fingerprint)
+ option_name = f'{self.encryption_short_name}:{fingerprint}'
config.silent_set(option=option_name, value=state, section=jid)
def fetch_trust(self, jid: JID, fingerprint: str) -> str:
"""Fetch trust of a fingerprint and a jid."""
- option_name = '%s:%s' % (self.encryption_short_name, fingerprint)
- return config.get(option=option_name, section=jid)
+ option_name = f'{self.encryption_short_name}:{fingerprint}'
+ return config.getstr(option=option_name, section=jid)
- async def decrypt(self, message: Message, jid: Optional[JID], tab: ChatTab):
+ async def decrypt(self, message: Message, jid: Optional[JID], tab: Optional[ChatTab]):
"""Decryption method
This is a method the plugin must implement. It is expected that this
@@ -530,7 +675,7 @@ class E2EEPlugin(BasePlugin):
raise NotImplementedError
- def get_fingerprints(self, jid: JID) -> List[str]:
+ async def get_fingerprints(self, jid: JID) -> List[Tuple[str, bool]]:
"""Show fingerprint(s) for this encryption method and JID.
To overload in plugins.