diff options
author | mathieui <mathieui@mathieui.net> | 2013-01-05 18:12:55 +0100 |
---|---|---|
committer | mathieui <mathieui@mathieui.net> | 2013-01-05 18:12:55 +0100 |
commit | b04d5c5f53f39457f60d38cb388c3e8f3edbd7de (patch) | |
tree | c3d1b8541d0a8adc75935e408673e16a42752a5a /plugins/gpg | |
parent | 4873bab7405f94fe0bb984a495d86bb5d83c78d7 (diff) | |
download | poezio-b04d5c5f53f39457f60d38cb388c3e8f3edbd7de.tar.gz poezio-b04d5c5f53f39457f60d38cb388c3e8f3edbd7de.tar.bz2 poezio-b04d5c5f53f39457f60d38cb388c3e8f3edbd7de.tar.xz poezio-b04d5c5f53f39457f60d38cb388c3e8f3edbd7de.zip |
Update the gnupg wrapper - Fix #2162
Diffstat (limited to 'plugins/gpg')
-rw-r--r-- | plugins/gpg/gnupg.py | 857 |
1 files changed, 469 insertions, 388 deletions
diff --git a/plugins/gpg/gnupg.py b/plugins/gpg/gnupg.py index e9e45711..9109451a 100644 --- a/plugins/gpg/gnupg.py +++ b/plugins/gpg/gnupg.py @@ -27,25 +27,22 @@ Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork() and so does not work on Windows). Renamed to gnupg.py to avoid confusion with the previous versions. -Modifications Copyright (C) 2008-2011 Vinay Sajip. All rights reserved. +Modifications Copyright (C) 2008-2012 Vinay Sajip. All rights reserved. A unittest harness (test_gnupg.py) has also been added. """ import locale +__version__ = "0.3.1" __author__ = "Vinay Sajip" -__date__ = "$25-Jan-2011 11:40:48$" +__date__ = "$01-Sep-2012 20:02:51$" try: from io import StringIO - from io import TextIOWrapper - from io import BufferedReader - from io import BufferedWriter except ImportError: from cStringIO import StringIO - class BufferedReader: pass - class BufferedWriter: pass +import codecs import locale import logging import os @@ -110,34 +107,11 @@ def _write_passphrase(stream, passphrase, encoding): passphrase = '%s\n' % passphrase passphrase = passphrase.encode(encoding) stream.write(passphrase) - logger.debug("Passphrase written") + logger.debug("Wrote passphrase: %r", passphrase) def _is_sequence(instance): return isinstance(instance,list) or isinstance(instance,tuple) -def _wrap_input(inp): - if isinstance(inp, BufferedWriter): - oldinp = inp - inp = TextIOWrapper(inp) - logger.debug('wrapped input: %r -> %r', oldinp, inp) - return inp - -def _wrap_output(outp): - if isinstance(outp, BufferedReader): - oldoutp = outp - outp = TextIOWrapper(outp) - logger.debug('wrapped output: %r -> %r', oldoutp, outp) - return outp - -#The following is needed for Python2.7 :-( -def _make_file(s): - try: - rv = StringIO(s) - except (TypeError, UnicodeError): - from io import BytesIO - rv = BytesIO(s) - return rv - def _make_binary_stream(s, encoding): try: if _py3k: @@ -152,51 +126,437 @@ def _make_binary_stream(s, encoding): rv = StringIO(s) return rv +class Verify(object): + "Handle status messages for --verify" + + TRUST_UNDEFINED = 0 + TRUST_NEVER = 1 + TRUST_MARGINAL = 2 + TRUST_FULLY = 3 + TRUST_ULTIMATE = 4 + + TRUST_LEVELS = { + "TRUST_UNDEFINED" : TRUST_UNDEFINED, + "TRUST_NEVER" : TRUST_NEVER, + "TRUST_MARGINAL" : TRUST_MARGINAL, + "TRUST_FULLY" : TRUST_FULLY, + "TRUST_ULTIMATE" : TRUST_ULTIMATE, + } + + def __init__(self, gpg): + self.gpg = gpg + self.valid = False + self.fingerprint = self.creation_date = self.timestamp = None + self.signature_id = self.key_id = None + self.username = None + self.status = None + self.pubkey_fingerprint = None + self.expire_timestamp = None + self.sig_timestamp = None + self.trust_text = None + self.trust_level = None + + def __nonzero__(self): + return self.valid + + __bool__ = __nonzero__ + + def handle_status(self, key, value): + if key in self.TRUST_LEVELS: + self.trust_text = key + self.trust_level = self.TRUST_LEVELS[key] + elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT", + "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO", + "DECRYPTION_OKAY"): + pass + elif key == "BADSIG": + self.valid = False + self.status = 'signature bad' + self.key_id, self.username = value.split(None, 1) + elif key == "GOODSIG": + self.valid = True + self.status = 'signature good' + self.key_id, self.username = value.split(None, 1) + elif key == "VALIDSIG": + (self.fingerprint, + self.creation_date, + self.sig_timestamp, + self.expire_timestamp) = value.split()[:4] + # may be different if signature is made with a subkey + self.pubkey_fingerprint = value.split()[-1] + self.status = 'signature valid' + elif key == "SIG_ID": + (self.signature_id, + self.creation_date, self.timestamp) = value.split() + elif key == "ERRSIG": + self.valid = False + (self.key_id, + algo, hash_algo, + cls, + self.timestamp) = value.split()[:5] + self.status = 'signature error' + elif key == "DECRYPTION_FAILED": + self.valid = False + self.key_id = value + self.status = 'decryption failed' + elif key == "NO_PUBKEY": + self.valid = False + self.key_id = value + self.status = 'no public key' + elif key in ("KEYEXPIRED", "SIGEXPIRED"): + # these are useless in verify, since they are spit out for any + # pub/subkeys on the key, not just the one doing the signing. + # if we want to check for signatures with expired key, + # the relevant flag is EXPKEYSIG. + pass + elif key in ("EXPKEYSIG", "REVKEYSIG"): + # signed with expired or revoked key + self.valid = False + self.key_id = value.split()[0] + self.status = (('%s %s') % (key[:3], key[3:])).lower() + else: + raise ValueError("Unknown status message: %r" % key) + +class ImportResult(object): + "Handle status messages for --import" + + counts = '''count no_user_id imported imported_rsa unchanged + n_uids n_subk n_sigs n_revoc sec_read sec_imported + sec_dups not_imported'''.split() + def __init__(self, gpg): + self.gpg = gpg + self.imported = [] + self.results = [] + self.fingerprints = [] + for result in self.counts: + setattr(self, result, None) + + def __nonzero__(self): + if self.not_imported: return False + if not self.fingerprints: return False + return True + + __bool__ = __nonzero__ + + ok_reason = { + '0': 'Not actually changed', + '1': 'Entirely new key', + '2': 'New user IDs', + '4': 'New signatures', + '8': 'New subkeys', + '16': 'Contains private key', + } + + problem_reason = { + '0': 'No specific reason given', + '1': 'Invalid Certificate', + '2': 'Issuer Certificate missing', + '3': 'Certificate Chain too long', + '4': 'Error storing certificate', + } + + def handle_status(self, key, value): + if key == "IMPORTED": + # this duplicates info we already see in import_ok & import_problem + pass + elif key == "NODATA": + self.results.append({'fingerprint': None, + 'problem': '0', 'text': 'No valid data found'}) + elif key == "IMPORT_OK": + reason, fingerprint = value.split() + reasons = [] + for code, text in list(self.ok_reason.items()): + if int(reason) | int(code) == int(reason): + reasons.append(text) + reasontext = '\n'.join(reasons) + "\n" + self.results.append({'fingerprint': fingerprint, + 'ok': reason, 'text': reasontext}) + self.fingerprints.append(fingerprint) + elif key == "IMPORT_PROBLEM": + try: + reason, fingerprint = value.split() + except: + reason = value + fingerprint = '<unknown>' + self.results.append({'fingerprint': fingerprint, + 'problem': reason, 'text': self.problem_reason[reason]}) + elif key == "IMPORT_RES": + import_res = value.split() + for i in range(len(self.counts)): + setattr(self, self.counts[i], int(import_res[i])) + elif key == "KEYEXPIRED": + self.results.append({'fingerprint': None, + 'problem': '0', 'text': 'Key expired'}) + elif key == "SIGEXPIRED": + self.results.append({'fingerprint': None, + 'problem': '0', 'text': 'Signature expired'}) + else: + raise ValueError("Unknown status message: %r" % key) + + def summary(self): + l = [] + l.append('%d imported' % self.imported) + if self.not_imported: + l.append('%d not imported' % self.not_imported) + return ', '.join(l) + +class ListKeys(list): + ''' Handle status messages for --list-keys. + + Handle pub and uid (relating the latter to the former). + + Don't care about (info from src/DETAILS): + + crt = X.509 certificate + crs = X.509 certificate and private key available + ssb = secret subkey (secondary key) + uat = user attribute (same as user id except for field 10). + sig = signature + rev = revocation signature + pkd = public key data (special field format, see below) + grp = reserved for gpgsm + rvk = revocation key + ''' + def __init__(self, gpg): + self.gpg = gpg + self.curkey = None + self.fingerprints = [] + self.uids = [] + + def key(self, args): + vars = (""" + type trust length algo keyid date expires dummy ownertrust uid + """).split() + self.curkey = {} + for i in range(len(vars)): + self.curkey[vars[i]] = args[i] + self.curkey['uids'] = [] + if self.curkey['uid']: + self.curkey['uids'].append(self.curkey['uid']) + del self.curkey['uid'] + self.curkey['subkeys'] = [] + self.append(self.curkey) + + pub = sec = key + + def fpr(self, args): + self.curkey['fingerprint'] = args[9] + self.fingerprints.append(args[9]) + + def uid(self, args): + self.curkey['uids'].append(args[9]) + self.uids.append(args[9]) + + def sub(self, args): + subkey = [args[4], args[11]] + self.curkey['subkeys'].append(subkey) + + def handle_status(self, key, value): + pass + +class Crypt(Verify): + "Handle status messages for --encrypt and --decrypt" + def __init__(self, gpg): + Verify.__init__(self, gpg) + self.data = '' + self.ok = False + self.status = '' + + def __nonzero__(self): + if self.ok: return True + return False + + __bool__ = __nonzero__ + + def __str__(self): + return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) + + def handle_status(self, key, value): + if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION", + "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", + "CARDCTRL"): + # in the case of ERROR, this is because a more specific error + # message will have come first + pass + elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", + "MISSING_PASSPHRASE", "DECRYPTION_FAILED", + "KEY_NOT_CREATED"): + self.status = key.replace("_", " ").lower() + elif key == "NEED_PASSPHRASE_SYM": + self.status = 'need symmetric passphrase' + elif key == "BEGIN_DECRYPTION": + self.status = 'decryption incomplete' + elif key == "BEGIN_ENCRYPTION": + self.status = 'encryption incomplete' + elif key == "DECRYPTION_OKAY": + self.status = 'decryption ok' + self.ok = True + elif key == "END_ENCRYPTION": + self.status = 'encryption ok' + self.ok = True + elif key == "INV_RECP": + self.status = 'invalid recipient' + elif key == "KEYEXPIRED": + self.status = 'key expired' + elif key == "SIG_CREATED": + self.status = 'sig created' + elif key == "SIGEXPIRED": + self.status = 'sig expired' + else: + Verify.handle_status(self, key, value) + +class GenKey(object): + "Handle status messages for --gen-key" + def __init__(self, gpg): + self.gpg = gpg + self.type = None + self.fingerprint = None + + def __nonzero__(self): + if self.fingerprint: return True + return False + + __bool__ = __nonzero__ + + def __str__(self): + return self.fingerprint or '' + + def handle_status(self, key, value): + if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA", "KEY_NOT_CREATED"): + pass + elif key == "KEY_CREATED": + (self.type,self.fingerprint) = value.split() + else: + raise ValueError("Unknown status message: %r" % key) + +class DeleteResult(object): + "Handle status messages for --delete-key and --delete-secret-key" + def __init__(self, gpg): + self.gpg = gpg + self.status = 'ok' + + def __str__(self): + return self.status + + problem_reason = { + '1': 'No such key', + '2': 'Must delete secret key first', + '3': 'Ambigious specification', + } + + def handle_status(self, key, value): + if key == "DELETE_PROBLEM": + self.status = self.problem_reason.get(value, + "Unknown error: %r" % value) + else: + raise ValueError("Unknown status message: %r" % key) + +class Sign(object): + "Handle status messages for --sign" + def __init__(self, gpg): + self.gpg = gpg + self.type = None + self.fingerprint = None + + def __nonzero__(self): + return self.fingerprint is not None + + __bool__ = __nonzero__ + + def __str__(self): + return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) + + def handle_status(self, key, value): + if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", + "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR"): + pass + elif key == "SIG_CREATED": + (self.type, + algo, hashalgo, cls, + self.timestamp, self.fingerprint + ) = value.split() + else: + raise ValueError("Unknown status message: %r" % key) + + class GPG(object): + + decode_errors = 'strict' + + result_map = { + 'crypt': Crypt, + 'delete': DeleteResult, + 'generate': GenKey, + 'import': ImportResult, + 'list': ListKeys, + 'sign': Sign, + 'verify': Verify, + } + "Encapsulate access to the gpg executable" - def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False, use_agent=False): + def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False, + use_agent=False, keyring=None, options=None): """Initialize a GPG process wrapper. Options are: gpgbinary -- full pathname for GPG binary. gnupghome -- full pathname to where we can find the public and private keyrings. Default is whatever gpg defaults to. + keyring -- name of alternative keyring file to use. If specified, + the default keyring is not used. + options =-- a list of additional options to pass to the GPG binary. """ self.gpgbinary = gpgbinary self.gnupghome = gnupghome + self.keyring = keyring self.verbose = verbose self.use_agent = use_agent + if isinstance(options, str): + options = [options] + self.options = options self.encoding = locale.getpreferredencoding() if self.encoding is None: # This happens on Jython! self.encoding = sys.stdin.encoding if gnupghome and not os.path.isdir(self.gnupghome): os.makedirs(self.gnupghome,0x1C0) p = self._open_subprocess(["--version"]) - result = Verify() # any result will do for this + result = self.result_map['verify'](self) # any result will do for this self._collect_output(p, result, stdin=p.stdin) if p.returncode != 0: raise ValueError("Error invoking gpg: %s: %s" % (p.returncode, result.stderr)) - def _open_subprocess(self, args, passphrase=False): - # Internal method: open a pipe to a GPG subprocess and return - # the file objects for communicating with it. + def make_args(self, args, passphrase): + """ + Make a list of command line elements for GPG. The value of ``args`` + will be appended. The ``passphrase`` argument needs to be True if + a passphrase will be sent to GPG, else False. + """ cmd = [self.gpgbinary, '--status-fd 2 --no-tty'] if self.gnupghome: cmd.append('--homedir "%s" ' % self.gnupghome) + if self.keyring: + cmd.append('--no-default-keyring --keyring "%s" ' % self.keyring) if passphrase: cmd.append('--batch --passphrase-fd 0') if self.use_agent: cmd.append('--use-agent') + if self.options: + cmd.extend(self.options) cmd.extend(args) - cmd = ' '.join(cmd) + return cmd + + def _open_subprocess(self, args, passphrase=False): + # Internal method: open a pipe to a GPG subprocess and return + # the file objects for communicating with it. + cmd = ' '.join(self.make_args(args, passphrase)) if self.verbose: print(cmd) logger.debug("%s", cmd) return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) def _read_response(self, stream, result): - # Internal method: reads all the output from GPG, taking notice + # Internal method: reads all the stderr output from GPG, taking notice # only of lines that begin with the magic [GNUPG:] prefix. # # Calls methods on the response object for each valid token found, @@ -204,14 +564,13 @@ class GPG(object): lines = [] while True: line = stream.readline() - if not isinstance(line, str): - line = line.decode('utf-8') + if len(line) == 0: + break lines.append(line) + line = line.rstrip() if self.verbose: print(line) - logger.debug("%s", line.rstrip()) - if line == "": break - line = line.rstrip() + logger.debug("%s", line) if line[0:9] == '[GNUPG:] ': # Chop off the prefix line = line[9:] @@ -246,13 +605,13 @@ class GPG(object): make sure it's joined before returning. If a stdin stream is given, close it before returning. """ - stderr = _wrap_output(process.stderr) + stderr = codecs.getreader(self.encoding)(process.stderr) rr = threading.Thread(target=self._read_response, args=(stderr, result)) rr.setDaemon(True) logger.debug('stderr reader: %r', rr) rr.start() - stdout = process.stdout # _wrap_output(process.stdout) + stdout = process.stdout dr = threading.Thread(target=self._read_data, args=(stdout, result)) dr.setDaemon(True) logger.debug('stdout reader: %r', dr) @@ -276,8 +635,8 @@ class GPG(object): # Handle a basic data call - pass data to GPG, handle the output # including status information. Garbage In, Garbage Out :) p = self._open_subprocess(args, passphrase is not None) - if not binary and not isinstance(file, BufferedReader): - stdin = _wrap_input(p.stdin) + if not binary: + stdin = codecs.getwriter(self.encoding)(p.stdin) else: stdin = p.stdin if passphrase: @@ -311,11 +670,10 @@ class GPG(object): elif clearsign: args.append("--clearsign") if keyid: - args.append("--default-key %s" % keyid) - result = Sign(self.encoding) + args.append('--default-key "%s"' % keyid) + result = self.result_map['sign'](self) #We could use _handle_io here except for the fact that if the #passphrase is bad, gpg bails and you can't write the message. - #self._handle_io(args, _make_file(message), result, passphrase=passphrase) p = self._open_subprocess(args, passphrase is not None) try: stdin = p.stdin @@ -351,7 +709,7 @@ class GPG(object): def verify_file(self, file, data_filename=None): "Verify the signature on the contents of the file-like object 'file'" logger.debug('verify_file: %r, %r', file, data_filename) - result = Verify() + result = self.result_map['verify'](self) args = ['--verify'] if data_filename is None: self._handle_io(args, file, result, binary=True) @@ -365,7 +723,7 @@ class GPG(object): os.write(fd, s) os.close(fd) args.append(fn) - args.append(data_filename) + args.append('"%s"' % data_filename) try: p = self._open_subprocess(args) self._collect_output(p, result, stdin=p.stdin) @@ -422,7 +780,7 @@ class GPG(object): >>> assert print2 in pubkeys.fingerprints """ - result = ImportResult() + result = self.result_map['import'](self) logger.debug('import_keys: %r', key_data[:256]) data = _make_binary_stream(key_data, self.encoding) self._handle_io(['--import'], data, result, binary=True) @@ -430,14 +788,35 @@ class GPG(object): data.close() return result + def recv_keys(self, keyserver, *keyids): + """Import a key from a keyserver + + >>> import shutil + >>> shutil.rmtree("keys") + >>> gpg = GPG(gnupghome="keys") + >>> result = gpg.recv_keys('pgp.mit.edu', '3FF0DB166A7476EA') + >>> assert result + + """ + result = self.result_map['import'](self) + logger.debug('recv_keys: %r', keyids) + data = _make_binary_stream("", self.encoding) + #data = "" + args = ['--keyserver', keyserver, '--recv-keys'] + args.extend(keyids) + self._handle_io(args, data, result, binary=True) + logger.debug('recv_keys result: %r', result.__dict__) + data.close() + return result + def delete_keys(self, fingerprints, secret=False): which='key' if secret: which='secret-key' if _is_sequence(fingerprints): fingerprints = ' '.join(fingerprints) - args = ["--batch --delete-%s %s" % (which, fingerprints)] - result = DeleteResult() + args = ['--batch --delete-%s "%s"' % (which, fingerprints)] + result = self.result_map['delete'](self) p = self._open_subprocess(args) self._collect_output(p, result, stdin=p.stdin) return result @@ -448,16 +827,16 @@ class GPG(object): if secret: which='-secret-key' if _is_sequence(keyids): - keyids = ' '.join(keyids) + keyids = ' '.join(['"%s"' % k for k in keyids]) args = ["--armor --export%s %s" % (which, keyids)] p = self._open_subprocess(args) # gpg --export produces no status-fd output; stdout will be # empty in case of failure #stdout, stderr = p.communicate() - result = DeleteResult() # any result will do + result = self.result_map['delete'](self) # any result will do self._collect_output(p, result, stdin=p.stdin) logger.debug('export_keys result: %r', result.data) - return result.data.decode(self.encoding, 'replace') + return result.data.decode(self.encoding, self.decode_errors) def list_keys(self, secret=False): """ list the keys currently in the keyring @@ -487,9 +866,10 @@ class GPG(object): # ...nope, unless you care about expired sigs or keys (stevegt) # Get the response information - result = ListKeys() + result = self.result_map['list'](self) self._collect_output(p, result, stdin=p.stdin) - lines = result.data.decode(self.encoding, 'replace').splitlines() + lines = result.data.decode(self.encoding, + self.decode_errors).splitlines() valid_keywords = 'pub uid sec fpr sub'.split() for line in lines: if self.verbose: @@ -518,9 +898,9 @@ class GPG(object): """ args = ["--gen-key --batch"] - result = GenKey() - f = _make_file(input) - self._handle_io(args, f, result) + result = self.result_map['generate'](self) + f = _make_binary_stream(input, self.encoding) + self._handle_io(args, f, result, binary=True) f.close() return result @@ -576,24 +956,28 @@ class GPG(object): # def encrypt_file(self, file, recipients, sign=None, always_trust=False, passphrase=None, - armor=True, output=None): + armor=True, output=None, symmetric=False): "Encrypt the message read from the file-like object 'file'" args = ['--encrypt'] + if symmetric: + args = ['--symmetric'] + else: + args = ['--encrypt'] + if not _is_sequence(recipients): + recipients = (recipients,) + for recipient in recipients: + args.append('--recipient "%s"' % recipient) if armor: # create ascii-armored output - set to False for binary output args.append('--armor') if output: # write the output to a file with the specified name if os.path.exists(output): os.remove(output) # to avoid overwrite confirmation message - args.append('--output %s' % output) - if not _is_sequence(recipients): - recipients = (recipients,) - for recipient in recipients: - args.append('--recipient %s' % recipient) + args.append('--output "%s"' % output) if sign: - args.append("--sign --default-key %s" % sign) + args.append('--sign --default-key "%s"' % sign) if always_trust: args.append("--always-trust") - result = Crypt(self.encoding) + result = self.result_map['crypt'](self) self._handle_io(args, file, result, passphrase=passphrase, binary=True) logger.debug('encrypt result: %r', result.data) return result @@ -621,27 +1005,27 @@ class GPG(object): >>> result = gpg.encrypt("hello again",print1) >>> message = str(result) >>> result = gpg.decrypt(message) - >>> result.status - 'need passphrase' + >>> result.status == 'need passphrase' + True >>> result = gpg.decrypt(message,passphrase='bar') - >>> result.status - 'decryption failed' + >>> result.status in ('decryption failed', 'bad passphrase') + True >>> assert not result >>> result = gpg.decrypt(message,passphrase='foo') - >>> result.status - 'decryption ok' + >>> result.status == 'decryption ok' + True >>> str(result) 'hello again' >>> result = gpg.encrypt("signed hello",print2,sign=print1) - >>> result.status - 'need passphrase' + >>> result.status == 'need passphrase' + True >>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo') - >>> result.status - 'encryption ok' + >>> result.status == 'encryption ok' + True >>> message = str(result) >>> result = gpg.decrypt(message) - >>> result.status - 'decryption ok' + >>> result.status == 'decryption ok' + True >>> assert result.fingerprint == print1 """ @@ -662,314 +1046,11 @@ class GPG(object): if output: # write the output to a file with the specified name if os.path.exists(output): os.remove(output) # to avoid overwrite confirmation message - args.append('--output %s' % output) + args.append('--output "%s"' % output) if always_trust: args.append("--always-trust") - result = Crypt(self.encoding) + result = self.result_map['crypt'](self) self._handle_io(args, file, result, passphrase, binary=True) logger.debug('decrypt result: %r', result.data) return result -class Verify(object): - "Handle status messages for --verify" - - def __init__(self): - self.valid = False - self.fingerprint = self.creation_date = self.timestamp = None - self.signature_id = self.key_id = None - self.username = None - - def __nonzero__(self): - return self.valid - - __bool__ = __nonzero__ - - def handle_status(self, key, value): - if key in ("TRUST_UNDEFINED", "TRUST_NEVER", "TRUST_MARGINAL", - "TRUST_FULLY", "TRUST_ULTIMATE", "RSA_OR_IDEA"): - pass - elif key in ("PLAINTEXT", "PLAINTEXT_LENGTH"): - pass - elif key == "IMPORT_RES": - # If auto-key-retrieve option is enabled, this can happen - pass - elif key == "BADSIG": - self.valid = False - self.key_id, self.username = value.split(None, 1) - elif key == "GOODSIG": - self.valid = True - self.key_id, self.username = value.split(None, 1) - elif key == "VALIDSIG": - (self.fingerprint, - self.creation_date, - self.sig_timestamp, - self.expire_timestamp) = value.split()[:4] - elif key == "SIG_ID": - (self.signature_id, - self.creation_date, self.timestamp) = value.split() - elif key == "ERRSIG": - self.valid = False - (self.key_id, - algo, hash_algo, - cls, - self.timestamp) = value.split()[:5] - elif key == "NO_PUBKEY": - self.valid = False - self.key_id = value - else: - raise ValueError("Unknown status message: %r" % key) - -class ImportResult(object): - "Handle status messages for --import" - - counts = '''count no_user_id imported imported_rsa unchanged - n_uids n_subk n_sigs n_revoc sec_read sec_imported - sec_dups not_imported'''.split() - def __init__(self): - self.imported = [] - self.results = [] - self.fingerprints = [] - for result in self.counts: - setattr(self, result, None) - - def __nonzero__(self): - if self.not_imported: return False - if not self.fingerprints: return False - return True - - __bool__ = __nonzero__ - - ok_reason = { - '0': 'Not actually changed', - '1': 'Entirely new key', - '2': 'New user IDs', - '4': 'New signatures', - '8': 'New subkeys', - '16': 'Contains private key', - } - - problem_reason = { - '0': 'No specific reason given', - '1': 'Invalid Certificate', - '2': 'Issuer Certificate missing', - '3': 'Certificate Chain too long', - '4': 'Error storing certificate', - } - - def handle_status(self, key, value): - if key == "IMPORTED": - # this duplicates info we already see in import_ok & import_problem - pass - elif key == "NODATA": - self.results.append({'fingerprint': None, - 'problem': '0', 'text': 'No valid data found'}) - elif key == "IMPORT_OK": - reason, fingerprint = value.split() - reasons = [] - for code, text in list(self.ok_reason.items()): - if int(reason) | int(code) == int(reason): - reasons.append(text) - reasontext = '\n'.join(reasons) + "\n" - self.results.append({'fingerprint': fingerprint, - 'ok': reason, 'text': reasontext}) - self.fingerprints.append(fingerprint) - elif key == "IMPORT_PROBLEM": - try: - reason, fingerprint = value.split() - except: - reason = value - fingerprint = '<unknown>' - self.results.append({'fingerprint': fingerprint, - 'problem': reason, 'text': self.problem_reason[reason]}) - elif key == "IMPORT_RES": - import_res = value.split() - for i in range(len(self.counts)): - setattr(self, self.counts[i], int(import_res[i])) - elif key == "KEYEXPIRED": - self.results.append({'fingerprint': None, - 'problem': '0', 'text': 'Key expired'}) - elif key == "SIGEXPIRED": - self.results.append({'fingerprint': None, - 'problem': '0', 'text': 'Signature expired'}) - else: - raise ValueError("Unknown status message: %r" % key) - - def summary(self): - l = [] - l.append('%d imported'%self.imported) - if self.not_imported: - l.append('%d not imported'%self.not_imported) - return ', '.join(l) - -class ListKeys(list): - ''' Handle status messages for --list-keys. - - Handle pub and uid (relating the latter to the former). - - Don't care about (info from src/DETAILS): - - crt = X.509 certificate - crs = X.509 certificate and private key available - sub = subkey (secondary key) - ssb = secret subkey (secondary key) - uat = user attribute (same as user id except for field 10). - sig = signature - rev = revocation signature - pkd = public key data (special field format, see below) - grp = reserved for gpgsm - rvk = revocation key - ''' - def __init__(self): - self.curkey = None - self.fingerprints = [] - self.uids = [] - - def key(self, args): - vars = (""" - type trust length algo keyid date expires dummy ownertrust uid - """).split() - self.curkey = {} - for i in range(len(vars)): - self.curkey[vars[i]] = args[i] - self.curkey['uids'] = [] - if self.curkey['uid']: - self.curkey['uids'].append(self.curkey['uid']) - del self.curkey['uid'] - self.curkey['subkeys'] = [] - self.append(self.curkey) - - pub = sec = key - - def fpr(self, args): - self.curkey['fingerprint'] = args[9] - self.fingerprints.append(args[9]) - - def uid(self, args): - self.curkey['uids'].append(args[9]) - self.uids.append(args[9]) - - def sub(self, args): - subkey = [args[4],args[11]] - self.curkey['subkeys'].append(subkey) - - def handle_status(self, key, value): - pass - -class Crypt(Verify): - "Handle status messages for --encrypt and --decrypt" - def __init__(self, encoding): - Verify.__init__(self) - self.data = '' - self.ok = False - self.status = '' - self.encoding = encoding - - def __nonzero__(self): - if self.ok: return True - return False - - __bool__ = __nonzero__ - - def __str__(self): - return self.data.decode(self.encoding, 'replace') - - def handle_status(self, key, value): - if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION", - "BEGIN_SIGNING", "NO_SECKEY"): - pass - elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", - "MISSING_PASSPHRASE", "DECRYPTION_FAILED"): - self.status = key.replace("_", " ").lower() - elif key == "NEED_PASSPHRASE_SYM": - self.status = 'need symmetric passphrase' - elif key == "BEGIN_DECRYPTION": - self.status = 'decryption incomplete' - elif key == "BEGIN_ENCRYPTION": - self.status = 'encryption incomplete' - elif key == "DECRYPTION_OKAY": - self.status = 'decryption ok' - self.ok = True - elif key == "END_ENCRYPTION": - self.status = 'encryption ok' - self.ok = True - elif key == "INV_RECP": - self.status = 'invalid recipient' - elif key == "KEYEXPIRED": - self.status = 'key expired' - elif key == "SIG_CREATED": - self.status = 'sig created' - elif key == "SIGEXPIRED": - self.status = 'sig expired' - else: - Verify.handle_status(self, key, value) - -class GenKey(object): - "Handle status messages for --gen-key" - def __init__(self): - self.type = None - self.fingerprint = None - - def __nonzero__(self): - if self.fingerprint: return True - return False - - __bool__ = __nonzero__ - - def __str__(self): - return self.fingerprint or '' - - def handle_status(self, key, value): - if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA"): - pass - elif key == "KEY_CREATED": - (self.type,self.fingerprint) = value.split() - else: - raise ValueError("Unknown status message: %r" % key) - -class DeleteResult(object): - "Handle status messages for --delete-key and --delete-secret-key" - def __init__(self): - self.status = 'ok' - - def __str__(self): - return self.status - - problem_reason = { - '1': 'No such key', - '2': 'Must delete secret key first', - '3': 'Ambigious specification', - } - - def handle_status(self, key, value): - if key == "DELETE_PROBLEM": - self.status = self.problem_reason.get(value, - "Unknown error: %r" % value) - else: - raise ValueError("Unknown status message: %r" % key) - -class Sign(object): - "Handle status messages for --sign" - def __init__(self, encoding): - self.type = None - self.fingerprint = None - self.encoding = encoding - - def __nonzero__(self): - return self.fingerprint is not None - - __bool__ = __nonzero__ - - def __str__(self): - return self.data.decode(self.encoding, 'replace') - - def handle_status(self, key, value): - if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", - "GOOD_PASSPHRASE", "BEGIN_SIGNING"): - pass - elif key == "SIG_CREATED": - (self.type, - algo, hashalgo, cls, - self.timestamp, self.fingerprint - ) = value.split() - else: - raise ValueError("Unknown status message: %r" % key) |