summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0454/__init__.py
blob: d537432d44940de867067817bee75f2ddfe0e162 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8 et ts=4 sts=4 sw=4
#
# Copyright © 2022 Maxime “pep” Buquet <pep@bouah.net>
#
# See the LICENSE file for copying permissions.

"""
    XEP-0454: OMEMO Media Sharing
"""

from typing import IO, Optional, Tuple

from os import urandom
from pathlib import Path
from io import BytesIO, SEEK_END

from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

from slixmpp.plugins import BasePlugin
from slixmpp.plugins.base import register_plugin


class InvalidURL(Exception):
    """Raised for URLs that either aren't HTTPS or already contain a fragment."""


EXTENSIONS_MAP = {
    'jpeg': 'jpg',
    'text': 'txt',
}

class XEP_0454(BasePlugin):
    """
        XEP-0454: OMEMO Media Sharing
    """

    name = 'xep_0454'
    description = 'XEP-0454: OMEMO Media Sharing'
    dependencies = {'xep_0363'}

    @staticmethod
    def encrypt(input_file: Optional[IO[bytes]] = None, filename: Optional[Path] = None) -> Tuple[bytes, str]:
        """
            Encrypts file as specified in XEP-0454 for use in file sharing

            :param input_file: Binary file stream on the file.
            :param filename: Path to the file to upload.

            One of input_file or filename must be specified. If both are
            passed, input_file will be used and filename ignored.
        """
        if input_file is None and filename is None:
            raise ValueError('Specify either filename or input_file parameter')

        aes_gcm_iv = urandom(12)
        aes_gcm_key = urandom(32)

        aes_gcm = Cipher(
            algorithms.AES(aes_gcm_key),
            modes.GCM(aes_gcm_iv),
        ).encryptor()

        if input_file is None:
            input_file = open(filename, 'rb')

        payload = b''
        while True:
            buf = input_file.read(4096)
            if not buf:
                break
            payload += aes_gcm.update(buf)

        payload += aes_gcm.finalize() + aes_gcm.tag
        fragment = aes_gcm_iv.hex() + aes_gcm_key.hex()
        return (payload, fragment)

    @staticmethod
    def decrypt(input_file: IO[bytes], fragment: str) -> bytes:
        """
            Decrypts file-like.

            :param input_file: Binary file stream on the file, containing the
                               tag (16 bytes) at the end.
            :param fragment: 88 hex chars string composed of iv (24 chars)
                             + key (64 chars).
        """

        assert len(fragment) == 88
        aes_gcm_iv = bytes.fromhex(fragment[:24])
        aes_gcm_key = bytes.fromhex(fragment[24:])

        # Find 16 bytes tag
        input_file.seek(-16, SEEK_END)
        tag = input_file.read()

        aes_gcm = Cipher(
            algorithms.AES(aes_gcm_key),
            modes.GCM(aes_gcm_iv, tag),
        ).decryptor()

        size = input_file.seek(0, SEEK_END)
        input_file.seek(0)

        count = size - 16
        plain = b''
        while count >= 0:
            buf = input_file.read(4096)
            count -= len(buf)
            if count <= 0:
                buf += input_file.read()
                buf = buf[:-16]
            plain += aes_gcm.update(buf)
        plain += aes_gcm.finalize()

        return plain

    @staticmethod
    def format_url(url: str, fragment: str) -> str:
        """Helper to format a HTTPS URL to an AESGCM URI"""
        if not url.startswith('https://') or url.find('#') != -1:
            raise InvalidURL
        return 'aesgcm://' + url[len('https://'):] + '#' + fragment

    @staticmethod
    def map_extensions(ext: str) -> str:
        """
            Apply conversions to extensions to reduce the number of
            variations, (e.g., JPEG -> jpg).
        """
        return EXTENSIONS_MAP.get(ext, ext).lower()

    async def upload_file(
        self,
        filename: Path,
        _size: Optional[int] = None,
        content_type: Optional[str] = None,
        **kwargs,
    ) -> str:
        """
            Wrapper to xep_0363 (HTTP Upload)'s upload_file method.

            :param input_file: Binary file stream on the file.
            :param filename: Path to the file to upload.

            Same as `XEP_0454.encrypt`, one of input_file or filename must be
            specified. If both are passed, input_file will be used and
            filename ignored.

            Other arguments passed in are passed to the actual
            `XEP_0363.upload_file` call.
        """
        input_file = kwargs.get('input_file')
        payload, fragment = self.encrypt(input_file, filename)

        # Prepare kwargs for upload_file call
        new_filename = urandom(12).hex()  # Random filename to hide user-provided path
        if filename.suffix:
            new_filename += self.map_extensions(filename.suffix)
        kwargs['filename'] = new_filename

        input_enc = BytesIO(payload)
        kwargs['input_file'] = input_enc

        # Size must also be overriden if provided
        size = input_enc.seek(0, SEEK_END)
        input_enc.seek(0)
        kwargs['size'] = size

        kwargs['content_type'] = content_type

        url = await self.xmpp['xep_0363'].upload_file(**kwargs)
        return self.format_url(url, fragment)

register_plugin(XEP_0454)