diff options
-rw-r--r-- | .gitlab-ci.yml | 6 | ||||
-rw-r--r-- | doap.xml | 8 | ||||
-rwxr-xr-x | examples/http_upload.py | 65 | ||||
-rwxr-xr-x | setup.py | 8 | ||||
-rw-r--r-- | slixmpp/plugins/xep_0363/http_upload.py | 8 | ||||
-rw-r--r-- | slixmpp/plugins/xep_0454/__init__.py | 176 | ||||
-rw-r--r-- | tests/test_xep_0454.py | 41 |
7 files changed, 300 insertions, 12 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 69bd7b7a..ebcc24eb 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -20,7 +20,7 @@ test: script: - apt update - apt install -y python3 python3-pip cython3 gpg - - pip3 install emoji aiohttp + - pip3 install emoji aiohttp cryptography - ./run_tests.py test-3.10: @@ -31,7 +31,7 @@ test-3.10: script: - apt update - apt install -y python3 python3-pip cython3 gpg - - pip3 install emoji aiohttp + - pip3 install emoji aiohttp cryptography - ./run_tests.py test-3.11: @@ -43,7 +43,7 @@ test-3.11: script: - apt update - apt install -y python3 python3-pip cython3 gpg - - pip3 install emoji aiohttp + - pip3 install emoji aiohttp cryptography - ./run_tests.py test_integration: @@ -892,6 +892,14 @@ <xmpp:since>1.6.0</xmpp:since> </xmpp:SupportedXep> </implements> + <implements> + <xmpp:SupportedXep> + <xmpp:xep rdf:resource="https://xmpp.org/extensions/xep-0454.html"/> + <xmpp:status>no thumbnail support</xmpp:status> + <xmpp:version>0.1.0</xmpp:version> + <xmpp:since>1.8.1</xmpp:since> + </xmpp:SupportedXep> + </implements> <release> <Version> diff --git a/examples/http_upload.py b/examples/http_upload.py index a926fd47..b62c736e 100755 --- a/examples/http_upload.py +++ b/examples/http_upload.py @@ -5,11 +5,16 @@ # This file is part of Slixmpp. # See the file LICENSE for copying permission. +from typing import Optional + +import sys import logging +from pathlib import Path from getpass import getpass from argparse import ArgumentParser import slixmpp +from slixmpp import JID from slixmpp.exceptions import IqTimeout log = logging.getLogger(__name__) @@ -21,20 +26,40 @@ class HttpUpload(slixmpp.ClientXMPP): A basic client asking an entity if they confirm the access to an HTTP URL. """ - def __init__(self, jid, password, recipient, filename, domain=None): + def __init__( + self, + jid: JID, + password: str, + recipient: JID, + filename: Path, + domain: Optional[JID] = None, + encrypted: bool = False, + ): slixmpp.ClientXMPP.__init__(self, jid, password) self.recipient = recipient self.filename = filename self.domain = domain + self.encrypted = encrypted self.add_event_handler("session_start", self.start) async def start(self, event): log.info('Uploading file %s...', self.filename) try: - url = await self['xep_0363'].upload_file( - self.filename, domain=self.domain, timeout=10 + upload_file = self['xep_0363'].upload_file + if self.encrypted and not self['xep_0454']: + print( + 'The xep_0454 module isn\'t available. ' + 'Ensure you have \'cryptography\' ' + 'from extras_require installed.', + file=sys.stderr, + ) + return + elif self.encrypted: + upload_file = self['xep_0454'].upload_file + url = await upload_file( + self.filename, domain=self.domain, timeout=10, ) except IqTimeout: raise TimeoutError('Could not send message in time') @@ -79,6 +104,10 @@ if __name__ == '__main__': parser.add_argument("--domain", help="Domain to use for HTTP File Upload (leave out for your own server’s)") + parser.add_argument("-e", "--encrypt", dest="encrypted", + help="Whether to encrypt", action="store_true", + default=False) + args = parser.parse_args() # Setup logging. @@ -86,15 +115,41 @@ if __name__ == '__main__': format='%(levelname)-8s %(message)s') if args.jid is None: - args.jid = input("Username: ") + args.jid = JID(input("Username: ")) if args.password is None: args.password = getpass("Password: ") - xmpp = HttpUpload(args.jid, args.password, args.recipient, args.file, args.domain) + domain = args.domain + if domain is not None: + domain = JID(domain) + + if args.encrypted: + print( + 'You are using the --encrypt flag. ' + 'Be aware that the transport being used is NOT end-to-end ' + 'encrypted. The server will be able to decrypt the file.', + file=sys.stderr, + ) + + xmpp = HttpUpload( + jid=args.jid, + password=args.password, + recipient=JID(args.recipient), + filename=Path(args.file), + domain=domain, + encrypted=args.encrypted, + ) xmpp.register_plugin('xep_0066') xmpp.register_plugin('xep_0071') xmpp.register_plugin('xep_0128') xmpp.register_plugin('xep_0363') + try: + xmpp.register_plugin('xep_0454') + except slixmpp.plugins.base.PluginNotFound: + log.error( + 'Could not load xep_0454. ' + 'Ensure you have \'cryptography\' from extras_require installed.' + ) # Connect to the XMPP server and start processing XMPP stanzas. xmpp.connect() @@ -86,10 +86,16 @@ setup( package_data={'slixmpp': ['py.typed']}, packages=packages, ext_modules=ext_modules, - install_requires=['aiodns>=1.0', 'pyasn1', 'pyasn1_modules', 'typing_extensions; python_version < "3.8.0"'], + install_requires=[ + 'aiodns>=1.0', + 'pyasn1', + 'pyasn1_modules', + 'typing_extensions; python_version < "3.8.0"', + ], extras_require={ 'XEP-0363': ['aiohttp'], 'XEP-0444 compliance': ['emoji'], + 'XEP-0454': ['cryptography'], 'Safer XML parsing': ['defusedxml'], }, classifiers=CLASSIFIERS, diff --git a/slixmpp/plugins/xep_0363/http_upload.py b/slixmpp/plugins/xep_0363/http_upload.py index bae3ee7d..4a185fb9 100644 --- a/slixmpp/plugins/xep_0363/http_upload.py +++ b/slixmpp/plugins/xep_0363/http_upload.py @@ -14,6 +14,8 @@ from typing import ( IO, ) +from pathlib import Path + from slixmpp import JID, __version__ from slixmpp.stanza import Iq from slixmpp.plugins import BasePlugin @@ -113,7 +115,7 @@ class XEP_0363(BasePlugin): if feature == Request.namespace: return info - def request_slot(self, jid: JID, filename: str, size: int, + def request_slot(self, jid: JID, filename: Path, size: int, content_type: Optional[str] = None, *, ifrom: Optional[JID] = None, **iqkwargs) -> Future: """Request an HTTP upload slot from a service. @@ -125,12 +127,12 @@ class XEP_0363(BasePlugin): """ iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom) request = iq['http_upload_request'] - request['filename'] = filename + request['filename'] = str(filename) request['size'] = str(size) request['content-type'] = content_type or self.default_content_type return iq.send(**iqkwargs) - async def upload_file(self, filename: str, size: Optional[int] = None, + async def upload_file(self, filename: Path, size: Optional[int] = None, content_type: Optional[str] = None, *, input_file: Optional[IO[bytes]]=None, domain: Optional[JID] = None, diff --git a/slixmpp/plugins/xep_0454/__init__.py b/slixmpp/plugins/xep_0454/__init__.py new file mode 100644 index 00000000..d537432d --- /dev/null +++ b/slixmpp/plugins/xep_0454/__init__.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 et ts=4 sts=4 sw=4 +# +# Copyright © 2022 Maxime “pep” Buquet <pep@bouah.net> +# +# See the LICENSE file for copying permissions. + +""" + XEP-0454: OMEMO Media Sharing +""" + +from typing import IO, Optional, Tuple + +from os import urandom +from pathlib import Path +from io import BytesIO, SEEK_END + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +from slixmpp.plugins import BasePlugin +from slixmpp.plugins.base import register_plugin + + +class InvalidURL(Exception): + """Raised for URLs that either aren't HTTPS or already contain a fragment.""" + + +EXTENSIONS_MAP = { + 'jpeg': 'jpg', + 'text': 'txt', +} + +class XEP_0454(BasePlugin): + """ + XEP-0454: OMEMO Media Sharing + """ + + name = 'xep_0454' + description = 'XEP-0454: OMEMO Media Sharing' + dependencies = {'xep_0363'} + + @staticmethod + def encrypt(input_file: Optional[IO[bytes]] = None, filename: Optional[Path] = None) -> Tuple[bytes, str]: + """ + Encrypts file as specified in XEP-0454 for use in file sharing + + :param input_file: Binary file stream on the file. + :param filename: Path to the file to upload. + + One of input_file or filename must be specified. If both are + passed, input_file will be used and filename ignored. + """ + if input_file is None and filename is None: + raise ValueError('Specify either filename or input_file parameter') + + aes_gcm_iv = urandom(12) + aes_gcm_key = urandom(32) + + aes_gcm = Cipher( + algorithms.AES(aes_gcm_key), + modes.GCM(aes_gcm_iv), + ).encryptor() + + if input_file is None: + input_file = open(filename, 'rb') + + payload = b'' + while True: + buf = input_file.read(4096) + if not buf: + break + payload += aes_gcm.update(buf) + + payload += aes_gcm.finalize() + aes_gcm.tag + fragment = aes_gcm_iv.hex() + aes_gcm_key.hex() + return (payload, fragment) + + @staticmethod + def decrypt(input_file: IO[bytes], fragment: str) -> bytes: + """ + Decrypts file-like. + + :param input_file: Binary file stream on the file, containing the + tag (16 bytes) at the end. + :param fragment: 88 hex chars string composed of iv (24 chars) + + key (64 chars). + """ + + assert len(fragment) == 88 + aes_gcm_iv = bytes.fromhex(fragment[:24]) + aes_gcm_key = bytes.fromhex(fragment[24:]) + + # Find 16 bytes tag + input_file.seek(-16, SEEK_END) + tag = input_file.read() + + aes_gcm = Cipher( + algorithms.AES(aes_gcm_key), + modes.GCM(aes_gcm_iv, tag), + ).decryptor() + + size = input_file.seek(0, SEEK_END) + input_file.seek(0) + + count = size - 16 + plain = b'' + while count >= 0: + buf = input_file.read(4096) + count -= len(buf) + if count <= 0: + buf += input_file.read() + buf = buf[:-16] + plain += aes_gcm.update(buf) + plain += aes_gcm.finalize() + + return plain + + @staticmethod + def format_url(url: str, fragment: str) -> str: + """Helper to format a HTTPS URL to an AESGCM URI""" + if not url.startswith('https://') or url.find('#') != -1: + raise InvalidURL + return 'aesgcm://' + url[len('https://'):] + '#' + fragment + + @staticmethod + def map_extensions(ext: str) -> str: + """ + Apply conversions to extensions to reduce the number of + variations, (e.g., JPEG -> jpg). + """ + return EXTENSIONS_MAP.get(ext, ext).lower() + + async def upload_file( + self, + filename: Path, + _size: Optional[int] = None, + content_type: Optional[str] = None, + **kwargs, + ) -> str: + """ + Wrapper to xep_0363 (HTTP Upload)'s upload_file method. + + :param input_file: Binary file stream on the file. + :param filename: Path to the file to upload. + + Same as `XEP_0454.encrypt`, one of input_file or filename must be + specified. If both are passed, input_file will be used and + filename ignored. + + Other arguments passed in are passed to the actual + `XEP_0363.upload_file` call. + """ + input_file = kwargs.get('input_file') + payload, fragment = self.encrypt(input_file, filename) + + # Prepare kwargs for upload_file call + new_filename = urandom(12).hex() # Random filename to hide user-provided path + if filename.suffix: + new_filename += self.map_extensions(filename.suffix) + kwargs['filename'] = new_filename + + input_enc = BytesIO(payload) + kwargs['input_file'] = input_enc + + # Size must also be overriden if provided + size = input_enc.seek(0, SEEK_END) + input_enc.seek(0) + kwargs['size'] = size + + kwargs['content_type'] = content_type + + url = await self.xmpp['xep_0363'].upload_file(**kwargs) + return self.format_url(url, fragment) + +register_plugin(XEP_0454) diff --git a/tests/test_xep_0454.py b/tests/test_xep_0454.py new file mode 100644 index 00000000..8d43c7e3 --- /dev/null +++ b/tests/test_xep_0454.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 et ts=4 sts=4 sw=4 +# +# Copyright © 2022 Maxime “pep” Buquet <pep@bouah.net> +# +# Distributed under terms of the GPLv3+ license. + +""" + Tests for XEP-0454 (OMEMO Media Sharing) plugin. +""" + +import unittest +from io import BytesIO +from slixmpp.test import SlixTest +from slixmpp.plugins.xep_0454 import XEP_0454 + + +class TestMediaSharing(SlixTest): + + def testEncryptDecryptSmall(self): + plain = b'qwertyuiop' + ciphertext, fragment = XEP_0454.encrypt(input_file=BytesIO(plain)) + result = XEP_0454.decrypt(BytesIO(ciphertext), fragment) + + self.assertEqual(plain, result) + + def testEncryptDecrypt(self): + plain = b'a' * 4096 + b'qwertyuiop' + ciphertext, fragment = XEP_0454.encrypt(input_file=BytesIO(plain)) + result = XEP_0454.decrypt(BytesIO(ciphertext), fragment) + + self.assertEqual(plain, result) + + def testFormatURL(self): + url = 'https://foo.bar' + fragment = 'a' * 88 + result = XEP_0454.format_url(url, fragment) + self.assertEqual('aesgcm://foo.bar#' + 'a' * 88, result) + +suite = unittest.TestLoader().loadTestsFromTestCase(TestMediaSharing) |