From 1debbe6a7277e0cdfdda8d3f5430cac3c4411a77 Mon Sep 17 00:00:00 2001 From: mathieui Date: Mon, 29 Dec 2014 21:34:24 +0100 Subject: Fix #2561, Fix #2525 (broken GPG plugin) Fix issues in presence signatures (no tests SHA-512), typos, update gnupg.py to a recent version, fix the completion, and display the messages we send properly --- plugins/gpg/__init__.py | 40 ++-- plugins/gpg/gnupg.py | 566 +++++++++++++++++++++++++++++++++++------------- 2 files changed, 440 insertions(+), 166 deletions(-) (limited to 'plugins/gpg') diff --git a/plugins/gpg/__init__.py b/plugins/gpg/__init__.py index bc8a96c1..0f441653 100644 --- a/plugins/gpg/__init__.py +++ b/plugins/gpg/__init__.py @@ -117,6 +117,7 @@ log = logging.getLogger(__name__) from plugin import BasePlugin from tabs import ConversationTab +from theming import get_theme NS_SIGNED = "jabber:x:signed" NS_ENCRYPTED = "jabber:x:encrypted" @@ -127,7 +128,6 @@ Hash: %(hash)s %(clear)s -----BEGIN PGP SIGNATURE----- -Version: GnuPG %(data)s -----END PGP SIGNATURE----- @@ -135,7 +135,6 @@ Version: GnuPG ENCRYPTED_MESSAGE = """-----BEGIN PGP MESSAGE----- -Version: GnuPG %(data)s -----END PGP MESSAGE-----""" @@ -156,14 +155,14 @@ class Plugin(BasePlugin): self.keyid = self.config.get('keyid', '') or None self.passphrase = self.config.get('passphrase', '') or None if not self.keyid: - self.core.information('No GPG keyid provided in the configuration', 'Warning') + self.api.information('No GPG keyid provided in the configuration', 'Warning') - self.add_event_handler('send_normal_presence', self.sign_presence) - self.add_event_handler('normal_presence', self.on_normal_presence) - self.add_event_handler('conversation_say_after', self.on_conversation_say) - self.add_event_handler('conversation_msg', self.on_conversation_msg) + self.api.add_event_handler('send_normal_presence', self.sign_presence) + self.api.add_slix_event_handler('presence', self.on_normal_presence) + self.api.add_event_handler('conversation_say_after', self.on_conversation_say) + self.api.add_event_handler('conversation_msg', self.on_conversation_msg) - self.add_tab_command(ConversationTab, 'gpg', self.command_gpg, + self.api.add_tab_command(ConversationTab, 'gpg', self.command_gpg, usage=' [jid] [keyid]', help='Force or disable gpg encryption with the fulljid of the current conversation. The setkey argument lets you associate a keyid with the given bare JID.', short='Manage the GPG status', @@ -197,7 +196,7 @@ class Plugin(BasePlugin): current_presence = self.core.get_status() self.core.command_status('%s %s' % (current_presence.show or 'available', current_presence.message or '',)) - def on_normal_presence(self, presence, resource): + def on_normal_presence(self, presence): """ Check if it’s signed, if it is and we can verify the signature, add 'valid' or 'invalid' into the dict. If it cannot be verified, just add @@ -212,7 +211,7 @@ class Plugin(BasePlugin): return if self.config.has_section('keys') and bare in self.config.options('keys'): self.contacts[full] = 'invalid' - for hash_ in ('SHA1', 'SHA256'): + for hash_ in ('SHA1', 'SHA256', 'SHA512'): to_verify = SIGNED_ATTACHED_MESSAGE % {'clear': presence['status'], 'data': signed.text, 'hash': hash_} @@ -225,7 +224,7 @@ class Plugin(BasePlugin): def on_conversation_say(self, message, tab): """ - Check if the contact has a signed AND veryfied signature. + Check if the contact has a signed AND verified signature. If yes, encrypt the message with her key. """ to = message['to'] @@ -234,12 +233,13 @@ class Plugin(BasePlugin): return signed = to.full in self.contacts.keys() if signed: - veryfied = self.contacts[to.full] in ('valid', 'forced') + verified = self.contacts[to.full] in ('valid', 'forced') else: - veryfied = False - if veryfied: + verified = False + if verified: # remove the xhtm_im body if present, because that # cannot be encrypted. + body = message['body'] del message['html'] encrypted_element = ET.Element('{%s}x' % (NS_ENCRYPTED,)) text = self.gpg.encrypt(message['body'], self.config.get(to.bare, '', section='keys'), always_trust=True) @@ -251,6 +251,13 @@ class Plugin(BasePlugin): encrypted_element.text = self.remove_gpg_headers(xml.sax.saxutils.escape(str(text))) message.append(encrypted_element) message['body'] = 'This message has been encrypted using the GPG key with id: %s' % self.keyid + message.send() + del message['body'] + tab.add_message(body, nickname=self.core.own_nick, + nick_color=get_theme().COLOR_OWN_NICK, + identifier=message['id'], + jid=self.core.xmpp.boundjid, + typ=0) def on_conversation_msg(self, message, tab): """ @@ -278,7 +285,7 @@ class Plugin(BasePlugin): if status in ('valid', 'invalid', 'signed'): return ' GPG Key: %s (%s)' % (status, 'encrypted' if status == 'valid' else 'NOT encrypted',) else: - return ' GPG: Encryption %s' % (status,) + return ' GPG: Encryption %s' % (status,) def command_gpg(self, args): """ @@ -314,7 +321,8 @@ class Plugin(BasePlugin): self.core.refresh_window() def gpg_completion(self, the_input): - return the_input.auto_completion(['force', 'disable', 'setkey'], ' ') + if the_input.get_argument_position() == 1: + return the_input.new_completion(['force', 'disable', 'setkey'], 1, quotify=False) def remove_gpg_headers(self, text): lines = text.splitlines() diff --git a/plugins/gpg/gnupg.py b/plugins/gpg/gnupg.py index 5cb11766..99bd7d25 100644 --- a/plugins/gpg/gnupg.py +++ b/plugins/gpg/gnupg.py @@ -27,15 +27,14 @@ Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork() and so does not work on Windows). Renamed to gnupg.py to avoid confusion with the previous versions. -Modifications Copyright (C) 2008-2012 Vinay Sajip. All rights reserved. +Modifications Copyright (C) 2008-2014 Vinay Sajip. All rights reserved. A unittest harness (test_gnupg.py) has also been added. """ -import locale -__version__ = "0.3.1" +__version__ = "0.3.8.dev0" __author__ = "Vinay Sajip" -__date__ = "$01-Sep-2012 20:02:51$" +__date__ = "$07-Dec-2014 18:46:17$" try: from io import StringIO @@ -46,6 +45,7 @@ import codecs import locale import logging import os +import re import socket from subprocess import Popen from subprocess import PIPE @@ -61,13 +61,61 @@ except ImportError: try: unicode _py3k = False + string_types = basestring + text_type = unicode except NameError: _py3k = True + string_types = str + text_type = str logger = logging.getLogger(__name__) if not logger.handlers: logger.addHandler(NullHandler()) +# We use the test below because it works for Jython as well as CPython +if os.path.__name__ == 'ntpath': + # On Windows, we don't need shell quoting, other than worrying about + # paths with spaces in them. + def shell_quote(s): + return '"%s"' % s +else: + # Section copied from sarge + + # This regex determines which shell input needs quoting + # because it may be unsafe + UNSAFE = re.compile(r'[^\w%+,./:=@-]') + + def shell_quote(s): + """ + Quote text so that it is safe for Posix command shells. + + For example, "*.py" would be converted to "'*.py'". If the text is + considered safe it is returned unquoted. + + :param s: The value to quote + :type s: str (or unicode on 2.x) + :return: A safe version of the input, from the point of view of Posix + command shells + :rtype: The passed-in type + """ + if not isinstance(s, string_types): + raise TypeError('Expected string type, got %s' % type(s)) + if not s: + result = "''" + elif not UNSAFE.search(s): + result = s + else: + result = "'%s'" % s.replace("'", r"'\''") + return result + + # end of sarge code + +# Now that we use shell=False, we shouldn't need to quote arguments. +# Use no_quote instead of shell_quote to remind us of where quoting +# was needed. +def no_quote(s): + return s + def _copy_data(instream, outstream): # Copy one stream to another sent = 0 @@ -77,7 +125,7 @@ def _copy_data(instream, outstream): enc = 'ascii' while True: data = instream.read(1024) - if len(data) == 0: + if not data: break sent += len(data) logger.debug("sending chunk (%d): %r", sent, data[:256]) @@ -107,25 +155,28 @@ def _write_passphrase(stream, passphrase, encoding): passphrase = '%s\n' % passphrase passphrase = passphrase.encode(encoding) stream.write(passphrase) - logger.debug("Wrote passphrase: %r", passphrase) + logger.debug('Wrote passphrase') def _is_sequence(instance): - return isinstance(instance,list) or isinstance(instance,tuple) + return isinstance(instance, (list, tuple, set, frozenset)) -def _make_binary_stream(s, encoding): +def _make_memory_stream(s): try: - if _py3k: - if isinstance(s, str): - s = s.encode(encoding) - else: - if type(s) is not str: - s = s.encode(encoding) from io import BytesIO rv = BytesIO(s) except ImportError: rv = StringIO(s) return rv +def _make_binary_stream(s, encoding): + if _py3k: + if isinstance(s, str): + s = s.encode(encoding) + else: + if type(s) is not str: + s = s.encode(encoding) + return _make_memory_stream(s) + class Verify(object): "Handle status messages for --verify" @@ -149,6 +200,7 @@ class Verify(object): self.fingerprint = self.creation_date = self.timestamp = None self.signature_id = self.key_id = None self.username = None + self.key_status = None self.status = None self.pubkey_fingerprint = None self.expire_timestamp = None @@ -166,13 +218,27 @@ class Verify(object): self.trust_text = key self.trust_level = self.TRUST_LEVELS[key] elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT", - "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO", - "DECRYPTION_OKAY"): + "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO", + "DECRYPTION_OKAY", "INV_SGNR", "FILE_START", "FILE_ERROR", + "FILE_DONE", "PKA_TRUST_GOOD", "PKA_TRUST_BAD", "BADMDC", + "GOODMDC", "NO_SGNR", "NOTATION_NAME", "NOTATION_DATA", + "PROGRESS"): pass elif key == "BADSIG": self.valid = False self.status = 'signature bad' self.key_id, self.username = value.split(None, 1) + elif key == "ERRSIG": + self.valid = False + (self.key_id, + algo, hash_algo, + cls, + self.timestamp) = value.split()[:5] + self.status = 'signature error' + elif key == "EXPSIG": + self.valid = False + self.status = 'signature expired' + self.key_id, self.username = value.split(None, 1) elif key == "GOODSIG": self.valid = True self.status = 'signature good' @@ -188,13 +254,6 @@ class Verify(object): elif key == "SIG_ID": (self.signature_id, self.creation_date, self.timestamp) = value.split() - elif key == "ERRSIG": - self.valid = False - (self.key_id, - algo, hash_algo, - cls, - self.timestamp) = value.split()[:5] - self.status = 'signature error' elif key == "DECRYPTION_FAILED": self.valid = False self.key_id = value @@ -203,17 +262,25 @@ class Verify(object): self.valid = False self.key_id = value self.status = 'no public key' - elif key in ("KEYEXPIRED", "SIGEXPIRED"): + elif key in ("KEYEXPIRED", "SIGEXPIRED", "KEYREVOKED"): # these are useless in verify, since they are spit out for any # pub/subkeys on the key, not just the one doing the signing. # if we want to check for signatures with expired key, - # the relevant flag is EXPKEYSIG. + # the relevant flag is EXPKEYSIG or REVKEYSIG. pass elif key in ("EXPKEYSIG", "REVKEYSIG"): # signed with expired or revoked key self.valid = False self.key_id = value.split()[0] - self.status = (('%s %s') % (key[:3], key[3:])).lower() + if key == "EXPKEYSIG": + self.key_status = 'signing key has expired' + else: + self.key_status = 'signing key was revoked' + self.status = self.key_status + elif key == "UNEXPECTED": + self.valid = False + self.key_id = value + self.status = 'unexpected data' else: raise ValueError("Unknown status message: %r" % key) @@ -282,8 +349,8 @@ class ImportResult(object): 'problem': reason, 'text': self.problem_reason[reason]}) elif key == "IMPORT_RES": import_res = value.split() - for i in range(len(self.counts)): - setattr(self, self.counts[i], int(import_res[i])) + for i, count in enumerate(self.counts): + setattr(self, count, int(import_res[i])) elif key == "KEYEXPIRED": self.results.append({'fingerprint': None, 'problem': '0', 'text': 'Key expired'}) @@ -300,7 +367,63 @@ class ImportResult(object): l.append('%d not imported' % self.not_imported) return ', '.join(l) -class ListKeys(list): +ESCAPE_PATTERN = re.compile(r'\\x([0-9a-f][0-9a-f])', re.I) +BASIC_ESCAPES = { + r'\n': '\n', + r'\r': '\r', + r'\f': '\f', + r'\v': '\v', + r'\b': '\b', + r'\0': '\0', +} + +class SendResult(object): + def __init__(self, gpg): + self.gpg = gpg + + def handle_status(self, key, value): + logger.debug('SendResult: %s: %s', key, value) + +class SearchKeys(list): + ''' Handle status messages for --search-keys. + + Handle pub and uid (relating the latter to the former). + + Don't care about the rest + ''' + + UID_INDEX = 1 + FIELDS = 'type keyid algo length date expires'.split() + + def __init__(self, gpg): + self.gpg = gpg + self.curkey = None + self.fingerprints = [] + self.uids = [] + + def get_fields(self, args): + result = {} + for i, var in enumerate(self.FIELDS): + result[var] = args[i] + result['uids'] = [] + return result + + def pub(self, args): + self.curkey = curkey = self.get_fields(args) + self.append(curkey) + + def uid(self, args): + uid = args[self.UID_INDEX] + uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid) + for k, v in BASIC_ESCAPES.items(): + uid = uid.replace(k, v) + self.curkey['uids'].append(uid) + self.uids.append(uid) + + def handle_status(self, key, value): + pass + +class ListKeys(SearchKeys): ''' Handle status messages for --list-keys. Handle pub and uid (relating the latter to the former). @@ -317,25 +440,17 @@ class ListKeys(list): grp = reserved for gpgsm rvk = revocation key ''' - def __init__(self, gpg): - self.gpg = gpg - self.curkey = None - self.fingerprints = [] - self.uids = [] + + UID_INDEX = 9 + FIELDS = 'type trust length algo keyid date expires dummy ownertrust uid'.split() def key(self, args): - vars = (""" - type trust length algo keyid date expires dummy ownertrust uid - """).split() - self.curkey = {} - for i in range(len(vars)): - self.curkey[vars[i]] = args[i] - self.curkey['uids'] = [] - if self.curkey['uid']: - self.curkey['uids'].append(self.curkey['uid']) - del self.curkey['uid'] - self.curkey['subkeys'] = [] - self.append(self.curkey) + self.curkey = curkey = self.get_fields(args) + if curkey['uid']: + curkey['uids'].append(curkey['uid']) + del curkey['uid'] + curkey['subkeys'] = [] + self.append(curkey) pub = sec = key @@ -343,18 +458,34 @@ class ListKeys(list): self.curkey['fingerprint'] = args[9] self.fingerprints.append(args[9]) - def uid(self, args): - self.curkey['uids'].append(args[9]) - self.uids.append(args[9]) - def sub(self, args): subkey = [args[4], args[11]] self.curkey['subkeys'].append(subkey) - def handle_status(self, key, value): - pass -class Crypt(Verify): +class ScanKeys(ListKeys): + ''' Handle status messages for --with-fingerprint.''' + + def sub(self, args): + # --with-fingerprint --with-colons somehow outputs fewer colons, + # use the last value args[-1] instead of args[11] + subkey = [args[4], args[-1]] + self.curkey['subkeys'].append(subkey) + +class TextHandler(object): + def _as_text(self): + return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) + + if _py3k: + __str__ = _as_text + else: + __unicode__ = _as_text + + def __str__(self): + return self.data + + +class Crypt(Verify, TextHandler): "Handle status messages for --encrypt and --decrypt" def __init__(self, gpg): Verify.__init__(self, gpg) @@ -368,19 +499,16 @@ class Crypt(Verify): __bool__ = __nonzero__ - def __str__(self): - return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) - def handle_status(self, key, value): if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION", - "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", - "CARDCTRL"): + "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", "PROGRESS", + "CARDCTRL", "BADMDC", "SC_OP_FAILURE", "SC_OP_SUCCESS"): # in the case of ERROR, this is because a more specific error # message will have come first pass elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", "MISSING_PASSPHRASE", "DECRYPTION_FAILED", - "KEY_NOT_CREATED"): + "KEY_NOT_CREATED", "NEED_PASSPHRASE_PIN"): self.status = key.replace("_", " ").lower() elif key == "NEED_PASSPHRASE_SYM": self.status = 'need symmetric passphrase' @@ -441,7 +569,7 @@ class DeleteResult(object): problem_reason = { '1': 'No such key', '2': 'Must delete secret key first', - '3': 'Ambigious specification', + '3': 'Ambiguous specification', } def handle_status(self, key, value): @@ -451,11 +579,18 @@ class DeleteResult(object): else: raise ValueError("Unknown status message: %r" % key) -class Sign(object): + def __nonzero__(self): + return self.status == 'ok' + + __bool__ = __nonzero__ + + +class Sign(TextHandler): "Handle status messages for --sign" def __init__(self, gpg): self.gpg = gpg self.type = None + self.hash_algo = None self.fingerprint = None def __nonzero__(self): @@ -463,21 +598,26 @@ class Sign(object): __bool__ = __nonzero__ - def __str__(self): - return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) - def handle_status(self, key, value): if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", - "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR"): + "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR", + "NO_SGNR", "MISSING_PASSPHRASE", "NEED_PASSPHRASE_PIN", + "SC_OP_FAILURE", "SC_OP_SUCCESS", "PROGRESS"): pass + elif key in ("KEYEXPIRED", "SIGEXPIRED"): + self.status = 'key expired' + elif key == "KEYREVOKED": + self.status = 'key revoked' elif key == "SIG_CREATED": (self.type, - algo, hashalgo, cls, + algo, self.hash_algo, cls, self.timestamp, self.fingerprint ) = value.split() else: raise ValueError("Unknown status message: %r" % key) +VERSION_RE = re.compile(r'gpg \(GnuPG\) (\d+(\.\d+)*)'.encode('ascii'), re.I) +HEX_DIGITS_RE = re.compile(r'[0-9a-f]+$', re.I) class GPG(object): @@ -488,35 +628,54 @@ class GPG(object): 'delete': DeleteResult, 'generate': GenKey, 'import': ImportResult, + 'send': SendResult, 'list': ListKeys, + 'scan': ScanKeys, + 'search': SearchKeys, 'sign': Sign, 'verify': Verify, } "Encapsulate access to the gpg executable" def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False, - use_agent=False, keyring=None, options=None): + use_agent=False, keyring=None, options=None, + secret_keyring=None): """Initialize a GPG process wrapper. Options are: gpgbinary -- full pathname for GPG binary. gnupghome -- full pathname to where we can find the public and private keyrings. Default is whatever gpg defaults to. - keyring -- name of alternative keyring file to use. If specified, - the default keyring is not used. + keyring -- name of alternative keyring file to use, or list of such + keyrings. If specified, the default keyring is not used. options =-- a list of additional options to pass to the GPG binary. + secret_keyring -- name of alternative secret keyring file to use, or + list of such keyrings. """ self.gpgbinary = gpgbinary self.gnupghome = gnupghome + if keyring: + # Allow passing a string or another iterable. Make it uniformly + # a list of keyring filenames + if isinstance(keyring, string_types): + keyring = [keyring] self.keyring = keyring + if secret_keyring: + # Allow passing a string or another iterable. Make it uniformly + # a list of keyring filenames + if isinstance(secret_keyring, string_types): + secret_keyring = [secret_keyring] + self.secret_keyring = secret_keyring self.verbose = verbose self.use_agent = use_agent if isinstance(options, str): options = [options] self.options = options - self.encoding = locale.getpreferredencoding() - if self.encoding is None: # This happens on Jython! - self.encoding = sys.stdin.encoding + # Changed in 0.3.7 to use Latin-1 encoding rather than + # locale.getpreferredencoding falling back to sys.stdin.encoding + # falling back to utf-8, because gpg itself uses latin-1 as the default + # encoding. + self.encoding = 'latin-1' if gnupghome and not os.path.isdir(self.gnupghome): os.makedirs(self.gnupghome,0x1C0) p = self._open_subprocess(["--version"]) @@ -525,6 +684,12 @@ class GPG(object): if p.returncode != 0: raise ValueError("Error invoking gpg: %s: %s" % (p.returncode, result.stderr)) + m = VERSION_RE.match(result.data) + if not m: + self.version = None + else: + dot = '.'.encode('ascii') + self.version = tuple([int(s) for s in m.groups()[0].split(dot)]) def make_args(self, args, passphrase): """ @@ -532,13 +697,18 @@ class GPG(object): will be appended. The ``passphrase`` argument needs to be True if a passphrase will be sent to GPG, else False. """ - cmd = [self.gpgbinary, '--status-fd 2 --no-tty'] + cmd = [self.gpgbinary, '--status-fd', '2', '--no-tty'] if self.gnupghome: - cmd.append('--homedir "%s" ' % self.gnupghome) + cmd.extend(['--homedir', no_quote(self.gnupghome)]) if self.keyring: - cmd.append('--no-default-keyring --keyring "%s" ' % self.keyring) + cmd.append('--no-default-keyring') + for fn in self.keyring: + cmd.extend(['--keyring', no_quote(fn)]) + if self.secret_keyring: + for fn in self.secret_keyring: + cmd.extend(['--secret-keyring', no_quote(fn)]) if passphrase: - cmd.append('--batch --passphrase-fd 0') + cmd.extend(['--batch', '--passphrase-fd', '0']) if self.use_agent: cmd.append('--use-agent') if self.options: @@ -549,11 +719,12 @@ class GPG(object): def _open_subprocess(self, args, passphrase=False): # Internal method: open a pipe to a GPG subprocess and return # the file objects for communicating with it. - cmd = ' '.join(self.make_args(args, passphrase)) + cmd = self.make_args(args, passphrase) if self.verbose: - print(cmd) + pcmd = ' '.join(cmd) + print(pcmd) logger.debug("%s", cmd) - return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) + return Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE) def _read_response(self, stream, result): # Internal method: reads all the stderr output from GPG, taking notice @@ -561,31 +732,27 @@ class GPG(object): # # Calls methods on the response object for each valid token found, # with the arg being the remainder of the status line. - try: - lines = [] - while True: - line = stream.readline() - if len(line) == 0: - break - lines.append(line) - line = line.rstrip() - if self.verbose: - print(line) - logger.debug("%s", line) - if line[0:9] == '[GNUPG:] ': - # Chop off the prefix - line = line[9:] - L = line.split(None, 1) - keyword = L[0] - if len(L) > 1: - value = L[1] - else: - value = "" - result.handle_status(keyword, value) - result.stderr = ''.join(lines) - except: - import traceback - logger.error('Error in the GPG plugin:\n%s', traceback.format_exc()) + lines = [] + while True: + line = stream.readline() + if len(line) == 0: + break + lines.append(line) + line = line.rstrip() + if self.verbose: + print(line) + logger.debug("%s", line) + if line[0:9] == '[GNUPG:] ': + # Chop off the prefix + line = line[9:] + L = line.split(None, 1) + keyword = L[0] + if len(L) > 1: + value = L[1] + else: + value = "" + result.handle_status(keyword, value) + result.stderr = ''.join(lines) def _read_data(self, stream, result): # Read the contents of the file from GPG's stdout @@ -634,7 +801,7 @@ class GPG(object): stderr.close() stdout.close() - def _handle_io(self, args, file, result, passphrase=None, binary=False): + def _handle_io(self, args, fileobj, result, passphrase=None, binary=False): "Handle a call to GPG - pass input data, collect output data" # Handle a basic data call - pass data to GPG, handle the output # including status information. Garbage In, Garbage Out :) @@ -645,7 +812,7 @@ class GPG(object): stdin = p.stdin if passphrase: _write_passphrase(stdin, passphrase, self.encoding) - writer = _threaded_copy_data(file, stdin) + writer = _threaded_copy_data(fileobj, stdin) self._collect_output(p, result, writer, stdin) return result @@ -659,8 +826,15 @@ class GPG(object): f.close() return result + def set_output_without_confirmation(self, args, output): + "If writing to a file which exists, avoid a confirmation message." + if os.path.exists(output): + # We need to avoid an overwrite confirmation message + args.extend(['--batch', '--yes']) + args.extend(['--output', output]) + def sign_file(self, file, keyid=None, passphrase=None, clearsign=True, - detach=False, binary=False): + detach=False, binary=False, output=None): """sign file""" logger.debug("sign_file: %s", file) if binary: @@ -674,7 +848,10 @@ class GPG(object): elif clearsign: args.append("--clearsign") if keyid: - args.append('--default-key "%s"' % keyid) + args.extend(['--default-key', no_quote(keyid)]) + if output: # write the output to a file with the specified name + self.set_output_without_confirmation(args, output) + result = self.result_map['sign'](self) #We could use _handle_io here except for the fact that if the #passphrase is bad, gpg bails and you can't write the message. @@ -726,8 +903,8 @@ class GPG(object): logger.debug('Wrote to temp file: %r', s) os.write(fd, s) os.close(fd) - args.append(fn) - args.append('"%s"' % data_filename) + args.append(no_quote(fn)) + args.append(no_quote(data_filename)) try: p = self._open_subprocess(args) self._collect_output(p, result, stdin=p.stdin) @@ -735,6 +912,15 @@ class GPG(object): os.unlink(fn) return result + def verify_data(self, sig_filename, data): + "Verify the signature in sig_filename against data in memory" + logger.debug('verify_data: %r, %r ...', sig_filename, data[:16]) + result = self.result_map['verify'](self) + args = ['--verify', no_quote(sig_filename), '-'] + stream = _make_memory_stream(data) + self._handle_io(args, stream, result, binary=True) + return result + # # KEY MANAGEMENT # @@ -798,7 +984,8 @@ class GPG(object): >>> import shutil >>> shutil.rmtree("keys") >>> gpg = GPG(gnupghome="keys") - >>> result = gpg.recv_keys('pgp.mit.edu', '3FF0DB166A7476EA') + >>> os.chmod('keys', 0x1C0) + >>> result = gpg.recv_keys('keyserver.ubuntu.com', '92905378') >>> assert result """ @@ -806,33 +993,60 @@ class GPG(object): logger.debug('recv_keys: %r', keyids) data = _make_binary_stream("", self.encoding) #data = "" - args = ['--keyserver', keyserver, '--recv-keys'] - args.extend(keyids) + args = ['--keyserver', no_quote(keyserver), '--recv-keys'] + args.extend([no_quote(k) for k in keyids]) self._handle_io(args, data, result, binary=True) logger.debug('recv_keys result: %r', result.__dict__) data.close() return result + def send_keys(self, keyserver, *keyids): + """Send a key to a keyserver. + + Note: it's not practical to test this function without sending + arbitrary data to live keyservers. + """ + result = self.result_map['send'](self) + logger.debug('send_keys: %r', keyids) + data = _make_binary_stream('', self.encoding) + #data = "" + args = ['--keyserver', no_quote(keyserver), '--send-keys'] + args.extend([no_quote(k) for k in keyids]) + self._handle_io(args, data, result, binary=True) + logger.debug('send_keys result: %r', result.__dict__) + data.close() + return result + def delete_keys(self, fingerprints, secret=False): which='key' if secret: which='secret-key' if _is_sequence(fingerprints): - fingerprints = ' '.join(fingerprints) - args = ['--batch --delete-%s "%s"' % (which, fingerprints)] + fingerprints = [no_quote(s) for s in fingerprints] + else: + fingerprints = [no_quote(fingerprints)] + args = ['--batch', '--delete-%s' % which] + args.extend(fingerprints) result = self.result_map['delete'](self) p = self._open_subprocess(args) self._collect_output(p, result, stdin=p.stdin) return result - def export_keys(self, keyids, secret=False): + def export_keys(self, keyids, secret=False, armor=True, minimal=False): "export the indicated keys. 'keyid' is anything gpg accepts" which='' if secret: which='-secret-key' if _is_sequence(keyids): - keyids = ' '.join(['"%s"' % k for k in keyids]) - args = ["--armor --export%s %s" % (which, keyids)] + keyids = [no_quote(k) for k in keyids] + else: + keyids = [no_quote(keyids)] + args = ['--export%s' % which] + if armor: + args.append('--armor') + if minimal: + args.extend(['--export-options','export-minimal']) + args.extend(keyids) p = self._open_subprocess(args) # gpg --export produces no status-fd output; stdout will be # empty in case of failure @@ -842,6 +1056,27 @@ class GPG(object): logger.debug('export_keys result: %r', result.data) return result.data.decode(self.encoding, self.decode_errors) + def _get_list_output(self, p, kind): + # Get the response information + result = self.result_map[kind](self) + self._collect_output(p, result, stdin=p.stdin) + lines = result.data.decode(self.encoding, + self.decode_errors).splitlines() + valid_keywords = 'pub uid sec fpr sub'.split() + for line in lines: + if self.verbose: + print(line) + logger.debug("line: %r", line.rstrip()) + if not line: + break + L = line.strip().split(':') + if not L: + continue + keyword = L[0] + if keyword in valid_keywords: + getattr(result, keyword)(L) + return result + def list_keys(self, secret=False): """ list the keys currently in the keyring @@ -862,25 +1097,58 @@ class GPG(object): which='keys' if secret: which='secret-keys' - args = "--list-%s --fixed-list-mode --fingerprint --with-colons" % (which,) - args = [args] + args = ["--list-%s" % which, "--fixed-list-mode", "--fingerprint", + "--with-colons"] p = self._open_subprocess(args) + return self._get_list_output(p, 'list') - # there might be some status thingumy here I should handle... (amk) - # ...nope, unless you care about expired sigs or keys (stevegt) + def scan_keys(self, filename): + """ + List details of an ascii armored or binary key file + without first importing it to the local keyring. + + The function achieves this by running: + $ gpg --with-fingerprint --with-colons filename + """ + args = ['--with-fingerprint', '--with-colons'] + args.append(no_quote(filename)) + p = self._open_subprocess(args) + return self._get_list_output(p, 'scan') + + def search_keys(self, query, keyserver='pgp.mit.edu'): + """ search keyserver by query (using --search-keys option) + + >>> import shutil + >>> shutil.rmtree('keys') + >>> gpg = GPG(gnupghome='keys') + >>> os.chmod('keys', 0x1C0) + >>> result = gpg.search_keys('') + >>> assert result, 'Failed using default keyserver' + >>> keyserver = 'keyserver.ubuntu.com' + >>> result = gpg.search_keys('', keyserver) + >>> assert result, 'Failed using keyserver.ubuntu.com' + + """ + query = query.strip() + if HEX_DIGITS_RE.match(query): + query = '0x' + query + args = ['--fixed-list-mode', '--fingerprint', '--with-colons', + '--keyserver', no_quote(keyserver), '--search-keys', + no_quote(query)] + p = self._open_subprocess(args) # Get the response information - result = self.result_map['list'](self) + result = self.result_map['search'](self) self._collect_output(p, result, stdin=p.stdin) lines = result.data.decode(self.encoding, self.decode_errors).splitlines() - valid_keywords = 'pub uid sec fpr sub'.split() + valid_keywords = ['pub', 'uid'] for line in lines: if self.verbose: print(line) - logger.debug("line: %r", line.rstrip()) - if not line: - break + logger.debug('line: %r', line.rstrip()) + if not line: # sometimes get blank lines on Windows + continue L = line.strip().split(':') if not L: continue @@ -901,7 +1169,7 @@ class GPG(object): >>> assert not result """ - args = ["--gen-key --batch"] + args = ["--gen-key", "--batch"] result = self.result_map['generate'](self) f = _make_binary_stream(input, self.encoding) self._handle_io(args, f, result, binary=True) @@ -915,11 +1183,11 @@ class GPG(object): parms = {} for key, val in list(kwargs.items()): key = key.replace('_','-').title() - parms[key] = val + if str(val).strip(): # skip empty strings + parms[key] = val parms.setdefault('Key-Type','RSA') - parms.setdefault('Key-Length',1024) + parms.setdefault('Key-Length',2048) parms.setdefault('Name-Real', "Autogenerated Key") - parms.setdefault('Name-Comment', "Generated by gnupg.py") try: logname = os.environ['LOGNAME'] except KeyError: @@ -964,23 +1232,30 @@ class GPG(object): "Encrypt the message read from the file-like object 'file'" args = ['--encrypt'] if symmetric: + # can't be False or None - could be True or a cipher algo value + # such as AES256 args = ['--symmetric'] + if symmetric is not True: + args.extend(['--cipher-algo', no_quote(symmetric)]) + # else use the default, currently CAST5 else: - args = ['--encrypt'] + if not recipients: + raise ValueError('No recipients specified with asymmetric ' + 'encryption') if not _is_sequence(recipients): recipients = (recipients,) for recipient in recipients: - args.append('--recipient "%s"' % recipient) - if armor: # create ascii-armored output - set to False for binary output + args.extend(['--recipient', no_quote(recipient)]) + if armor: # create ascii-armored output - False for binary output args.append('--armor') if output: # write the output to a file with the specified name - if os.path.exists(output): - os.remove(output) # to avoid overwrite confirmation message - args.append('--output "%s"' % output) - if sign: - args.append('--sign --default-key "%s"' % sign) + self.set_output_without_confirmation(args, output) + if sign is True: + args.append('--sign') + elif sign: + args.extend(['--sign', '--default-key', no_quote(sign)]) if always_trust: - args.append("--always-trust") + args.append('--always-trust') result = self.result_map['crypt'](self) self._handle_io(args, file, result, passphrase=passphrase, binary=True) logger.debug('encrypt result: %r', result.data) @@ -1008,9 +1283,6 @@ class GPG(object): 'hello' >>> result = gpg.encrypt("hello again",print1) >>> message = str(result) - >>> result = gpg.decrypt(message) - >>> result.status == 'need passphrase' - True >>> result = gpg.decrypt(message,passphrase='bar') >>> result.status in ('decryption failed', 'bad passphrase') True @@ -1020,9 +1292,6 @@ class GPG(object): True >>> str(result) 'hello again' - >>> result = gpg.encrypt("signed hello",print2,sign=print1) - >>> result.status == 'need passphrase' - True >>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo') >>> result.status == 'encryption ok' True @@ -1048,13 +1317,10 @@ class GPG(object): output=None): args = ["--decrypt"] if output: # write the output to a file with the specified name - if os.path.exists(output): - os.remove(output) # to avoid overwrite confirmation message - args.append('--output "%s"' % output) + self.set_output_without_confirmation(args, output) if always_trust: args.append("--always-trust") result = self.result_map['crypt'](self) self._handle_io(args, file, result, passphrase, binary=True) logger.debug('decrypt result: %r', result.data) return result - -- cgit v1.2.3