bugfixen
- added missed call to _compute.chksum in ECDSAPriv - replaced all gpg wrapper fixtures in the unit test suite with gpg package - moved test suite gnupghome location to tests/gnupghome
This commit is contained in:
@@ -4,10 +4,16 @@
|
||||
Changelog
|
||||
*********
|
||||
|
||||
v0.5.0
|
||||
v0.4.3
|
||||
======
|
||||
|
||||
Released: not yet
|
||||
Released: August 15, 2017
|
||||
|
||||
Bugs Fixed
|
||||
----------
|
||||
|
||||
* Private key checksum calculations were not getting stored for ECDSA keys; this has been fixed.
|
||||
* The test suite gpg wrappers have been replaced with use of the `gpg <https://pypi.python.org/pypi/gpg/1.8.0>`_ package. (#171)
|
||||
|
||||
v0.4.2
|
||||
======
|
||||
|
||||
@@ -15,4 +15,4 @@ __all__ = ['__author__',
|
||||
__author__ = "Michael Greene"
|
||||
__copyright__ = "Copyright (c) 2014 Michael Greene"
|
||||
__license__ = "BSD"
|
||||
__version__ = str(LooseVersion("0.5.0"))
|
||||
__version__ = str(LooseVersion("0.4.3"))
|
||||
|
||||
@@ -1303,6 +1303,7 @@ class ECDSAPriv(PrivKey, ECDSAPub):
|
||||
self.x = MPI(pubn.x)
|
||||
self.y = MPI(pubn.y)
|
||||
self.s = MPI(pk.private_numbers().private_value)
|
||||
self._compute_chksum()
|
||||
|
||||
def parse(self, packet):
|
||||
super(ECDSAPriv, self).parse(packet)
|
||||
|
||||
@@ -3,4 +3,5 @@ pytest
|
||||
pytest-cov
|
||||
pytest-ordering
|
||||
flake8
|
||||
pep8-naming
|
||||
pep8-naming
|
||||
gpg==1.8.0
|
||||
@@ -1,14 +1,10 @@
|
||||
"""PGPy conftest"""
|
||||
import pytest
|
||||
|
||||
import contextlib
|
||||
import glob
|
||||
import gpg
|
||||
import os
|
||||
import re
|
||||
import select
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
from distutils.version import LooseVersion
|
||||
|
||||
@@ -16,7 +12,8 @@ from cryptography.hazmat.backends import openssl
|
||||
|
||||
openssl_ver = LooseVersion(openssl.backend.openssl_version_text().split(' ')[1])
|
||||
gpg_ver = LooseVersion('0')
|
||||
pgpdump_ver = LooseVersion('0')
|
||||
python_gpg_ver = LooseVersion(gpg.version.versionstr)
|
||||
gnupghome = os.path.join(os.path.dirname(__file__), 'gnupghome')
|
||||
|
||||
|
||||
# ensure external commands we need to run exist
|
||||
@@ -33,214 +30,6 @@ if os.path.join(os.getcwd(), 'tests') not in sys.path:
|
||||
sys.path.insert(1, os.path.join(os.getcwd(), 'tests'))
|
||||
|
||||
|
||||
def _which(cmd):
|
||||
for d in iter(p for p in os.getenv('PATH').split(':') if os.path.isdir(p)):
|
||||
if cmd in os.listdir(d) and os.access(os.path.realpath(os.path.join(d, cmd)), os.X_OK):
|
||||
return os.path.join(d, cmd)
|
||||
|
||||
|
||||
# run a subprocess command, wait for it to complete, and then return decoded output
|
||||
def _run(bin, *binargs, **pkw):
|
||||
_default_pkw = {'stdout': subprocess.PIPE,
|
||||
'stderr': subprocess.PIPE}
|
||||
|
||||
popen_kwargs = _default_pkw.copy()
|
||||
popen_kwargs.update(pkw)
|
||||
|
||||
cmd = subprocess.Popen([bin] + list(binargs), **popen_kwargs)
|
||||
cmd.wait()
|
||||
cmdo, cmde = cmd.communicate()
|
||||
|
||||
cmdo = cmdo.decode('latin-1') if cmdo is not None else ""
|
||||
cmde = cmde.decode('latin-1') if cmde is not None else ""
|
||||
|
||||
return cmdo, cmde
|
||||
|
||||
|
||||
_gpg_bin = _which('gpg2')
|
||||
_gpg_args = ('--options', './pgpy.gpg.conf', '--expert', '--status-fd')
|
||||
_gpg_env = {}
|
||||
_gpg_env['GNUPGHOME'] = os.path.abspath(os.path.abspath('tests/testdata'))
|
||||
_gpg_kwargs = dict()
|
||||
_gpg_kwargs['cwd'] = 'tests/testdata'
|
||||
_gpg_kwargs['env'] = _gpg_env
|
||||
_gpg_kwargs['stdout'] = subprocess.PIPE
|
||||
_gpg_kwargs['stderr'] = subprocess.STDOUT
|
||||
_gpg_kwargs['close_fds'] = False
|
||||
|
||||
|
||||
# GPG boilerplate function
|
||||
def _gpg(*gpg_args, **popen_kwargs):
|
||||
# gpgfd is our "read" end of the pipe
|
||||
# _gpgfd is gpg's "write" end
|
||||
gpgfd, _gpgfd = os.pipe()
|
||||
|
||||
# on python >= 3.4, we need to set _gpgfd as inheritable
|
||||
# older versions do not have this function
|
||||
if sys.version_info >= (3, 4):
|
||||
os.set_inheritable(_gpgfd, True)
|
||||
|
||||
args = (_gpg_bin,) + _gpg_args + (str(_gpgfd),) + gpg_args
|
||||
kwargs = _gpg_kwargs.copy()
|
||||
kwargs.update(popen_kwargs)
|
||||
|
||||
try:
|
||||
# use this as the buffer for collecting status-fd output
|
||||
c = bytearray()
|
||||
|
||||
cmd = subprocess.Popen(args, **kwargs)
|
||||
while cmd.poll() is None:
|
||||
while gpgfd in select.select([gpgfd,], [], [], 0)[0]:
|
||||
c += os.read(gpgfd, 1)
|
||||
|
||||
else:
|
||||
# sleep for a bit
|
||||
time.sleep(0.010)
|
||||
|
||||
# finish reading if needed
|
||||
while gpgfd in select.select([gpgfd,], [], [], 0)[0]:
|
||||
c += os.read(gpgfd, 1)
|
||||
|
||||
# collect stdout and stderr
|
||||
o, e = cmd.communicate()
|
||||
|
||||
finally:
|
||||
# close the pipes we used for this
|
||||
os.close(gpgfd)
|
||||
os.close(_gpgfd)
|
||||
|
||||
return c.decode('latin-1'), (o or b'').decode('latin-1'), (e or b'').decode('latin-1')
|
||||
|
||||
|
||||
# fixtures
|
||||
@pytest.fixture()
|
||||
def gpg_import():
|
||||
@contextlib.contextmanager
|
||||
def _gpg_import(*keypaths):
|
||||
# if GPG version is 2.1 or newer, we need to add a setup/teardown step in creating the keybox folder
|
||||
if gpg_ver >= '2.1':
|
||||
if not os.path.exists('tests/testdata/private-keys-v1.d'):
|
||||
os.mkdir('tests/testdata/private-keys-v1.d')
|
||||
time.sleep(0.5)
|
||||
|
||||
gpgc, gpgo, gpge = _gpg('--import', *list(keypaths))
|
||||
|
||||
try:
|
||||
yield gpgo
|
||||
|
||||
finally:
|
||||
[os.remove(f) for f in glob.glob('tests/testdata/testkeys.*')]
|
||||
if gpg_ver >= '2.1':
|
||||
[os.remove(f) for f in glob.glob('tests/testdata/private-keys-v1.d/*')]
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
return _gpg_import
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def gpg_check_sigs():
|
||||
def _gpg_check_sigs(*keyids):
|
||||
gpgc, gpgo, gpge = _gpg('--check-sigs', *keyids)
|
||||
return 'sig-' not in gpgo
|
||||
|
||||
return _gpg_check_sigs
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def gpg_verify():
|
||||
sfd_verify = re.compile(r'^\[GNUPG:\] (?:GOOD|EXP)SIG (?P<keyid>[0-9A-F]+) .*'
|
||||
r'^\[GNUPG:\] VALIDSIG (?:[0-9A-F]{,24})\1', flags=re.MULTILINE | re.DOTALL)
|
||||
|
||||
def _gpg_verify(gpg_subjpath, gpg_sigpath=None, keyid=None):
|
||||
rargs = [gpg_sigpath, gpg_subjpath] if gpg_sigpath is not None else [gpg_subjpath,]
|
||||
|
||||
gpgc, gpgo, gpge = _gpg('--verify', *rargs)
|
||||
|
||||
sigs = [ sv.group('keyid') for sv in sfd_verify.finditer(gpgc) ]
|
||||
|
||||
if keyid is not None:
|
||||
return keyid in sigs
|
||||
|
||||
return sigs
|
||||
|
||||
return _gpg_verify
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def gpg_decrypt():
|
||||
sfd_decrypt = re.compile(r'^\[GNUPG:\] BEGIN_DECRYPTION\n'
|
||||
r'^\[GNUPG:\] DECRYPTION_INFO \d+ \d+\n'
|
||||
r'^\[GNUPG:\] PLAINTEXT (?:62|74|75) (?P<tstamp>\d+) (?P<fname>.*)\n'
|
||||
r'^\[GNUPG:\] PLAINTEXT_LENGTH \d+\n'
|
||||
r'\[GNUPG:\] DECRYPTION_OKAY\n'
|
||||
r'(?:^\[GNUPG:\] GOODMDC\n)?'
|
||||
r'^\[GNUPG:\] END_DECRYPTION', flags=re.MULTILINE)
|
||||
|
||||
def _gpg_decrypt(encmsgpath, passphrase=None, keyid=None):
|
||||
a = []
|
||||
|
||||
if passphrase is not None:
|
||||
# create a pipe to send the passphrase to GnuPG through
|
||||
pfdr, pfdw = os.pipe()
|
||||
|
||||
# write the passphrase to the pipe buffer right away
|
||||
os.write(pfdw, passphrase.encode())
|
||||
os.write(pfdw, b'\n')
|
||||
|
||||
# on python >= 3.4, we need to set pfdr as inheritable
|
||||
# older versions do not have this function
|
||||
if sys.version_info >= (3, 4):
|
||||
os.set_inheritable(pfdr, True)
|
||||
|
||||
a.extend(['--batch', '--passphrase-fd', str(pfdr)])
|
||||
|
||||
elif keyid is not None:
|
||||
a.extend(['--recipient', keyid])
|
||||
|
||||
a.extend(['--decrypt', encmsgpath])
|
||||
|
||||
gpgc, gpgo, gpge = _gpg(*a, stderr=subprocess.PIPE)
|
||||
|
||||
status = sfd_decrypt.match(gpgc)
|
||||
return gpgo
|
||||
|
||||
return _gpg_decrypt
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def gpg_print():
|
||||
sfd_text = re.compile(r'^\[GNUPG:\] PLAINTEXT (?:62|74|75) (?P<tstamp>\d+) (?P<fname>.*)\n'
|
||||
r'^\[GNUPG:\] PLAINTEXT_LENGTH (?P<len>\d+)\n', re.MULTILINE)
|
||||
|
||||
gpg_text = re.compile(r'(?:- gpg control packet\n)?(?P<text>.*)', re.MULTILINE | re.DOTALL)
|
||||
|
||||
def _gpg_print(infile):
|
||||
gpgc, gpgo, gpge = _gpg('-o-', infile, stderr=subprocess.PIPE)
|
||||
status = sfd_text.match(gpgc)
|
||||
tlen = len(gpgo) if status is None else int(status.group('len'))
|
||||
|
||||
return gpg_text.match(gpgo).group('text')[:tlen]
|
||||
|
||||
return _gpg_print
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def gpg_keyid_file():
|
||||
def _gpg_keyid_file(infile):
|
||||
c, o, e = _gpg('--list-packets', infile)
|
||||
return re.findall(r'^\s+keyid: ([0-9A-F]+)', o, flags=re.MULTILINE)
|
||||
return _gpg_keyid_file
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def pgpdump():
|
||||
def _pgpdump(infile):
|
||||
return _run(_which('pgpdump'), '-agimplu', infile)[0]
|
||||
|
||||
return _pgpdump
|
||||
|
||||
|
||||
# pytest hooks
|
||||
|
||||
# pytest_configure
|
||||
@@ -248,22 +37,26 @@ def pgpdump():
|
||||
def pytest_configure(config):
|
||||
print("== PGPy Test Suite ==")
|
||||
|
||||
# ensure commands we need exist
|
||||
for cmd in ['gpg2', 'pgpdump']:
|
||||
if _which(cmd) is None:
|
||||
print("Error: Missing Command: " + cmd)
|
||||
exit(-1)
|
||||
# clear out gnupghome
|
||||
clear_globs = [os.path.join(gnupghome, 'private-keys-v1.d', '*.key'),
|
||||
os.path.join(gnupghome, '*.kbx*'),
|
||||
os.path.join(gnupghome, '*.gpg*'),
|
||||
os.path.join(gnupghome, '.*'),
|
||||
os.path.join(gnupghome, 'random_seed')]
|
||||
for fpath in iter(f for cg in clear_globs for f in glob.glob(cg)):
|
||||
os.unlink(fpath)
|
||||
|
||||
# get the GnuPG version
|
||||
gpg_ver.parse(_run(_which('gpg2'), '--version')[0].splitlines()[0].split(' ')[-1])
|
||||
gpg_ver.parse(gpg.core.check_version())
|
||||
|
||||
# get the pgpdump version
|
||||
v, _ = _run(_which('pgpdump'), '-v', stderr=subprocess.STDOUT)
|
||||
pgpdump_ver.parse(v.split(' ')[2].strip(','))
|
||||
# check that there are no keys loaded, now
|
||||
with gpg.Context(offline=True) as c:
|
||||
c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=gnupghome)
|
||||
assert len(list(c.keylist())) == 0
|
||||
assert len(list(c.keylist(secret=True))) == 0
|
||||
|
||||
# display the working directory and the OpenSSL/GPG/pgpdump versions
|
||||
print("Working Directory: " + os.getcwd())
|
||||
print("Using OpenSSL " + str(openssl_ver))
|
||||
print("Using GnuPG " + str(gpg_ver))
|
||||
print("Using pgpdump " + str(pgpdump_ver))
|
||||
print("")
|
||||
|
||||
1
tests/gnupghome/gpg-agent.conf
Normal file
1
tests/gnupghome/gpg-agent.conf
Normal file
@@ -0,0 +1 @@
|
||||
allow-loopback-pinentry
|
||||
@@ -2,11 +2,6 @@
|
||||
# always expert
|
||||
expert
|
||||
trust-model always
|
||||
# keyring stuff
|
||||
no-default-keyring
|
||||
keyring testkeys.pub.gpg
|
||||
secret-keyring testkeys.sec.gpg
|
||||
trustdb-name testkeys.trust.gpg
|
||||
|
||||
# don't try to auto-locate keys except in the local keyring(s)
|
||||
no-auto-key-locate
|
||||
@@ -90,60 +90,48 @@ class TestPGPUID(object):
|
||||
|
||||
|
||||
_keyfiles = sorted(glob.glob('tests/testdata/blocks/*key*.asc'))
|
||||
_fingerprints = {'dsapubkey.asc': '2B5BBB143BA0B290DCEE6668B798AE8990877201',
|
||||
'dsaseckey.asc': '2B5BBB143BA0B290DCEE6668B798AE8990877201',
|
||||
'eccpubkey.asc': '502D1A5365D1C0CAA69945390BA52DF0BAA59D9C',
|
||||
'eccseckey.asc': '502D1A5365D1C0CAA69945390BA52DF0BAA59D9C',
|
||||
'openpgp.js.pubkey.asc': 'C7C38ECEE94A4AD32DDB064E14AB44C74D1BDAB8',
|
||||
'openpgp.js.seckey.asc': 'C7C38ECEE94A4AD32DDB064E14AB44C74D1BDAB8',
|
||||
'rsapubkey.asc': 'F4294BC8094A7E0585C85E8637473B3758C44F36',
|
||||
'rsaseckey.asc': 'F4294BC8094A7E0585C85E8637473B3758C44F36',}
|
||||
|
||||
|
||||
class TestPGPKey(object):
|
||||
@pytest.mark.parametrize('kf', _keyfiles, ids=[os.path.basename(f) for f in _keyfiles])
|
||||
def test_load_from_file(self, kf, gpg_keyid_file):
|
||||
def test_load_from_file(self, kf):
|
||||
key, _ = PGPKey.from_file(kf)
|
||||
|
||||
# TODO: maybe store the fingerprint instead of relying on a particular version of GnuPG...?
|
||||
if 'ecc' in kf and gpg_ver < '2.1':
|
||||
assert key.fingerprint
|
||||
|
||||
else:
|
||||
assert key.fingerprint.keyid in gpg_keyid_file(kf.replace('tests/testdata/', ''))
|
||||
assert key.fingerprint == _fingerprints[os.path.basename(kf)]
|
||||
|
||||
@pytest.mark.parametrize('kf', _keyfiles, ids=[os.path.basename(f) for f in _keyfiles])
|
||||
def test_load_from_str(self, kf, gpg_keyid_file):
|
||||
def test_load_from_str(self, kf):
|
||||
with open(kf, 'r') as tkf:
|
||||
key, _ = PGPKey.from_blob(tkf.read())
|
||||
|
||||
# TODO: maybe store the fingerprint instead of relying on a particular version of GnuPG...?
|
||||
if 'ecc' in kf and gpg_ver < '2.1':
|
||||
assert key.fingerprint
|
||||
|
||||
else:
|
||||
assert key.fingerprint.keyid in gpg_keyid_file(kf.replace('tests/testdata/', ''))
|
||||
assert key.fingerprint == _fingerprints[os.path.basename(kf)]
|
||||
|
||||
@pytest.mark.regression(issue=140)
|
||||
@pytest.mark.parametrize('kf', _keyfiles, ids=[os.path.basename(f) for f in _keyfiles])
|
||||
def test_load_from_bytes(self, kf, gpg_keyid_file):
|
||||
def test_load_from_bytes(self, kf):
|
||||
with open(kf, 'rb') as tkf:
|
||||
key, _ = PGPKey.from_blob(tkf.read())
|
||||
|
||||
# TODO: maybe store the fingerprint instead of relying on a particular version of GnuPG...?
|
||||
if 'ecc' in kf and gpg_ver < '2.1':
|
||||
assert key.fingerprint
|
||||
|
||||
else:
|
||||
assert key.fingerprint.keyid in gpg_keyid_file(kf.replace('tests/testdata/', ''))
|
||||
assert key.fingerprint == _fingerprints[os.path.basename(kf)]
|
||||
|
||||
@pytest.mark.regression(issue=140)
|
||||
@pytest.mark.parametrize('kf', _keyfiles, ids=[os.path.basename(f) for f in _keyfiles])
|
||||
def test_load_from_bytearray(self, kf, gpg_keyid_file):
|
||||
def test_load_from_bytearray(self, kf):
|
||||
tkb = bytearray(os.stat(kf).st_size)
|
||||
with open(kf, 'rb') as tkf:
|
||||
tkf.readinto(tkb)
|
||||
|
||||
key, _ = PGPKey.from_blob(tkb)
|
||||
|
||||
# TODO: maybe store the fingerprint instead of relying on a particular version of GnuPG...?
|
||||
if 'ecc' in kf and gpg_ver < '2.1':
|
||||
assert key.fingerprint
|
||||
|
||||
else:
|
||||
assert key.fingerprint.keyid in gpg_keyid_file(kf.replace('tests/testdata/', ''))
|
||||
assert key.fingerprint == _fingerprints[os.path.basename(kf)]
|
||||
|
||||
@pytest.mark.parametrize('kf', sorted(filter(lambda f: not f.endswith('enc.asc'), glob.glob('tests/testdata/keys/*.asc'))))
|
||||
def test_save(self, kf):
|
||||
|
||||
@@ -2,10 +2,11 @@
|
||||
""" test doing things with keys/signatures/etc
|
||||
"""
|
||||
import pytest
|
||||
from conftest import gpg_ver
|
||||
from conftest import gpg_ver, gnupghome
|
||||
|
||||
import copy
|
||||
import glob
|
||||
import gpg
|
||||
import itertools
|
||||
import os
|
||||
import six
|
||||
@@ -45,9 +46,25 @@ def EncodedNamedTemporaryFile(mode, **kw):
|
||||
|
||||
|
||||
class TestPGPMessage(object):
|
||||
@staticmethod
|
||||
def gpg_message(msg):
|
||||
with gpg.Context(offline=True) as c:
|
||||
c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=gnupghome)
|
||||
msg, _ = c.verify(gpg.Data(string=str(msg)))
|
||||
return msg
|
||||
|
||||
@staticmethod
|
||||
def gpg_decrypt(msg, passphrase):
|
||||
with gpg.Context(offline=True) as c:
|
||||
c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=gnupghome)
|
||||
msg, decres, _ = c.decrypt(gpg.Data(string=str(msg)), passphrase=passphrase)
|
||||
|
||||
assert decres
|
||||
return msg
|
||||
|
||||
@pytest.mark.parametrize('comp_alg,sensitive',
|
||||
itertools.product(CompressionAlgorithm, [False, True]))
|
||||
def test_new(self, comp_alg, sensitive, gpg_print):
|
||||
def test_new(self, comp_alg, sensitive):
|
||||
mtxt = u"This is a new message!"
|
||||
msg = PGPMessage.new(mtxt, compression=comp_alg, sensitive=sensitive)
|
||||
|
||||
@@ -58,14 +75,12 @@ class TestPGPMessage(object):
|
||||
assert msg.message == mtxt
|
||||
assert msg._compression == comp_alg
|
||||
|
||||
with tempfile.NamedTemporaryFile('w+') as mf:
|
||||
mf.write(str(msg))
|
||||
mf.flush()
|
||||
assert gpg_print(mf.name) == mtxt
|
||||
# see if GPG can parse our message
|
||||
assert self.gpg_message(msg).decode('utf-8') == mtxt
|
||||
|
||||
@pytest.mark.parametrize('comp_alg,sensitive,path',
|
||||
itertools.product(CompressionAlgorithm, [False, True], sorted(glob.glob('tests/testdata/files/literal*'))))
|
||||
def test_new_from_file(self, comp_alg, sensitive, path, gpg_print):
|
||||
def test_new_from_file(self, comp_alg, sensitive, path):
|
||||
msg = PGPMessage.new(path, file=True, compression=comp_alg, sensitive=sensitive)
|
||||
|
||||
assert isinstance(msg, PGPMessage)
|
||||
@@ -74,16 +89,14 @@ class TestPGPMessage(object):
|
||||
assert msg.is_sensitive is sensitive
|
||||
|
||||
with open(path, 'rb') as tf:
|
||||
mtxt = tf.read().decode('latin-1')
|
||||
mtxt = tf.read()
|
||||
|
||||
with tempfile.NamedTemporaryFile('w+') as mf:
|
||||
mf.write(str(msg))
|
||||
mf.flush()
|
||||
assert gpg_print(mf.name) == mtxt
|
||||
# see if GPG can parse our message
|
||||
assert self.gpg_message(msg) == mtxt
|
||||
|
||||
@pytest.mark.regression(issue=154)
|
||||
# @pytest.mark.parametrize('cleartext', [False, True])
|
||||
def test_new_non_unicode(self, gpg_print):
|
||||
def test_new_non_unicode(self):
|
||||
# this message text comes from http://www.columbia.edu/~fdc/utf8/
|
||||
text = u'色は匂へど 散りぬるを\n' \
|
||||
u'我が世誰ぞ 常ならむ\n' \
|
||||
@@ -94,13 +107,11 @@ class TestPGPMessage(object):
|
||||
assert msg.type == 'literal'
|
||||
assert msg.message == text.encode('jisx0213')
|
||||
|
||||
with tempfile.NamedTemporaryFile('w+') as mf:
|
||||
mf.write(str(msg))
|
||||
mf.flush()
|
||||
assert gpg_print(mf.name).encode('latin-1').decode('jisx0213').strip() == text
|
||||
# see if GPG can parse our message
|
||||
assert self.gpg_message(msg).decode('jisx0213') == text
|
||||
|
||||
@pytest.mark.regression(issue=154)
|
||||
def test_new_non_unicode_cleartext(self, gpg_print):
|
||||
def test_new_non_unicode_cleartext(self):
|
||||
# this message text comes from http://www.columbia.edu/~fdc/utf8/
|
||||
text = u'色は匂へど 散りぬるを\n' \
|
||||
u'我が世誰ぞ 常ならむ\n' \
|
||||
@@ -111,11 +122,6 @@ class TestPGPMessage(object):
|
||||
assert msg.type == 'cleartext'
|
||||
assert msg.message == text
|
||||
|
||||
with EncodedNamedTemporaryFile('w+', encoding='utf-8') as mf:
|
||||
mf.write(six.text_type(msg).encode('utf-8') if six.PY2 else six.text_type(msg))
|
||||
mf.flush()
|
||||
assert gpg_print(mf.name).encode('latin-1').decode('utf-8').strip() == text
|
||||
|
||||
def test_add_marker(self):
|
||||
msg = PGPMessage.new(u"This is a new message")
|
||||
marker = Packet(bytearray(b'\xa8\x03\x50\x47\x50'))
|
||||
@@ -133,7 +139,7 @@ class TestPGPMessage(object):
|
||||
assert decmsg.message == b"This is stored, literally\\!\n\n"
|
||||
|
||||
@pytest.mark.parametrize('comp_alg', CompressionAlgorithm)
|
||||
def test_encrypt_passphrase(self, comp_alg, gpg_decrypt):
|
||||
def test_encrypt_passphrase(self, comp_alg):
|
||||
mtxt = "This message is to be encrypted"
|
||||
msg = PGPMessage.new(mtxt, compression=comp_alg)
|
||||
assert not msg.is_encrypted
|
||||
@@ -153,11 +159,8 @@ class TestPGPMessage(object):
|
||||
assert decmsg.message == mtxt
|
||||
assert decmsg._compression == msg._compression
|
||||
|
||||
# decrypt with GPG
|
||||
with tempfile.NamedTemporaryFile('w+') as mf:
|
||||
mf.write(str(encmsg))
|
||||
mf.flush()
|
||||
assert gpg_decrypt(mf.name, "QwertyUiop") == mtxt
|
||||
# decrypt with GPG via python-gnupg
|
||||
assert self.gpg_decrypt(encmsg, 'QwertyUiop').decode('utf-8') == decmsg.message
|
||||
|
||||
def test_encrypt_passphrase_2(self):
|
||||
mtxt = "This message is to be encrypted"
|
||||
@@ -211,18 +214,11 @@ class TestPGPKey_Management(object):
|
||||
keys = {}
|
||||
|
||||
def gpg_verify_key(self, key):
|
||||
from conftest import gpg_import as gpgi
|
||||
gpg_import = gpgi()
|
||||
with gpg.Context(offline=True) as c:
|
||||
c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=gnupghome)
|
||||
data, vres = c.verify(gpg.Data(string=str(key)))
|
||||
|
||||
if gpg_ver < '2.1' and key.key_algorithm in {PubKeyAlgorithm.ECDSA, PubKeyAlgorithm.ECDH}:
|
||||
# GPG prior to 2.1.x does not support EC* keys
|
||||
return
|
||||
|
||||
with tempfile.NamedTemporaryFile('w+') as kf:
|
||||
kf.write(str(key))
|
||||
kf.flush()
|
||||
with gpg_import(kf.name) as kio:
|
||||
assert 'invalid self-signature' not in kio
|
||||
assert vres
|
||||
|
||||
@pytest.mark.run('first')
|
||||
@pytest.mark.parametrize('alg,size', pkeyspecs)
|
||||
@@ -581,25 +577,44 @@ class TestPGPKey_Actions(object):
|
||||
sigs = {}
|
||||
msgs = {}
|
||||
|
||||
def gpg_verify(self, subject, sig, pubkey):
|
||||
def gpg_verify(self, subject, sig=None, pubkey=None):
|
||||
# verify with GnuPG
|
||||
from conftest import gpg_import as gpgi
|
||||
from conftest import gpg_verify as gpgv
|
||||
gpg_import = gpgi()
|
||||
gpg_verify = gpgv()
|
||||
with gpg.Context(armor=True, offline=True) as c:
|
||||
c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=gnupghome)
|
||||
|
||||
with tempfile.NamedTemporaryFile('w+') as sigf, \
|
||||
tempfile.NamedTemporaryFile('w+') as subjf, \
|
||||
tempfile.NamedTemporaryFile('w+') as keyf:
|
||||
sigf.write(str(sig))
|
||||
subjf.write(str(subject))
|
||||
keyf.write(str(pubkey))
|
||||
sigf.flush()
|
||||
subjf.flush()
|
||||
keyf.flush()
|
||||
# do we need to import the key?
|
||||
if pubkey:
|
||||
try:
|
||||
c.get_key(pubkey.fingerprint)
|
||||
|
||||
with gpg_import(keyf.name):
|
||||
assert gpg_verify(subjf.name, sigf.name, keyid=sig.signer)
|
||||
except gpg.errors.KeyNotFound:
|
||||
key_data = gpg.Data(string=str(pubkey))
|
||||
gpg.core.gpgme.gpgme_op_import(c.wrapped, key_data)
|
||||
|
||||
vargs = [gpg.Data(string=str(subject))]
|
||||
if sig is not None:
|
||||
vargs += [gpg.Data(string=str(sig))]
|
||||
|
||||
_, vres = c.verify(*vargs)
|
||||
|
||||
assert vres
|
||||
|
||||
def gpg_decrypt(self, message, privkey):
|
||||
# decrypt with GnuPG
|
||||
with gpg.Context(armor=True, offline=True) as c:
|
||||
c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=gnupghome)
|
||||
|
||||
# do we need to import the key?
|
||||
try:
|
||||
c.get_key(privkey.fingerprint, True)
|
||||
|
||||
except gpg.errors.KeyNotFound:
|
||||
key_data = gpg.Data(string=str(privkey))
|
||||
gpg.core.gpgme.gpgme_op_import(c.wrapped, key_data)
|
||||
|
||||
pt, _, _ = c.decrypt(gpg.Data(string=str(message)), verify=False)
|
||||
|
||||
return pt
|
||||
|
||||
# test non-management PGPKey actions using existing keys, i.e.:
|
||||
# - signing/verifying
|
||||
@@ -641,7 +656,7 @@ class TestPGPKey_Actions(object):
|
||||
assert sv
|
||||
assert sig in sv
|
||||
|
||||
def test_sign_message(self, targette_sec, targette_pub, message, gpg_import, gpg_verify):
|
||||
def test_sign_message(self, targette_sec, targette_pub, message):
|
||||
# test signing a message
|
||||
sig = targette_sec.sign(message)
|
||||
|
||||
@@ -652,13 +667,7 @@ class TestPGPKey_Actions(object):
|
||||
message |= sig
|
||||
|
||||
# verify with GnuPG
|
||||
with tempfile.NamedTemporaryFile('w+') as mf, tempfile.NamedTemporaryFile('w+') as pubf:
|
||||
mf.write(str(message))
|
||||
pubf.write(str(targette_pub))
|
||||
mf.flush()
|
||||
pubf.flush()
|
||||
with gpg_import(pubf.name):
|
||||
assert gpg_verify(mf.name, keyid=sig.signer)
|
||||
self.gpg_verify(message, pubkey=targette_pub)
|
||||
|
||||
@pytest.mark.run(after='test_sign_message')
|
||||
def test_verify_message(self, targette_pub, message):
|
||||
@@ -667,7 +676,7 @@ class TestPGPKey_Actions(object):
|
||||
assert sv
|
||||
assert len(sv) > 0
|
||||
|
||||
def test_sign_ctmessage(self, targette_sec, targette_pub, ctmessage, gpg_import, gpg_verify):
|
||||
def test_sign_ctmessage(self, targette_sec, targette_pub, ctmessage):
|
||||
# test signing a cleartext message
|
||||
expire_at = datetime.utcnow() + timedelta(days=1)
|
||||
|
||||
@@ -680,13 +689,7 @@ class TestPGPKey_Actions(object):
|
||||
ctmessage |= sig
|
||||
|
||||
# verify with GnuPG
|
||||
with tempfile.NamedTemporaryFile('w+') as ctmf, tempfile.NamedTemporaryFile('w+') as pubf:
|
||||
ctmf.write(str(ctmessage))
|
||||
pubf.write(str(targette_pub))
|
||||
ctmf.flush()
|
||||
pubf.flush()
|
||||
with gpg_import(pubf.name):
|
||||
assert gpg_verify(ctmf.name, keyid=sig.signer)
|
||||
self.gpg_verify(ctmessage, pubkey=targette_pub)
|
||||
|
||||
@pytest.mark.run(after='test_sign_ctmessage')
|
||||
def test_verify_ctmessage(self, targette_pub, ctmessage):
|
||||
@@ -822,18 +825,9 @@ class TestPGPKey_Actions(object):
|
||||
assert sv
|
||||
assert len(list(sv.good_signatures)) > 0
|
||||
|
||||
def test_gpg_import_abe(self, abe, gpg_import, gpg_check_sigs):
|
||||
def test_gpg_import_abe(self, abe):
|
||||
# verify all of the things we did to Abe's key with GnuPG in one fell swoop
|
||||
with tempfile.NamedTemporaryFile('w+') as abef:
|
||||
abef.write(str(abe))
|
||||
abef.flush()
|
||||
|
||||
# import all of the public keys first
|
||||
with gpg_import(*(os.path.realpath(f) for f in sorted(glob.glob('tests/testdata/keys/*.pub.asc')))):
|
||||
# import Abe's key
|
||||
with gpg_import(abef.name) as kio:
|
||||
assert 'invalid self-signature' not in kio
|
||||
assert gpg_check_sigs(abe.fingerprint.keyid)
|
||||
self.gpg_verify(abe)
|
||||
|
||||
@pytest.mark.parametrize('pub,cipher',
|
||||
itertools.product(pubkeys, sorted(SymmetricKeyAlgorithm)),
|
||||
@@ -854,7 +848,7 @@ class TestPGPKey_Actions(object):
|
||||
@pytest.mark.run(after='test_encrypt_message')
|
||||
@pytest.mark.parametrize('sf,cipher',
|
||||
itertools.product(sorted(glob.glob('tests/testdata/keys/*.sec.asc')), sorted(SymmetricKeyAlgorithm)))
|
||||
def test_decrypt_message(self, sf, cipher, gpg_import, gpg_print):
|
||||
def test_decrypt_message(self, sf, cipher):
|
||||
# test decrypting a message
|
||||
sec, _ = PGPKey.from_file(sf)
|
||||
if (sec.fingerprint, cipher) not in self.msgs:
|
||||
@@ -870,12 +864,7 @@ class TestPGPKey_Actions(object):
|
||||
# GnuPG prior to 2.1.x does not support EC* keys, so skip this step
|
||||
return
|
||||
|
||||
with tempfile.NamedTemporaryFile('w+') as emsgf:
|
||||
emsgf.write(str(emsg))
|
||||
emsgf.flush()
|
||||
|
||||
with gpg_import(os.path.realpath(sf)) as kf:
|
||||
assert gpg_print(emsgf.name) == dmsg.message
|
||||
assert self.gpg_decrypt(emsg, sec).decode('utf-8') == dmsg.message
|
||||
|
||||
@pytest.mark.run(after='test_encrypt_message')
|
||||
@pytest.mark.parametrize('sf,cipher',
|
||||
|
||||
@@ -1,16 +1,18 @@
|
||||
""" I've got 99 problems but regression testing ain't one
|
||||
"""
|
||||
from conftest import gpg_ver, gnupghome
|
||||
|
||||
import gpg
|
||||
import os
|
||||
import pytest
|
||||
import glob
|
||||
import tempfile
|
||||
import warnings
|
||||
from pgpy import PGPKey
|
||||
from pgpy.types import Armorable
|
||||
|
||||
|
||||
@pytest.mark.regression(issue=56)
|
||||
def test_reg_bug_56(gpg_import, gpg_verify):
|
||||
def test_reg_bug_56():
|
||||
# some imports only used by this regression test
|
||||
import hashlib
|
||||
from datetime import datetime
|
||||
@@ -147,19 +149,15 @@ def test_reg_bug_56(gpg_import, gpg_verify):
|
||||
assert pk.verify(sigsubject, sig)
|
||||
|
||||
# with GnuPG
|
||||
with tempfile.NamedTemporaryFile('w+') as subjf, \
|
||||
tempfile.NamedTemporaryFile('w+') as sigf, \
|
||||
tempfile.NamedTemporaryFile('w+') as pubf:
|
||||
subjf.write(sigsubject.decode('latin-1'))
|
||||
sigf.write(str(sig))
|
||||
pubf.write(str(pk))
|
||||
with gpg.Context(armor=True, offline=True) as c:
|
||||
c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=gnupghome)
|
||||
|
||||
subjf.flush()
|
||||
sigf.flush()
|
||||
pubf.flush()
|
||||
# import the key
|
||||
key_data = gpg.Data(string=pub)
|
||||
gpg.core.gpgme.gpgme_op_import(c.wrapped, key_data)
|
||||
|
||||
with gpg_import(pubf.name):
|
||||
assert gpg_verify(subjf.name, sigf.name)
|
||||
_, vres = c.verify(gpg.Data(string=sigsubject.decode('latin-1')), gpg.Data(string=str(sig)))
|
||||
assert vres
|
||||
|
||||
|
||||
@pytest.mark.regression(issue=157)
|
||||
|
||||
1
tox.ini
1
tox.ini
@@ -17,6 +17,7 @@ passenv = HOME ARCHFLAGS LDFLAGS CFLAGS INCLUDE LIB LD_LIBRARY_PATH PATH
|
||||
deps =
|
||||
cryptography>=1.1
|
||||
enum34
|
||||
gpg==1.8.0
|
||||
pyasn1
|
||||
six>=1.9.0
|
||||
singledispatch
|
||||
|
||||
Reference in New Issue
Block a user