- Implemented proper parsing of ECDH keys - closes #110

- Additional code needed to generate ECDH keys also added
 - Encrypt/decrypt using ECDH also works
 - Added some code to the test harness to start making it work with gpg 2.1
 - Fixed key generation tests to test generating keys using all possible algorithms
 - Updated documentation sources to reflect these changes
This commit is contained in:
Michael Greene
2016-02-15 22:48:01 -08:00
parent 0efb355643
commit f31a802c73
16 changed files with 632 additions and 77 deletions

View File

@@ -20,6 +20,12 @@ New Features
* Keys can be generated with the following algorithms:
- RSA
- DSA
- ECDSA
- ECDH
Other Changes
-------------
* Removed support for Python 3.2, as multiple dependency libraries have already done so
v0.3.0
======

View File

@@ -197,8 +197,8 @@ PGPy is focused on eventually reaching complete OpenPGP implementation, adhering
:text: :rfc:`6637` extends OpenPGP to officially add support for elliptic curve cryptography
:Key Types:
- ECDH, False, Elliptic Curve Diffie-Hellman
- ECDSA, True, Elliptic Curve Digital Signature Algorithm
- ECDH, True, Elliptic Curve Diffie-Hellman
- ECDSA, True, Elliptic Curve Digital Signature Algorithm
:Curves:
- Curve, True, NIST P-256
@@ -206,11 +206,11 @@ PGPy is focused on eventually reaching complete OpenPGP implementation, adhering
- Curve, True, NIST P-521
:ECDH Key Actions:
- Load, False, Load Keys
- Generate, False, Generate Keys
- Generate, False, Generate Subkeys
- KDF, False, Encode KDF data for encryption
- KDF, False, Decode KDF data for decryption
- Load, True, Load Keys
- Generate, True, Generate Keys
- Generate, True, Generate Subkeys
- KDF, True, Encode KDF data for encryption
- KDF, True, Decode KDF data for decryption
:ECDSA Key Actions:
- Load, True, Load Keys

View File

@@ -201,15 +201,16 @@ class PubKeyAlgorithm(IntEnum):
def can_gen(self):
return self in {PubKeyAlgorithm.RSAEncryptOrSign,
PubKeyAlgorithm.DSA,
PubKeyAlgorithm.ECDSA}
PubKeyAlgorithm.ECDSA,
PubKeyAlgorithm.ECDH}
@property
def can_encrypt(self): # pragma: no cover
return self in {PubKeyAlgorithm.RSAEncryptOrSign, PubKeyAlgorithm.ElGamal}
return self in {PubKeyAlgorithm.RSAEncryptOrSign, PubKeyAlgorithm.ElGamal, PubKeyAlgorithm.ECDH}
@property
def can_sign(self):
return self in {PubKeyAlgorithm.RSAEncryptOrSign, PubKeyAlgorithm.DSA}
return self in {PubKeyAlgorithm.RSAEncryptOrSign, PubKeyAlgorithm.DSA, PubKeyAlgorithm.ECDSA}
@property
def deprecated(self):

View File

