Merge pull request #423 from SecurityInnovation/cleanup
get stuff actually working again, and cleanup
This commit is contained in:
6
.github/workflows/tests.yml
vendored
6
.github/workflows/tests.yml
vendored
@@ -6,7 +6,7 @@ jobs:
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ubuntu-18.04, macos-10.15]
|
||||
python-version: [3.5, 3.6, 3.7, 3.8, 3.9, '3.10']
|
||||
python-version: [3.6, 3.7, 3.8, 3.9, '3.10']
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
@@ -59,8 +59,6 @@ jobs:
|
||||
strategy:
|
||||
matrix:
|
||||
include:
|
||||
- python-version: 3.5
|
||||
env: py35-setup
|
||||
- python-version: 3.6
|
||||
env: py36-setup
|
||||
- python-version: 3.7
|
||||
@@ -69,6 +67,8 @@ jobs:
|
||||
env: py38-setup
|
||||
- python-version: 3.9
|
||||
env: py39-setup
|
||||
- python-version: '3.10'
|
||||
env: py310-setup
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
@@ -50,9 +50,9 @@ You can also join ``#pgpy`` on Freenode to ask questions or get involved
|
||||
Requirements
|
||||
------------
|
||||
|
||||
- Python >= 3.5
|
||||
- Python >= 3.6
|
||||
|
||||
Tested with: 3.8, 3.7, 3.6, 3.5
|
||||
Tested with: 3.10, 3.9, 3.8, 3.7, 3.6
|
||||
|
||||
- `Cryptography <https://pypi.python.org/pypi/cryptography>`_
|
||||
|
||||
|
||||
@@ -20,15 +20,15 @@ Exceptions
|
||||
|
||||
.. autoexception:: PGPDecryptionError
|
||||
|
||||
:py:class:`PGPOpenSSLCipherNotSupported`
|
||||
:py:class:`PGPOpenSSLCipherNotSupportedError`
|
||||
----------------------------------------
|
||||
|
||||
.. autoexception:: PGPOpenSSLCipherNotSupported
|
||||
.. autoexception:: PGPOpenSSLCipherNotSupportedError
|
||||
|
||||
:py:class:`PGPInsecureCipher`
|
||||
:py:class:`PGPInsecureCipherError`
|
||||
-----------------------------
|
||||
|
||||
.. autoexception:: PGPInsecureCipher
|
||||
.. autoexception:: PGPInsecureCipherError
|
||||
|
||||
:py:class:`WontImplementError`
|
||||
------------------------------
|
||||
|
||||
@@ -49,25 +49,25 @@ if use_legacy_cryptography_decorator():
|
||||
key_size = 256
|
||||
|
||||
|
||||
@utils.register_interface(ec.EllipticCurve)
|
||||
@utils.register_interface(ec.EllipticCurve) # noqa: E303
|
||||
class BrainpoolP384R1(object):
|
||||
name = 'brainpoolP384r1'
|
||||
key_size = 384
|
||||
|
||||
|
||||
@utils.register_interface(ec.EllipticCurve)
|
||||
@utils.register_interface(ec.EllipticCurve) # noqa: E303
|
||||
class BrainpoolP512R1(object):
|
||||
name = 'brainpoolP512r1'
|
||||
key_size = 512
|
||||
|
||||
|
||||
@utils.register_interface(ec.EllipticCurve)
|
||||
@utils.register_interface(ec.EllipticCurve) # noqa: E303
|
||||
class X25519(object):
|
||||
name = 'X25519'
|
||||
key_size = 256
|
||||
|
||||
|
||||
@utils.register_interface(ec.EllipticCurve)
|
||||
@utils.register_interface(ec.EllipticCurve) # noqa: E303
|
||||
class Ed25519(object):
|
||||
name = 'ed25519'
|
||||
key_size = 256
|
||||
@@ -77,22 +77,22 @@ else:
|
||||
key_size = 256
|
||||
|
||||
|
||||
class BrainpoolP384R1(ec.EllipticCurve):
|
||||
class BrainpoolP384R1(ec.EllipticCurve): # noqa: E303
|
||||
name = 'brainpoolP384r1'
|
||||
key_size = 384
|
||||
|
||||
|
||||
class BrainpoolP512R1(ec.EllipticCurve):
|
||||
class BrainpoolP512R1(ec.EllipticCurve): # noqa: E303
|
||||
name = 'brainpoolP512r1'
|
||||
key_size = 512
|
||||
|
||||
|
||||
class X25519(ec.EllipticCurve):
|
||||
class X25519(ec.EllipticCurve): # noqa: E303
|
||||
name = 'X25519'
|
||||
key_size = 256
|
||||
|
||||
|
||||
class Ed25519(ec.EllipticCurve):
|
||||
class Ed25519(ec.EllipticCurve): # noqa: E303
|
||||
name = 'ed25519'
|
||||
key_size = 256
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import bz2
|
||||
import hashlib
|
||||
import imghdr
|
||||
import os
|
||||
import time
|
||||
import zlib
|
||||
import warnings
|
||||
|
||||
@@ -12,7 +11,6 @@ from collections import namedtuple
|
||||
from enum import Enum
|
||||
from enum import IntEnum
|
||||
from enum import IntFlag
|
||||
from enum import EnumMeta
|
||||
|
||||
from pyasn1.type.univ import ObjectIdentifier
|
||||
|
||||
@@ -20,52 +18,39 @@ from cryptography.hazmat.backends import openssl
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.ciphers import algorithms
|
||||
|
||||
from .types import FlagEnum
|
||||
from .decorators import classproperty
|
||||
from ._curves import BrainpoolP256R1, BrainpoolP384R1, BrainpoolP512R1, X25519, Ed25519
|
||||
|
||||
__all__ = ['Backend',
|
||||
'EllipticCurveOID',
|
||||
'ECPointFormat',
|
||||
'PacketTag',
|
||||
'SymmetricKeyAlgorithm',
|
||||
'PubKeyAlgorithm',
|
||||
'CompressionAlgorithm',
|
||||
'HashAlgorithm',
|
||||
'RevocationReason',
|
||||
'ImageEncoding',
|
||||
'SignatureType',
|
||||
'KeyServerPreferences',
|
||||
'S2KGNUExtension',
|
||||
'SecurityIssues',
|
||||
'String2KeyType',
|
||||
'TrustLevel',
|
||||
'KeyFlags',
|
||||
'Features',
|
||||
'FlagEnumMeta',
|
||||
'RevocationKeyClass',
|
||||
'NotationDataFlags',
|
||||
'TrustFlags',
|
||||
'check_assymetric_algo_and_its_parameters',
|
||||
'is_hash_considered_secure']
|
||||
__all__ = [
|
||||
'Backend',
|
||||
'EllipticCurveOID',
|
||||
'ECPointFormat',
|
||||
'PacketTag',
|
||||
'SymmetricKeyAlgorithm',
|
||||
'PubKeyAlgorithm',
|
||||
'CompressionAlgorithm',
|
||||
'HashAlgorithm',
|
||||
'RevocationReason',
|
||||
'ImageEncoding',
|
||||
'SignatureType',
|
||||
'KeyServerPreferences',
|
||||
'S2KGNUExtension',
|
||||
'SecurityIssues',
|
||||
'String2KeyType',
|
||||
'TrustLevel',
|
||||
'KeyFlags',
|
||||
'Features',
|
||||
'RevocationKeyClass',
|
||||
'NotationDataFlags',
|
||||
'TrustFlags',
|
||||
]
|
||||
|
||||
|
||||
# this is 50 KiB
|
||||
_hashtunedata = bytearray([10, 11, 12, 13, 14, 15, 16, 17] * 128 * 50)
|
||||
|
||||
|
||||
class FlagEnumMeta(EnumMeta):
|
||||
def __and__(self, other):
|
||||
return { f for f in iter(self) if f.value & other }
|
||||
|
||||
def __rand__(self, other): # pragma: no cover
|
||||
return self & other
|
||||
|
||||
|
||||
namespace = FlagEnumMeta.__prepare__('FlagEnum', (IntEnum,))
|
||||
FlagEnum = FlagEnumMeta('FlagEnum', (IntEnum,), namespace)
|
||||
|
||||
|
||||
|
||||
class Backend(Enum):
|
||||
OpenSSL = openssl.backend
|
||||
|
||||
@@ -301,6 +286,26 @@ class PubKeyAlgorithm(IntEnum):
|
||||
PubKeyAlgorithm.RSASign,
|
||||
PubKeyAlgorithm.FormerlyElGamalEncryptOrSign}
|
||||
|
||||
def validate_params(self, size):
|
||||
min_size = MINIMUM_ASYMMETRIC_KEY_LENGTHS.get(self)
|
||||
if min_size is not None:
|
||||
if isinstance(min_size, set):
|
||||
# ECC
|
||||
curve = size
|
||||
safe_curves = min_size
|
||||
if curve in safe_curves:
|
||||
return SecurityIssues.OK
|
||||
else:
|
||||
return SecurityIssues.InsecureCurve
|
||||
else:
|
||||
# not ECC
|
||||
if size >= min_size:
|
||||
return SecurityIssues.OK
|
||||
else:
|
||||
return SecurityIssues.AsymmetricKeyLengthIsTooShort
|
||||
# min_size is None
|
||||
return SecurityIssues.BrokenAsymmetricFunc
|
||||
|
||||
|
||||
class CompressionAlgorithm(IntEnum):
|
||||
"""Supported compression algorithms."""
|
||||
@@ -361,7 +366,6 @@ class HashAlgorithm(IntEnum):
|
||||
#SHA3_256 = 13
|
||||
#SHA3_384 = 14
|
||||
#SHA3_512 = 15
|
||||
|
||||
|
||||
def __init__(self, *args):
|
||||
super(self.__class__, self).__init__()
|
||||
@@ -383,9 +387,27 @@ class HashAlgorithm(IntEnum):
|
||||
def is_supported(self):
|
||||
return True
|
||||
|
||||
@property
|
||||
def is_second_preimage_resistant(self):
|
||||
return self in {HashAlgorithm.SHA1}
|
||||
|
||||
@property
|
||||
def is_collision_resistant(self):
|
||||
return self in {HashAlgorithm.SHA256, HashAlgorithm.SHA384, HashAlgorithm.SHA512}
|
||||
|
||||
@property
|
||||
def is_considered_secure(self):
|
||||
if self.is_collision_resistant:
|
||||
return SecurityIssues.OK
|
||||
|
||||
warnings.warn('Hash function {hash} is not considered collision resistant'.format(hash=repr(self)))
|
||||
issues = SecurityIssues.HashFunctionNotCollisionResistant
|
||||
|
||||
if not self.is_second_preimage_resistant:
|
||||
issues |= SecurityIssues.HashFunctionNotSecondPreimageResistant
|
||||
|
||||
return issues
|
||||
|
||||
secondPreimageResistantHashes = {HashAlgorithm.SHA1}
|
||||
collisionResistantHashses = {HashAlgorithm.SHA256, HashAlgorithm.SHA384, HashAlgorithm.SHA512}
|
||||
|
||||
class RevocationReason(IntEnum):
|
||||
"""Reasons explaining why a key or certificate was revoked."""
|
||||
@@ -566,67 +588,42 @@ class TrustFlags(FlagEnum):
|
||||
|
||||
class SecurityIssues(IntFlag):
|
||||
OK = 0
|
||||
wrongSig = (1 << 0)
|
||||
expired = (1 << 1)
|
||||
disabled = (1 << 2)
|
||||
revoked = (1 << 3)
|
||||
invalid = (1 << 4)
|
||||
brokenAssymetricFunc = (1 << 5)
|
||||
hashFunctionNotCollisionResistant = (1 << 6)
|
||||
hashFunctionNotSecondPreimageResistant = (1 << 7)
|
||||
assymetricKeyLengthIsTooShort = (1 << 8)
|
||||
insecureCurve = (1 << 9)
|
||||
noSelfSignature = (1 << 10)
|
||||
WrongSig = (1 << 0)
|
||||
Expired = (1 << 1)
|
||||
Disabled = (1 << 2)
|
||||
Revoked = (1 << 3)
|
||||
Invalid = (1 << 4)
|
||||
BrokenAsymmetricFunc = (1 << 5)
|
||||
HashFunctionNotCollisionResistant = (1 << 6)
|
||||
HashFunctionNotSecondPreimageResistant = (1 << 7)
|
||||
AsymmetricKeyLengthIsTooShort = (1 << 8)
|
||||
InsecureCurve = (1 << 9)
|
||||
NoSelfSignature = (1 << 10)
|
||||
|
||||
@property
|
||||
def causes_signature_verify_to_fail(self):
|
||||
return self in {
|
||||
SecurityIssues.WrongSig,
|
||||
SecurityIssues.Expired,
|
||||
SecurityIssues.Disabled,
|
||||
SecurityIssues.Invalid,
|
||||
SecurityIssues.NoSelfSignature,
|
||||
}
|
||||
|
||||
|
||||
# https://safecurves.cr.yp.to/
|
||||
safeCurves = {
|
||||
SAFE_CURVES = {
|
||||
EllipticCurveOID.Curve25519,
|
||||
EllipticCurveOID.Ed25519,
|
||||
}
|
||||
|
||||
minimumAssymetricKeyLegths = {
|
||||
MINIMUM_ASYMMETRIC_KEY_LENGTHS = {
|
||||
PubKeyAlgorithm.RSAEncryptOrSign: 2048,
|
||||
PubKeyAlgorithm.RSASign: 2048,
|
||||
PubKeyAlgorithm.ElGamal: 2048,
|
||||
PubKeyAlgorithm.DSA: 2048,
|
||||
|
||||
PubKeyAlgorithm.ECDSA: safeCurves,
|
||||
PubKeyAlgorithm.EdDSA: safeCurves,
|
||||
PubKeyAlgorithm.ECDH: safeCurves,
|
||||
##
|
||||
PubKeyAlgorithm.ECDSA: SAFE_CURVES,
|
||||
PubKeyAlgorithm.EdDSA: SAFE_CURVES,
|
||||
PubKeyAlgorithm.ECDH: SAFE_CURVES,
|
||||
}
|
||||
|
||||
|
||||
|
||||
def is_hash_considered_secure(hash):
|
||||
if hash in collisionResistantHashses:
|
||||
return SecurityIssues.OK
|
||||
|
||||
warnings.warn("Hash function " + repr(hash) + " is not considered collision resistant")
|
||||
issues = SecurityIssues.hashFunctionNotCollisionResistant
|
||||
|
||||
if hash not in secondPreimageResistantHashes:
|
||||
issues |= hashFunctionNotSecondPreimageResistant
|
||||
|
||||
return issues
|
||||
|
||||
def check_assymetric_algo_and_its_parameters(algo, size):
|
||||
if algo in minimumAssymetricKeyLegths:
|
||||
minLOrSetOfSecureCurves = minimumAssymetricKeyLegths[algo]
|
||||
if isinstance(minLOrSetOfSecureCurves, set): # ECC
|
||||
curve = size
|
||||
safeCurvesForThisAlg = minLOrSetOfSecureCurves
|
||||
if curve in safeCurvesForThisAlg:
|
||||
return SecurityIssues.OK
|
||||
else:
|
||||
warnings.warn("Curve " + repr(curve) + " is not considered secure for " + repr(algo))
|
||||
return SecurityIssues.insecureCurve
|
||||
else:
|
||||
minL = minLOrSetOfSecureCurves
|
||||
if size < minL:
|
||||
warnings.warn("Assymetric algo " + repr(algo) + " needs key at least of " + repr(minL) + " bits effective length to be considered secure")
|
||||
return SecurityIssues.assymetricKeyLengthIsTooShort
|
||||
else:
|
||||
return SecurityIssues.OK
|
||||
else:
|
||||
warnings.warn("Assymetric algo " + repr(algo) + " is not considered secure")
|
||||
return SecurityIssues.brokenAssymetricFunc
|
||||
|
||||
@@ -4,9 +4,9 @@
|
||||
__all__ = ('PGPError',
|
||||
'PGPEncryptionError',
|
||||
'PGPDecryptionError',
|
||||
'PGPIncompatibleECPointFormat',
|
||||
'PGPOpenSSLCipherNotSupported',
|
||||
'PGPInsecureCipher',
|
||||
'PGPIncompatibleECPointFormatError',
|
||||
'PGPOpenSSLCipherNotSupportedError',
|
||||
'PGPInsecureCipherError',
|
||||
'WontImplementError',)
|
||||
|
||||
|
||||
@@ -25,17 +25,17 @@ class PGPDecryptionError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class PGPIncompatibleECPointFormat(Exception):
|
||||
class PGPIncompatibleECPointFormatError(Exception):
|
||||
"""Raised when the point format is incompatible with the elliptic curve"""
|
||||
pass
|
||||
|
||||
|
||||
class PGPOpenSSLCipherNotSupported(Exception):
|
||||
class PGPOpenSSLCipherNotSupportedError(Exception):
|
||||
"""Raised when OpenSSL does not support the requested cipher"""
|
||||
pass
|
||||
|
||||
|
||||
class PGPInsecureCipher(Exception):
|
||||
class PGPInsecureCipherError(Exception):
|
||||
"""Raised when a cipher known to be insecure is attempted to be used to encrypt data"""
|
||||
pass
|
||||
|
||||
|
||||
@@ -63,7 +63,7 @@ from ..decorators import sdproperty
|
||||
|
||||
from ..errors import PGPDecryptionError
|
||||
from ..errors import PGPError
|
||||
from ..errors import PGPIncompatibleECPointFormat
|
||||
from ..errors import PGPIncompatibleECPointFormatError
|
||||
|
||||
from ..symenc import _decrypt
|
||||
from ..symenc import _encrypt
|
||||
@@ -371,8 +371,8 @@ class EdDSASignature(DSASignature):
|
||||
|
||||
def __sig__(self):
|
||||
# TODO: change this length when EdDSA can be used with another curve (Ed448)
|
||||
l = (EllipticCurveOID.Ed25519.key_size + 7) // 8
|
||||
return self.int_to_bytes(self.r, l) + self.int_to_bytes(self.s, l)
|
||||
siglen = (EllipticCurveOID.Ed25519.key_size + 7) // 8
|
||||
return self.int_to_bytes(self.r, siglen) + self.int_to_bytes(self.s, siglen)
|
||||
|
||||
|
||||
class PubKey(MPIs):
|
||||
@@ -500,7 +500,7 @@ class ECPoint:
|
||||
self.x = MPI(MPIs.bytes_to_int(xy[:self.bytelen]))
|
||||
self.y = MPI(MPIs.bytes_to_int(xy[self.bytelen:]))
|
||||
elif self.format == ECPointFormat.Native:
|
||||
self.bytelen = 0 # dummy value for copy
|
||||
self.bytelen = 0 # dummy value for copy
|
||||
self.x = bytes(xy)
|
||||
self.y = None
|
||||
else:
|
||||
@@ -592,7 +592,7 @@ class ECDSAPub(PubKey):
|
||||
|
||||
self.p = ECPoint(packet)
|
||||
if self.p.format != ECPointFormat.Standard:
|
||||
raise PGPIncompatibleECPointFormat("Only Standard format is valid for ECDSA")
|
||||
raise PGPIncompatibleECPointFormatError("Only Standard format is valid for ECDSA")
|
||||
|
||||
|
||||
class EdDSAPub(PubKey):
|
||||
@@ -643,7 +643,7 @@ class EdDSAPub(PubKey):
|
||||
|
||||
self.p = ECPoint(packet)
|
||||
if self.p.format != ECPointFormat.Native:
|
||||
raise PGPIncompatibleECPointFormat("Only Native format is valid for EdDSA")
|
||||
raise PGPIncompatibleECPointFormatError("Only Native format is valid for EdDSA")
|
||||
|
||||
|
||||
class ECDHPub(PubKey):
|
||||
@@ -718,9 +718,9 @@ class ECDHPub(PubKey):
|
||||
self.p = ECPoint(packet)
|
||||
if self.oid == EllipticCurveOID.Curve25519:
|
||||
if self.p.format != ECPointFormat.Native:
|
||||
raise PGPIncompatibleECPointFormat("Only Native format is valid for Curve25519")
|
||||
raise PGPIncompatibleECPointFormatError("Only Native format is valid for Curve25519")
|
||||
elif self.p.format != ECPointFormat.Standard:
|
||||
raise PGPIncompatibleECPointFormat("Only Standard format is valid for this curve")
|
||||
raise PGPIncompatibleECPointFormatError("Only Standard format is valid for this curve")
|
||||
self.kdf.parse(packet)
|
||||
|
||||
|
||||
@@ -1177,14 +1177,14 @@ class PrivKey(PubKey):
|
||||
return _bytes
|
||||
|
||||
def __len__(self):
|
||||
l = super(PrivKey, self).__len__() + len(self.s2k) + len(self.chksum)
|
||||
nbytes = super(PrivKey, self).__len__() + len(self.s2k) + len(self.chksum)
|
||||
if self.s2k:
|
||||
l += len(self.encbytes)
|
||||
nbytes += len(self.encbytes)
|
||||
|
||||
else:
|
||||
l += sum(len(getattr(self, i)) for i in self.__privfields__)
|
||||
nbytes += sum(len(getattr(self, i)) for i in self.__privfields__)
|
||||
|
||||
return l
|
||||
return nbytes
|
||||
|
||||
def __copy__(self):
|
||||
pk = super(PrivKey, self).__copy__()
|
||||
@@ -1582,12 +1582,12 @@ class ECDHPriv(ECDSAPriv, ECDHPub):
|
||||
return _b
|
||||
|
||||
def __len__(self):
|
||||
l = ECDHPub.__len__(self) + len(self.s2k) + len(self.chksum)
|
||||
nbytes = ECDHPub.__len__(self) + len(self.s2k) + len(self.chksum)
|
||||
if self.s2k:
|
||||
l += len(self.encbytes)
|
||||
nbytes += len(self.encbytes)
|
||||
else:
|
||||
l += sum(len(getattr(self, i)) for i in self.__privfields__)
|
||||
return l
|
||||
nbytes += sum(len(getattr(self, i)) for i in self.__privfields__)
|
||||
return nbytes
|
||||
|
||||
def __privkey__(self):
|
||||
if self.oid == EllipticCurveOID.Curve25519:
|
||||
|
||||
@@ -6,7 +6,6 @@ import calendar
|
||||
import copy
|
||||
import hashlib
|
||||
import os
|
||||
import re
|
||||
import warnings
|
||||
|
||||
from datetime import datetime, timezone
|
||||
@@ -359,12 +358,14 @@ class SignatureV4(Signature):
|
||||
def pubalg_int(self, val):
|
||||
self._pubalg = PubKeyAlgorithm(val)
|
||||
|
||||
sigs = {PubKeyAlgorithm.RSAEncryptOrSign: RSASignature,
|
||||
PubKeyAlgorithm.RSAEncrypt: RSASignature,
|
||||
PubKeyAlgorithm.RSASign: RSASignature,
|
||||
PubKeyAlgorithm.DSA: DSASignature,
|
||||
PubKeyAlgorithm.ECDSA: ECDSASignature,
|
||||
PubKeyAlgorithm.EdDSA: EdDSASignature,}
|
||||
sigs = {
|
||||
PubKeyAlgorithm.RSAEncryptOrSign: RSASignature,
|
||||
PubKeyAlgorithm.RSAEncrypt: RSASignature,
|
||||
PubKeyAlgorithm.RSASign: RSASignature,
|
||||
PubKeyAlgorithm.DSA: DSASignature,
|
||||
PubKeyAlgorithm.ECDSA: ECDSASignature,
|
||||
PubKeyAlgorithm.EdDSA: EdDSASignature,
|
||||
}
|
||||
|
||||
self.signature = sigs.get(self.pubalg, OpaqueSignature)()
|
||||
|
||||
@@ -427,7 +428,6 @@ class SignatureV4(Signature):
|
||||
with the length-of-length set to zero.) The unhashed subpacket data
|
||||
of the Signature packet being hashed is not included in the hash, and
|
||||
the unhashed subpacket data length value is set to zero.
|
||||
|
||||
'''
|
||||
_body = bytearray()
|
||||
_body += self.int_to_bytes(self.header.version)
|
||||
@@ -435,7 +435,7 @@ class SignatureV4(Signature):
|
||||
_body += self.int_to_bytes(self.pubalg)
|
||||
_body += self.int_to_bytes(self.halg)
|
||||
_body += self.subpackets.__hashbytearray__()
|
||||
_body += self.int_to_bytes(0, minlen=2) # empty unhashed subpackets
|
||||
_body += self.int_to_bytes(0, minlen=2) # empty unhashed subpackets
|
||||
_body += self.hash2
|
||||
_body += self.signature.__bytearray__()
|
||||
|
||||
@@ -443,7 +443,7 @@ class SignatureV4(Signature):
|
||||
_hdr += b'\x88'
|
||||
_hdr += self.int_to_bytes(len(_body), minlen=4)
|
||||
return _hdr + _body
|
||||
|
||||
|
||||
def __copy__(self):
|
||||
spkt = SignatureV4()
|
||||
spkt.header = copy.copy(self.header)
|
||||
|
||||
@@ -703,9 +703,11 @@ class KeyServerPreferences(ByteFlag):
|
||||
class PreferredKeyServer(URI):
|
||||
__typeid__ = 0x18
|
||||
|
||||
|
||||
class SubkeyBindingSignature(Signature):
|
||||
__typeid__ = 0x18
|
||||
|
||||
|
||||
class PrimaryUserID(SubkeyBindingSignature):
|
||||
__typeid__ = 0x19
|
||||
|
||||
@@ -1037,7 +1039,7 @@ class IntendedRecipient(Signature):
|
||||
self.intended_recipient = packet[:fpr_len]
|
||||
del packet[:fpr_len]
|
||||
|
||||
|
||||
|
||||
class AttestedCertifications(Signature):
|
||||
'''
|
||||
(from RFC4880bis-08)
|
||||
@@ -1118,7 +1120,7 @@ class AttestedCertifications(Signature):
|
||||
def attested_certifications(self):
|
||||
return self._attested_certifications
|
||||
|
||||
@attested_certifications.register(bytearray)
|
||||
@attested_certifications.register(bytearray)
|
||||
@attested_certifications.register(bytes)
|
||||
def attested_certifications_bytearray(self, val):
|
||||
self._attested_certifications = val
|
||||
|
||||
@@ -77,10 +77,11 @@ class SubPacket(Dispatchable):
|
||||
super(SubPacket, self).__init__()
|
||||
self.header = Header()
|
||||
|
||||
# if self.__typeid__ not in [-1, None]:
|
||||
if (self.header.typeid == -1 and
|
||||
(not hasattr(self.__typeid__, '__abstractmethod__')) and
|
||||
(self.__typeid__ not in [-1, None])):
|
||||
if (
|
||||
self.header.typeid == -1
|
||||
and (not hasattr(self.__typeid__, '__abstractmethod__'))
|
||||
and (self.__typeid__ not in {-1, None})
|
||||
):
|
||||
self.header.typeid = self.__typeid__
|
||||
|
||||
def __bytearray__(self):
|
||||
|
||||
119
pgpy/pgp.py
119
pgpy/pgp.py
@@ -35,8 +35,6 @@ from .constants import RevocationReason
|
||||
from .constants import SignatureType
|
||||
from .constants import SymmetricKeyAlgorithm
|
||||
from .constants import SecurityIssues
|
||||
from .constants import check_assymetric_algo_and_its_parameters
|
||||
from .constants import is_hash_considered_secure
|
||||
|
||||
from .decorators import KeyAction
|
||||
|
||||
@@ -49,7 +47,6 @@ from .packet import Packet
|
||||
from .packet import Primary
|
||||
from .packet import Private
|
||||
from .packet import PubKeyV4
|
||||
from .packet import PubSubKeyV4
|
||||
from .packet import PrivKeyV4
|
||||
from .packet import PrivSubKeyV4
|
||||
from .packet import Public
|
||||
@@ -90,6 +87,7 @@ __all__ = ['PGPSignature',
|
||||
|
||||
class PGPSignature(Armorable, ParentRef, PGPObject):
|
||||
_reason_for_revocation = collections.namedtuple('ReasonForRevocation', ['code', 'comment'])
|
||||
|
||||
@property
|
||||
def __sig__(self):
|
||||
return self._signature.signature.__sig__()
|
||||
@@ -172,11 +170,10 @@ class PGPSignature(Armorable, ParentRef, PGPObject):
|
||||
The :py:obj:`~constants.HashAlgorithm` used when computing this signature.
|
||||
"""
|
||||
return self._signature.halg
|
||||
|
||||
|
||||
|
||||
def check_primitives(self):
|
||||
return is_hash_considered_secure(self.hash_algorithm)
|
||||
|
||||
return self.hash_algorithm.is_considered_secure
|
||||
|
||||
def check_soundness(self):
|
||||
return self.check_primitives()
|
||||
|
||||
@@ -287,7 +284,7 @@ class PGPSignature(Armorable, ParentRef, PGPObject):
|
||||
for n in self._signature.subpackets['h_AttestedCertifications']:
|
||||
attestations = bytes(n.attested_certifications)
|
||||
for i in range(0, len(attestations), hlen):
|
||||
ret.add(attestations[i:i+hlen])
|
||||
ret.add(attestations[i:i + hlen])
|
||||
return ret
|
||||
|
||||
@property
|
||||
@@ -329,7 +326,7 @@ class PGPSignature(Armorable, ParentRef, PGPObject):
|
||||
sig = PGPSignature()
|
||||
|
||||
if created is None:
|
||||
created=datetime.now(timezone.utc)
|
||||
created = datetime.now(timezone.utc)
|
||||
sigpkt = SignatureV4()
|
||||
sigpkt.header.tag = 2
|
||||
sigpkt.header.version = 4
|
||||
@@ -603,9 +600,9 @@ class PGPUID(ParentRef):
|
||||
def _splitstring(self):
|
||||
'''returns name, comment email from User ID string'''
|
||||
if not isinstance(self._uid, UserID):
|
||||
return ("", "", "")
|
||||
return "", "", ""
|
||||
if self._uid.uid == "":
|
||||
return ("", "", "")
|
||||
return "", "", ""
|
||||
rfc2822 = re.match(r"""^
|
||||
# name should always match something
|
||||
(?P<name>.+?)
|
||||
@@ -619,9 +616,8 @@ class PGPUID(ParentRef):
|
||||
(\ <(?P<email>.+)>)?
|
||||
$
|
||||
""", self._uid.uid, flags=re.VERBOSE).groupdict()
|
||||
|
||||
return (rfc2822['name'], rfc2822['comment'] or "", rfc2822['email'] or "")
|
||||
|
||||
return (rfc2822['name'], rfc2822['comment'] or "", rfc2822['email'] or "")
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
@@ -636,7 +632,6 @@ class PGPUID(ParentRef):
|
||||
"""
|
||||
return self._splitstring()[1]
|
||||
|
||||
|
||||
@property
|
||||
def email(self):
|
||||
"""
|
||||
@@ -690,7 +685,10 @@ class PGPUID(ParentRef):
|
||||
if self.parent is not None:
|
||||
for sig in reversed(self._signatures):
|
||||
if sig.signer_fingerprint:
|
||||
if sig.signer == self.parent.fingerprint.keyid:
|
||||
if self.parent.fingerprint == sig.signer_fingerprint:
|
||||
return sig
|
||||
elif sig.signer:
|
||||
if self.parent.fingerprint == sig.signer:
|
||||
return sig
|
||||
|
||||
@property
|
||||
@@ -1130,7 +1128,7 @@ class PGPMessage(Armorable, PGPObject):
|
||||
charset = kwargs.pop('encoding', None)
|
||||
|
||||
filename = ''
|
||||
mtime = datetime.now(timezeone.utc)
|
||||
mtime = datetime.now(timezone.utc)
|
||||
|
||||
msg = PGPMessage()
|
||||
|
||||
@@ -1395,18 +1393,15 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
@property
|
||||
def expires_at(self):
|
||||
"""A :py:obj:`~datetime.datetime` object of when this key is to be considered expired, if any. Otherwise, ``None``"""
|
||||
try:
|
||||
def expirationsIter():
|
||||
for sig in itertools.chain(iter(uid.selfsig for uid in self.userids if uid.selfsig), self.self_signatures):
|
||||
if sig.key_expiration is not None:
|
||||
yield sig.key_expiration
|
||||
expires = min(expirationsIter())
|
||||
expires = None
|
||||
for sig in iter(uid.selfsig for uid in self.userids if uid.selfsig):
|
||||
if sig.key_expiration is not None:
|
||||
expires = sig.key_expiration
|
||||
|
||||
except ValueError:
|
||||
return None
|
||||
if expires is not None:
|
||||
return self.created + expires
|
||||
|
||||
else:
|
||||
return (self.created + expires)
|
||||
return None
|
||||
|
||||
@property
|
||||
def fingerprint(self):
|
||||
@@ -1702,8 +1697,10 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
self._uids.insort(other)
|
||||
|
||||
else:
|
||||
raise TypeError("unsupported operand type(s) for |: '{:s}' and '{:s}'"
|
||||
"".format(self.__class__.__name__, other.__class__.__name__))
|
||||
raise TypeError(
|
||||
"unsupported operand type(s) for |: '{:s}' and '{:s}'"
|
||||
"".format(self.__class__.__name__, other.__class__.__name__)
|
||||
)
|
||||
|
||||
if isinstance(self._sibling, weakref.ref) and not from_sib:
|
||||
sib = self._sibling()
|
||||
@@ -1913,7 +1910,7 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
user = next(iter(self.userids))
|
||||
|
||||
# RFC 4880 says that primary keys *must* be capable of certification
|
||||
return {KeyFlags.Certify} | user.selfsig.key_flags
|
||||
return {KeyFlags.Certify} | (user.selfsig.key_flags if user.selfsig else set())
|
||||
|
||||
return next(self.self_signatures).key_flags
|
||||
|
||||
@@ -2098,7 +2095,7 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
the certificate holder wants to attest to for redistribution with the certificate.
|
||||
Alternatively, any element in the list can be a ``bytes`` or ``bytearray`` object
|
||||
of the appropriate length (the length of this certification's digest).
|
||||
This keyword is only used for signatures of type Attestation.
|
||||
This keyword is only used for signatures of type Attestation.
|
||||
:type attested_certifications: ``list``
|
||||
:keyword keyserver: Specify the URI of the preferred key server of the user.
|
||||
This keyword is ignored for non-self-certifications.
|
||||
@@ -2202,11 +2199,13 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
h = sig.hash_algorithm.hasher
|
||||
h.update(attestation._signature.canonical_bytes())
|
||||
attestations.add(h.digest())
|
||||
elif isinstance(attestation, (bytes,bytearray)) and len(attestation) == sig.hash_algorithm.digest_size:
|
||||
elif isinstance(attestation, (bytes, bytearray)) and len(attestation) == sig.hash_algorithm.digest_size:
|
||||
attestations.add(attestation)
|
||||
else:
|
||||
warnings.warn("Attested Certification element is neither a PGPSignature certification nor " +
|
||||
"a bytes object of size %d, ignoring"%(sig.hash_algorithm.digest_size))
|
||||
warnings.warn(
|
||||
'Attested Certification element is neither a PGPSignature certification nor '
|
||||
'a bytes object of size {:d}; ignoring'.format(sig.hash_algorithm.digest_size)
|
||||
)
|
||||
sig._signature.subpackets.addnew('AttestedCertifications', hashed=True, attested_certifications=b''.join(sorted(attestations)))
|
||||
|
||||
else:
|
||||
@@ -2361,28 +2360,27 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
|
||||
def is_considered_insecure(self, self_verifying=False):
|
||||
res = self.check_soundness(self_verifying=self_verifying)
|
||||
|
||||
|
||||
for sk in self.subkeys.values():
|
||||
res |= sk.check_soundness(self_verifying=self_verifying)
|
||||
return res
|
||||
|
||||
def self_verify(self):
|
||||
selfSigs = list(self.self_signatures)
|
||||
self_sigs = list(self.self_signatures)
|
||||
res = SecurityIssues.OK
|
||||
if selfSigs:
|
||||
for s in selfSigs:
|
||||
if self_sigs:
|
||||
for s in self_sigs:
|
||||
if not self.verify(self, s):
|
||||
res |= SecurityIssues.invalid
|
||||
res |= SecurityIssues.Invalid
|
||||
break
|
||||
else:
|
||||
return SecurityIssues.noSelfSignature
|
||||
return SecurityIssues.NoSelfSignature
|
||||
return res
|
||||
|
||||
|
||||
def _do_self_signatures_verification(self):
|
||||
try:
|
||||
self._self_verified = SecurityIssues.OK
|
||||
self._self_verified = self.self_verify()
|
||||
except:
|
||||
except Exception:
|
||||
self._self_verified = None
|
||||
raise
|
||||
|
||||
@@ -2390,24 +2388,24 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
def self_verified(self):
|
||||
warnings.warn("TODO: Self-sigs verification is not yet working because self-sigs are not parsed!!!")
|
||||
return SecurityIssues.OK
|
||||
|
||||
|
||||
if self._self_verified is None:
|
||||
self._do_self_signatures_verification()
|
||||
|
||||
|
||||
return self._self_verified
|
||||
|
||||
def check_primitives(self):
|
||||
return check_assymetric_algo_and_its_parameters(self.key_algorithm, self.key_size)
|
||||
return self.key_algorithm.validate_params(self.key_size)
|
||||
|
||||
def check_management(self, self_verifying=False):
|
||||
res = self.self_verified
|
||||
if self.is_expired:
|
||||
warnings.warn("Key " + repr(self) + " has expired at " + str(self.expires_at))
|
||||
res |= SecurityIssues.expired
|
||||
|
||||
warnings.warn('Key {} has expired at {:s}'.format(repr(self), self.expires_at))
|
||||
res |= SecurityIssues.Expired
|
||||
|
||||
warnings.warn("TODO: Revocation checks are not yet implemented!!!")
|
||||
warnings.warn("TODO: Flags (s.a. `disabled`) checks are not yet implemented!!!")
|
||||
res |= int(bool(list(self.revocation_signatures))) * SecurityIssues.revoked
|
||||
res |= int(bool(list(self.revocation_signatures))) * SecurityIssues.Revoked
|
||||
return res
|
||||
|
||||
def check_soundness(self, self_verifying=False):
|
||||
@@ -2432,7 +2430,7 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
raise TypeError("Unexpected signature value: {:s}".format(str(type(signature))))
|
||||
|
||||
def _filter_sigs(sigs):
|
||||
_ids = {self.fingerprint} | set(self.subkeys)
|
||||
_ids = {self.fingerprint.keyid} | set(self.subkeys)
|
||||
for sig in sigs:
|
||||
if sig.signer in _ids:
|
||||
yield sig
|
||||
@@ -2455,12 +2453,11 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
for ua in subject.userattributes:
|
||||
for sig in _filter_sigs(ua.__sig__):
|
||||
sspairs.append((sig, ua))
|
||||
|
||||
|
||||
# subkey binding signatures
|
||||
for subkey in subject.subkeys.values():
|
||||
for sig in _filter_sigs(subkey.__sig__):
|
||||
sspairs.append((sig, subkey))
|
||||
|
||||
|
||||
elif signature.signer in {self.fingerprint.keyid} | set(self.subkeys):
|
||||
sspairs += [(signature, subject)]
|
||||
@@ -2476,25 +2473,25 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
|
||||
else:
|
||||
if isinstance(subj, PGPKey):
|
||||
self_verifying = signerFp == subj.fingerprint
|
||||
self_verifying = sig.signer == subj.fingerprint
|
||||
else:
|
||||
self_verifying = False
|
||||
|
||||
|
||||
subkey_issues = self.check_soundness(self_verifying)
|
||||
signature_issues = self.check_primitives()
|
||||
|
||||
|
||||
if self_verifying:
|
||||
signature_issues &= ~SecurityIssues.hashFunctionNotCollisionResistant
|
||||
|
||||
signature_issues &= ~SecurityIssues.HashFunctionNotCollisionResistant
|
||||
|
||||
issues = signature_issues | subkey_issues
|
||||
if issues:
|
||||
if issues and issues.causes_signature_verify_to_fail:
|
||||
sigv.add_sigsubj(sig, self, subj, issues)
|
||||
else:
|
||||
verified = self._key.verify(sig.hashdata(subj), sig.__sig__, getattr(hashes, sig.hash_algorithm.name)())
|
||||
if verified is NotImplemented:
|
||||
raise NotImplementedError(sig.key_algorithm)
|
||||
|
||||
sigv.add_sigsubj(sig, self, subj, SecurityIssues.wrongSig if not verified else SecurityIssues.OK)
|
||||
sigv.add_sigsubj(sig, self, subj, SecurityIssues.WrongSig if not verified else SecurityIssues.OK)
|
||||
|
||||
return sigv
|
||||
|
||||
@@ -2551,7 +2548,6 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
pkesk = PKESessionKeyV3()
|
||||
pkesk.encrypter = bytearray(binascii.unhexlify(self.fingerprint.keyid.encode('latin-1')))
|
||||
pkesk.pkalg = self.key_algorithm
|
||||
# pkesk.encrypt_sk(self.__key__, cipher_algo, sessionkey)
|
||||
pkesk.encrypt_sk(self._key, cipher_algo, sessionkey)
|
||||
|
||||
if message.is_encrypted: # pragma: no cover
|
||||
@@ -2617,7 +2613,8 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
# last holds the last non-signature thing processed
|
||||
|
||||
##TODO: see issue #141 and fix this better
|
||||
_getpkt = lambda d: (Packet(d) if d else None) # flake8: noqa
|
||||
def _getpkt(d):
|
||||
return Packet(d) if d else None
|
||||
# some packets are filtered out
|
||||
getpkt = filter(lambda p: p.header.tag != PacketTag.Trust, iter(functools.partial(_getpkt, data), None))
|
||||
|
||||
|
||||
2879
pgpy/pgp1.py
2879
pgpy/pgp1.py
File diff suppressed because it is too large
Load Diff
@@ -9,7 +9,7 @@ from cryptography.hazmat.primitives.ciphers import modes
|
||||
|
||||
from .errors import PGPDecryptionError
|
||||
from .errors import PGPEncryptionError
|
||||
from .errors import PGPInsecureCipher
|
||||
from .errors import PGPInsecureCipherError
|
||||
|
||||
__all__ = ['_encrypt',
|
||||
'_decrypt']
|
||||
@@ -20,7 +20,7 @@ def _encrypt(pt, key, alg, iv=None):
|
||||
iv = b'\x00' * (alg.block_size // 8)
|
||||
|
||||
if alg.is_insecure:
|
||||
raise PGPInsecureCipher("{:s} is not secure. Do not use it for encryption!".format(alg.name))
|
||||
raise PGPInsecureCipherError("{:s} is not secure. Do not use it for encryption!".format(alg.name))
|
||||
|
||||
if not alg.is_supported:
|
||||
raise PGPEncryptionError("Cipher {:s} not supported".format(alg.name))
|
||||
|
||||
109
pgpy/types.py
109
pgpy/types.py
@@ -20,17 +20,18 @@ from enum import IntEnum
|
||||
from .decorators import sdproperty
|
||||
|
||||
from .errors import PGPError
|
||||
from .constants import SecurityIssues
|
||||
|
||||
__all__ = ['Armorable',
|
||||
'ParentRef',
|
||||
'PGPObject',
|
||||
'Field',
|
||||
'Fingerprint',
|
||||
'FlagEnum',
|
||||
'FlagEnumMeta',
|
||||
'Header',
|
||||
'MetaDispatchable',
|
||||
'Dispatchable',
|
||||
'SignatureVerification',
|
||||
'FlagEnum',
|
||||
'Fingerprint',
|
||||
'SorteDeque']
|
||||
|
||||
@@ -315,21 +316,21 @@ class Field(PGPObject):
|
||||
|
||||
class Header(Field):
|
||||
@staticmethod
|
||||
def encode_length(l, nhf=True, llen=1):
|
||||
def _new_length(l):
|
||||
if 192 > l:
|
||||
return Header.int_to_bytes(l)
|
||||
def encode_length(length, nhf=True, llen=1):
|
||||
def _new_length(nl):
|
||||
if 192 > nl:
|
||||
return Header.int_to_bytes(nl)
|
||||
|
||||
elif 8384 > l:
|
||||
elen = ((l & 0xFF00) + (192 << 8)) + ((l & 0xFF) - 192)
|
||||
elif 8384 > nl:
|
||||
elen = ((nl & 0xFF00) + (192 << 8)) + ((nl & 0xFF) - 192)
|
||||
return Header.int_to_bytes(elen, 2)
|
||||
|
||||
return b'\xFF' + Header.int_to_bytes(l, 4)
|
||||
return b'\xFF' + Header.int_to_bytes(nl, 4)
|
||||
|
||||
def _old_length(l, llen):
|
||||
return Header.int_to_bytes(l, llen) if llen > 0 else b''
|
||||
def _old_length(nl, llen):
|
||||
return Header.int_to_bytes(nl, llen) if llen > 0 else b''
|
||||
|
||||
return _new_length(l) if nhf else _old_length(l, llen)
|
||||
return _new_length(length) if nhf else _old_length(length, llen)
|
||||
|
||||
@sdproperty
|
||||
def length(self):
|
||||
@@ -389,12 +390,11 @@ class Header(Field):
|
||||
|
||||
@sdproperty
|
||||
def llen(self):
|
||||
l = self.length
|
||||
lf = self._lenfmt
|
||||
|
||||
if lf == 1:
|
||||
# new-format length
|
||||
if 192 > l:
|
||||
if 192 > self.length:
|
||||
return 1
|
||||
|
||||
elif 8384 > self.length: # >= 192 is implied
|
||||
@@ -475,12 +475,15 @@ class MetaDispatchable(abc.ABCMeta):
|
||||
MetaDispatchable._roots.add(ncls)
|
||||
|
||||
elif issubclass(ncls, tuple(MetaDispatchable._roots)) and ncls.__typeid__ != -1:
|
||||
for rcls in [ root for root in MetaDispatchable._roots if issubclass(ncls, root) ]:
|
||||
for rcls in (root for root in MetaDispatchable._roots if issubclass(ncls, root)):
|
||||
if (rcls, ncls.__typeid__) not in MetaDispatchable._registry:
|
||||
MetaDispatchable._registry[(rcls, ncls.__typeid__)] = ncls
|
||||
|
||||
if (ncls.__ver__ is not None and ncls.__ver__ > 0 and
|
||||
(rcls, ncls.__typeid__, ncls.__ver__) not in MetaDispatchable._registry):
|
||||
if (
|
||||
ncls.__ver__ is not None
|
||||
and ncls.__ver__ > 0
|
||||
and (rcls, ncls.__typeid__, ncls.__ver__) not in MetaDispatchable._registry
|
||||
):
|
||||
MetaDispatchable._registry[(rcls, ncls.__typeid__, ncls.__ver__)] = ncls
|
||||
|
||||
# finally, return the new class object
|
||||
@@ -575,8 +578,12 @@ class SignatureVerification(object):
|
||||
|
||||
``sigsubj.subject`` - the subject that was verified using the signature.
|
||||
"""
|
||||
for s in [ i for i in self._subjects if not i.issues ]:
|
||||
yield s
|
||||
yield from (
|
||||
sigsub
|
||||
for sigsub in self._subjects
|
||||
if not sigsub.issues
|
||||
or (sigsub.issues and not sigsub.issues.causes_signature_verify_to_fail)
|
||||
)
|
||||
|
||||
@property
|
||||
def bad_signatures(self): # pragma: no cover
|
||||
@@ -592,7 +599,11 @@ class SignatureVerification(object):
|
||||
|
||||
``sigsubj.subject`` - the subject that was verified using the signature.
|
||||
"""
|
||||
yield from [ i for i in self._subjects if i.issues ]
|
||||
yield from (
|
||||
sigsub
|
||||
for sigsub in self._subjects
|
||||
if sigsub.issues and sigsub.issues.causes_signature_verify_to_fail
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
@@ -610,7 +621,12 @@ class SignatureVerification(object):
|
||||
return len(self._subjects)
|
||||
|
||||
def __bool__(self):
|
||||
return all(not s.issues for s in self._subjects)
|
||||
from .constants import SecurityIssues
|
||||
return all(
|
||||
sigsub.issues is SecurityIssues.OK
|
||||
or (sigsub.issues and not sigsub.issues.causes_signature_verify_to_fail)
|
||||
for sigsub in self._subjects
|
||||
)
|
||||
|
||||
def __nonzero__(self):
|
||||
return self.__bool__()
|
||||
@@ -623,12 +639,30 @@ class SignatureVerification(object):
|
||||
return self
|
||||
|
||||
def __repr__(self):
|
||||
return "<"+ self.__class__.__name__ + "({" + str(bool(self)) + "})>"
|
||||
return '<{classname}({val})>'.format(
|
||||
classname=self.__class__.__name__,
|
||||
val=bool(self)
|
||||
)
|
||||
|
||||
def add_sigsubj(self, signature, by, subject=None, issues=SecurityIssues(0xFF)):
|
||||
def add_sigsubj(self, signature, by, subject=None, issues=None):
|
||||
if issues is None:
|
||||
from .constants import SecurityIssues
|
||||
issues = SecurityIssues(0xFF)
|
||||
self._subjects.append(self._sigsubj(issues, by, signature, subject))
|
||||
|
||||
|
||||
class FlagEnumMeta(EnumMeta):
|
||||
def __and__(self, other):
|
||||
return { f for f in iter(self) if f.value & other }
|
||||
|
||||
def __rand__(self, other): # pragma: no cover
|
||||
return self & other
|
||||
|
||||
|
||||
namespace = FlagEnumMeta.__prepare__('FlagEnum', (IntEnum,))
|
||||
FlagEnum = FlagEnumMeta('FlagEnum', (IntEnum,), namespace)
|
||||
|
||||
|
||||
class Fingerprint(str):
|
||||
"""
|
||||
A subclass of ``str``. Can be compared using == and != to ``str``, ``unicode``, and other :py:obj:`Fingerprint` instances.
|
||||
@@ -649,6 +683,8 @@ class Fingerprint(str):
|
||||
|
||||
# validate input before continuing: this should be a string of 40 hex digits
|
||||
content = content.upper().replace(' ', '')
|
||||
if not re.match(r'^[0-9A-F]+$', content):
|
||||
raise ValueError('Fingerprint must be a string of 40 hex digits')
|
||||
return str.__new__(cls, content)
|
||||
|
||||
def __eq__(self, other):
|
||||
@@ -659,37 +695,38 @@ class Fingerprint(str):
|
||||
if isinstance(other, (bytes, bytearray)): # pragma: no cover
|
||||
other = other.decode('latin-1')
|
||||
|
||||
other = str(other).replace(' ', '')
|
||||
return any([self.replace(' ', '') == other,
|
||||
other = other.replace(' ', '')
|
||||
return any([str(self) == other,
|
||||
self.keyid == other,
|
||||
self.shortid == other])
|
||||
|
||||
return False # pragma: no cover
|
||||
|
||||
def __ne__(self, other):
|
||||
return str(self) != str(other)
|
||||
return not (self == other)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(str(self))
|
||||
|
||||
def __bytes__(self):
|
||||
return binascii.unhexlify(self.encode("latin-1"))
|
||||
|
||||
|
||||
def __pretty__(self):
|
||||
content = self
|
||||
if not bool(re.match(r'^[A-F0-9]{40}$', content)):
|
||||
raise ValueError("Expected: String of 40 hex digits")
|
||||
|
||||
# store in the format: "AAAA BBBB CCCC DDDD EEEE FFFF 0000 1111 2222 3333"
|
||||
# ^^ note 2 spaces here
|
||||
spaces = [ ' ' if i != 4 else ' ' for i in range(10) ]
|
||||
chunks = [ ''.join(g) for g in itertools.zip_longest(*[iter(content)] * 4) ]
|
||||
content = ''.join(j for i in itertools.zip_longest(chunks, spaces, fillvalue='') for j in i).strip()
|
||||
return content
|
||||
|
||||
def __repr__(self):
|
||||
return self.__class__.__name__+"("+repr(self.__pretty__())+")"
|
||||
halves = [
|
||||
[content[i:i + 4] for i in range(0, 20, 4)],
|
||||
[content[i:i + 4] for i in range(20, 40, 4)]
|
||||
]
|
||||
return ' '.join(' '.join(c for c in half) for half in halves)
|
||||
|
||||
def __repr__(self):
|
||||
return '{classname}({fp})'.format(
|
||||
classname=self.__class__.__name__,
|
||||
fp=self.__pretty__()
|
||||
)
|
||||
|
||||
|
||||
class SorteDeque(collections.deque):
|
||||
|
||||
@@ -1,3 +1,2 @@
|
||||
cryptography>=2.6
|
||||
pyasn1
|
||||
six>=1.9.0
|
||||
|
||||
@@ -27,7 +27,6 @@ classifiers =
|
||||
Programming Language :: Python :: 3.8
|
||||
Programming Language :: Python :: 3.7
|
||||
Programming Language :: Python :: 3.6
|
||||
Programming Language :: Python :: 3.5
|
||||
Programming Language :: Python :: Implementation :: CPython
|
||||
Topic :: Security
|
||||
Topic :: Security :: Cryptography
|
||||
@@ -44,7 +43,7 @@ packages =
|
||||
install_requires =
|
||||
cryptography>=2.6,<38
|
||||
pyasn1
|
||||
python_requires = >=3.5
|
||||
python_requires = >=3.6
|
||||
|
||||
# doc_requires =
|
||||
# sphinx
|
||||
|
||||
@@ -14,6 +14,7 @@ from pgpy.pgp import PGPMessage
|
||||
from pgpy.pgp import PGPSignature
|
||||
from pgpy.types import Armorable
|
||||
|
||||
|
||||
blocks = sorted(glob.glob('tests/testdata/blocks/*.asc'))
|
||||
block_attrs = {
|
||||
'tests/testdata/blocks/message.ascii.asc':
|
||||
@@ -130,9 +131,9 @@ block_attrs = {
|
||||
('type', 'encrypted')],
|
||||
|
||||
'tests/testdata/blocks/revochiio.asc':
|
||||
[('created', datetime(2014, 9, 11, 22, 55, 53)),
|
||||
[('created', datetime(2014, 9, 11, 22, 55, 53, tzinfo=timezone.utc)),
|
||||
('fingerprint', "AE15 9FF3 4C1A 2426 B7F8 0F1A 560C F308 EF60 CFA3"),
|
||||
('expires_at', datetime(2018, 9, 12, 1, 0, 59)),
|
||||
('expires_at', datetime(2018, 9, 12, 1, 0, 59, tzinfo=timezone.utc)),
|
||||
('is_expired', True),
|
||||
('is_primary', True),
|
||||
('is_protected', False),
|
||||
@@ -144,13 +145,13 @@ block_attrs = {
|
||||
('signers', {'560CF308EF60CFA3'}),],
|
||||
|
||||
'tests/testdata/blocks/expyro.asc':
|
||||
[('created', datetime(1970, 1, 1)),
|
||||
('expires_at', datetime(1970, 1, 2)),
|
||||
[('created', datetime(1970, 1, 1, tzinfo=timezone.utc)),
|
||||
('expires_at', datetime(1970, 1, 2, tzinfo=timezone.utc)),
|
||||
('fingerprint', '24EB C1B0 29B1 FCF8 29A5 C150 1A48 291A FB91 A533'),
|
||||
('is_expired', True),],
|
||||
|
||||
'tests/testdata/blocks/rsapubkey.asc':
|
||||
[('created', datetime(2014, 7, 23, 21, 19, 24)),
|
||||
[('created', datetime(2014, 7, 23, 21, 19, 24, tzinfo=timezone.utc)),
|
||||
('expires_at', None),
|
||||
('fingerprint', "F429 4BC8 094A 7E05 85C8 5E86 3747 3B37 58C4 4F36"),
|
||||
('is_expired', False),
|
||||
@@ -164,7 +165,7 @@ block_attrs = {
|
||||
('signers', set()),],
|
||||
|
||||
'tests/testdata/blocks/rsaseckey.asc':
|
||||
[('created', datetime(2014, 7, 23, 21, 19, 24)),
|
||||
[('created', datetime(2014, 7, 23, 21, 19, 24, tzinfo=timezone.utc)),
|
||||
('expires_at', None),
|
||||
('fingerprint', "F429 4BC8 094A 7E05 85C8 5E86 3747 3B37 58C4 4F36"),
|
||||
('is_expired', False),
|
||||
@@ -208,7 +209,7 @@ block_attrs = {
|
||||
('type', SignatureType.BinaryDocument)],
|
||||
|
||||
'tests/testdata/blocks/eccpubkey.asc':
|
||||
[('created', datetime(2010, 9, 17, 20, 33, 49)),
|
||||
[('created', datetime(2010, 9, 17, 20, 33, 49, tzinfo=timezone.utc)),
|
||||
('expires_at', None),
|
||||
('fingerprint', "502D 1A53 65D1 C0CA A699 4539 0BA5 2DF0 BAA5 9D9C"),
|
||||
('is_expired', False),
|
||||
@@ -222,7 +223,7 @@ block_attrs = {
|
||||
('signers', set()),],
|
||||
|
||||
'tests/testdata/blocks/eccseckey.asc':
|
||||
[('created', datetime(2010, 9, 17, 20, 33, 49)),
|
||||
[('created', datetime(2010, 9, 17, 20, 33, 49, tzinfo=timezone.utc)),
|
||||
('expires_at', None),
|
||||
('fingerprint', "502D 1A53 65D1 C0CA A699 4539 0BA5 2DF0 BAA5 9D9C"),
|
||||
('is_expired', False),
|
||||
@@ -236,7 +237,7 @@ block_attrs = {
|
||||
('signers', set()),],
|
||||
|
||||
'tests/testdata/blocks/dsaseckey.asc':
|
||||
[('created', datetime(2017, 2, 21, 19, 21, 41)),
|
||||
[('created', datetime(2017, 2, 21, 19, 21, 41, tzinfo=timezone.utc)),
|
||||
('expires_at', None),
|
||||
('fingerprint', "2B5B BB14 3BA0 B290 DCEE 6668 B798 AE89 9087 7201"),
|
||||
('is_expired', False),
|
||||
@@ -247,7 +248,7 @@ block_attrs = {
|
||||
('key_algorithm', PubKeyAlgorithm.DSA),],
|
||||
|
||||
'tests/testdata/blocks/dsapubkey.asc':
|
||||
[('created', datetime(2017, 2, 21, 19, 21, 41)),
|
||||
[('created', datetime(2017, 2, 21, 19, 21, 41, tzinfo=timezone.utc)),
|
||||
('expires_at', None),
|
||||
('fingerprint', "2B5B BB14 3BA0 B290 DCEE 6668 B798 AE89 9087 7201"),
|
||||
('is_expired', False),
|
||||
@@ -258,7 +259,7 @@ block_attrs = {
|
||||
('key_algorithm', PubKeyAlgorithm.DSA),],
|
||||
|
||||
'tests/testdata/blocks/openpgp.js.pubkey.asc':
|
||||
[('created', datetime(2016, 6, 2, 21, 57, 13)),
|
||||
[('created', datetime(2016, 6, 2, 21, 57, 13, tzinfo=timezone.utc)),
|
||||
('expires_at', None),
|
||||
('fingerprint', "C7C3 8ECE E94A 4AD3 2DDB 064E 14AB 44C7 4D1B DAB8"),
|
||||
('is_expired', False),
|
||||
@@ -272,7 +273,7 @@ block_attrs = {
|
||||
('signers', set()), ],
|
||||
|
||||
'tests/testdata/blocks/openpgp.js.seckey.asc':
|
||||
[('created', datetime(2016, 6, 2, 21, 57, 13)),
|
||||
[('created', datetime(2016, 6, 2, 21, 57, 13, tzinfo=timezone.utc)),
|
||||
('expires_at', None),
|
||||
('fingerprint', "C7C3 8ECE E94A 4AD3 2DDB 064E 14AB 44C7 4D1B DAB8"),
|
||||
('is_expired', False),
|
||||
@@ -286,11 +287,11 @@ block_attrs = {
|
||||
('signers', set()), ],
|
||||
|
||||
'tests/testdata/blocks/signature.expired.asc':
|
||||
[('created', datetime(2014, 9, 28, 20, 54, 42)),
|
||||
[('created', datetime(2014, 9, 28, 20, 54, 42, tzinfo=timezone.utc)),
|
||||
('is_expired', True),],
|
||||
|
||||
'tests/testdata/blocks/signature.non-exportable.asc':
|
||||
[('created', datetime(2017, 2, 21, 20, 43, 34)),
|
||||
[('created', datetime(2017, 2, 21, 20, 43, 34, tzinfo=timezone.utc)),
|
||||
('exportable', False),]
|
||||
}
|
||||
|
||||
|
||||
@@ -222,15 +222,15 @@ def userphoto():
|
||||
|
||||
|
||||
# TODO: add more keyspecs
|
||||
pkeyspecs = ((PubKeyAlgorithm.RSAEncryptOrSign, 1024),
|
||||
(PubKeyAlgorithm.DSA, 1024),
|
||||
pkeyspecs = ((PubKeyAlgorithm.RSAEncryptOrSign, 2048),
|
||||
(PubKeyAlgorithm.DSA, 2048),
|
||||
(PubKeyAlgorithm.ECDSA, EllipticCurveOID.NIST_P256),
|
||||
(PubKeyAlgorithm.EdDSA, EllipticCurveOID.Ed25519),)
|
||||
|
||||
|
||||
skeyspecs = ((PubKeyAlgorithm.RSAEncryptOrSign, 1024),
|
||||
(PubKeyAlgorithm.DSA, 1024),
|
||||
(PubKeyAlgorithm.ElGamal, 1024),
|
||||
skeyspecs = ((PubKeyAlgorithm.RSAEncryptOrSign, 2048),
|
||||
(PubKeyAlgorithm.DSA, 2048),
|
||||
(PubKeyAlgorithm.ElGamal, 2048),
|
||||
(PubKeyAlgorithm.ECDSA, EllipticCurveOID.SECP256K1),
|
||||
(PubKeyAlgorithm.ECDH, EllipticCurveOID.Brainpool_P256),
|
||||
(PubKeyAlgorithm.EdDSA, EllipticCurveOID.Ed25519),
|
||||
|
||||
@@ -17,13 +17,12 @@ from pgpy.constants import PubKeyAlgorithm
|
||||
from pgpy.constants import SymmetricKeyAlgorithm
|
||||
from pgpy.packet import Packet
|
||||
from pgpy.types import Armorable
|
||||
from pgpy.types import PGPObject
|
||||
from pgpy.types import Fingerprint
|
||||
from pgpy.types import SignatureVerification
|
||||
from pgpy.errors import PGPError
|
||||
from pgpy.errors import PGPDecryptionError
|
||||
from pgpy.errors import PGPEncryptionError
|
||||
from pgpy.errors import PGPInsecureCipher
|
||||
from pgpy.errors import PGPInsecureCipherError
|
||||
|
||||
|
||||
def _read(f, mode='r'):
|
||||
@@ -230,13 +229,9 @@ class TestPGPKey(object):
|
||||
rsa_pub.subkeys['EEE097A017B979CA'].encrypt(PGPMessage.new('asdf'),
|
||||
cipher=SymmetricKeyAlgorithm.CAST5)
|
||||
|
||||
w = recwarn.pop(UserWarning)
|
||||
assert str(w.message) == "Selected symmetric algorithm not in key preferences"
|
||||
assert w.filename == __file__
|
||||
|
||||
w = recwarn.pop(UserWarning)
|
||||
assert str(w.message) == "Selected compression algorithm not in key preferences"
|
||||
assert w.filename == __file__
|
||||
relevant_warning_messages = [ str(w.message) for w in recwarn if w.category is UserWarning ]
|
||||
assert "Selected symmetric algorithm not in key preferences" in relevant_warning_messages
|
||||
assert "Selected compression algorithm not in key preferences" in relevant_warning_messages
|
||||
|
||||
def test_sign_bad_prefs(self, rsa_sec, recwarn):
|
||||
rsa_sec.subkeys['2A834D8E5918E886'].sign(PGPMessage.new('asdf'), hash=HashAlgorithm.MD5)
|
||||
@@ -380,7 +375,7 @@ class TestPGPMessage(object):
|
||||
|
||||
def test_encrypt_insecure_cipher(self):
|
||||
msg = PGPMessage.new('asdf')
|
||||
with pytest.raises(PGPInsecureCipher):
|
||||
with pytest.raises(PGPInsecureCipherError):
|
||||
msg.encrypt('QwertyUiop', cipher=SymmetricKeyAlgorithm.IDEA)
|
||||
|
||||
def test_encrypt_sessionkey_wrongtype(self):
|
||||
|
||||
6
tox.ini
6
tox.ini
@@ -1,5 +1,5 @@
|
||||
[tox]
|
||||
envlist = py{35,36,37,38,39,310}{,-setup}, pypy3, pep8
|
||||
envlist = py{36,37,38,39,310}{,-setup}, pypy3, pep8
|
||||
skipsdist = True
|
||||
|
||||
[pytest]
|
||||
@@ -11,7 +11,7 @@ markers =
|
||||
|
||||
[flake8]
|
||||
exclude = .git,.idea,__pycache__,.tox,tests/*,docs/*,test_load_asc_bench.py
|
||||
ignore = E201,E202,E221,E251,E265,F403,F821,N805
|
||||
ignore = E201,E202,E221,E251,E265,F403,F821,N805,W503
|
||||
max-line-length = 160
|
||||
|
||||
[testenv]
|
||||
@@ -30,7 +30,7 @@ install_command = pip install {opts} --no-cache-dir {packages}
|
||||
commands =
|
||||
py.test --cov pgpy --cov-report term-missing tests/
|
||||
|
||||
[testenv:py{35,36,37,38,39}-setup]
|
||||
[testenv:py{36,37,38,39,310}-setup]
|
||||
recreate = True
|
||||
allowlist_externals =
|
||||
/usr/bin/rm
|
||||
|
||||
Reference in New Issue
Block a user