From bc8af3cc6181248744057f1fb29295508d059d3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maxime=20=E2=80=9Cpep=E2=80=9D=20Buquet?= Date: Thu, 17 Mar 2022 23:06:45 +0100 Subject: xep_0454: new plugin. OMEMO Media Sharing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Maxime “pep” Buquet --- slixmpp/plugins/xep_0454/__init__.py | 76 ++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 slixmpp/plugins/xep_0454/__init__.py diff --git a/slixmpp/plugins/xep_0454/__init__.py b/slixmpp/plugins/xep_0454/__init__.py new file mode 100644 index 00000000..a8af6190 --- /dev/null +++ b/slixmpp/plugins/xep_0454/__init__.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 et ts=4 sts=4 sw=4 +# +# Copyright © 2022 Maxime “pep” Buquet +# +# See the LICENSE file for copying permissions. + +""" + XEP-0454: OMEMO Media Sharing +""" + +from typing import IO, Optional, Tuple + +from os import urandom +from pathlib import Path + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +from slixmpp.plugins import BasePlugin +from slixmpp.plugins.base import register_plugin + + +class InvalidURL(Exception): + """Raised for URLs that either aren't HTTPS or already contain a fragment.""" + + +class XEP_0454(BasePlugin): + """ + XEP-0454: OMEMO Media Sharing + """ + + name = 'xep_0454' + description = 'XEP-0454: OMEMO Media Sharing' + + @classmethod + def encrypt(cls, input_file: Optional[IO[bytes]] = None, filename: Optional[Path] = None) -> Tuple[bytes, str]: + """ + Encrypts file as specified in XEP-0454 for use in file sharing + + :param input_file: Binary file stream on the file. + :param filename: Path to the file to upload. + + One of input_file or filename must be specified. If both are + passed, input_file will be used and filename ignored. + """ + if input_file is None and filename is None: + raise ValueError('Specify either filename or input_file parameter') + + aes_gcm_iv = urandom(12) + aes_gcm_key = urandom(32) + + aes_gcm = Cipher( + algorithms.AES(aes_gcm_key), + modes.GCM(aes_gcm_iv), + ).encryptor() + + # TODO: use streaming API from CipherContext + if input_file: + plain = input_file.read() + else: + with filename.open(mode='rb') as file: + plain = file.read() + + payload = aes_gcm.update(plain) + aes_gcm.tag + aes_gcm.finalize() + fragment = aes_gcm_iv.hex() + aes_gcm_key.hex() + return (payload, fragment) + + @classmethod + def format_url(cls, url: str, fragment: str) -> str: + """Helper to format a HTTPS URL to an AESGCM URI""" + if not url.startswith('https://') or url.find('#') != -1: + raise InvalidURL + url = 'aesgcm://' + url.removeprefix('https://') + '#' + fragment + +register_plugin(XEP_0454) -- cgit v1.2.3