@@ -3,6 +3,7 @@
from __future__ import absolute_import, division
import abc
import binascii
import collections
import hashlib
import itertools
@@ -18,11 +19,20 @@ from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHash
from cryptography.hazmat.primitives.keywrap import aes_key_wrap
from cryptography.hazmat.primitives.keywrap import aes_key_unwrap
from cryptography.hazmat.primitives.padding import PKCS7
from .subpackets import Signature as SignatureSP
from .subpackets import UserAttribute
from .subpackets import signature
@@ -59,6 +69,7 @@ __all__ = ['SubPackets',
'DSAPub',
'ElGPub',
'ECDSAPub',
'ECDHPub',
'String2Key',
'PrivKey',
'OpaquePrivKey',
@@ -66,6 +77,7 @@ __all__ = ['SubPackets',
'DSAPriv',
'ElGPriv',
'ECDSAPriv',
'ECDHPriv',
'CipherText',
'RSACipherText',
'ElGCipherText', ]
@@ -305,23 +317,6 @@ class DSASignature(Signature):
class ECDSASignature(DSASignature):
# def __init__(self):
# super(ECDSASignature, self).__init__()
# self.r = MPI(0)
# self.s = MPI(0)
# def __iter__(self):
# yield self.r
# yield self.s
# def __sig__(self):
# # return the signature data into an ASN.1 sequence of integers in DER format
# seq = Sequence()
# for i in self:
# seq.setComponentByPosition(len(seq), Integer(i))
#
# return encoder.encode(seq)
def from_signer(self, sig):
seq, _ = decoder.decode(sig)
self.r = MPI(seq[0])
@@ -496,7 +491,6 @@ class ECDSAPub(PubKey):
def parse(self, packet):
oidlen = packet[0]
print(oidlen)
del packet[0]
_oid = bytearray(b'\x06')
_oid.append(oidlen)
@@ -521,6 +515,153 @@ class ECDSAPub(PubKey):
self.y = MPI(self.bytes_to_int(y))
class ECDHPub(PubKey):
"""
8. EC DH Algorithm (ECDH)
The method is a combination of an ECC Diffie-Hellman method to
establish a shared secret, a key derivation method to process the
shared secret into a derived key, and a key wrapping method that uses
the derived key to protect a session key used to encrypt a message.
The One-Pass Diffie-Hellman method C(1, 1, ECC CDH) [NIST-SP800-56A]
MUST be implemented with the following restrictions: the ECC CDH
primitive employed by this method is modified to always assume the
cofactor as 1, the KDF specified in Section 7 is used, and the KDF
parameters specified below are used.
The KDF parameters are encoded as a concatenation of the following 5
variable-length and fixed-length fields, compatible with the
definition of the OtherInfo bitstring [NIST-SP800-56A]:
o a variable-length field containing a curve OID, formatted as
follows:
- a one-octet size of the following field
- the octets representing a curve OID, defined in Section 11
o a one-octet public key algorithm ID defined in Section 5
o a variable-length field containing KDF parameters, identical to
the corresponding field in the ECDH public key, formatted as
follows:
- a one-octet size of the following fields; values 0 and 0xff
are reserved for future extensions
- a one-octet value 01, reserved for future extensions
- a one-octet hash function ID used with the KDF
- a one-octet algorithm ID for the symmetric algorithm used to
wrap the symmetric key for message encryption; see Section 8
for details
o 20 octets representing the UTF-8 encoding of the string
"Anonymous Sender ", which is the octet sequence
41 6E 6F 6E 79 6D 6F 75 73 20 53 65 6E 64 65 72 20 20 20 20
o 20 octets representing a recipient encryption subkey or a master
key fingerprint, identifying the key material that is needed for
the decryption
"""
def __init__(self):
super(ECDHPub, self).__init__()
self.oid = None
self.x = MPI(0)
self.y = MPI(0)
self.kdf = ECKDF()
def __iter__(self):
yield self.x
yield self.y
def __len__(self):
return sum([len(i) - 2 for i in self] +
[3,
len(self.kdf),
len(encoder.encode(self.oid.value)) - 1])
def __pubkey__(self):
return ec.EllipticCurvePublicNumbers(self.x, self.y, self.oid.curve()).public_key(default_backend())
# test
# def publen(self):
# return len(self)
def __bytearray__(self):
_b = bytearray()
_b += encoder.encode(self.oid.value)[1:]
# 0x04 || x || y
# where x and y are the same length
_xy = b'\x04' + self.x.to_mpibytes()[2:] + self.y.to_mpibytes()[2:]
_b += MPI(self.bytes_to_int(_xy, 'big')).to_mpibytes()
_b += self.kdf.__bytearray__()
return _b
# ECDH does not do signatures, so this should cancel inheriting the verify method
verify = False
def parse(self, packet):
"""
Algorithm-Specific Fields for ECDH keys:
o a variable-length field containing a curve OID, formatted
as follows:
- a one-octet size of the following field; values 0 and
0xFF are reserved for future extensions
- the octets representing a curve OID, defined in
Section 11
- MPI of an EC point representing a public key
o a variable-length field containing KDF parameters,
formatted as follows:
- a one-octet size of the following fields; values 0 and
0xff are reserved for future extensions
- a one-octet value 01, reserved for future extensions
- a one-octet hash function ID used with a KDF
- a one-octet algorithm ID for the symmetric algorithm
used to wrap the symmetric key used for the message
encryption; see Section 8 for details
"""
oidlen = packet[0]
del packet[0]
_oid = bytearray(b'\x06')
_oid.append(oidlen)
_oid += bytearray(packet[:oidlen])
# try:
oid, _ = decoder.decode(bytes(_oid))
# except:
# raise PGPError("Bad OID octet stream: b'{:s}'".format(''.join(['\\x{:02X}'.format(c) for c in _oid])))
self.oid = EllipticCurveOID(oid)
del packet[:oidlen]
# flen = (self.oid.bit_length // 8)
xy = bytearray(MPI(packet).to_mpibytes()[2:])
# xy = bytearray(MPI(packet).to_bytes(flen, 'big'))
# the first byte is just \x04
del xy[:1]
# now xy needs to be separated into x, y
xylen = len(xy)
x, y = xy[:xylen // 2], xy[xylen // 2:]
self.x = MPI(self.bytes_to_int(x))
self.y = MPI(self.bytes_to_int(y))
self.kdf.parse(packet)
class String2Key(Field):
"""
3.7. String-to-Key (S2K) Specifiers
@@ -773,6 +914,76 @@ class String2Key(Field):
return b''.join(hc.digest() for hc in h)[:(keylen // 8)]
class ECKDF(Field):
"""
o a variable-length field containing KDF parameters,
formatted as follows:
- a one-octet size of the following fields; values 0 and
0xff are reserved for future extensions
- a one-octet value 01, reserved for future extensions
- a one-octet hash function ID used with a KDF
- a one-octet algorithm ID for the symmetric algorithm
used to wrap the symmetric key used for the message
encryption; see Section 8 for details
"""
@sdproperty
def halg(self):
return self._halg
@halg.register(int)
@halg.register(HashAlgorithm)
def halg_int(self, val):
self._halg = HashAlgorithm(val)
@sdproperty
def encalg(self):
return self._encalg
@encalg.register(int)
@encalg.register(SymmetricKeyAlgorithm)
def encalg_int(self, val):
self._encalg = SymmetricKeyAlgorithm(val)
def __init__(self):
super(ECKDF, self).__init__()
self.halg = 0
self.encalg = 0
def __bytearray__(self):
_bytes = bytearray()
_bytes.append(len(self) - 1)
_bytes.append(0x01)
_bytes.append(self.halg)
_bytes.append(self.encalg)
return _bytes
def __len__(self):
return 4
def parse(self, packet):
# packet[0] should always be 3
# packet[1] should always be 1
# TODO: this assert is likely not necessary, but we should raise some kind of exception
# if parsing fails due to these fields being incorrect
assert packet[:2] == b'\x03\x01'
del packet[:2]
self.halg = packet[0]
del packet[0]
self.encalg = packet[0]
del packet[0]
def derive_key(self, s, zlen, data):
# wrapper around the Concatenation KDF method provided by cryptography
ckdf = ConcatKDFHash(algorithm=getattr(hashes, self.halg.name)(), length=zlen, otherinfo=bytes(data), backend=default_backend())
return ckdf.derive(s)
class PrivKey(PubKey):
def __init__(self):
super(PrivKey, self).__init__()
@@ -1160,11 +1371,11 @@ class ECDSAPriv(PrivKey, ECDSAPub):
self.oid = EllipticCurveOID(oid)
pk = ec.generate_private_key(self.oid.value, default_backend())
pk = ec.generate_private_key(self.oid.curve(), default_backend())
pubn = pk.public_key().public_numbers()
self.x = pubn.x
self.y = pubn.y
self.s = pk.private_numbers().private_value
self.x = MPI(pubn.x)
self.y = MPI(pubn.y)
self.s = MPI(pk.private_numbers().private_value)
def publen(self):
return sum([len(i) - 2 for i in ECDSAPub.__iter__(self)] + [3, len(encoder.encode(self.oid.value)) - 1])
@@ -1185,17 +1396,13 @@ class ECDSAPriv(PrivKey, ECDSAPub):
self.encbytes = packet
def decrypt_keyblob(self, passphrase):
raise NotImplementedError("ECDSA private key unlocking is not implemented yet")
kb = super(ECDSAPriv, self).decrypt_keyblob(passphrase)
del passphrase
self.s = MPI(kb)
def clear(self):
del self.oid
del self.x
del self.y
del self.s
self.oid = None
self.x = MPI(0)
self.y = MPI(0)
self.s = MPI(0)
def sign(self, sigdata, hash_alg):
@@ -1204,19 +1411,91 @@ class ECDSAPriv(PrivKey, ECDSAPub):
return signer.finalize()
class ECDHPriv(ECDSAPriv, ECDHPub):
def __bytearray__(self):
_b = ECDHPub.__bytearray__(self)
_b += self.s2k.__bytearray__()
if not self.s2k:
_b += self.s.to_mpibytes()
if self.s2k.usage == 0:
_b += self.chksum
else:
_b += self.encbytes
return _b
def _generate(self, oid):
super(ECDHPriv, self)._generate(oid)
# set the KDF hash and symmetric encryption algorithms
_halgs = {EllipticCurveOID.NIST_P256: HashAlgorithm.SHA256,
EllipticCurveOID.NIST_P384: HashAlgorithm.SHA384,
EllipticCurveOID.NIST_P521: HashAlgorithm.SHA512}
_kalgs = {EllipticCurveOID.NIST_P256: SymmetricKeyAlgorithm.AES128,
EllipticCurveOID.NIST_P384: SymmetricKeyAlgorithm.AES192,
EllipticCurveOID.NIST_P521: SymmetricKeyAlgorithm.AES256}
self.kdf.halg = _halgs[self.oid]
self.kdf.encalg = _kalgs[self.oid]
def publen(self):
return sum([len(i) - 2 for i in ECDHPub.__iter__(self)] +
[3,
len(encoder.encode(self.oid.value)) - 1,
len(self.kdf)])
def parse(self, packet):
ECDHPub.parse(self, packet)
self.s2k.parse(packet)
if not self.s2k:
self.s = MPI(packet)
if self.s2k.usage == 0:
self.chksum = packet[:2]
del packet[:2]
else:
##TODO: this needs to be bounded to the length of the encrypted key material
self.encbytes = packet
#
# # as in ECDHPub, ECDH cannot sign, so we set this property so the sign method
# # isn't inherited from ECDSAPriv
# sign = False
class CipherText(MPIs):
@classmethod
@abc.abstractmethod
def encrypt(cls, encfn, *args):
"""create and populate a concrete CipherText class instance"""
@abc.abstractmethod
def decrypt(self, decfn, *args):
"""decrypt the ciphertext contained in this CipherText instance"""
def __bytearray__(self):
_bytes = bytearray()
for i in self:
_bytes += i.to_mpibytes()
return _bytes
@abc.abstractmethod
def from_encrypter(self, ct):
"""create and parse a concrete CipherText class instance"""
class RSACipherText(CipherText):
@classmethod
def encrypt(cls, encfn, *args):
ct = cls()
ct.me_mod_n = MPI(cls.bytes_to_int(encfn(*args)))
return ct
def decrypt(self, decfn, *args):
return decfn(*args)
def __init__(self):
super(RSACipherText, self).__init__()
self.me_mod_n = MPI(0)
@@ -1227,11 +1506,15 @@ class RSACipherText(CipherText):
def parse(self, packet):
self.me_mod_n = MPI(packet)
def from_encrypter(self, ct):
self.me_mod_n = MPI(self.bytes_to_int(ct))
class ElGCipherText(CipherText):
@classmethod
def encrypt(cls, encfn, *args):
raise NotImplementedError()
def decrypt(self, decfn, *args):
raise NotImplementedError()
def __init__(self):
super(ElGCipherText, self).__init__()
self.gk_mod_p = MPI(0)
@@ -1245,5 +1528,129 @@ class ElGCipherText(CipherText):
self.gk_mod_p = MPI(packet)
self.myk_mod_p = MPI(packet)
def from_encrypter(self, ct):
raise NotImplementedError()
class ECDHCipherText(CipherText):
@classmethod
def encrypt(cls, pk, *args):
"""
For convenience, the synopsis of the encoding method is given below;
however, this section, [NIST-SP800-56A], and [RFC3394] are the
normative sources of the definition.
Obtain the authenticated recipient public key R
Generate an ephemeral key pair {v, V=vG}
Compute the shared point S = vR;
m = symm_alg_ID || session key || checksum || pkcs5_padding;
curve_OID_len = (byte)len(curve_OID);
Param = curve_OID_len || curve_OID || public_key_alg_ID || 03
|| 01 || KDF_hash_ID || KEK_alg_ID for AESKeyWrap || "Anonymous
Sender " || recipient_fingerprint;
Z_len = the key size for the KEK_alg_ID used with AESKeyWrap
Compute Z = KDF( S, Z_len, Param );
Compute C = AESKeyWrap( Z, m ) as per [RFC3394]
VB = convert point V to the octet string
Output (MPI(VB) || len(C) || C).
The decryption is the inverse of the method given. Note that the
recipient obtains the shared secret by calculating
"""
# *args should be:
# - m
# -
_m, = args
# m may need to be PKCS5-padded
padder = PKCS7(64).padder()
m = padder.update(_m) + padder.finalize()
km = pk.keymaterial
ct = cls()
# generate ephemeral key pair, then store it in ct
v = ec.generate_private_key(km.oid.curve(), default_backend())
ct.vX = MPI(v.public_key().public_numbers().x)
ct.vY = MPI(v.public_key().public_numbers().y)
# compute the shared point S
s = v.exchange(ec.ECDH(), km.__pubkey__())
# additional data for the KDF
data = bytearray()
data += encoder.encode(km.oid.value)[1:]
data.append(PubKeyAlgorithm.ECDH)
data += b'\x03\x01'
data.append(km.kdf.halg)
data.append(km.kdf.encalg)
data += b'Anonymous Sender '
data += binascii.unhexlify(pk.fingerprint.replace(' ', ''))
# TODO: how do we get 16 here?
z = km.kdf.derive_key(s, 16, data)
# compute C
ct.c = aes_key_wrap(z, m, default_backend())
return ct
def decrypt(self, pk, *args):
km = pk.keymaterial
# assemble the public component of ephemeral key v
v = ec.EllipticCurvePublicNumbers(self.vX, self.vY, km.oid.curve()).public_key(default_backend())
# compute s using the inverse of how it was derived during encryption
s = km.__privkey__().exchange(ec.ECDH(), v)
# additional data for the KDF
data = bytearray()
# - DER encoded OID, sans the first octet
data += encoder.encode(km.oid.value)[1:]
data.append(PubKeyAlgorithm.ECDH)
data += b'\x03\x01'
data.append(km.kdf.halg)
data.append(km.kdf.encalg)
data += b'Anonymous Sender '
data += binascii.unhexlify(pk.fingerprint.replace(' ', ''))
# z = pk.kdf.derive_key(s, self.vY, data)
# TODO: derive 16 ... how?
z = km.kdf.derive_key(s, 16, data)
_m = aes_key_unwrap(z, self.c, default_backend())
padder = PKCS7(64).unpadder()
return padder.update(_m) + padder.finalize()
def __init__(self):
super(ECDHCipherText, self).__init__()
self.vX = MPI(0)
self.vY = MPI(0)
self.c = bytearray(0)
def __iter__(self):
yield self.v
def __bytearray__(self):
_bytes = bytearray()
_xy = b'\x04' + self.vX.to_mpibytes()[2:] + self.vY.to_mpibytes()[2:]
_bytes += MPI(self.bytes_to_int(_xy, 'big')).to_mpibytes()
_bytes.append(len(self.c))
_bytes += self.c
return _bytes
def parse(self, packet):
# self.v = MPI(packet)
xy = bytearray(MPI(packet).to_mpibytes()[2:])
del xy[:1]
xylen = len(xy)
x, y = xy[:xylen // 2], xy[xylen // 2:]
self.vX = MPI(self.bytes_to_int(x))
self.vY = MPI(self.bytes_to_int(y))
clen = packet[0]
del packet[0]
self.c += packet[:clen]
del packet[:clen]

View File

@@ -14,21 +14,13 @@ import six
from cryptography.hazmat.primitives import constant_time
from cryptography.hazmat.primitives.asymmetric import padding
from .fields import DSAPriv
from .fields import DSAPub
from .fields import DSASignature
from .fields import ECDSAPub
from .fields import ECDSAPriv
from .fields import ECDSASignature
from .fields import ElGCipherText
from .fields import ElGPriv
from .fields import ElGPub
from .fields import DSAPriv, DSAPub, DSASignature
from .fields import ECDSAPub, ECDSAPriv, ECDSASignature
from .fields import ECDHPub, ECDHPriv, ECDHCipherText
from .fields import ElGCipherText, ElGPriv, ElGPub
from .fields import OpaquePubKey
from .fields import OpaquePrivKey
from .fields import RSACipherText
from .fields import RSAPriv
from .fields import RSAPub
from .fields import RSASignature
from .fields import RSACipherText, RSAPriv, RSAPub, RSASignature
from .fields import String2Key
from .fields import SubPackets
from .fields import UserAttributeSubPackets
@@ -93,6 +85,10 @@ class PKESessionKey(VersionedPacket):
def decrypt_sk(self, pk):
raise NotImplementedError()
@abc.abstractmethod
def encrypt_sk(self, pk, symalg, symkey):
raise NotImplementedError()
class PKESessionKeyV3(PKESessionKey):
"""
@@ -179,7 +175,8 @@ class PKESessionKeyV3(PKESessionKey):
_c = {PubKeyAlgorithm.RSAEncryptOrSign: RSACipherText,
PubKeyAlgorithm.RSAEncrypt: RSACipherText,
PubKeyAlgorithm.ElGamal: ElGCipherText,
PubKeyAlgorithm.FormerlyElGamalEncryptOrSign: ElGCipherText}
PubKeyAlgorithm.FormerlyElGamalEncryptOrSign: ElGCipherText,
PubKeyAlgorithm.ECDH: ECDHCipherText}
ct = _c.get(self._pkalg, None)
self.ct = ct() if ct is not None else ct
@@ -202,14 +199,19 @@ class PKESessionKeyV3(PKESessionKey):
if self.pkalg == PubKeyAlgorithm.RSAEncryptOrSign:
# pad up ct with null bytes if necessary
ct = self.ct.me_mod_n.to_mpibytes()[2:]
ct = b'\x00' * ((pk.key_size // 8) - len(ct)) + ct
ct = b'\x00' * ((pk.keymaterial.__privkey__().key_size // 8) - len(ct)) + ct
decrypter = pk.keymaterial.__privkey__().decrypt
decargs = (ct, padding.PKCS1v15(),)
elif self.pkalg == PubKeyAlgorithm.ECDH:
decrypter = pk
decargs = ()
else:
raise NotImplementedError(self.pkalg)
m = bytearray(pk.decrypt(*decargs))
m = bytearray(self.ct.decrypt(decrypter, *decargs))
"""
The value "m" in the above formulas is derived from the session key
@@ -243,12 +245,17 @@ class PKESessionKeyV3(PKESessionKey):
m += self.int_to_bytes(sum(bytearray(symkey)) % 65536, 2)
if self.pkalg == PubKeyAlgorithm.RSAEncryptOrSign:
encrypter = pk.keymaterial.__pubkey__().encrypt
encargs = (bytes(m), padding.PKCS1v15(),)
elif self.pkalg == PubKeyAlgorithm.ECDH:
encrypter = pk
encargs = (bytes(m),)
else:
raise NotImplementedError(self.pkalg)
self.ct.from_encrypter(pk.encrypt(*encargs))
self.ct = self.ct.encrypt(encrypter, *encargs)
self.update_hlen()
def parse(self, packet):
@@ -721,6 +728,7 @@ class PubKeyV4(PubKey):
(True, PubKeyAlgorithm.ElGamal): ElGPub,
(True, PubKeyAlgorithm.FormerlyElGamalEncryptOrSign): ElGPub,
(True, PubKeyAlgorithm.ECDSA): ECDSAPub,
(True, PubKeyAlgorithm.ECDH): ECDHPub,
# False means private
(False, PubKeyAlgorithm.RSAEncryptOrSign): RSAPriv,
(False, PubKeyAlgorithm.RSAEncrypt): RSAPriv,
@@ -729,6 +737,7 @@ class PubKeyV4(PubKey):
(False, PubKeyAlgorithm.ElGamal): ElGPriv,
(False, PubKeyAlgorithm.FormerlyElGamalEncryptOrSign): ElGPriv,
(False, PubKeyAlgorithm.ECDSA): ECDSAPriv,
(False, PubKeyAlgorithm.ECDH): ECDHPriv,
}
k = (self.public, self.pkalg)

View File

@@ -1942,7 +1942,8 @@ class PGPKey(PGPObject, Armorable):
pkesk = PKESessionKeyV3()
pkesk.encrypter = bytearray(binascii.unhexlify(self.fingerprint.keyid.encode('latin-1')))
pkesk.pkalg = self.key_algorithm
pkesk.encrypt_sk(self.__key__.__pubkey__(), cipher_algo, sessionkey)
# pkesk.encrypt_sk(self.__key__, cipher_algo, sessionkey)
pkesk.encrypt_sk(self._key, cipher_algo, sessionkey)
if message.is_encrypted: # pragma: no cover
_m = message
@@ -1981,7 +1982,7 @@ class PGPKey(PGPObject, Armorable):
raise PGPError("Cannot decrypt the provided message with this key")
pkesk = next(pk for pk in message._sessionkeys if pk.pkalg == self.key_algorithm and pk.encrypter == self.fingerprint.keyid)
alg, key = pkesk.decrypt_sk(self.__key__.__privkey__())
alg, key = pkesk.decrypt_sk(self._key)
# now that we have the symmetric cipher used and the key, we can decrypt the actual message
decmsg = PGPMessage()

View File

@@ -8,6 +8,7 @@ import re
import six
import subprocess
import sys
import time
from distutils.version import LooseVersion
@@ -125,11 +126,22 @@ def gpg_import():
gpg_kwargs = _gpg_kwargs.copy()
gpgo, _ = _run(_gpg_bin, *gpg_args, **gpg_kwargs)
# if GPG version is 2.1 or newer, we need to add a setup/teardown step in creating the keybox folder
if gpg_ver >= '2.1':
if not os.path.exists('tests/testdata/private-keys-v1.d'):
os.mkdir('tests/testdata/private-keys-v1.d')
time.sleep(5)
try:
yield gpgo
finally:
[os.remove(f) for f in glob.glob('tests/testdata/testkeys.*')]
if gpg_ver >= '2.1':
[os.remove(f) for f in glob.glob('tests/testdata/private-keys-v1.d/*')]
# os.rmdir('tests/testdata/private-keys-v1.d')
time.sleep(0.5)
return _gpg_import

View File

@@ -36,7 +36,7 @@ def binload(f):
return bytearray(ff.read())
skip_files = {'tests/testdata/packets/{:s}'.format(pkt) for pkt in ['07.v4.ecdh.privsubkey', '11.literal.partial', '14.v4.ecdh.pubsubkey']}
skip_files = {'tests/testdata/packets/{:s}'.format(pkt) for pkt in ['11.literal.partial']}
class TestPacket(object):

View File

@@ -112,6 +112,15 @@ class TestBlocks(object):
('issuers', {'EEE097A017B979CA'}),
('signers', set()),
('type', 'encrypted')],
'tests/testdata/blocks/message.ecc.encrypted.asc':
[('encrypters', {'77CEB7A34089AB73'}),
('is_compressed', False),
('is_encrypted', True),
('is_signed', False),
('issuers', {'77CEB7A34089AB73'}),
('signers', set()),
('type', 'encrypted')],
'tests/testdata/blocks/revochiio.asc':
[('created', datetime(2014, 9, 11, 22, 55, 53)),
('fingerprint', "AE15 9FF3 4C1A 2426 B7F8 0F1A 560C F308 EF60 CFA3"),
@@ -185,6 +194,32 @@ class TestBlocks(object):
('revocation_key', None),
('signer', 'FCAE54F74BA27CF7'),
('type', SignatureType.BinaryDocument)],
'tests/testdata/blocks/eccpubkey.asc':
[('created', datetime(2010, 9, 17, 20, 33, 49)),
('expires_at', None),
('fingerprint', "502D 1A53 65D1 C0CA A699 4539 0BA5 2DF0 BAA5 9D9C"),
('is_expired', False),
('is_primary', True),
('is_protected', False),
('is_public', True),
('is_unlocked', True),
('key_algorithm', PubKeyAlgorithm.ECDSA),
('magic', "PUBLIC KEY BLOCK"),
('parent', None),
('signers', set()),],
'tests/testdata/blocks/eccseckey.asc':
[('created', datetime(2010, 9, 17, 20, 33, 49)),
('expires_at', None),
('fingerprint', "502D 1A53 65D1 C0CA A699 4539 0BA5 2DF0 BAA5 9D9C"),
('is_expired', False),
('is_primary', True),
('is_protected', True),
('is_public', False),
('is_unlocked', False),
('key_algorithm', PubKeyAlgorithm.ECDSA),
('magic', "PRIVATE KEY BLOCK"),
('parent', None),
('signers', set()),],
'tests/testdata/blocks/signature.expired.asc':
[('created', datetime(2014, 9, 28, 20, 54, 42)),
('is_expired', True),],

View File

@@ -17,6 +17,7 @@ from pgpy import PGPSignature
from pgpy import PGPUID
from pgpy.constants import CompressionAlgorithm
from pgpy.constants import EllipticCurveOID
from pgpy.constants import Features
from pgpy.constants import HashAlgorithm
from pgpy.constants import ImageEncoding
@@ -34,6 +35,8 @@ from pgpy.errors import PGPError
from pgpy.packet.packets import PrivKeyV4
from pgpy.packet.packets import PrivSubKeyV4
from conftest import gpg_ver
def _read(f, mode='r'):
with open(f, mode) as ff:
@@ -205,6 +208,17 @@ def _compare_keys(keyA, keyB):
key_algs = [ PubKeyAlgorithm.RSAEncryptOrSign, PubKeyAlgorithm.DSA, PubKeyAlgorithm.ECDSA ]
subkey_alg = {
PubKeyAlgorithm.RSAEncryptOrSign: PubKeyAlgorithm.RSAEncryptOrSign,
PubKeyAlgorithm.DSA: PubKeyAlgorithm.DSA, # TODO: when it becomes possible to generate ElGamal keys, switch this to generate it as the subkey
PubKeyAlgorithm.ECDSA: PubKeyAlgorithm.ECDH,
}
key_alg_size = {
PubKeyAlgorithm.RSAEncryptOrSign: 1024,
PubKeyAlgorithm.DSA: 1024,
PubKeyAlgorithm.ECDSA: EllipticCurveOID.NIST_P256,
PubKeyAlgorithm.ECDH: EllipticCurveOID.NIST_P256,
}
class TestPGPKey(object):
@@ -392,7 +406,7 @@ class TestPGPKey(object):
assert gpg_verify('./message.asc')
def test_encrypt_message(self, pub, message, sessionkey):
if pub.key_algorithm != PubKeyAlgorithm.RSAEncryptOrSign:
if pub.key_algorithm not in {PubKeyAlgorithm.RSAEncryptOrSign, PubKeyAlgorithm.ECDSA}:
pytest.skip('Asymmetric encryption only implemented for RSA currently')
return
@@ -404,7 +418,7 @@ class TestPGPKey(object):
self.encmessage.append(enc)
def test_decrypt_encmessage(self, sec, message):
if sec.key_algorithm != PubKeyAlgorithm.RSAEncryptOrSign:
if sec.key_algorithm not in {PubKeyAlgorithm.RSAEncryptOrSign, PubKeyAlgorithm.ECDSA}:
pytest.skip('Asymmetric encryption only implemented for RSA currently')
return
@@ -417,8 +431,15 @@ class TestPGPKey(object):
def test_gpg_decrypt_encmessage(self, write_clean, gpg_import, gpg_decrypt):
emsg = self.encmessage.pop(0)
with write_clean('tests/testdata/aemsg.asc', 'w', str(emsg)), gpg_import('./sectest.asc'):
assert gpg_decrypt('./aemsg.asc', keyid='EEE097A017B979CA')
with write_clean('tests/testdata/aemsg.asc', 'w', str(emsg)):
# decrypt using RSA
with gpg_import('./sectest.asc'):
assert gpg_decrypt('./aemsg.asc', keyid='EEE097A017B979CA')
# decrypt using ECDH
if gpg_ver >= '2.1':
with gpg_import('./keys/ecc.1.sec.asc'):
assert gpg_decrypt('./aemsg.asc', keyid='D01055FBCADD268E')
def test_sign_timestamp(self, sec):
with self.assert_warnings():
@@ -569,7 +590,7 @@ class TestPGPKey(object):
def test_new_key(self, key_alg):
# create a key and a user id and add the UID to the key
uid = PGPUID.new('Hugo Gernsback', 'Science Fiction Plus', 'hugo.gernsback@space.local')
key = PGPKey.new(PubKeyAlgorithm.RSAEncryptOrSign, 1024)
key = PGPKey.new(key_alg, key_alg_size[key_alg])
key.add_uid(uid, hashes=[HashAlgorithm.SHA224])
# self-verify the key
@@ -579,7 +600,7 @@ class TestPGPKey(object):
def test_new_subkey(self, key_alg):
key = self.gen_keys[key_alg]
subkey = PGPKey.new(PubKeyAlgorithm.RSAEncryptOrSign, 1024)
subkey = PGPKey.new(subkey_alg[key_alg], key_alg_size[subkey_alg[key_alg]])
assert subkey._key
assert not isinstance(subkey._key, PrivSubKeyV4)
@@ -597,8 +618,8 @@ class TestPGPKey(object):
assert subkey in sv
def test_gpg_verify_new_key(self, key_alg, write_clean, gpg_import, gpg_check_sigs):
# if key_alg in {PubKeyAlgorithm.ECDSA, PubKeyAlgorithm.ECDH}:
# pytest.skip("GnuPG version in use cannot import/verify ")
if gpg_ver < '2.1' and key_alg in {PubKeyAlgorithm.ECDSA, PubKeyAlgorithm.ECDH}:
pytest.skip("GnuPG version in use cannot import/verify ")
# with GnuPG
key = self.gen_keys[key_alg]

View File

@@ -66,6 +66,7 @@ class TestPGPKey(object):
PubKeyAlgorithm.RSAEncryptOrSign: 256,
PubKeyAlgorithm.DSA: 512,
PubKeyAlgorithm.ECDSA: 1,
PubKeyAlgorithm.ECDH: 1,
}
def test_unlock_pubkey(self, rsa_pub, recwarn):

15
tests/testdata/blocks/eccpubkey.asc vendored Normal file
View File

@@ -0,0 +1,15 @@
-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v2.1.0-ecc (GNU/Linux)
mFIETJPQrRMIKoZIzj0DAQcCAwQLx6e669XwjHTHe3HuROe7C1oYMXuZbaU5PjOs
xSkyxtL2D00e/jWgufuNN4ftS+6XygEtB7j1g1vnCTVF1TLmtCRlY19kc2FfZGhf
MjU2IDxvcGVucGdwQGJyYWluaHViLm9yZz6IegQTEwgAIgUCTJPQrQIbAwYLCQgH
AwIGFQgCCQoLBBYCAwECHgECF4AACgkQC6Ut8LqlnZzmXQEAiKgiSzPSpUOJcX9d
JtLJ5As98Alit2oFwzhxG7mSVmQA/RP67yOeoUtdsK6bwmRA95cwf9lBIusNjehx
XDfpHj+/uFYETJPQrRIIKoZIzj0DAQcCAwR/cMCoGEzcrqXbILqP7Rfke977dE1X
XsRJEwrzftreZYrn7jXSDoiXkRyfVkvjPZqUvB5cknsaoH/3UNLRHClxAwEIB4hh
BBgTCAAJBQJMk9CtAhsMAAoJEAulLfC6pZ2c1yYBAOSUmaQ8rkgihnepbnpK7tNz
3QEocsLEtsTCDUBGNYGyAQDclifYqsUChXlWKaw3md+yHJPcWZXzHt37c4q/MhIm
oQ==
=hMzp
-----END PGP PUBLIC KEY BLOCK-----

18
tests/testdata/blocks/eccseckey.asc vendored Normal file
View File

@@ -0,0 +1,18 @@
-----BEGIN PGP PRIVATE KEY BLOCK-----
Version: GnuPG v2.1.0-ecc (GNU/Linux)
lJ0ETJPQrRMIKoZIzj0DAQcCAwQLx6e669XwjHTHe3HuROe7C1oYMXuZbaU5PjOs
xSkyxtL2D00e/jWgufuNN4ftS+6XygEtB7j1g1vnCTVF1TLm/gMDAmHomSLb9NbE
oyWUoqgKTbZzbFR/SWmiCcuiQEhREcTyvyU1hAglj7FsBJoQ6/pbeAEQZ3bVzlNM
8F0nF8KPLPuEADF1+4CntCRlY19kc2FfZGhfMjU2IDxvcGVucGdwQGJyYWluaHVi
Lm9yZz6IegQTEwgAIgUCTJPQrQIbAwYLCQgHAwIGFQgCCQoLBBYCAwECHgECF4AA
CgkQC6Ut8LqlnZzmXQEAiKgiSzPSpUOJcX9dJtLJ5As98Alit2oFwzhxG7mSVmQA
/RP67yOeoUtdsK6bwmRA95cwf9lBIusNjehxXDfpHj+/nKEETJPQrRIIKoZIzj0D
AQcCAwR/cMCoGEzcrqXbILqP7Rfke977dE1XXsRJEwrzftreZYrn7jXSDoiXkRyf
VkvjPZqUvB5cknsaoH/3UNLRHClxAwEIB/4DAwJh6Jki2/TWxKO7gHKWIcOcxYZp
CRWjlUghbKb6Q83p8GLPjKRN0USl/U1tObWdksqMXhUO0ePLWUnrbwoWYfYXg9Er
ADTgCYhhBBgTCAAJBQJMk9CtAhsMAAoJEAulLfC6pZ2c1yYA/3eJRirPQZmBno+Z
P/HOBSFWmFt4cUBGUx3oqiUd5loOAP480pb+vXx9ipljJWCJDSl/boRSuqB4hePP
qt9Rd5gNdQ==
=O8Dg
-----END PGP PRIVATE KEY BLOCK-----

View File

@@ -0,0 +1,10 @@
-----BEGIN PGP MESSAGE-----
Version: GnuPG v2.1.0-ecc (GNU/Linux)
hH4Dd863o0CJq3MSAgMEHdIYZQx+rV1cjy7qitIOEICFFzp4cjsRX4r+rDdMcQUs
h7VZmbP1c9C0s9sgCKwubWfkcYUl2ZOju4gy+s4MYTBb4/j8JjnJ9Bqn6LWutTXJ
zwsdP13VIJLnhiNqISdR3/6xWQ0ICRYzwb95nUZ1c1DSVgFpjPgUvi4pgYbTpcDB
jzILKWBfBDT/jck169XE8vgtbcqVQYZ7lZpaY9CzEbC+4dXZmV1gm5MafpTyFWgH
VnyrZB4gad9Lp9e0RKHHcOOE7s/NeLuu
=odUZ
-----END PGP MESSAGE-----

View File

@@ -0,0 +1,10 @@
-----BEGIN PGP MESSAGE-----
Version: GnuPG v2.1.0-ecc (GNU/Linux)
hH4Dd863o0CJq3MSAgMEHdIYZQx+rV1cjy7qitIOEICFFzp4cjsRX4r+rDdMcQUs
h7VZmbP1c9C0s9sgCKwubWfkcYUl2ZOju4gy+s4MYTBb4/j8JjnJ9Bqn6LWutTXJ
zwsdP13VIJLnhiNqISdR3/6xWQ0ICRYzwb95nUZ1c1DSVgFpjPgUvi4pgYbTpcDB
jzILKWBfBDT/jck169XE8vgtbcqVQYZ7lZpaY9CzEbC+4dXZmV1gm5MafpTyFWgH
VnyrZB4gad9Lp9e0RKHHcOOE7s/NeLuu
=odUZ
-----END PGP MESSAGE-----

View File

@@ -0,0 +1,9 @@
-----BEGIN PGP MESSAGE-----
Version: GnuPG v2.1.0-ecc (GNU/Linux)
owGbwMvMwCHMvVT3w66lc+cwrlFK4k5N1k3KT6nUK6ko8Zl8MSEkI7NYAYjy81IV
cjLzUrk64lgYhDkY2FiZQNIMXJwCMO31rxgZ+tW/zesUPxWzdKWrtLGW/LkP5rXL
V/Yvnr/EKjBbQuvZSYa/klsum6XFmTze+maVgclT6Rc6hzqqxNy6o6qdTTmLJuvp
AQA=
=GDv4
-----END PGP MESSAGE----