big PEP8 pass
This commit is contained in:
@@ -20,15 +20,15 @@ Exceptions
|
||||
|
||||
.. autoexception:: PGPDecryptionError
|
||||
|
||||
:py:class:`PGPOpenSSLCipherNotSupported`
|
||||
:py:class:`PGPOpenSSLCipherNotSupportedError`
|
||||
----------------------------------------
|
||||
|
||||
.. autoexception:: PGPOpenSSLCipherNotSupported
|
||||
.. autoexception:: PGPOpenSSLCipherNotSupportedError
|
||||
|
||||
:py:class:`PGPInsecureCipher`
|
||||
:py:class:`PGPInsecureCipherError`
|
||||
-----------------------------
|
||||
|
||||
.. autoexception:: PGPInsecureCipher
|
||||
.. autoexception:: PGPInsecureCipherError
|
||||
|
||||
:py:class:`WontImplementError`
|
||||
------------------------------
|
||||
|
||||
@@ -49,25 +49,25 @@ if use_legacy_cryptography_decorator():
|
||||
key_size = 256
|
||||
|
||||
|
||||
@utils.register_interface(ec.EllipticCurve)
|
||||
@utils.register_interface(ec.EllipticCurve) # noqa: E303
|
||||
class BrainpoolP384R1(object):
|
||||
name = 'brainpoolP384r1'
|
||||
key_size = 384
|
||||
|
||||
|
||||
@utils.register_interface(ec.EllipticCurve)
|
||||
@utils.register_interface(ec.EllipticCurve) # noqa: E303
|
||||
class BrainpoolP512R1(object):
|
||||
name = 'brainpoolP512r1'
|
||||
key_size = 512
|
||||
|
||||
|
||||
@utils.register_interface(ec.EllipticCurve)
|
||||
@utils.register_interface(ec.EllipticCurve) # noqa: E303
|
||||
class X25519(object):
|
||||
name = 'X25519'
|
||||
key_size = 256
|
||||
|
||||
|
||||
@utils.register_interface(ec.EllipticCurve)
|
||||
@utils.register_interface(ec.EllipticCurve) # noqa: E303
|
||||
class Ed25519(object):
|
||||
name = 'ed25519'
|
||||
key_size = 256
|
||||
@@ -77,22 +77,22 @@ else:
|
||||
key_size = 256
|
||||
|
||||
|
||||
class BrainpoolP384R1(ec.EllipticCurve):
|
||||
class BrainpoolP384R1(ec.EllipticCurve): # noqa: E303
|
||||
name = 'brainpoolP384r1'
|
||||
key_size = 384
|
||||
|
||||
|
||||
class BrainpoolP512R1(ec.EllipticCurve):
|
||||
class BrainpoolP512R1(ec.EllipticCurve): # noqa: E303
|
||||
name = 'brainpoolP512r1'
|
||||
key_size = 512
|
||||
|
||||
|
||||
class X25519(ec.EllipticCurve):
|
||||
class X25519(ec.EllipticCurve): # noqa: E303
|
||||
name = 'X25519'
|
||||
key_size = 256
|
||||
|
||||
|
||||
class Ed25519(ec.EllipticCurve):
|
||||
class Ed25519(ec.EllipticCurve): # noqa: E303
|
||||
name = 'ed25519'
|
||||
key_size = 256
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import bz2
|
||||
import hashlib
|
||||
import imghdr
|
||||
import os
|
||||
import time
|
||||
import zlib
|
||||
import warnings
|
||||
|
||||
@@ -23,27 +22,29 @@ from .types import FlagEnum
|
||||
from .decorators import classproperty
|
||||
from ._curves import BrainpoolP256R1, BrainpoolP384R1, BrainpoolP512R1, X25519, Ed25519
|
||||
|
||||
__all__ = ['Backend',
|
||||
'EllipticCurveOID',
|
||||
'ECPointFormat',
|
||||
'PacketTag',
|
||||
'SymmetricKeyAlgorithm',
|
||||
'PubKeyAlgorithm',
|
||||
'CompressionAlgorithm',
|
||||
'HashAlgorithm',
|
||||
'RevocationReason',
|
||||
'ImageEncoding',
|
||||
'SignatureType',
|
||||
'KeyServerPreferences',
|
||||
'S2KGNUExtension',
|
||||
'SecurityIssues',
|
||||
'String2KeyType',
|
||||
'TrustLevel',
|
||||
'KeyFlags',
|
||||
'Features',
|
||||
'RevocationKeyClass',
|
||||
'NotationDataFlags',
|
||||
'TrustFlags',]
|
||||
__all__ = [
|
||||
'Backend',
|
||||
'EllipticCurveOID',
|
||||
'ECPointFormat',
|
||||
'PacketTag',
|
||||
'SymmetricKeyAlgorithm',
|
||||
'PubKeyAlgorithm',
|
||||
'CompressionAlgorithm',
|
||||
'HashAlgorithm',
|
||||
'RevocationReason',
|
||||
'ImageEncoding',
|
||||
'SignatureType',
|
||||
'KeyServerPreferences',
|
||||
'S2KGNUExtension',
|
||||
'SecurityIssues',
|
||||
'String2KeyType',
|
||||
'TrustLevel',
|
||||
'KeyFlags',
|
||||
'Features',
|
||||
'RevocationKeyClass',
|
||||
'NotationDataFlags',
|
||||
'TrustFlags',
|
||||
]
|
||||
|
||||
|
||||
# this is 50 KiB
|
||||
@@ -621,7 +622,7 @@ MINIMUM_ASYMMETRIC_KEY_LENGTHS = {
|
||||
PubKeyAlgorithm.RSASign: 2048,
|
||||
PubKeyAlgorithm.ElGamal: 2048,
|
||||
PubKeyAlgorithm.DSA: 2048,
|
||||
|
||||
##
|
||||
PubKeyAlgorithm.ECDSA: SAFE_CURVES,
|
||||
PubKeyAlgorithm.EdDSA: SAFE_CURVES,
|
||||
PubKeyAlgorithm.ECDH: SAFE_CURVES,
|
||||
|
||||
@@ -4,9 +4,9 @@
|
||||
__all__ = ('PGPError',
|
||||
'PGPEncryptionError',
|
||||
'PGPDecryptionError',
|
||||
'PGPIncompatibleECPointFormat',
|
||||
'PGPOpenSSLCipherNotSupported',
|
||||
'PGPInsecureCipher',
|
||||
'PGPIncompatibleECPointFormatError',
|
||||
'PGPOpenSSLCipherNotSupportedError',
|
||||
'PGPInsecureCipherError',
|
||||
'WontImplementError',)
|
||||
|
||||
|
||||
@@ -25,17 +25,17 @@ class PGPDecryptionError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class PGPIncompatibleECPointFormat(Exception):
|
||||
class PGPIncompatibleECPointFormatError(Exception):
|
||||
"""Raised when the point format is incompatible with the elliptic curve"""
|
||||
pass
|
||||
|
||||
|
||||
class PGPOpenSSLCipherNotSupported(Exception):
|
||||
class PGPOpenSSLCipherNotSupportedError(Exception):
|
||||
"""Raised when OpenSSL does not support the requested cipher"""
|
||||
pass
|
||||
|
||||
|
||||
class PGPInsecureCipher(Exception):
|
||||
class PGPInsecureCipherError(Exception):
|
||||
"""Raised when a cipher known to be insecure is attempted to be used to encrypt data"""
|
||||
pass
|
||||
|
||||
|
||||
@@ -63,7 +63,7 @@ from ..decorators import sdproperty
|
||||
|
||||
from ..errors import PGPDecryptionError
|
||||
from ..errors import PGPError
|
||||
from ..errors import PGPIncompatibleECPointFormat
|
||||
from ..errors import PGPIncompatibleECPointFormatError
|
||||
|
||||
from ..symenc import _decrypt
|
||||
from ..symenc import _encrypt
|
||||
@@ -371,8 +371,8 @@ class EdDSASignature(DSASignature):
|
||||
|
||||
def __sig__(self):
|
||||
# TODO: change this length when EdDSA can be used with another curve (Ed448)
|
||||
l = (EllipticCurveOID.Ed25519.key_size + 7) // 8
|
||||
return self.int_to_bytes(self.r, l) + self.int_to_bytes(self.s, l)
|
||||
siglen = (EllipticCurveOID.Ed25519.key_size + 7) // 8
|
||||
return self.int_to_bytes(self.r, l) + self.int_to_bytes(self.s, siglen)
|
||||
|
||||
|
||||
class PubKey(MPIs):
|
||||
@@ -500,7 +500,7 @@ class ECPoint:
|
||||
self.x = MPI(MPIs.bytes_to_int(xy[:self.bytelen]))
|
||||
self.y = MPI(MPIs.bytes_to_int(xy[self.bytelen:]))
|
||||
elif self.format == ECPointFormat.Native:
|
||||
self.bytelen = 0 # dummy value for copy
|
||||
self.bytelen = 0 # dummy value for copy
|
||||
self.x = bytes(xy)
|
||||
self.y = None
|
||||
else:
|
||||
@@ -592,7 +592,7 @@ class ECDSAPub(PubKey):
|
||||
|
||||
self.p = ECPoint(packet)
|
||||
if self.p.format != ECPointFormat.Standard:
|
||||
raise PGPIncompatibleECPointFormat("Only Standard format is valid for ECDSA")
|
||||
raise PGPIncompatibleECPointFormatError("Only Standard format is valid for ECDSA")
|
||||
|
||||
|
||||
class EdDSAPub(PubKey):
|
||||
@@ -643,7 +643,7 @@ class EdDSAPub(PubKey):
|
||||
|
||||
self.p = ECPoint(packet)
|
||||
if self.p.format != ECPointFormat.Native:
|
||||
raise PGPIncompatibleECPointFormat("Only Native format is valid for EdDSA")
|
||||
raise PGPIncompatibleECPointFormatError("Only Native format is valid for EdDSA")
|
||||
|
||||
|
||||
class ECDHPub(PubKey):
|
||||
@@ -718,9 +718,9 @@ class ECDHPub(PubKey):
|
||||
self.p = ECPoint(packet)
|
||||
if self.oid == EllipticCurveOID.Curve25519:
|
||||
if self.p.format != ECPointFormat.Native:
|
||||
raise PGPIncompatibleECPointFormat("Only Native format is valid for Curve25519")
|
||||
raise PGPIncompatibleECPointFormatError("Only Native format is valid for Curve25519")
|
||||
elif self.p.format != ECPointFormat.Standard:
|
||||
raise PGPIncompatibleECPointFormat("Only Standard format is valid for this curve")
|
||||
raise PGPIncompatibleECPointFormatError("Only Standard format is valid for this curve")
|
||||
self.kdf.parse(packet)
|
||||
|
||||
|
||||
@@ -1177,14 +1177,14 @@ class PrivKey(PubKey):
|
||||
return _bytes
|
||||
|
||||
def __len__(self):
|
||||
l = super(PrivKey, self).__len__() + len(self.s2k) + len(self.chksum)
|
||||
nbytes = super(PrivKey, self).__len__() + len(self.s2k) + len(self.chksum)
|
||||
if self.s2k:
|
||||
l += len(self.encbytes)
|
||||
nbytes += len(self.encbytes)
|
||||
|
||||
else:
|
||||
l += sum(len(getattr(self, i)) for i in self.__privfields__)
|
||||
nbytes += sum(len(getattr(self, i)) for i in self.__privfields__)
|
||||
|
||||
return l
|
||||
return nbytes
|
||||
|
||||
def __copy__(self):
|
||||
pk = super(PrivKey, self).__copy__()
|
||||
@@ -1582,12 +1582,12 @@ class ECDHPriv(ECDSAPriv, ECDHPub):
|
||||
return _b
|
||||
|
||||
def __len__(self):
|
||||
l = ECDHPub.__len__(self) + len(self.s2k) + len(self.chksum)
|
||||
nbytes = ECDHPub.__len__(self) + len(self.s2k) + len(self.chksum)
|
||||
if self.s2k:
|
||||
l += len(self.encbytes)
|
||||
nbytes += len(self.encbytes)
|
||||
else:
|
||||
l += sum(len(getattr(self, i)) for i in self.__privfields__)
|
||||
return l
|
||||
nbytes += sum(len(getattr(self, i)) for i in self.__privfields__)
|
||||
return nbytes
|
||||
|
||||
def __privkey__(self):
|
||||
if self.oid == EllipticCurveOID.Curve25519:
|
||||
|
||||
@@ -6,7 +6,6 @@ import calendar
|
||||
import copy
|
||||
import hashlib
|
||||
import os
|
||||
import re
|
||||
import warnings
|
||||
|
||||
from datetime import datetime, timezone
|
||||
@@ -359,12 +358,14 @@ class SignatureV4(Signature):
|
||||
def pubalg_int(self, val):
|
||||
self._pubalg = PubKeyAlgorithm(val)
|
||||
|
||||
sigs = {PubKeyAlgorithm.RSAEncryptOrSign: RSASignature,
|
||||
PubKeyAlgorithm.RSAEncrypt: RSASignature,
|
||||
PubKeyAlgorithm.RSASign: RSASignature,
|
||||
PubKeyAlgorithm.DSA: DSASignature,
|
||||
PubKeyAlgorithm.ECDSA: ECDSASignature,
|
||||
PubKeyAlgorithm.EdDSA: EdDSASignature,}
|
||||
sigs = {
|
||||
PubKeyAlgorithm.RSAEncryptOrSign: RSASignature,
|
||||
PubKeyAlgorithm.RSAEncrypt: RSASignature,
|
||||
PubKeyAlgorithm.RSASign: RSASignature,
|
||||
PubKeyAlgorithm.DSA: DSASignature,
|
||||
PubKeyAlgorithm.ECDSA: ECDSASignature,
|
||||
PubKeyAlgorithm.EdDSA: EdDSASignature,
|
||||
}
|
||||
|
||||
self.signature = sigs.get(self.pubalg, OpaqueSignature)()
|
||||
|
||||
@@ -427,7 +428,6 @@ class SignatureV4(Signature):
|
||||
with the length-of-length set to zero.) The unhashed subpacket data
|
||||
of the Signature packet being hashed is not included in the hash, and
|
||||
the unhashed subpacket data length value is set to zero.
|
||||
|
||||
'''
|
||||
_body = bytearray()
|
||||
_body += self.int_to_bytes(self.header.version)
|
||||
@@ -435,7 +435,7 @@ class SignatureV4(Signature):
|
||||
_body += self.int_to_bytes(self.pubalg)
|
||||
_body += self.int_to_bytes(self.halg)
|
||||
_body += self.subpackets.__hashbytearray__()
|
||||
_body += self.int_to_bytes(0, minlen=2) # empty unhashed subpackets
|
||||
_body += self.int_to_bytes(0, minlen=2) # empty unhashed subpackets
|
||||
_body += self.hash2
|
||||
_body += self.signature.__bytearray__()
|
||||
|
||||
@@ -443,7 +443,7 @@ class SignatureV4(Signature):
|
||||
_hdr += b'\x88'
|
||||
_hdr += self.int_to_bytes(len(_body), minlen=4)
|
||||
return _hdr + _body
|
||||
|
||||
|
||||
def __copy__(self):
|
||||
spkt = SignatureV4()
|
||||
spkt.header = copy.copy(self.header)
|
||||
|
||||
@@ -703,9 +703,11 @@ class KeyServerPreferences(ByteFlag):
|
||||
class PreferredKeyServer(URI):
|
||||
__typeid__ = 0x18
|
||||
|
||||
|
||||
class SubkeyBindingSignature(Signature):
|
||||
__typeid__ = 0x18
|
||||
|
||||
|
||||
class PrimaryUserID(SubkeyBindingSignature):
|
||||
__typeid__ = 0x19
|
||||
|
||||
@@ -1037,7 +1039,7 @@ class IntendedRecipient(Signature):
|
||||
self.intended_recipient = packet[:fpr_len]
|
||||
del packet[:fpr_len]
|
||||
|
||||
|
||||
|
||||
class AttestedCertifications(Signature):
|
||||
'''
|
||||
(from RFC4880bis-08)
|
||||
@@ -1118,7 +1120,7 @@ class AttestedCertifications(Signature):
|
||||
def attested_certifications(self):
|
||||
return self._attested_certifications
|
||||
|
||||
@attested_certifications.register(bytearray)
|
||||
@attested_certifications.register(bytearray)
|
||||
@attested_certifications.register(bytes)
|
||||
def attested_certifications_bytearray(self, val):
|
||||
self._attested_certifications = val
|
||||
|
||||
@@ -77,10 +77,11 @@ class SubPacket(Dispatchable):
|
||||
super(SubPacket, self).__init__()
|
||||
self.header = Header()
|
||||
|
||||
# if self.__typeid__ not in [-1, None]:
|
||||
if (self.header.typeid == -1 and
|
||||
(not hasattr(self.__typeid__, '__abstractmethod__')) and
|
||||
(self.__typeid__ not in [-1, None])):
|
||||
if (
|
||||
self.header.typeid == -1
|
||||
and (not hasattr(self.__typeid__, '__abstractmethod__'))
|
||||
and (self.__typeid__ not in {-1, None})
|
||||
):
|
||||
self.header.typeid = self.__typeid__
|
||||
|
||||
def __bytearray__(self):
|
||||
|
||||
54
pgpy/pgp.py
54
pgpy/pgp.py
@@ -47,7 +47,6 @@ from .packet import Packet
|
||||
from .packet import Primary
|
||||
from .packet import Private
|
||||
from .packet import PubKeyV4
|
||||
from .packet import PubSubKeyV4
|
||||
from .packet import PrivKeyV4
|
||||
from .packet import PrivSubKeyV4
|
||||
from .packet import Public
|
||||
@@ -88,6 +87,7 @@ __all__ = ['PGPSignature',
|
||||
|
||||
class PGPSignature(Armorable, ParentRef, PGPObject):
|
||||
_reason_for_revocation = collections.namedtuple('ReasonForRevocation', ['code', 'comment'])
|
||||
|
||||
@property
|
||||
def __sig__(self):
|
||||
return self._signature.signature.__sig__()
|
||||
@@ -173,7 +173,7 @@ class PGPSignature(Armorable, ParentRef, PGPObject):
|
||||
|
||||
def check_primitives(self):
|
||||
return self.hash_algorithm.is_considered_secure
|
||||
|
||||
|
||||
def check_soundness(self):
|
||||
return self.check_primitives()
|
||||
|
||||
@@ -284,7 +284,7 @@ class PGPSignature(Armorable, ParentRef, PGPObject):
|
||||
for n in self._signature.subpackets['h_AttestedCertifications']:
|
||||
attestations = bytes(n.attested_certifications)
|
||||
for i in range(0, len(attestations), hlen):
|
||||
ret.add(attestations[i:i+hlen])
|
||||
ret.add(attestations[i:i + hlen])
|
||||
return ret
|
||||
|
||||
@property
|
||||
@@ -326,7 +326,7 @@ class PGPSignature(Armorable, ParentRef, PGPObject):
|
||||
sig = PGPSignature()
|
||||
|
||||
if created is None:
|
||||
created=datetime.now(timezone.utc)
|
||||
created = datetime.now(timezone.utc)
|
||||
sigpkt = SignatureV4()
|
||||
sigpkt.header.tag = 2
|
||||
sigpkt.header.version = 4
|
||||
@@ -600,9 +600,9 @@ class PGPUID(ParentRef):
|
||||
def _splitstring(self):
|
||||
'''returns name, comment email from User ID string'''
|
||||
if not isinstance(self._uid, UserID):
|
||||
return ("", "", "")
|
||||
return "", "", ""
|
||||
if self._uid.uid == "":
|
||||
return ("", "", "")
|
||||
return "", "", ""
|
||||
rfc2822 = re.match(r"""^
|
||||
# name should always match something
|
||||
(?P<name>.+?)
|
||||
@@ -616,9 +616,8 @@ class PGPUID(ParentRef):
|
||||
(\ <(?P<email>.+)>)?
|
||||
$
|
||||
""", self._uid.uid, flags=re.VERBOSE).groupdict()
|
||||
|
||||
return (rfc2822['name'], rfc2822['comment'] or "", rfc2822['email'] or "")
|
||||
|
||||
return (rfc2822['name'], rfc2822['comment'] or "", rfc2822['email'] or "")
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
@@ -633,7 +632,6 @@ class PGPUID(ParentRef):
|
||||
"""
|
||||
return self._splitstring()[1]
|
||||
|
||||
|
||||
@property
|
||||
def email(self):
|
||||
"""
|
||||
@@ -1699,8 +1697,10 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
self._uids.insort(other)
|
||||
|
||||
else:
|
||||
raise TypeError("unsupported operand type(s) for |: '{:s}' and '{:s}'"
|
||||
"".format(self.__class__.__name__, other.__class__.__name__))
|
||||
raise TypeError(
|
||||
"unsupported operand type(s) for |: '{:s}' and '{:s}'"
|
||||
"".format(self.__class__.__name__, other.__class__.__name__)
|
||||
)
|
||||
|
||||
if isinstance(self._sibling, weakref.ref) and not from_sib:
|
||||
sib = self._sibling()
|
||||
@@ -2095,7 +2095,7 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
the certificate holder wants to attest to for redistribution with the certificate.
|
||||
Alternatively, any element in the list can be a ``bytes`` or ``bytearray`` object
|
||||
of the appropriate length (the length of this certification's digest).
|
||||
This keyword is only used for signatures of type Attestation.
|
||||
This keyword is only used for signatures of type Attestation.
|
||||
:type attested_certifications: ``list``
|
||||
:keyword keyserver: Specify the URI of the preferred key server of the user.
|
||||
This keyword is ignored for non-self-certifications.
|
||||
@@ -2360,28 +2360,27 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
|
||||
def is_considered_insecure(self, self_verifying=False):
|
||||
res = self.check_soundness(self_verifying=self_verifying)
|
||||
|
||||
|
||||
for sk in self.subkeys.values():
|
||||
res |= sk.check_soundness(self_verifying=self_verifying)
|
||||
return res
|
||||
|
||||
def self_verify(self):
|
||||
selfSigs = list(self.self_signatures)
|
||||
self_sigs = list(self.self_signatures)
|
||||
res = SecurityIssues.OK
|
||||
if selfSigs:
|
||||
for s in selfSigs:
|
||||
if self_sigs:
|
||||
for s in self_sigs:
|
||||
if not self.verify(self, s):
|
||||
res |= SecurityIssues.Invalid
|
||||
break
|
||||
else:
|
||||
return SecurityIssues.NoSelfSignature
|
||||
return res
|
||||
|
||||
|
||||
def _do_self_signatures_verification(self):
|
||||
try:
|
||||
self._self_verified = SecurityIssues.OK
|
||||
self._self_verified = self.self_verify()
|
||||
except:
|
||||
except Exception:
|
||||
self._self_verified = None
|
||||
raise
|
||||
|
||||
@@ -2389,10 +2388,10 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
def self_verified(self):
|
||||
warnings.warn("TODO: Self-sigs verification is not yet working because self-sigs are not parsed!!!")
|
||||
return SecurityIssues.OK
|
||||
|
||||
|
||||
if self._self_verified is None:
|
||||
self._do_self_signatures_verification()
|
||||
|
||||
|
||||
return self._self_verified
|
||||
|
||||
def check_primitives(self):
|
||||
@@ -2403,7 +2402,7 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
if self.is_expired:
|
||||
warnings.warn('Key {} has expired at {:s}'.format(repr(self), self.expires_at))
|
||||
res |= SecurityIssues.Expired
|
||||
|
||||
|
||||
warnings.warn("TODO: Revocation checks are not yet implemented!!!")
|
||||
warnings.warn("TODO: Flags (s.a. `disabled`) checks are not yet implemented!!!")
|
||||
res |= int(bool(list(self.revocation_signatures))) * SecurityIssues.Revoked
|
||||
@@ -2454,7 +2453,7 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
for ua in subject.userattributes:
|
||||
for sig in _filter_sigs(ua.__sig__):
|
||||
sspairs.append((sig, ua))
|
||||
|
||||
|
||||
# subkey binding signatures
|
||||
for subkey in subject.subkeys.values():
|
||||
for sig in _filter_sigs(subkey.__sig__):
|
||||
@@ -2477,13 +2476,13 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
self_verifying = sig.signer == subj.fingerprint
|
||||
else:
|
||||
self_verifying = False
|
||||
|
||||
|
||||
subkey_issues = self.check_soundness(self_verifying)
|
||||
signature_issues = self.check_primitives()
|
||||
|
||||
|
||||
if self_verifying:
|
||||
signature_issues &= ~SecurityIssues.HashFunctionNotCollisionResistant
|
||||
|
||||
|
||||
issues = signature_issues | subkey_issues
|
||||
if issues and issues.causes_signature_verify_to_fail:
|
||||
sigv.add_sigsubj(sig, self, subj, issues)
|
||||
@@ -2614,7 +2613,8 @@ class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
# last holds the last non-signature thing processed
|
||||
|
||||
##TODO: see issue #141 and fix this better
|
||||
_getpkt = lambda d: (Packet(d) if d else None) # flake8: noqa
|
||||
def _getpkt(d):
|
||||
return Packet(d) if d else None
|
||||
# some packets are filtered out
|
||||
getpkt = filter(lambda p: p.header.tag != PacketTag.Trust, iter(functools.partial(_getpkt, data), None))
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from cryptography.hazmat.primitives.ciphers import modes
|
||||
|
||||
from .errors import PGPDecryptionError
|
||||
from .errors import PGPEncryptionError
|
||||
from .errors import PGPInsecureCipher
|
||||
from .errors import PGPInsecureCipherError
|
||||
|
||||
__all__ = ['_encrypt',
|
||||
'_decrypt']
|
||||
@@ -20,7 +20,7 @@ def _encrypt(pt, key, alg, iv=None):
|
||||
iv = b'\x00' * (alg.block_size // 8)
|
||||
|
||||
if alg.is_insecure:
|
||||
raise PGPInsecureCipher("{:s} is not secure. Do not use it for encryption!".format(alg.name))
|
||||
raise PGPInsecureCipherError("{:s} is not secure. Do not use it for encryption!".format(alg.name))
|
||||
|
||||
if not alg.is_supported:
|
||||
raise PGPEncryptionError("Cipher {:s} not supported".format(alg.name))
|
||||
|
||||
@@ -316,21 +316,21 @@ class Field(PGPObject):
|
||||
|
||||
class Header(Field):
|
||||
@staticmethod
|
||||
def encode_length(l, nhf=True, llen=1):
|
||||
def _new_length(l):
|
||||
if 192 > l:
|
||||
return Header.int_to_bytes(l)
|
||||
def encode_length(length, nhf=True, llen=1):
|
||||
def _new_length(nl):
|
||||
if 192 > nl:
|
||||
return Header.int_to_bytes(nl)
|
||||
|
||||
elif 8384 > l:
|
||||
elen = ((l & 0xFF00) + (192 << 8)) + ((l & 0xFF) - 192)
|
||||
elif 8384 > nl:
|
||||
elen = ((nl & 0xFF00) + (192 << 8)) + ((nl & 0xFF) - 192)
|
||||
return Header.int_to_bytes(elen, 2)
|
||||
|
||||
return b'\xFF' + Header.int_to_bytes(l, 4)
|
||||
return b'\xFF' + Header.int_to_bytes(nl, 4)
|
||||
|
||||
def _old_length(l, llen):
|
||||
return Header.int_to_bytes(l, llen) if llen > 0 else b''
|
||||
def _old_length(nl, llen):
|
||||
return Header.int_to_bytes(nl, llen) if llen > 0 else b''
|
||||
|
||||
return _new_length(l) if nhf else _old_length(l, llen)
|
||||
return _new_length(length) if nhf else _old_length(length, llen)
|
||||
|
||||
@sdproperty
|
||||
def length(self):
|
||||
@@ -390,12 +390,11 @@ class Header(Field):
|
||||
|
||||
@sdproperty
|
||||
def llen(self):
|
||||
l = self.length
|
||||
lf = self._lenfmt
|
||||
|
||||
if lf == 1:
|
||||
# new-format length
|
||||
if 192 > l:
|
||||
if 192 > self.length:
|
||||
return 1
|
||||
|
||||
elif 8384 > self.length: # >= 192 is implied
|
||||
@@ -476,12 +475,15 @@ class MetaDispatchable(abc.ABCMeta):
|
||||
MetaDispatchable._roots.add(ncls)
|
||||
|
||||
elif issubclass(ncls, tuple(MetaDispatchable._roots)) and ncls.__typeid__ != -1:
|
||||
for rcls in [ root for root in MetaDispatchable._roots if issubclass(ncls, root) ]:
|
||||
for rcls in (root for root in MetaDispatchable._roots if issubclass(ncls, root)):
|
||||
if (rcls, ncls.__typeid__) not in MetaDispatchable._registry:
|
||||
MetaDispatchable._registry[(rcls, ncls.__typeid__)] = ncls
|
||||
|
||||
if (ncls.__ver__ is not None and ncls.__ver__ > 0 and
|
||||
(rcls, ncls.__typeid__, ncls.__ver__) not in MetaDispatchable._registry):
|
||||
if (
|
||||
ncls.__ver__ is not None
|
||||
and ncls.__ver__ > 0
|
||||
and (rcls, ncls.__typeid__, ncls.__ver__) not in MetaDispatchable._registry
|
||||
):
|
||||
MetaDispatchable._registry[(rcls, ncls.__typeid__, ncls.__ver__)] = ncls
|
||||
|
||||
# finally, return the new class object
|
||||
@@ -637,7 +639,10 @@ class SignatureVerification(object):
|
||||
return self
|
||||
|
||||
def __repr__(self):
|
||||
return "<"+ self.__class__.__name__ + "({" + str(bool(self)) + "})>"
|
||||
return '<{classname}({val})>'.format(
|
||||
classname=self.__class__.__name__,
|
||||
val=bool(self)
|
||||
)
|
||||
|
||||
def add_sigsubj(self, signature, by, subject=None, issues=None):
|
||||
if issues is None:
|
||||
@@ -705,20 +710,23 @@ class Fingerprint(str):
|
||||
|
||||
def __bytes__(self):
|
||||
return binascii.unhexlify(self.encode("latin-1"))
|
||||
|
||||
|
||||
def __pretty__(self):
|
||||
content = self
|
||||
if not bool(re.match(r'^[A-F0-9]{40}$', content)):
|
||||
raise ValueError("Expected: String of 40 hex digits")
|
||||
|
||||
halves = [
|
||||
[content[i:i+4] for i in range(0, 20, 4)],
|
||||
[content[i:i+4] for i in range(20, 40, 4)]
|
||||
[content[i:i + 4] for i in range(0, 20, 4)],
|
||||
[content[i:i + 4] for i in range(20, 40, 4)]
|
||||
]
|
||||
return ' '.join(' '.join(c for c in half) for half in halves)
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
return self.__class__.__name__+"("+repr(self.__pretty__())+")"
|
||||
return '{classname}({fp})'.format(
|
||||
classname=self.__class__.__name__,
|
||||
fp=self.__pretty__()
|
||||
)
|
||||
|
||||
|
||||
class SorteDeque(collections.deque):
|
||||
|
||||
@@ -17,13 +17,12 @@ from pgpy.constants import PubKeyAlgorithm
|
||||
from pgpy.constants import SymmetricKeyAlgorithm
|
||||
from pgpy.packet import Packet
|
||||
from pgpy.types import Armorable
|
||||
from pgpy.types import PGPObject
|
||||
from pgpy.types import Fingerprint
|
||||
from pgpy.types import SignatureVerification
|
||||
from pgpy.errors import PGPError
|
||||
from pgpy.errors import PGPDecryptionError
|
||||
from pgpy.errors import PGPEncryptionError
|
||||
from pgpy.errors import PGPInsecureCipher
|
||||
from pgpy.errors import PGPInsecureCipherError
|
||||
|
||||
|
||||
def _read(f, mode='r'):
|
||||
@@ -376,7 +375,7 @@ class TestPGPMessage(object):
|
||||
|
||||
def test_encrypt_insecure_cipher(self):
|
||||
msg = PGPMessage.new('asdf')
|
||||
with pytest.raises(PGPInsecureCipher):
|
||||
with pytest.raises(PGPInsecureCipherError):
|
||||
msg.encrypt('QwertyUiop', cipher=SymmetricKeyAlgorithm.IDEA)
|
||||
|
||||
def test_encrypt_sessionkey_wrongtype(self):
|
||||
|
||||
Reference in New Issue
Block a user