summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormathieui <mathieui@mathieui.net>2013-01-05 18:12:55 +0100
committermathieui <mathieui@mathieui.net>2013-01-05 18:12:55 +0100
commitb04d5c5f53f39457f60d38cb388c3e8f3edbd7de (patch)
treec3d1b8541d0a8adc75935e408673e16a42752a5a
parent4873bab7405f94fe0bb984a495d86bb5d83c78d7 (diff)
downloadpoezio-b04d5c5f53f39457f60d38cb388c3e8f3edbd7de.tar.gz
poezio-b04d5c5f53f39457f60d38cb388c3e8f3edbd7de.tar.bz2
poezio-b04d5c5f53f39457f60d38cb388c3e8f3edbd7de.tar.xz
poezio-b04d5c5f53f39457f60d38cb388c3e8f3edbd7de.zip
Update the gnupg wrapper - Fix #2162
-rw-r--r--plugins/gpg/gnupg.py857
1 files changed, 469 insertions, 388 deletions
diff --git a/plugins/gpg/gnupg.py b/plugins/gpg/gnupg.py
index e9e45711..9109451a 100644
--- a/plugins/gpg/gnupg.py
+++ b/plugins/gpg/gnupg.py
@@ -27,25 +27,22 @@ Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork()
and so does not work on Windows). Renamed to gnupg.py to avoid confusion with
the previous versions.
-Modifications Copyright (C) 2008-2011 Vinay Sajip. All rights reserved.
+Modifications Copyright (C) 2008-2012 Vinay Sajip. All rights reserved.
A unittest harness (test_gnupg.py) has also been added.
"""
import locale
+__version__ = "0.3.1"
__author__ = "Vinay Sajip"
-__date__ = "$25-Jan-2011 11:40:48$"
+__date__ = "$01-Sep-2012 20:02:51$"
try:
from io import StringIO
- from io import TextIOWrapper
- from io import BufferedReader
- from io import BufferedWriter
except ImportError:
from cStringIO import StringIO
- class BufferedReader: pass
- class BufferedWriter: pass
+import codecs
import locale
import logging
import os
@@ -110,34 +107,11 @@ def _write_passphrase(stream, passphrase, encoding):
passphrase = '%s\n' % passphrase
passphrase = passphrase.encode(encoding)
stream.write(passphrase)
- logger.debug("Passphrase written")
+ logger.debug("Wrote passphrase: %r", passphrase)
def _is_sequence(instance):
return isinstance(instance,list) or isinstance(instance,tuple)
-def _wrap_input(inp):
- if isinstance(inp, BufferedWriter):
- oldinp = inp
- inp = TextIOWrapper(inp)
- logger.debug('wrapped input: %r -> %r', oldinp, inp)
- return inp
-
-def _wrap_output(outp):
- if isinstance(outp, BufferedReader):
- oldoutp = outp
- outp = TextIOWrapper(outp)
- logger.debug('wrapped output: %r -> %r', oldoutp, outp)
- return outp
-
-#The following is needed for Python2.7 :-(
-def _make_file(s):
- try:
- rv = StringIO(s)
- except (TypeError, UnicodeError):
- from io import BytesIO
- rv = BytesIO(s)
- return rv
-
def _make_binary_stream(s, encoding):
try:
if _py3k:
@@ -152,51 +126,437 @@ def _make_binary_stream(s, encoding):
rv = StringIO(s)
return rv
+class Verify(object):
+ "Handle status messages for --verify"
+
+ TRUST_UNDEFINED = 0
+ TRUST_NEVER = 1
+ TRUST_MARGINAL = 2
+ TRUST_FULLY = 3
+ TRUST_ULTIMATE = 4
+
+ TRUST_LEVELS = {
+ "TRUST_UNDEFINED" : TRUST_UNDEFINED,
+ "TRUST_NEVER" : TRUST_NEVER,
+ "TRUST_MARGINAL" : TRUST_MARGINAL,
+ "TRUST_FULLY" : TRUST_FULLY,
+ "TRUST_ULTIMATE" : TRUST_ULTIMATE,
+ }
+
+ def __init__(self, gpg):
+ self.gpg = gpg
+ self.valid = False
+ self.fingerprint = self.creation_date = self.timestamp = None
+ self.signature_id = self.key_id = None
+ self.username = None
+ self.status = None
+ self.pubkey_fingerprint = None
+ self.expire_timestamp = None
+ self.sig_timestamp = None
+ self.trust_text = None
+ self.trust_level = None
+
+ def __nonzero__(self):
+ return self.valid
+
+ __bool__ = __nonzero__
+
+ def handle_status(self, key, value):
+ if key in self.TRUST_LEVELS:
+ self.trust_text = key
+ self.trust_level = self.TRUST_LEVELS[key]
+ elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT",
+ "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO",
+ "DECRYPTION_OKAY"):
+ pass
+ elif key == "BADSIG":
+ self.valid = False
+ self.status = 'signature bad'
+ self.key_id, self.username = value.split(None, 1)
+ elif key == "GOODSIG":
+ self.valid = True
+ self.status = 'signature good'
+ self.key_id, self.username = value.split(None, 1)
+ elif key == "VALIDSIG":
+ (self.fingerprint,
+ self.creation_date,
+ self.sig_timestamp,
+ self.expire_timestamp) = value.split()[:4]
+ # may be different if signature is made with a subkey
+ self.pubkey_fingerprint = value.split()[-1]
+ self.status = 'signature valid'
+ elif key == "SIG_ID":
+ (self.signature_id,
+ self.creation_date, self.timestamp) = value.split()
+ elif key == "ERRSIG":
+ self.valid = False
+ (self.key_id,
+ algo, hash_algo,
+ cls,
+ self.timestamp) = value.split()[:5]
+ self.status = 'signature error'
+ elif key == "DECRYPTION_FAILED":
+ self.valid = False
+ self.key_id = value
+ self.status = 'decryption failed'
+ elif key == "NO_PUBKEY":
+ self.valid = False
+ self.key_id = value
+ self.status = 'no public key'
+ elif key in ("KEYEXPIRED", "SIGEXPIRED"):
+ # these are useless in verify, since they are spit out for any
+ # pub/subkeys on the key, not just the one doing the signing.
+ # if we want to check for signatures with expired key,
+ # the relevant flag is EXPKEYSIG.
+ pass
+ elif key in ("EXPKEYSIG", "REVKEYSIG"):
+ # signed with expired or revoked key
+ self.valid = False
+ self.key_id = value.split()[0]
+ self.status = (('%s %s') % (key[:3], key[3:])).lower()
+ else:
+ raise ValueError("Unknown status message: %r" % key)
+
+class ImportResult(object):
+ "Handle status messages for --import"
+
+ counts = '''count no_user_id imported imported_rsa unchanged
+ n_uids n_subk n_sigs n_revoc sec_read sec_imported
+ sec_dups not_imported'''.split()
+ def __init__(self, gpg):
+ self.gpg = gpg
+ self.imported = []
+ self.results = []
+ self.fingerprints = []
+ for result in self.counts:
+ setattr(self, result, None)
+
+ def __nonzero__(self):
+ if self.not_imported: return False
+ if not self.fingerprints: return False
+ return True
+
+ __bool__ = __nonzero__
+
+ ok_reason = {
+ '0': 'Not actually changed',
+ '1': 'Entirely new key',
+ '2': 'New user IDs',
+ '4': 'New signatures',
+ '8': 'New subkeys',
+ '16': 'Contains private key',
+ }
+
+ problem_reason = {
+ '0': 'No specific reason given',
+ '1': 'Invalid Certificate',
+ '2': 'Issuer Certificate missing',
+ '3': 'Certificate Chain too long',
+ '4': 'Error storing certificate',
+ }
+
+ def handle_status(self, key, value):
+ if key == "IMPORTED":
+ # this duplicates info we already see in import_ok & import_problem
+ pass
+ elif key == "NODATA":
+ self.results.append({'fingerprint': None,
+ 'problem': '0', 'text': 'No valid data found'})
+ elif key == "IMPORT_OK":
+ reason, fingerprint = value.split()
+ reasons = []
+ for code, text in list(self.ok_reason.items()):
+ if int(reason) | int(code) == int(reason):
+ reasons.append(text)
+ reasontext = '\n'.join(reasons) + "\n"
+ self.results.append({'fingerprint': fingerprint,
+ 'ok': reason, 'text': reasontext})
+ self.fingerprints.append(fingerprint)
+ elif key == "IMPORT_PROBLEM":
+ try:
+ reason, fingerprint = value.split()
+ except:
+ reason = value
+ fingerprint = '<unknown>'
+ self.results.append({'fingerprint': fingerprint,
+ 'problem': reason, 'text': self.problem_reason[reason]})
+ elif key == "IMPORT_RES":
+ import_res = value.split()
+ for i in range(len(self.counts)):
+ setattr(self, self.counts[i], int(import_res[i]))
+ elif key == "KEYEXPIRED":
+ self.results.append({'fingerprint': None,
+ 'problem': '0', 'text': 'Key expired'})
+ elif key == "SIGEXPIRED":
+ self.results.append({'fingerprint': None,
+ 'problem': '0', 'text': 'Signature expired'})
+ else:
+ raise ValueError("Unknown status message: %r" % key)
+
+ def summary(self):
+ l = []
+ l.append('%d imported' % self.imported)
+ if self.not_imported:
+ l.append('%d not imported' % self.not_imported)
+ return ', '.join(l)
+
+class ListKeys(list):
+ ''' Handle status messages for --list-keys.
+
+ Handle pub and uid (relating the latter to the former).
+
+ Don't care about (info from src/DETAILS):
+
+ crt = X.509 certificate
+ crs = X.509 certificate and private key available
+ ssb = secret subkey (secondary key)
+ uat = user attribute (same as user id except for field 10).
+ sig = signature
+ rev = revocation signature
+ pkd = public key data (special field format, see below)
+ grp = reserved for gpgsm
+ rvk = revocation key
+ '''
+ def __init__(self, gpg):
+ self.gpg = gpg
+ self.curkey = None
+ self.fingerprints = []
+ self.uids = []
+
+ def key(self, args):
+ vars = ("""
+ type trust length algo keyid date expires dummy ownertrust uid
+ """).split()
+ self.curkey = {}
+ for i in range(len(vars)):
+ self.curkey[vars[i]] = args[i]
+ self.curkey['uids'] = []
+ if self.curkey['uid']:
+ self.curkey['uids'].append(self.curkey['uid'])
+ del self.curkey['uid']
+ self.curkey['subkeys'] = []
+ self.append(self.curkey)
+
+ pub = sec = key
+
+ def fpr(self, args):
+ self.curkey['fingerprint'] = args[9]
+ self.fingerprints.append(args[9])
+
+ def uid(self, args):
+ self.curkey['uids'].append(args[9])
+ self.uids.append(args[9])
+
+ def sub(self, args):
+ subkey = [args[4], args[11]]
+ self.curkey['subkeys'].append(subkey)
+
+ def handle_status(self, key, value):
+ pass
+
+class Crypt(Verify):
+ "Handle status messages for --encrypt and --decrypt"
+ def __init__(self, gpg):
+ Verify.__init__(self, gpg)
+ self.data = ''
+ self.ok = False
+ self.status = ''
+
+ def __nonzero__(self):
+ if self.ok: return True
+ return False
+
+ __bool__ = __nonzero__
+
+ def __str__(self):
+ return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
+
+ def handle_status(self, key, value):
+ if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION",
+ "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA",
+ "CARDCTRL"):
+ # in the case of ERROR, this is because a more specific error
+ # message will have come first
+ pass
+ elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE",
+ "MISSING_PASSPHRASE", "DECRYPTION_FAILED",
+ "KEY_NOT_CREATED"):
+ self.status = key.replace("_", " ").lower()
+ elif key == "NEED_PASSPHRASE_SYM":
+ self.status = 'need symmetric passphrase'
+ elif key == "BEGIN_DECRYPTION":
+ self.status = 'decryption incomplete'
+ elif key == "BEGIN_ENCRYPTION":
+ self.status = 'encryption incomplete'
+ elif key == "DECRYPTION_OKAY":
+ self.status = 'decryption ok'
+ self.ok = True
+ elif key == "END_ENCRYPTION":
+ self.status = 'encryption ok'
+ self.ok = True
+ elif key == "INV_RECP":
+ self.status = 'invalid recipient'
+ elif key == "KEYEXPIRED":
+ self.status = 'key expired'
+ elif key == "SIG_CREATED":
+ self.status = 'sig created'
+ elif key == "SIGEXPIRED":
+ self.status = 'sig expired'
+ else:
+ Verify.handle_status(self, key, value)
+
+class GenKey(object):
+ "Handle status messages for --gen-key"
+ def __init__(self, gpg):
+ self.gpg = gpg
+ self.type = None
+ self.fingerprint = None
+
+ def __nonzero__(self):
+ if self.fingerprint: return True
+ return False
+
+ __bool__ = __nonzero__
+
+ def __str__(self):
+ return self.fingerprint or ''
+
+ def handle_status(self, key, value):
+ if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA", "KEY_NOT_CREATED"):
+ pass
+ elif key == "KEY_CREATED":
+ (self.type,self.fingerprint) = value.split()
+ else:
+ raise ValueError("Unknown status message: %r" % key)
+
+class DeleteResult(object):
+ "Handle status messages for --delete-key and --delete-secret-key"
+ def __init__(self, gpg):
+ self.gpg = gpg
+ self.status = 'ok'
+
+ def __str__(self):
+ return self.status
+
+ problem_reason = {
+ '1': 'No such key',
+ '2': 'Must delete secret key first',
+ '3': 'Ambigious specification',
+ }
+
+ def handle_status(self, key, value):
+ if key == "DELETE_PROBLEM":
+ self.status = self.problem_reason.get(value,
+ "Unknown error: %r" % value)
+ else:
+ raise ValueError("Unknown status message: %r" % key)
+
+class Sign(object):
+ "Handle status messages for --sign"
+ def __init__(self, gpg):
+ self.gpg = gpg
+ self.type = None
+ self.fingerprint = None
+
+ def __nonzero__(self):
+ return self.fingerprint is not None
+
+ __bool__ = __nonzero__
+
+ def __str__(self):
+ return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
+
+ def handle_status(self, key, value):
+ if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
+ "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR"):
+ pass
+ elif key == "SIG_CREATED":
+ (self.type,
+ algo, hashalgo, cls,
+ self.timestamp, self.fingerprint
+ ) = value.split()
+ else:
+ raise ValueError("Unknown status message: %r" % key)
+
+
class GPG(object):
+
+ decode_errors = 'strict'
+
+ result_map = {
+ 'crypt': Crypt,
+ 'delete': DeleteResult,
+ 'generate': GenKey,
+ 'import': ImportResult,
+ 'list': ListKeys,
+ 'sign': Sign,
+ 'verify': Verify,
+ }
+
"Encapsulate access to the gpg executable"
- def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False, use_agent=False):
+ def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False,
+ use_agent=False, keyring=None, options=None):
"""Initialize a GPG process wrapper. Options are:
gpgbinary -- full pathname for GPG binary.
gnupghome -- full pathname to where we can find the public and
private keyrings. Default is whatever gpg defaults to.
+ keyring -- name of alternative keyring file to use. If specified,
+ the default keyring is not used.
+ options =-- a list of additional options to pass to the GPG binary.
"""
self.gpgbinary = gpgbinary
self.gnupghome = gnupghome
+ self.keyring = keyring
self.verbose = verbose
self.use_agent = use_agent
+ if isinstance(options, str):
+ options = [options]
+ self.options = options
self.encoding = locale.getpreferredencoding()
if self.encoding is None: # This happens on Jython!
self.encoding = sys.stdin.encoding
if gnupghome and not os.path.isdir(self.gnupghome):
os.makedirs(self.gnupghome,0x1C0)
p = self._open_subprocess(["--version"])
- result = Verify() # any result will do for this
+ result = self.result_map['verify'](self) # any result will do for this
self._collect_output(p, result, stdin=p.stdin)
if p.returncode != 0:
raise ValueError("Error invoking gpg: %s: %s" % (p.returncode,
result.stderr))
- def _open_subprocess(self, args, passphrase=False):
- # Internal method: open a pipe to a GPG subprocess and return
- # the file objects for communicating with it.
+ def make_args(self, args, passphrase):
+ """
+ Make a list of command line elements for GPG. The value of ``args``
+ will be appended. The ``passphrase`` argument needs to be True if
+ a passphrase will be sent to GPG, else False.
+ """
cmd = [self.gpgbinary, '--status-fd 2 --no-tty']
if self.gnupghome:
cmd.append('--homedir "%s" ' % self.gnupghome)
+ if self.keyring:
+ cmd.append('--no-default-keyring --keyring "%s" ' % self.keyring)
if passphrase:
cmd.append('--batch --passphrase-fd 0')
if self.use_agent:
cmd.append('--use-agent')
+ if self.options:
+ cmd.extend(self.options)
cmd.extend(args)
- cmd = ' '.join(cmd)
+ return cmd
+
+ def _open_subprocess(self, args, passphrase=False):
+ # Internal method: open a pipe to a GPG subprocess and return
+ # the file objects for communicating with it.
+ cmd = ' '.join(self.make_args(args, passphrase))
if self.verbose:
print(cmd)
logger.debug("%s", cmd)
return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
def _read_response(self, stream, result):
- # Internal method: reads all the output from GPG, taking notice
+ # Internal method: reads all the stderr output from GPG, taking notice
# only of lines that begin with the magic [GNUPG:] prefix.
#
# Calls methods on the response object for each valid token found,
@@ -204,14 +564,13 @@ class GPG(object):
lines = []
while True:
line = stream.readline()
- if not isinstance(line, str):
- line = line.decode('utf-8')
+ if len(line) == 0:
+ break
lines.append(line)
+ line = line.rstrip()
if self.verbose:
print(line)
- logger.debug("%s", line.rstrip())
- if line == "": break
- line = line.rstrip()
+ logger.debug("%s", line)
if line[0:9] == '[GNUPG:] ':
# Chop off the prefix
line = line[9:]
@@ -246,13 +605,13 @@ class GPG(object):
make sure it's joined before returning. If a stdin stream is given,
close it before returning.
"""
- stderr = _wrap_output(process.stderr)
+ stderr = codecs.getreader(self.encoding)(process.stderr)
rr = threading.Thread(target=self._read_response, args=(stderr, result))
rr.setDaemon(True)
logger.debug('stderr reader: %r', rr)
rr.start()
- stdout = process.stdout # _wrap_output(process.stdout)
+ stdout = process.stdout
dr = threading.Thread(target=self._read_data, args=(stdout, result))
dr.setDaemon(True)
logger.debug('stdout reader: %r', dr)
@@ -276,8 +635,8 @@ class GPG(object):
# Handle a basic data call - pass data to GPG, handle the output
# including status information. Garbage In, Garbage Out :)
p = self._open_subprocess(args, passphrase is not None)
- if not binary and not isinstance(file, BufferedReader):
- stdin = _wrap_input(p.stdin)
+ if not binary:
+ stdin = codecs.getwriter(self.encoding)(p.stdin)
else:
stdin = p.stdin
if passphrase:
@@ -311,11 +670,10 @@ class GPG(object):
elif clearsign:
args.append("--clearsign")
if keyid:
- args.append("--default-key %s" % keyid)
- result = Sign(self.encoding)
+ args.append('--default-key "%s"' % keyid)
+ result = self.result_map['sign'](self)
#We could use _handle_io here except for the fact that if the
#passphrase is bad, gpg bails and you can't write the message.
- #self._handle_io(args, _make_file(message), result, passphrase=passphrase)
p = self._open_subprocess(args, passphrase is not None)
try:
stdin = p.stdin
@@ -351,7 +709,7 @@ class GPG(object):
def verify_file(self, file, data_filename=None):
"Verify the signature on the contents of the file-like object 'file'"
logger.debug('verify_file: %r, %r', file, data_filename)
- result = Verify()
+ result = self.result_map['verify'](self)
args = ['--verify']
if data_filename is None:
self._handle_io(args, file, result, binary=True)
@@ -365,7 +723,7 @@ class GPG(object):
os.write(fd, s)
os.close(fd)
args.append(fn)
- args.append(data_filename)
+ args.append('"%s"' % data_filename)
try:
p = self._open_subprocess(args)
self._collect_output(p, result, stdin=p.stdin)
@@ -422,7 +780,7 @@ class GPG(object):
>>> assert print2 in pubkeys.fingerprints
"""
- result = ImportResult()
+ result = self.result_map['import'](self)
logger.debug('import_keys: %r', key_data[:256])
data = _make_binary_stream(key_data, self.encoding)
self._handle_io(['--import'], data, result, binary=True)
@@ -430,14 +788,35 @@ class GPG(object):
data.close()
return result
+ def recv_keys(self, keyserver, *keyids):
+ """Import a key from a keyserver
+
+ >>> import shutil
+ >>> shutil.rmtree("keys")
+ >>> gpg = GPG(gnupghome="keys")
+ >>> result = gpg.recv_keys('pgp.mit.edu', '3FF0DB166A7476EA')
+ >>> assert result
+
+ """
+ result = self.result_map['import'](self)
+ logger.debug('recv_keys: %r', keyids)
+ data = _make_binary_stream("", self.encoding)
+ #data = ""
+ args = ['--keyserver', keyserver, '--recv-keys']
+ args.extend(keyids)
+ self._handle_io(args, data, result, binary=True)
+ logger.debug('recv_keys result: %r', result.__dict__)
+ data.close()
+ return result
+
def delete_keys(self, fingerprints, secret=False):
which='key'
if secret:
which='secret-key'
if _is_sequence(fingerprints):
fingerprints = ' '.join(fingerprints)
- args = ["--batch --delete-%s %s" % (which, fingerprints)]
- result = DeleteResult()
+ args = ['--batch --delete-%s "%s"' % (which, fingerprints)]
+ result = self.result_map['delete'](self)
p = self._open_subprocess(args)
self._collect_output(p, result, stdin=p.stdin)
return result
@@ -448,16 +827,16 @@ class GPG(object):
if secret:
which='-secret-key'
if _is_sequence(keyids):
- keyids = ' '.join(keyids)
+ keyids = ' '.join(['"%s"' % k for k in keyids])
args = ["--armor --export%s %s" % (which, keyids)]
p = self._open_subprocess(args)
# gpg --export produces no status-fd output; stdout will be
# empty in case of failure
#stdout, stderr = p.communicate()
- result = DeleteResult() # any result will do
+ result = self.result_map['delete'](self) # any result will do
self._collect_output(p, result, stdin=p.stdin)
logger.debug('export_keys result: %r', result.data)
- return result.data.decode(self.encoding, 'replace')
+ return result.data.decode(self.encoding, self.decode_errors)
def list_keys(self, secret=False):
""" list the keys currently in the keyring
@@ -487,9 +866,10 @@ class GPG(object):
# ...nope, unless you care about expired sigs or keys (stevegt)
# Get the response information
- result = ListKeys()
+ result = self.result_map['list'](self)
self._collect_output(p, result, stdin=p.stdin)
- lines = result.data.decode(self.encoding, 'replace').splitlines()
+ lines = result.data.decode(self.encoding,
+ self.decode_errors).splitlines()
valid_keywords = 'pub uid sec fpr sub'.split()
for line in lines:
if self.verbose:
@@ -518,9 +898,9 @@ class GPG(object):
"""
args = ["--gen-key --batch"]
- result = GenKey()
- f = _make_file(input)
- self._handle_io(args, f, result)
+ result = self.result_map['generate'](self)
+ f = _make_binary_stream(input, self.encoding)
+ self._handle_io(args, f, result, binary=True)
f.close()
return result
@@ -576,24 +956,28 @@ class GPG(object):
#
def encrypt_file(self, file, recipients, sign=None,
always_trust=False, passphrase=None,
- armor=True, output=None):
+ armor=True, output=None, symmetric=False):
"Encrypt the message read from the file-like object 'file'"
args = ['--encrypt']
+ if symmetric:
+ args = ['--symmetric']
+ else:
+ args = ['--encrypt']
+ if not _is_sequence(recipients):
+ recipients = (recipients,)
+ for recipient in recipients:
+ args.append('--recipient "%s"' % recipient)
if armor: # create ascii-armored output - set to False for binary output
args.append('--armor')
if output: # write the output to a file with the specified name
if os.path.exists(output):
os.remove(output) # to avoid overwrite confirmation message
- args.append('--output %s' % output)
- if not _is_sequence(recipients):
- recipients = (recipients,)
- for recipient in recipients:
- args.append('--recipient %s' % recipient)
+ args.append('--output "%s"' % output)
if sign:
- args.append("--sign --default-key %s" % sign)
+ args.append('--sign --default-key "%s"' % sign)
if always_trust:
args.append("--always-trust")
- result = Crypt(self.encoding)
+ result = self.result_map['crypt'](self)
self._handle_io(args, file, result, passphrase=passphrase, binary=True)
logger.debug('encrypt result: %r', result.data)
return result
@@ -621,27 +1005,27 @@ class GPG(object):
>>> result = gpg.encrypt("hello again",print1)
>>> message = str(result)
>>> result = gpg.decrypt(message)
- >>> result.status
- 'need passphrase'
+ >>> result.status == 'need passphrase'
+ True
>>> result = gpg.decrypt(message,passphrase='bar')
- >>> result.status
- 'decryption failed'
+ >>> result.status in ('decryption failed', 'bad passphrase')
+ True
>>> assert not result
>>> result = gpg.decrypt(message,passphrase='foo')
- >>> result.status
- 'decryption ok'
+ >>> result.status == 'decryption ok'
+ True
>>> str(result)
'hello again'
>>> result = gpg.encrypt("signed hello",print2,sign=print1)
- >>> result.status
- 'need passphrase'
+ >>> result.status == 'need passphrase'
+ True
>>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo')
- >>> result.status
- 'encryption ok'
+ >>> result.status == 'encryption ok'
+ True
>>> message = str(result)
>>> result = gpg.decrypt(message)
- >>> result.status
- 'decryption ok'
+ >>> result.status == 'decryption ok'
+ True
>>> assert result.fingerprint == print1
"""
@@ -662,314 +1046,11 @@ class GPG(object):
if output: # write the output to a file with the specified name
if os.path.exists(output):
os.remove(output) # to avoid overwrite confirmation message
- args.append('--output %s' % output)
+ args.append('--output "%s"' % output)
if always_trust:
args.append("--always-trust")
- result = Crypt(self.encoding)
+ result = self.result_map['crypt'](self)
self._handle_io(args, file, result, passphrase, binary=True)
logger.debug('decrypt result: %r', result.data)
return result
-class Verify(object):
- "Handle status messages for --verify"
-
- def __init__(self):
- self.valid = False
- self.fingerprint = self.creation_date = self.timestamp = None
- self.signature_id = self.key_id = None
- self.username = None
-
- def __nonzero__(self):
- return self.valid
-
- __bool__ = __nonzero__
-
- def handle_status(self, key, value):
- if key in ("TRUST_UNDEFINED", "TRUST_NEVER", "TRUST_MARGINAL",
- "TRUST_FULLY", "TRUST_ULTIMATE", "RSA_OR_IDEA"):
- pass
- elif key in ("PLAINTEXT", "PLAINTEXT_LENGTH"):
- pass
- elif key == "IMPORT_RES":
- # If auto-key-retrieve option is enabled, this can happen
- pass
- elif key == "BADSIG":
- self.valid = False
- self.key_id, self.username = value.split(None, 1)
- elif key == "GOODSIG":
- self.valid = True
- self.key_id, self.username = value.split(None, 1)
- elif key == "VALIDSIG":
- (self.fingerprint,
- self.creation_date,
- self.sig_timestamp,
- self.expire_timestamp) = value.split()[:4]
- elif key == "SIG_ID":
- (self.signature_id,
- self.creation_date, self.timestamp) = value.split()
- elif key == "ERRSIG":
- self.valid = False
- (self.key_id,
- algo, hash_algo,
- cls,
- self.timestamp) = value.split()[:5]
- elif key == "NO_PUBKEY":
- self.valid = False
- self.key_id = value
- else:
- raise ValueError("Unknown status message: %r" % key)
-
-class ImportResult(object):
- "Handle status messages for --import"
-
- counts = '''count no_user_id imported imported_rsa unchanged
- n_uids n_subk n_sigs n_revoc sec_read sec_imported
- sec_dups not_imported'''.split()
- def __init__(self):
- self.imported = []
- self.results = []
- self.fingerprints = []
- for result in self.counts:
- setattr(self, result, None)
-
- def __nonzero__(self):
- if self.not_imported: return False
- if not self.fingerprints: return False
- return True
-
- __bool__ = __nonzero__
-
- ok_reason = {
- '0': 'Not actually changed',
- '1': 'Entirely new key',
- '2': 'New user IDs',
- '4': 'New signatures',
- '8': 'New subkeys',
- '16': 'Contains private key',
- }
-
- problem_reason = {
- '0': 'No specific reason given',
- '1': 'Invalid Certificate',
- '2': 'Issuer Certificate missing',
- '3': 'Certificate Chain too long',
- '4': 'Error storing certificate',
- }
-
- def handle_status(self, key, value):
- if key == "IMPORTED":
- # this duplicates info we already see in import_ok & import_problem
- pass
- elif key == "NODATA":
- self.results.append({'fingerprint': None,
- 'problem': '0', 'text': 'No valid data found'})
- elif key == "IMPORT_OK":
- reason, fingerprint = value.split()
- reasons = []
- for code, text in list(self.ok_reason.items()):
- if int(reason) | int(code) == int(reason):
- reasons.append(text)
- reasontext = '\n'.join(reasons) + "\n"
- self.results.append({'fingerprint': fingerprint,
- 'ok': reason, 'text': reasontext})
- self.fingerprints.append(fingerprint)
- elif key == "IMPORT_PROBLEM":
- try:
- reason, fingerprint = value.split()
- except:
- reason = value
- fingerprint = '<unknown>'
- self.results.append({'fingerprint': fingerprint,
- 'problem': reason, 'text': self.problem_reason[reason]})
- elif key == "IMPORT_RES":
- import_res = value.split()
- for i in range(len(self.counts)):
- setattr(self, self.counts[i], int(import_res[i]))
- elif key == "KEYEXPIRED":
- self.results.append({'fingerprint': None,
- 'problem': '0', 'text': 'Key expired'})
- elif key == "SIGEXPIRED":
- self.results.append({'fingerprint': None,
- 'problem': '0', 'text': 'Signature expired'})
- else:
- raise ValueError("Unknown status message: %r" % key)
-
- def summary(self):
- l = []
- l.append('%d imported'%self.imported)
- if self.not_imported:
- l.append('%d not imported'%self.not_imported)
- return ', '.join(l)
-
-class ListKeys(list):
- ''' Handle status messages for --list-keys.
-
- Handle pub and uid (relating the latter to the former).
-
- Don't care about (info from src/DETAILS):
-
- crt = X.509 certificate
- crs = X.509 certificate and private key available
- sub = subkey (secondary key)
- ssb = secret subkey (secondary key)
- uat = user attribute (same as user id except for field 10).
- sig = signature
- rev = revocation signature
- pkd = public key data (special field format, see below)
- grp = reserved for gpgsm
- rvk = revocation key
- '''
- def __init__(self):
- self.curkey = None
- self.fingerprints = []
- self.uids = []
-
- def key(self, args):
- vars = ("""
- type trust length algo keyid date expires dummy ownertrust uid
- """).split()
- self.curkey = {}
- for i in range(len(vars)):
- self.curkey[vars[i]] = args[i]
- self.curkey['uids'] = []
- if self.curkey['uid']:
- self.curkey['uids'].append(self.curkey['uid'])
- del self.curkey['uid']
- self.curkey['subkeys'] = []
- self.append(self.curkey)
-
- pub = sec = key
-
- def fpr(self, args):
- self.curkey['fingerprint'] = args[9]
- self.fingerprints.append(args[9])
-
- def uid(self, args):
- self.curkey['uids'].append(args[9])
- self.uids.append(args[9])
-
- def sub(self, args):
- subkey = [args[4],args[11]]
- self.curkey['subkeys'].append(subkey)
-
- def handle_status(self, key, value):
- pass
-
-class Crypt(Verify):
- "Handle status messages for --encrypt and --decrypt"
- def __init__(self, encoding):
- Verify.__init__(self)
- self.data = ''
- self.ok = False
- self.status = ''
- self.encoding = encoding
-
- def __nonzero__(self):
- if self.ok: return True
- return False
-
- __bool__ = __nonzero__
-
- def __str__(self):
- return self.data.decode(self.encoding, 'replace')
-
- def handle_status(self, key, value):
- if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION",
- "BEGIN_SIGNING", "NO_SECKEY"):
- pass
- elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE",
- "MISSING_PASSPHRASE", "DECRYPTION_FAILED"):
- self.status = key.replace("_", " ").lower()
- elif key == "NEED_PASSPHRASE_SYM":
- self.status = 'need symmetric passphrase'
- elif key == "BEGIN_DECRYPTION":
- self.status = 'decryption incomplete'
- elif key == "BEGIN_ENCRYPTION":
- self.status = 'encryption incomplete'
- elif key == "DECRYPTION_OKAY":
- self.status = 'decryption ok'
- self.ok = True
- elif key == "END_ENCRYPTION":
- self.status = 'encryption ok'
- self.ok = True
- elif key == "INV_RECP":
- self.status = 'invalid recipient'
- elif key == "KEYEXPIRED":
- self.status = 'key expired'
- elif key == "SIG_CREATED":
- self.status = 'sig created'
- elif key == "SIGEXPIRED":
- self.status = 'sig expired'
- else:
- Verify.handle_status(self, key, value)
-
-class GenKey(object):
- "Handle status messages for --gen-key"
- def __init__(self):
- self.type = None
- self.fingerprint = None
-
- def __nonzero__(self):
- if self.fingerprint: return True
- return False
-
- __bool__ = __nonzero__
-
- def __str__(self):
- return self.fingerprint or ''
-
- def handle_status(self, key, value):
- if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA"):
- pass
- elif key == "KEY_CREATED":
- (self.type,self.fingerprint) = value.split()
- else:
- raise ValueError("Unknown status message: %r" % key)
-
-class DeleteResult(object):
- "Handle status messages for --delete-key and --delete-secret-key"
- def __init__(self):
- self.status = 'ok'
-
- def __str__(self):
- return self.status
-
- problem_reason = {
- '1': 'No such key',
- '2': 'Must delete secret key first',
- '3': 'Ambigious specification',
- }
-
- def handle_status(self, key, value):
- if key == "DELETE_PROBLEM":
- self.status = self.problem_reason.get(value,
- "Unknown error: %r" % value)
- else:
- raise ValueError("Unknown status message: %r" % key)
-
-class Sign(object):
- "Handle status messages for --sign"
- def __init__(self, encoding):
- self.type = None
- self.fingerprint = None
- self.encoding = encoding
-
- def __nonzero__(self):
- return self.fingerprint is not None
-
- __bool__ = __nonzero__
-
- def __str__(self):
- return self.data.decode(self.encoding, 'replace')
-
- def handle_status(self, key, value):
- if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
- "GOOD_PASSPHRASE", "BEGIN_SIGNING"):
- pass
- elif key == "SIG_CREATED":
- (self.type,
- algo, hashalgo, cls,
- self.timestamp, self.fingerprint
- ) = value.split()
- else:
- raise ValueError("Unknown status message: %r" % key)