summaryrefslogtreecommitdiff
path: root/plugins/gpg/gnupg.py
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/gpg/gnupg.py')
-rw-r--r--plugins/gpg/gnupg.py566
1 files changed, 416 insertions, 150 deletions
diff --git a/plugins/gpg/gnupg.py b/plugins/gpg/gnupg.py
index 5cb11766..99bd7d25 100644
--- a/plugins/gpg/gnupg.py
+++ b/plugins/gpg/gnupg.py
@@ -27,15 +27,14 @@ Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork()
and so does not work on Windows). Renamed to gnupg.py to avoid confusion with
the previous versions.
-Modifications Copyright (C) 2008-2012 Vinay Sajip. All rights reserved.
+Modifications Copyright (C) 2008-2014 Vinay Sajip. All rights reserved.
A unittest harness (test_gnupg.py) has also been added.
"""
-import locale
-__version__ = "0.3.1"
+__version__ = "0.3.8.dev0"
__author__ = "Vinay Sajip"
-__date__ = "$01-Sep-2012 20:02:51$"
+__date__ = "$07-Dec-2014 18:46:17$"
try:
from io import StringIO
@@ -46,6 +45,7 @@ import codecs
import locale
import logging
import os
+import re
import socket
from subprocess import Popen
from subprocess import PIPE
@@ -61,13 +61,61 @@ except ImportError:
try:
unicode
_py3k = False
+ string_types = basestring
+ text_type = unicode
except NameError:
_py3k = True
+ string_types = str
+ text_type = str
logger = logging.getLogger(__name__)
if not logger.handlers:
logger.addHandler(NullHandler())
+# We use the test below because it works for Jython as well as CPython
+if os.path.__name__ == 'ntpath':
+ # On Windows, we don't need shell quoting, other than worrying about
+ # paths with spaces in them.
+ def shell_quote(s):
+ return '"%s"' % s
+else:
+ # Section copied from sarge
+
+ # This regex determines which shell input needs quoting
+ # because it may be unsafe
+ UNSAFE = re.compile(r'[^\w%+,./:=@-]')
+
+ def shell_quote(s):
+ """
+ Quote text so that it is safe for Posix command shells.
+
+ For example, "*.py" would be converted to "'*.py'". If the text is
+ considered safe it is returned unquoted.
+
+ :param s: The value to quote
+ :type s: str (or unicode on 2.x)
+ :return: A safe version of the input, from the point of view of Posix
+ command shells
+ :rtype: The passed-in type
+ """
+ if not isinstance(s, string_types):
+ raise TypeError('Expected string type, got %s' % type(s))
+ if not s:
+ result = "''"
+ elif not UNSAFE.search(s):
+ result = s
+ else:
+ result = "'%s'" % s.replace("'", r"'\''")
+ return result
+
+ # end of sarge code
+
+# Now that we use shell=False, we shouldn't need to quote arguments.
+# Use no_quote instead of shell_quote to remind us of where quoting
+# was needed.
+def no_quote(s):
+ return s
+
def _copy_data(instream, outstream):
# Copy one stream to another
sent = 0
@@ -77,7 +125,7 @@ def _copy_data(instream, outstream):
enc = 'ascii'
while True:
data = instream.read(1024)
- if len(data) == 0:
+ if not data:
break
sent += len(data)
logger.debug("sending chunk (%d): %r", sent, data[:256])
@@ -107,25 +155,28 @@ def _write_passphrase(stream, passphrase, encoding):
passphrase = '%s\n' % passphrase
passphrase = passphrase.encode(encoding)
stream.write(passphrase)
- logger.debug("Wrote passphrase: %r", passphrase)
+ logger.debug('Wrote passphrase')
def _is_sequence(instance):
- return isinstance(instance,list) or isinstance(instance,tuple)
+ return isinstance(instance, (list, tuple, set, frozenset))
-def _make_binary_stream(s, encoding):
+def _make_memory_stream(s):
try:
- if _py3k:
- if isinstance(s, str):
- s = s.encode(encoding)
- else:
- if type(s) is not str:
- s = s.encode(encoding)
from io import BytesIO
rv = BytesIO(s)
except ImportError:
rv = StringIO(s)
return rv
+def _make_binary_stream(s, encoding):
+ if _py3k:
+ if isinstance(s, str):
+ s = s.encode(encoding)
+ else:
+ if type(s) is not str:
+ s = s.encode(encoding)
+ return _make_memory_stream(s)
+
class Verify(object):
"Handle status messages for --verify"
@@ -149,6 +200,7 @@ class Verify(object):
self.fingerprint = self.creation_date = self.timestamp = None
self.signature_id = self.key_id = None
self.username = None
+ self.key_status = None
self.status = None
self.pubkey_fingerprint = None
self.expire_timestamp = None
@@ -166,13 +218,27 @@ class Verify(object):
self.trust_text = key
self.trust_level = self.TRUST_LEVELS[key]
elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT",
- "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO",
- "DECRYPTION_OKAY"):
+ "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO",
+ "DECRYPTION_OKAY", "INV_SGNR", "FILE_START", "FILE_ERROR",
+ "FILE_DONE", "PKA_TRUST_GOOD", "PKA_TRUST_BAD", "BADMDC",
+ "GOODMDC", "NO_SGNR", "NOTATION_NAME", "NOTATION_DATA",
+ "PROGRESS"):
pass
elif key == "BADSIG":
self.valid = False
self.status = 'signature bad'
self.key_id, self.username = value.split(None, 1)
+ elif key == "ERRSIG":
+ self.valid = False
+ (self.key_id,
+ algo, hash_algo,
+ cls,
+ self.timestamp) = value.split()[:5]
+ self.status = 'signature error'
+ elif key == "EXPSIG":
+ self.valid = False
+ self.status = 'signature expired'
+ self.key_id, self.username = value.split(None, 1)
elif key == "GOODSIG":
self.valid = True
self.status = 'signature good'
@@ -188,13 +254,6 @@ class Verify(object):
elif key == "SIG_ID":
(self.signature_id,
self.creation_date, self.timestamp) = value.split()
- elif key == "ERRSIG":
- self.valid = False
- (self.key_id,
- algo, hash_algo,
- cls,
- self.timestamp) = value.split()[:5]
- self.status = 'signature error'
elif key == "DECRYPTION_FAILED":
self.valid = False
self.key_id = value
@@ -203,17 +262,25 @@ class Verify(object):
self.valid = False
self.key_id = value
self.status = 'no public key'
- elif key in ("KEYEXPIRED", "SIGEXPIRED"):
+ elif key in ("KEYEXPIRED", "SIGEXPIRED", "KEYREVOKED"):
# these are useless in verify, since they are spit out for any
# pub/subkeys on the key, not just the one doing the signing.
# if we want to check for signatures with expired key,
- # the relevant flag is EXPKEYSIG.
+ # the relevant flag is EXPKEYSIG or REVKEYSIG.
pass
elif key in ("EXPKEYSIG", "REVKEYSIG"):
# signed with expired or revoked key
self.valid = False
self.key_id = value.split()[0]
- self.status = (('%s %s') % (key[:3], key[3:])).lower()
+ if key == "EXPKEYSIG":
+ self.key_status = 'signing key has expired'
+ else:
+ self.key_status = 'signing key was revoked'
+ self.status = self.key_status
+ elif key == "UNEXPECTED":
+ self.valid = False
+ self.key_id = value
+ self.status = 'unexpected data'
else:
raise ValueError("Unknown status message: %r" % key)
@@ -282,8 +349,8 @@ class ImportResult(object):
'problem': reason, 'text': self.problem_reason[reason]})
elif key == "IMPORT_RES":
import_res = value.split()
- for i in range(len(self.counts)):
- setattr(self, self.counts[i], int(import_res[i]))
+ for i, count in enumerate(self.counts):
+ setattr(self, count, int(import_res[i]))
elif key == "KEYEXPIRED":
self.results.append({'fingerprint': None,
'problem': '0', 'text': 'Key expired'})
@@ -300,7 +367,63 @@ class ImportResult(object):
l.append('%d not imported' % self.not_imported)
return ', '.join(l)
-class ListKeys(list):
+ESCAPE_PATTERN = re.compile(r'\\x([0-9a-f][0-9a-f])', re.I)
+BASIC_ESCAPES = {
+ r'\n': '\n',
+ r'\r': '\r',
+ r'\f': '\f',
+ r'\v': '\v',
+ r'\b': '\b',
+ r'\0': '\0',
+}
+
+class SendResult(object):
+ def __init__(self, gpg):
+ self.gpg = gpg
+
+ def handle_status(self, key, value):
+ logger.debug('SendResult: %s: %s', key, value)
+
+class SearchKeys(list):
+ ''' Handle status messages for --search-keys.
+
+ Handle pub and uid (relating the latter to the former).
+
+ Don't care about the rest
+ '''
+
+ UID_INDEX = 1
+ FIELDS = 'type keyid algo length date expires'.split()
+
+ def __init__(self, gpg):
+ self.gpg = gpg
+ self.curkey = None
+ self.fingerprints = []
+ self.uids = []
+
+ def get_fields(self, args):
+ result = {}
+ for i, var in enumerate(self.FIELDS):
+ result[var] = args[i]
+ result['uids'] = []
+ return result
+
+ def pub(self, args):
+ self.curkey = curkey = self.get_fields(args)
+ self.append(curkey)
+
+ def uid(self, args):
+ uid = args[self.UID_INDEX]
+ uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
+ for k, v in BASIC_ESCAPES.items():
+ uid = uid.replace(k, v)
+ self.curkey['uids'].append(uid)
+ self.uids.append(uid)
+
+ def handle_status(self, key, value):
+ pass
+
+class ListKeys(SearchKeys):
''' Handle status messages for --list-keys.
Handle pub and uid (relating the latter to the former).
@@ -317,25 +440,17 @@ class ListKeys(list):
grp = reserved for gpgsm
rvk = revocation key
'''
- def __init__(self, gpg):
- self.gpg = gpg
- self.curkey = None
- self.fingerprints = []
- self.uids = []
+
+ UID_INDEX = 9
+ FIELDS = 'type trust length algo keyid date expires dummy ownertrust uid'.split()
def key(self, args):
- vars = ("""
- type trust length algo keyid date expires dummy ownertrust uid
- """).split()
- self.curkey = {}
- for i in range(len(vars)):
- self.curkey[vars[i]] = args[i]
- self.curkey['uids'] = []
- if self.curkey['uid']:
- self.curkey['uids'].append(self.curkey['uid'])
- del self.curkey['uid']
- self.curkey['subkeys'] = []
- self.append(self.curkey)
+ self.curkey = curkey = self.get_fields(args)
+ if curkey['uid']:
+ curkey['uids'].append(curkey['uid'])
+ del curkey['uid']
+ curkey['subkeys'] = []
+ self.append(curkey)
pub = sec = key
@@ -343,18 +458,34 @@ class ListKeys(list):
self.curkey['fingerprint'] = args[9]
self.fingerprints.append(args[9])
- def uid(self, args):
- self.curkey['uids'].append(args[9])
- self.uids.append(args[9])
-
def sub(self, args):
subkey = [args[4], args[11]]
self.curkey['subkeys'].append(subkey)
- def handle_status(self, key, value):
- pass
-class Crypt(Verify):
+class ScanKeys(ListKeys):
+ ''' Handle status messages for --with-fingerprint.'''
+
+ def sub(self, args):
+ # --with-fingerprint --with-colons somehow outputs fewer colons,
+ # use the last value args[-1] instead of args[11]
+ subkey = [args[4], args[-1]]
+ self.curkey['subkeys'].append(subkey)
+
+class TextHandler(object):
+ def _as_text(self):
+ return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
+
+ if _py3k:
+ __str__ = _as_text
+ else:
+ __unicode__ = _as_text
+
+ def __str__(self):
+ return self.data
+
+
+class Crypt(Verify, TextHandler):
"Handle status messages for --encrypt and --decrypt"
def __init__(self, gpg):
Verify.__init__(self, gpg)
@@ -368,19 +499,16 @@ class Crypt(Verify):
__bool__ = __nonzero__
- def __str__(self):
- return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
-
def handle_status(self, key, value):
if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION",
- "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA",
- "CARDCTRL"):
+ "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", "PROGRESS",
+ "CARDCTRL", "BADMDC", "SC_OP_FAILURE", "SC_OP_SUCCESS"):
# in the case of ERROR, this is because a more specific error
# message will have come first
pass
elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE",
"MISSING_PASSPHRASE", "DECRYPTION_FAILED",
- "KEY_NOT_CREATED"):
+ "KEY_NOT_CREATED", "NEED_PASSPHRASE_PIN"):
self.status = key.replace("_", " ").lower()
elif key == "NEED_PASSPHRASE_SYM":
self.status = 'need symmetric passphrase'
@@ -441,7 +569,7 @@ class DeleteResult(object):
problem_reason = {
'1': 'No such key',
'2': 'Must delete secret key first',
- '3': 'Ambigious specification',
+ '3': 'Ambiguous specification',
}
def handle_status(self, key, value):
@@ -451,11 +579,18 @@ class DeleteResult(object):
else:
raise ValueError("Unknown status message: %r" % key)
-class Sign(object):
+ def __nonzero__(self):
+ return self.status == 'ok'
+
+ __bool__ = __nonzero__
+
+
+class Sign(TextHandler):
"Handle status messages for --sign"
def __init__(self, gpg):
self.gpg = gpg
self.type = None
+ self.hash_algo = None
self.fingerprint = None
def __nonzero__(self):
@@ -463,21 +598,26 @@ class Sign(object):
__bool__ = __nonzero__
- def __str__(self):
- return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
-
def handle_status(self, key, value):
if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
- "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR"):
+ "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR",
+ "NO_SGNR", "MISSING_PASSPHRASE", "NEED_PASSPHRASE_PIN",
+ "SC_OP_FAILURE", "SC_OP_SUCCESS", "PROGRESS"):
pass
+ elif key in ("KEYEXPIRED", "SIGEXPIRED"):
+ self.status = 'key expired'
+ elif key == "KEYREVOKED":
+ self.status = 'key revoked'
elif key == "SIG_CREATED":
(self.type,
- algo, hashalgo, cls,
+ algo, self.hash_algo, cls,
self.timestamp, self.fingerprint
) = value.split()
else:
raise ValueError("Unknown status message: %r" % key)
+VERSION_RE = re.compile(r'gpg \(GnuPG\) (\d+(\.\d+)*)'.encode('ascii'), re.I)
+HEX_DIGITS_RE = re.compile(r'[0-9a-f]+$', re.I)
class GPG(object):
@@ -488,35 +628,54 @@ class GPG(object):
'delete': DeleteResult,
'generate': GenKey,
'import': ImportResult,
+ 'send': SendResult,
'list': ListKeys,
+ 'scan': ScanKeys,
+ 'search': SearchKeys,
'sign': Sign,
'verify': Verify,
}
"Encapsulate access to the gpg executable"
def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False,
- use_agent=False, keyring=None, options=None):
+ use_agent=False, keyring=None, options=None,
+ secret_keyring=None):
"""Initialize a GPG process wrapper. Options are:
gpgbinary -- full pathname for GPG binary.
gnupghome -- full pathname to where we can find the public and
private keyrings. Default is whatever gpg defaults to.
- keyring -- name of alternative keyring file to use. If specified,
- the default keyring is not used.
+ keyring -- name of alternative keyring file to use, or list of such
+ keyrings. If specified, the default keyring is not used.
options =-- a list of additional options to pass to the GPG binary.
+ secret_keyring -- name of alternative secret keyring file to use, or
+ list of such keyrings.
"""
self.gpgbinary = gpgbinary
self.gnupghome = gnupghome
+ if keyring:
+ # Allow passing a string or another iterable. Make it uniformly
+ # a list of keyring filenames
+ if isinstance(keyring, string_types):
+ keyring = [keyring]
self.keyring = keyring
+ if secret_keyring:
+ # Allow passing a string or another iterable. Make it uniformly
+ # a list of keyring filenames
+ if isinstance(secret_keyring, string_types):
+ secret_keyring = [secret_keyring]
+ self.secret_keyring = secret_keyring
self.verbose = verbose
self.use_agent = use_agent
if isinstance(options, str):
options = [options]
self.options = options
- self.encoding = locale.getpreferredencoding()
- if self.encoding is None: # This happens on Jython!
- self.encoding = sys.stdin.encoding
+ # Changed in 0.3.7 to use Latin-1 encoding rather than
+ # locale.getpreferredencoding falling back to sys.stdin.encoding
+ # falling back to utf-8, because gpg itself uses latin-1 as the default
+ # encoding.
+ self.encoding = 'latin-1'
if gnupghome and not os.path.isdir(self.gnupghome):
os.makedirs(self.gnupghome,0x1C0)
p = self._open_subprocess(["--version"])
@@ -525,6 +684,12 @@ class GPG(object):
if p.returncode != 0:
raise ValueError("Error invoking gpg: %s: %s" % (p.returncode,
result.stderr))
+ m = VERSION_RE.match(result.data)
+ if not m:
+ self.version = None
+ else:
+ dot = '.'.encode('ascii')
+ self.version = tuple([int(s) for s in m.groups()[0].split(dot)])
def make_args(self, args, passphrase):
"""
@@ -532,13 +697,18 @@ class GPG(object):
will be appended. The ``passphrase`` argument needs to be True if
a passphrase will be sent to GPG, else False.
"""
- cmd = [self.gpgbinary, '--status-fd 2 --no-tty']
+ cmd = [self.gpgbinary, '--status-fd', '2', '--no-tty']
if self.gnupghome:
- cmd.append('--homedir "%s" ' % self.gnupghome)
+ cmd.extend(['--homedir', no_quote(self.gnupghome)])
if self.keyring:
- cmd.append('--no-default-keyring --keyring "%s" ' % self.keyring)
+ cmd.append('--no-default-keyring')
+ for fn in self.keyring:
+ cmd.extend(['--keyring', no_quote(fn)])
+ if self.secret_keyring:
+ for fn in self.secret_keyring:
+ cmd.extend(['--secret-keyring', no_quote(fn)])
if passphrase:
- cmd.append('--batch --passphrase-fd 0')
+ cmd.extend(['--batch', '--passphrase-fd', '0'])
if self.use_agent:
cmd.append('--use-agent')
if self.options:
@@ -549,11 +719,12 @@ class GPG(object):
def _open_subprocess(self, args, passphrase=False):
# Internal method: open a pipe to a GPG subprocess and return
# the file objects for communicating with it.
- cmd = ' '.join(self.make_args(args, passphrase))
+ cmd = self.make_args(args, passphrase)
if self.verbose:
- print(cmd)
+ pcmd = ' '.join(cmd)
+ print(pcmd)
logger.debug("%s", cmd)
- return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ return Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE)
def _read_response(self, stream, result):
# Internal method: reads all the stderr output from GPG, taking notice
@@ -561,31 +732,27 @@ class GPG(object):
#
# Calls methods on the response object for each valid token found,
# with the arg being the remainder of the status line.
- try:
- lines = []
- while True:
- line = stream.readline()
- if len(line) == 0:
- break
- lines.append(line)
- line = line.rstrip()
- if self.verbose:
- print(line)
- logger.debug("%s", line)
- if line[0:9] == '[GNUPG:] ':
- # Chop off the prefix
- line = line[9:]
- L = line.split(None, 1)
- keyword = L[0]
- if len(L) > 1:
- value = L[1]
- else:
- value = ""
- result.handle_status(keyword, value)
- result.stderr = ''.join(lines)
- except:
- import traceback
- logger.error('Error in the GPG plugin:\n%s', traceback.format_exc())
+ lines = []
+ while True:
+ line = stream.readline()
+ if len(line) == 0:
+ break
+ lines.append(line)
+ line = line.rstrip()
+ if self.verbose:
+ print(line)
+ logger.debug("%s", line)
+ if line[0:9] == '[GNUPG:] ':
+ # Chop off the prefix
+ line = line[9:]
+ L = line.split(None, 1)
+ keyword = L[0]
+ if len(L) > 1:
+ value = L[1]
+ else:
+ value = ""
+ result.handle_status(keyword, value)
+ result.stderr = ''.join(lines)
def _read_data(self, stream, result):
# Read the contents of the file from GPG's stdout
@@ -634,7 +801,7 @@ class GPG(object):
stderr.close()
stdout.close()
- def _handle_io(self, args, file, result, passphrase=None, binary=False):
+ def _handle_io(self, args, fileobj, result, passphrase=None, binary=False):
"Handle a call to GPG - pass input data, collect output data"
# Handle a basic data call - pass data to GPG, handle the output
# including status information. Garbage In, Garbage Out :)
@@ -645,7 +812,7 @@ class GPG(object):
stdin = p.stdin
if passphrase:
_write_passphrase(stdin, passphrase, self.encoding)
- writer = _threaded_copy_data(file, stdin)
+ writer = _threaded_copy_data(fileobj, stdin)
self._collect_output(p, result, writer, stdin)
return result
@@ -659,8 +826,15 @@ class GPG(object):
f.close()
return result
+ def set_output_without_confirmation(self, args, output):
+ "If writing to a file which exists, avoid a confirmation message."
+ if os.path.exists(output):
+ # We need to avoid an overwrite confirmation message
+ args.extend(['--batch', '--yes'])
+ args.extend(['--output', output])
+
def sign_file(self, file, keyid=None, passphrase=None, clearsign=True,
- detach=False, binary=False):
+ detach=False, binary=False, output=None):
"""sign file"""
logger.debug("sign_file: %s", file)
if binary:
@@ -674,7 +848,10 @@ class GPG(object):
elif clearsign:
args.append("--clearsign")
if keyid:
- args.append('--default-key "%s"' % keyid)
+ args.extend(['--default-key', no_quote(keyid)])
+ if output: # write the output to a file with the specified name
+ self.set_output_without_confirmation(args, output)
+
result = self.result_map['sign'](self)
#We could use _handle_io here except for the fact that if the
#passphrase is bad, gpg bails and you can't write the message.
@@ -726,8 +903,8 @@ class GPG(object):
logger.debug('Wrote to temp file: %r', s)
os.write(fd, s)
os.close(fd)
- args.append(fn)
- args.append('"%s"' % data_filename)
+ args.append(no_quote(fn))
+ args.append(no_quote(data_filename))
try:
p = self._open_subprocess(args)
self._collect_output(p, result, stdin=p.stdin)
@@ -735,6 +912,15 @@ class GPG(object):
os.unlink(fn)
return result
+ def verify_data(self, sig_filename, data):
+ "Verify the signature in sig_filename against data in memory"
+ logger.debug('verify_data: %r, %r ...', sig_filename, data[:16])
+ result = self.result_map['verify'](self)
+ args = ['--verify', no_quote(sig_filename), '-']
+ stream = _make_memory_stream(data)
+ self._handle_io(args, stream, result, binary=True)
+ return result
+
#
# KEY MANAGEMENT
#
@@ -798,7 +984,8 @@ class GPG(object):
>>> import shutil
>>> shutil.rmtree("keys")
>>> gpg = GPG(gnupghome="keys")
- >>> result = gpg.recv_keys('pgp.mit.edu', '3FF0DB166A7476EA')
+ >>> os.chmod('keys', 0x1C0)
+ >>> result = gpg.recv_keys('keyserver.ubuntu.com', '92905378')
>>> assert result
"""
@@ -806,33 +993,60 @@ class GPG(object):
logger.debug('recv_keys: %r', keyids)
data = _make_binary_stream("", self.encoding)
#data = ""
- args = ['--keyserver', keyserver, '--recv-keys']
- args.extend(keyids)
+ args = ['--keyserver', no_quote(keyserver), '--recv-keys']
+ args.extend([no_quote(k) for k in keyids])
self._handle_io(args, data, result, binary=True)
logger.debug('recv_keys result: %r', result.__dict__)
data.close()
return result
+ def send_keys(self, keyserver, *keyids):
+ """Send a key to a keyserver.
+
+ Note: it's not practical to test this function without sending
+ arbitrary data to live keyservers.
+ """
+ result = self.result_map['send'](self)
+ logger.debug('send_keys: %r', keyids)
+ data = _make_binary_stream('', self.encoding)
+ #data = ""
+ args = ['--keyserver', no_quote(keyserver), '--send-keys']
+ args.extend([no_quote(k) for k in keyids])
+ self._handle_io(args, data, result, binary=True)
+ logger.debug('send_keys result: %r', result.__dict__)
+ data.close()
+ return result
+
def delete_keys(self, fingerprints, secret=False):
which='key'
if secret:
which='secret-key'
if _is_sequence(fingerprints):
- fingerprints = ' '.join(fingerprints)
- args = ['--batch --delete-%s "%s"' % (which, fingerprints)]
+ fingerprints = [no_quote(s) for s in fingerprints]
+ else:
+ fingerprints = [no_quote(fingerprints)]
+ args = ['--batch', '--delete-%s' % which]
+ args.extend(fingerprints)
result = self.result_map['delete'](self)
p = self._open_subprocess(args)
self._collect_output(p, result, stdin=p.stdin)
return result
- def export_keys(self, keyids, secret=False):
+ def export_keys(self, keyids, secret=False, armor=True, minimal=False):
"export the indicated keys. 'keyid' is anything gpg accepts"
which=''
if secret:
which='-secret-key'
if _is_sequence(keyids):
- keyids = ' '.join(['"%s"' % k for k in keyids])
- args = ["--armor --export%s %s" % (which, keyids)]
+ keyids = [no_quote(k) for k in keyids]
+ else:
+ keyids = [no_quote(keyids)]
+ args = ['--export%s' % which]
+ if armor:
+ args.append('--armor')
+ if minimal:
+ args.extend(['--export-options','export-minimal'])
+ args.extend(keyids)
p = self._open_subprocess(args)
# gpg --export produces no status-fd output; stdout will be
# empty in case of failure
@@ -842,6 +1056,27 @@ class GPG(object):
logger.debug('export_keys result: %r', result.data)
return result.data.decode(self.encoding, self.decode_errors)
+ def _get_list_output(self, p, kind):
+ # Get the response information
+ result = self.result_map[kind](self)
+ self._collect_output(p, result, stdin=p.stdin)
+ lines = result.data.decode(self.encoding,
+ self.decode_errors).splitlines()
+ valid_keywords = 'pub uid sec fpr sub'.split()
+ for line in lines:
+ if self.verbose:
+ print(line)
+ logger.debug("line: %r", line.rstrip())
+ if not line:
+ break
+ L = line.strip().split(':')
+ if not L:
+ continue
+ keyword = L[0]
+ if keyword in valid_keywords:
+ getattr(result, keyword)(L)
+ return result
+
def list_keys(self, secret=False):
""" list the keys currently in the keyring
@@ -862,25 +1097,58 @@ class GPG(object):
which='keys'
if secret:
which='secret-keys'
- args = "--list-%s --fixed-list-mode --fingerprint --with-colons" % (which,)
- args = [args]
+ args = ["--list-%s" % which, "--fixed-list-mode", "--fingerprint",
+ "--with-colons"]
p = self._open_subprocess(args)
+ return self._get_list_output(p, 'list')
- # there might be some status thingumy here I should handle... (amk)
- # ...nope, unless you care about expired sigs or keys (stevegt)
+ def scan_keys(self, filename):
+ """
+ List details of an ascii armored or binary key file
+ without first importing it to the local keyring.
+
+ The function achieves this by running:
+ $ gpg --with-fingerprint --with-colons filename
+ """
+ args = ['--with-fingerprint', '--with-colons']
+ args.append(no_quote(filename))
+ p = self._open_subprocess(args)
+ return self._get_list_output(p, 'scan')
+
+ def search_keys(self, query, keyserver='pgp.mit.edu'):
+ """ search keyserver by query (using --search-keys option)
+
+ >>> import shutil
+ >>> shutil.rmtree('keys')
+ >>> gpg = GPG(gnupghome='keys')
+ >>> os.chmod('keys', 0x1C0)
+ >>> result = gpg.search_keys('<vinay_sajip@hotmail.com>')
+ >>> assert result, 'Failed using default keyserver'
+ >>> keyserver = 'keyserver.ubuntu.com'
+ >>> result = gpg.search_keys('<vinay_sajip@hotmail.com>', keyserver)
+ >>> assert result, 'Failed using keyserver.ubuntu.com'
+
+ """
+ query = query.strip()
+ if HEX_DIGITS_RE.match(query):
+ query = '0x' + query
+ args = ['--fixed-list-mode', '--fingerprint', '--with-colons',
+ '--keyserver', no_quote(keyserver), '--search-keys',
+ no_quote(query)]
+ p = self._open_subprocess(args)
# Get the response information
- result = self.result_map['list'](self)
+ result = self.result_map['search'](self)
self._collect_output(p, result, stdin=p.stdin)
lines = result.data.decode(self.encoding,
self.decode_errors).splitlines()
- valid_keywords = 'pub uid sec fpr sub'.split()
+ valid_keywords = ['pub', 'uid']
for line in lines:
if self.verbose:
print(line)
- logger.debug("line: %r", line.rstrip())
- if not line:
- break
+ logger.debug('line: %r', line.rstrip())
+ if not line: # sometimes get blank lines on Windows
+ continue
L = line.strip().split(':')
if not L:
continue
@@ -901,7 +1169,7 @@ class GPG(object):
>>> assert not result
"""
- args = ["--gen-key --batch"]
+ args = ["--gen-key", "--batch"]
result = self.result_map['generate'](self)
f = _make_binary_stream(input, self.encoding)
self._handle_io(args, f, result, binary=True)
@@ -915,11 +1183,11 @@ class GPG(object):
parms = {}
for key, val in list(kwargs.items()):
key = key.replace('_','-').title()
- parms[key] = val
+ if str(val).strip(): # skip empty strings
+ parms[key] = val
parms.setdefault('Key-Type','RSA')
- parms.setdefault('Key-Length',1024)
+ parms.setdefault('Key-Length',2048)
parms.setdefault('Name-Real', "Autogenerated Key")
- parms.setdefault('Name-Comment', "Generated by gnupg.py")
try:
logname = os.environ['LOGNAME']
except KeyError:
@@ -964,23 +1232,30 @@ class GPG(object):
"Encrypt the message read from the file-like object 'file'"
args = ['--encrypt']
if symmetric:
+ # can't be False or None - could be True or a cipher algo value
+ # such as AES256
args = ['--symmetric']
+ if symmetric is not True:
+ args.extend(['--cipher-algo', no_quote(symmetric)])
+ # else use the default, currently CAST5
else:
- args = ['--encrypt']
+ if not recipients:
+ raise ValueError('No recipients specified with asymmetric '
+ 'encryption')
if not _is_sequence(recipients):
recipients = (recipients,)
for recipient in recipients:
- args.append('--recipient "%s"' % recipient)
- if armor: # create ascii-armored output - set to False for binary output
+ args.extend(['--recipient', no_quote(recipient)])
+ if armor: # create ascii-armored output - False for binary output
args.append('--armor')
if output: # write the output to a file with the specified name
- if os.path.exists(output):
- os.remove(output) # to avoid overwrite confirmation message
- args.append('--output "%s"' % output)
- if sign:
- args.append('--sign --default-key "%s"' % sign)
+ self.set_output_without_confirmation(args, output)
+ if sign is True:
+ args.append('--sign')
+ elif sign:
+ args.extend(['--sign', '--default-key', no_quote(sign)])
if always_trust:
- args.append("--always-trust")
+ args.append('--always-trust')
result = self.result_map['crypt'](self)
self._handle_io(args, file, result, passphrase=passphrase, binary=True)
logger.debug('encrypt result: %r', result.data)
@@ -1008,9 +1283,6 @@ class GPG(object):
'hello'
>>> result = gpg.encrypt("hello again",print1)
>>> message = str(result)
- >>> result = gpg.decrypt(message)
- >>> result.status == 'need passphrase'
- True
>>> result = gpg.decrypt(message,passphrase='bar')
>>> result.status in ('decryption failed', 'bad passphrase')
True
@@ -1020,9 +1292,6 @@ class GPG(object):
True
>>> str(result)
'hello again'
- >>> result = gpg.encrypt("signed hello",print2,sign=print1)
- >>> result.status == 'need passphrase'
- True
>>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo')
>>> result.status == 'encryption ok'
True
@@ -1048,13 +1317,10 @@ class GPG(object):
output=None):
args = ["--decrypt"]
if output: # write the output to a file with the specified name
- if os.path.exists(output):
- os.remove(output) # to avoid overwrite confirmation message
- args.append('--output "%s"' % output)
+ self.set_output_without_confirmation(args, output)
if always_trust:
args.append("--always-trust")
result = self.result_map['crypt'](self)
self._handle_io(args, file, result, passphrase, binary=True)
logger.debug('decrypt result: %r', result.data)
return result
-