diff options
Diffstat (limited to 'slixmpp')
-rw-r--r-- | slixmpp/plugins/xep_0363/http_upload.py | 8 | ||||
-rw-r--r-- | slixmpp/plugins/xep_0454/__init__.py | 176 |
2 files changed, 181 insertions, 3 deletions
diff --git a/slixmpp/plugins/xep_0363/http_upload.py b/slixmpp/plugins/xep_0363/http_upload.py index bae3ee7d..4a185fb9 100644 --- a/slixmpp/plugins/xep_0363/http_upload.py +++ b/slixmpp/plugins/xep_0363/http_upload.py @@ -14,6 +14,8 @@ from typing import ( IO, ) +from pathlib import Path + from slixmpp import JID, __version__ from slixmpp.stanza import Iq from slixmpp.plugins import BasePlugin @@ -113,7 +115,7 @@ class XEP_0363(BasePlugin): if feature == Request.namespace: return info - def request_slot(self, jid: JID, filename: str, size: int, + def request_slot(self, jid: JID, filename: Path, size: int, content_type: Optional[str] = None, *, ifrom: Optional[JID] = None, **iqkwargs) -> Future: """Request an HTTP upload slot from a service. @@ -125,12 +127,12 @@ class XEP_0363(BasePlugin): """ iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom) request = iq['http_upload_request'] - request['filename'] = filename + request['filename'] = str(filename) request['size'] = str(size) request['content-type'] = content_type or self.default_content_type return iq.send(**iqkwargs) - async def upload_file(self, filename: str, size: Optional[int] = None, + async def upload_file(self, filename: Path, size: Optional[int] = None, content_type: Optional[str] = None, *, input_file: Optional[IO[bytes]]=None, domain: Optional[JID] = None, diff --git a/slixmpp/plugins/xep_0454/__init__.py b/slixmpp/plugins/xep_0454/__init__.py new file mode 100644 index 00000000..d537432d --- /dev/null +++ b/slixmpp/plugins/xep_0454/__init__.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 et ts=4 sts=4 sw=4 +# +# Copyright © 2022 Maxime “pep” Buquet <pep@bouah.net> +# +# See the LICENSE file for copying permissions. + +""" + XEP-0454: OMEMO Media Sharing +""" + +from typing import IO, Optional, Tuple + +from os import urandom +from pathlib import Path +from io import BytesIO, SEEK_END + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +from slixmpp.plugins import BasePlugin +from slixmpp.plugins.base import register_plugin + + +class InvalidURL(Exception): + """Raised for URLs that either aren't HTTPS or already contain a fragment.""" + + +EXTENSIONS_MAP = { + 'jpeg': 'jpg', + 'text': 'txt', +} + +class XEP_0454(BasePlugin): + """ + XEP-0454: OMEMO Media Sharing + """ + + name = 'xep_0454' + description = 'XEP-0454: OMEMO Media Sharing' + dependencies = {'xep_0363'} + + @staticmethod + def encrypt(input_file: Optional[IO[bytes]] = None, filename: Optional[Path] = None) -> Tuple[bytes, str]: + """ + Encrypts file as specified in XEP-0454 for use in file sharing + + :param input_file: Binary file stream on the file. + :param filename: Path to the file to upload. + + One of input_file or filename must be specified. If both are + passed, input_file will be used and filename ignored. + """ + if input_file is None and filename is None: + raise ValueError('Specify either filename or input_file parameter') + + aes_gcm_iv = urandom(12) + aes_gcm_key = urandom(32) + + aes_gcm = Cipher( + algorithms.AES(aes_gcm_key), + modes.GCM(aes_gcm_iv), + ).encryptor() + + if input_file is None: + input_file = open(filename, 'rb') + + payload = b'' + while True: + buf = input_file.read(4096) + if not buf: + break + payload += aes_gcm.update(buf) + + payload += aes_gcm.finalize() + aes_gcm.tag + fragment = aes_gcm_iv.hex() + aes_gcm_key.hex() + return (payload, fragment) + + @staticmethod + def decrypt(input_file: IO[bytes], fragment: str) -> bytes: + """ + Decrypts file-like. + + :param input_file: Binary file stream on the file, containing the + tag (16 bytes) at the end. + :param fragment: 88 hex chars string composed of iv (24 chars) + + key (64 chars). + """ + + assert len(fragment) == 88 + aes_gcm_iv = bytes.fromhex(fragment[:24]) + aes_gcm_key = bytes.fromhex(fragment[24:]) + + # Find 16 bytes tag + input_file.seek(-16, SEEK_END) + tag = input_file.read() + + aes_gcm = Cipher( + algorithms.AES(aes_gcm_key), + modes.GCM(aes_gcm_iv, tag), + ).decryptor() + + size = input_file.seek(0, SEEK_END) + input_file.seek(0) + + count = size - 16 + plain = b'' + while count >= 0: + buf = input_file.read(4096) + count -= len(buf) + if count <= 0: + buf += input_file.read() + buf = buf[:-16] + plain += aes_gcm.update(buf) + plain += aes_gcm.finalize() + + return plain + + @staticmethod + def format_url(url: str, fragment: str) -> str: + """Helper to format a HTTPS URL to an AESGCM URI""" + if not url.startswith('https://') or url.find('#') != -1: + raise InvalidURL + return 'aesgcm://' + url[len('https://'):] + '#' + fragment + + @staticmethod + def map_extensions(ext: str) -> str: + """ + Apply conversions to extensions to reduce the number of + variations, (e.g., JPEG -> jpg). + """ + return EXTENSIONS_MAP.get(ext, ext).lower() + + async def upload_file( + self, + filename: Path, + _size: Optional[int] = None, + content_type: Optional[str] = None, + **kwargs, + ) -> str: + """ + Wrapper to xep_0363 (HTTP Upload)'s upload_file method. + + :param input_file: Binary file stream on the file. + :param filename: Path to the file to upload. + + Same as `XEP_0454.encrypt`, one of input_file or filename must be + specified. If both are passed, input_file will be used and + filename ignored. + + Other arguments passed in are passed to the actual + `XEP_0363.upload_file` call. + """ + input_file = kwargs.get('input_file') + payload, fragment = self.encrypt(input_file, filename) + + # Prepare kwargs for upload_file call + new_filename = urandom(12).hex() # Random filename to hide user-provided path + if filename.suffix: + new_filename += self.map_extensions(filename.suffix) + kwargs['filename'] = new_filename + + input_enc = BytesIO(payload) + kwargs['input_file'] = input_enc + + # Size must also be overriden if provided + size = input_enc.seek(0, SEEK_END) + input_enc.seek(0) + kwargs['size'] = size + + kwargs['content_type'] = content_type + + url = await self.xmpp['xep_0363'].upload_file(**kwargs) + return self.format_url(url, fragment) + +register_plugin(XEP_0454) |