- fixed improper munging of non-unicode inputs - #154
- added additional unicode tests - #153 - fixed mixin/base class ordering of PGPObject subclasses
This commit is contained in:
@@ -1134,8 +1134,8 @@ class LiteralData(Packet):
|
||||
if self.format == 't':
|
||||
return self._contents.decode('latin-1')
|
||||
|
||||
if self.format == 'u': # pragma: no cover
|
||||
return six.u(self._contents.decode('latin-1'))
|
||||
if self.format == 'u':
|
||||
return self._contents.decode('utf-8')
|
||||
|
||||
return self._contents
|
||||
|
||||
|
||||
67
pgpy/pgp.py
67
pgpy/pgp.py
@@ -83,7 +83,7 @@ __all__ = ['PGPSignature',
|
||||
'PGPKeyring']
|
||||
|
||||
|
||||
class PGPSignature(PGPObject, Armorable, ParentRef):
|
||||
class PGPSignature(Armorable, ParentRef, PGPObject):
|
||||
@property
|
||||
def __sig__(self):
|
||||
return self._signature.signature.__sig__()
|
||||
@@ -666,7 +666,7 @@ class PGPUID(ParentRef):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class PGPMessage(PGPObject, Armorable):
|
||||
class PGPMessage(Armorable, PGPObject):
|
||||
@staticmethod
|
||||
def dash_unescape(text):
|
||||
return re.subn(r'^- -', '-', text, flags=re.MULTILINE)[0]
|
||||
@@ -722,12 +722,15 @@ class PGPMessage(PGPObject, Armorable):
|
||||
@property
|
||||
def message(self):
|
||||
"""The message contents"""
|
||||
if self.type in ['cleartext', 'encrypted']:
|
||||
return self._message
|
||||
if self.type == 'cleartext':
|
||||
return self.bytes_to_text(self._message)
|
||||
|
||||
if self.type == 'literal':
|
||||
return self._message.contents
|
||||
|
||||
if self.type == 'encrypted':
|
||||
return self._message
|
||||
|
||||
@property
|
||||
def signatures(self):
|
||||
"""A ``set`` containing all key ids (if any) which have signed this message."""
|
||||
@@ -741,7 +744,7 @@ class PGPMessage(PGPObject, Armorable):
|
||||
@property
|
||||
def type(self):
|
||||
##TODO: it might be better to use an Enum for the output of this
|
||||
if isinstance(self._message, six.string_types):
|
||||
if isinstance(self._message, (six.string_types, six.binary_type, bytearray)):
|
||||
return 'cleartext'
|
||||
|
||||
if isinstance(self._message, LiteralData):
|
||||
@@ -790,8 +793,8 @@ class PGPMessage(PGPObject, Armorable):
|
||||
return "-----BEGIN PGP SIGNED MESSAGE-----\n" \
|
||||
"Hash: {hashes:s}\n\n" \
|
||||
"{cleartext:s}\n" \
|
||||
"{signature:s}".format(hashes=','.join(s.hash_algorithm.name for s in self.signatures),
|
||||
cleartext=self.dash_escape(self._message),
|
||||
"{signature:s}".format(hashes=','.join(set(s.hash_algorithm.name for s in self.signatures)),
|
||||
cleartext=self.dash_escape(self.bytes_to_text(self._message)),
|
||||
signature=super(PGPMessage, self).__str__())
|
||||
|
||||
return super(PGPMessage, self).__str__()
|
||||
@@ -831,7 +834,12 @@ class PGPMessage(PGPObject, Armorable):
|
||||
self |= pkt
|
||||
return self
|
||||
|
||||
if isinstance(other, (six.string_types, LiteralData, SKEData, IntegrityProtectedSKEData)):
|
||||
if isinstance(other, (six.string_types, six.binary_type, bytearray)):
|
||||
if self._message is None:
|
||||
self._message = self.text_to_bytes(other)
|
||||
return self
|
||||
|
||||
if isinstance(other, (LiteralData, SKEData, IntegrityProtectedSKEData)):
|
||||
if self._message is None:
|
||||
self._message = other
|
||||
return self
|
||||
@@ -899,19 +907,33 @@ class PGPMessage(PGPObject, Armorable):
|
||||
:keyword sensitive: if True, the filename will be set to '_CONSOLE' to signal other OpenPGP clients to treat
|
||||
this message as being 'for your eyes only'. Ignored if cleartext is True.
|
||||
:type sensitive: ``bool``
|
||||
:keyword format: Set the message format identifier. Ignored if cleartext is True.
|
||||
:type format: ``str``
|
||||
:keyword compression: Set the compression algorithm for the new message.
|
||||
Defaults to :py:obj:`CompressionAlgorithm.ZIP`. Ignored if cleartext is True.
|
||||
:keyword encoding: Set the Charset header for the message.
|
||||
:type encoding: ``str`` representing a valid codec in codecs
|
||||
"""
|
||||
# TODO: have 'codecs' above (in :type encoding:) link to python documentation page on codecs
|
||||
cleartext = kwargs.pop('cleartext', False)
|
||||
format = kwargs.pop('format', None)
|
||||
sensitive = kwargs.pop('sensitive', False)
|
||||
compression = kwargs.pop('compression', CompressionAlgorithm.ZIP)
|
||||
file = kwargs.pop('file', False)
|
||||
charset = kwargs.pop('encoding', None)
|
||||
|
||||
filename = ''
|
||||
mtime = datetime.utcnow()
|
||||
|
||||
msg = PGPMessage()
|
||||
|
||||
if charset:
|
||||
msg.charset = charset
|
||||
|
||||
# if format in 'tu' and isinstance(message, (six.binary_type, bytearray)):
|
||||
# # if message format is text or unicode and we got binary data, we'll need to transcode it to UTF-8
|
||||
# message =
|
||||
|
||||
if file and os.path.isfile(message):
|
||||
filename = message
|
||||
message = bytearray(os.path.getsize(filename))
|
||||
@@ -920,20 +942,37 @@ class PGPMessage(PGPObject, Armorable):
|
||||
with open(filename, 'rb') as mf:
|
||||
mf.readinto(message)
|
||||
|
||||
# if format is None, we can try to detect it
|
||||
if format is None:
|
||||
if isinstance(message, six.text_type):
|
||||
# message is definitely UTF-8 already
|
||||
format = 'u'
|
||||
|
||||
elif cls.is_ascii(message):
|
||||
# message is probably text
|
||||
format = 't'
|
||||
|
||||
else:
|
||||
# message is probably binary
|
||||
format = 'b'
|
||||
|
||||
# if message is a binary type and we're building a textual message, we need to transcode the bytes to UTF-8
|
||||
if isinstance(message, (six.binary_type, bytearray)) and (cleartext or format in 'tu'):
|
||||
message = message.decode(charset or 'utf-8')
|
||||
|
||||
if cleartext:
|
||||
# cleartext message
|
||||
msg |= message
|
||||
|
||||
else:
|
||||
# load literal data
|
||||
lit = LiteralData()
|
||||
lit._contents = bytearray(cls.text_to_bytes(message))
|
||||
lit._contents = bytearray(msg.text_to_bytes(message))
|
||||
lit.filename = '_CONSOLE' if sensitive else os.path.basename(filename)
|
||||
lit.mtime = mtime
|
||||
lit.format = 'b'
|
||||
lit.format = format
|
||||
|
||||
if cls.is_ascii(message):
|
||||
lit.format = 't'
|
||||
# if cls.is_ascii(message):
|
||||
# lit.format = 't'
|
||||
|
||||
lit.update_hlen()
|
||||
|
||||
@@ -1046,7 +1085,7 @@ class PGPMessage(PGPObject, Armorable):
|
||||
self |= Packet(data)
|
||||
|
||||
|
||||
class PGPKey(PGPObject, Armorable, ParentRef):
|
||||
class PGPKey(Armorable, ParentRef, PGPObject):
|
||||
"""
|
||||
11.1. Transferable Public Keys
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import abc
|
||||
import base64
|
||||
import binascii
|
||||
import bisect
|
||||
import codecs
|
||||
import collections
|
||||
import operator
|
||||
import os
|
||||
@@ -53,6 +54,14 @@ class Armorable(six.with_metaclass(abc.ABCMeta)):
|
||||
'={crc}\n' \
|
||||
'-----END PGP {block_type}-----\n'
|
||||
|
||||
@property
|
||||
def charset(self):
|
||||
return self.ascii_headers.get('Charset', 'utf-8')
|
||||
|
||||
@charset.setter
|
||||
def charset(self, encoding):
|
||||
self.ascii_headers['Charset'] = codecs.lookup(encoding).name
|
||||
|
||||
@staticmethod
|
||||
def is_ascii(text):
|
||||
if isinstance(text, six.string_types):
|
||||
@@ -191,10 +200,6 @@ class Armorable(six.with_metaclass(abc.ABCMeta)):
|
||||
self.ascii_headers = collections.OrderedDict()
|
||||
self.ascii_headers['Version'] = 'PGPy v' + __version__ # Default value
|
||||
|
||||
@abc.abstractmethod
|
||||
def __bytes__(self):
|
||||
"""This method is too abstract to understand"""
|
||||
|
||||
def __str__(self):
|
||||
payload = base64.b64encode(self.__bytes__()).decode('latin-1')
|
||||
payload = '\n'.join(payload[i:(i + 64)] for i in range(0, len(payload), 64))
|
||||
@@ -274,19 +279,23 @@ class PGPObject(six.with_metaclass(abc.ABCMeta, object)):
|
||||
|
||||
@staticmethod
|
||||
def text_to_bytes(text):
|
||||
bin = bytearray()
|
||||
|
||||
if text is None or isinstance(text, bytearray):
|
||||
if text is None:
|
||||
return text
|
||||
|
||||
for c in iter(ord(c) for c in text):
|
||||
if c < 256:
|
||||
bin.append(c)
|
||||
# if we got bytes, just return it
|
||||
if isinstance(text, (bytearray, six.binary_type)):
|
||||
return text
|
||||
|
||||
else:
|
||||
bin += PGPObject.int_to_bytes(c)
|
||||
# if we were given a unicode string, or if we translated the string into utf-8,
|
||||
# we know that Python already has it in utf-8 encoding, so we can now just encode it to bytes
|
||||
return text.encode('utf-8')
|
||||
|
||||
return bytes(bin)
|
||||
@staticmethod
|
||||
def bytes_to_text(text):
|
||||
if text is None or isinstance(text, six.text_type):
|
||||
return text
|
||||
|
||||
return text.decode('utf-8')
|
||||
|
||||
@abc.abstractmethod
|
||||
def parse(self, packet):
|
||||
|
||||
82
tests/test_01_types.py
Normal file
82
tests/test_01_types.py
Normal file
@@ -0,0 +1,82 @@
|
||||
# coding=utf-8
|
||||
""" test types
|
||||
"""
|
||||
import pytest
|
||||
|
||||
import glob
|
||||
|
||||
from pgpy.types import Armorable, PGPObject
|
||||
|
||||
|
||||
# read txt files in tests/testdata/text/*.txt and yield ids and strings
|
||||
# TODO: figure out how to set ids
|
||||
# @pytest.yield_fixture
|
||||
def get_text():
|
||||
for tf in sorted(glob.glob('tests/testdata/text/*.txt')):
|
||||
with open(tf, 'r') as f:
|
||||
for line in f:
|
||||
# skip comments and blank lines
|
||||
if line.startswith('#') or line == "":
|
||||
continue
|
||||
yield line.split(': ')
|
||||
|
||||
text = {
|
||||
# some basic utf-8 test strings - these should all pass
|
||||
'english': u'The quick brown fox jumped over the lazy dog',
|
||||
# this hiragana pangram comes from http://www.columbia.edu/~fdc/utf8/
|
||||
'hiragana': u'いろはにほへど ちりぬるを\n'
|
||||
u'わがよたれぞ つねならむ\n'
|
||||
u'うゐのおくやま けふこえて\n'
|
||||
u'あさきゆめみじ ゑひもせず',
|
||||
|
||||
'poo': u'Hello, \U0001F4A9!',
|
||||
}
|
||||
|
||||
# some alternate encodings to try
|
||||
# these should fail
|
||||
encoded_text = {
|
||||
# try some alternate encodings as well
|
||||
# 'crunch the granite of science'
|
||||
'cyrillic': u'грызть гранит науки'.encode('iso8859_5'),
|
||||
# 'My hovercraft is full of eels'
|
||||
'cp865': u'Mit luftpudefartøj er fyldt med ål'.encode('cp865'),
|
||||
}
|
||||
|
||||
|
||||
# test harness for pgpy.types.PGPObject, since it defines a couple of abstract methods
|
||||
class FakePGPObject(PGPObject):
|
||||
@classmethod
|
||||
def new(cls, text):
|
||||
obj = FakePGPObject()
|
||||
obj.data = cls.text_to_bytes(text)
|
||||
return obj
|
||||
|
||||
def __init__(self):
|
||||
self.data = bytearray()
|
||||
|
||||
def __bytearray__(self):
|
||||
return bytearray(b'_fake_') + self.data
|
||||
|
||||
def parse(self, packet):
|
||||
self.data = packet
|
||||
|
||||
|
||||
class TestPGPObject(object):
|
||||
params = {
|
||||
'text': [ v for _, v in sorted(text.items()) ],
|
||||
'encoded_text': [ v for _, v in sorted(encoded_text.items()) ],
|
||||
}
|
||||
ids = {
|
||||
'test_text_to_bytes': [ k for k, _ in sorted(text.items()) ],
|
||||
'test_text_to_bytes_encodings': [ k for k, _ in sorted(encoded_text.items()) ],
|
||||
}
|
||||
def test_text_to_bytes(self, text):
|
||||
pgpo = FakePGPObject.new(text)
|
||||
|
||||
assert pgpo.__bytearray__() == bytearray(b'_fake_') + bytearray(text, 'utf-8')
|
||||
|
||||
def test_text_to_bytes_encodings(self, encoded_text):
|
||||
pgpo = FakePGPObject.new(encoded_text)
|
||||
# this should fail
|
||||
with pytest.raises(UnicodeDecodeError):
|
||||
pgpo.data.decode('utf-8')
|
||||
@@ -1,3 +1,4 @@
|
||||
# coding=utf-8
|
||||
""" test doing things with keys/signatures/etc
|
||||
"""
|
||||
import pytest
|
||||
@@ -72,12 +73,13 @@ class TestPGPMessage(object):
|
||||
('message', bytearray(range(256)))],
|
||||
}
|
||||
def test_new(self, comp_alg, write_clean, gpg_print):
|
||||
msg = PGPMessage.new("This is a new message!")
|
||||
msg = PGPMessage.new(u"This is a new message!", compression=comp_alg)
|
||||
|
||||
assert msg.type == 'literal'
|
||||
assert msg.message == "This is a new message!"
|
||||
assert msg._message.format == 't'
|
||||
assert msg.message == u"This is a new message!"
|
||||
assert msg._message.format == 'u'
|
||||
assert msg._message.filename == ''
|
||||
assert msg.is_compressed is bool(comp_alg != CompressionAlgorithm.Uncompressed)
|
||||
|
||||
with write_clean('tests/testdata/cmsg.asc', 'w', str(msg)):
|
||||
assert gpg_print('cmsg.asc') == "This is a new message!"
|
||||
@@ -93,6 +95,29 @@ class TestPGPMessage(object):
|
||||
with write_clean('tests/testdata/csmsg.asc', 'w', str(msg)):
|
||||
assert gpg_print('csmsg.asc') == "This is a sensitive message!"
|
||||
|
||||
def test_new_non_unicode(self, write_clean, gpg_print):
|
||||
# this message text comes from http://www.columbia.edu/~fdc/utf8/
|
||||
text = u'色は匂へど 散りぬるを\n' \
|
||||
u'我が世誰ぞ 常ならむ\n' \
|
||||
u'有為の奥山 今日越えて\n' \
|
||||
u'浅き夢見じ 酔ひもせず\n'
|
||||
msg = PGPMessage.new(text.encode('jisx0213'), encoding='jisx0213')
|
||||
|
||||
assert msg.type == 'literal'
|
||||
assert msg.message == text.encode('jisx0213')
|
||||
|
||||
def test_new_non_unicode_cleartext(self, write_clean, gpg_print):
|
||||
# this message text comes from http://www.columbia.edu/~fdc/utf8/
|
||||
text = u'色は匂へど 散りぬるを\n' \
|
||||
u'我が世誰ぞ 常ならむ\n' \
|
||||
u'有為の奥山 今日越えて\n' \
|
||||
u'浅き夢見じ 酔ひもせず\n'
|
||||
|
||||
msg = PGPMessage.new(text.encode('jisx0213'), cleartext=True, encoding='jisx0213')
|
||||
|
||||
assert msg.type == 'cleartext'
|
||||
assert msg.message == text
|
||||
|
||||
def test_new_from_file(self, file, write_clean, gpg_print):
|
||||
msg = PGPMessage.new(file, file=True)
|
||||
|
||||
@@ -106,7 +131,10 @@ class TestPGPMessage(object):
|
||||
assert val == expected
|
||||
|
||||
with write_clean('tests/testdata/cmsg.asc', 'w', str(msg)):
|
||||
assert gpg_print('cmsg.asc') == msg.message
|
||||
out = gpg_print('cmsg.asc')
|
||||
if msg._message.format == 'b':
|
||||
out = out.encode('latin-1')
|
||||
assert out == msg.message
|
||||
|
||||
def test_decrypt_passphrase_message(self, enc_msg):
|
||||
decmsg = enc_msg.decrypt("QwertyUiop")
|
||||
|
||||
@@ -6,4 +6,4 @@ cjLzUrk64lgYhDkY2FiZQNIMXJwCMO31rxgZ+tW/zesUPxWzdKWrtLGW/LkP5rXL
|
||||
V/Yvnr/EKjBbQuvZSYa/klsum6XFmTze+maVgclT6Rc6hzqqxNy6o6qdTTmLJuvp
|
||||
AQA=
|
||||
=GDv4
|
||||
-----END PGP MESSAGE----
|
||||
-----END PGP MESSAGE-----
|
||||
|
||||
Reference in New Issue
Block a user