genericised test_load_paul_pubring intto test_load_asc_bench, although this script requires python 3.4

[skip ci]
This commit is contained in:
Michael Greene
2014-08-12 13:57:32 -07:00
parent edebb959f5
commit 6ce5ee95d1

View File

@@ -1,10 +1,10 @@
#!/usr/bin/env python
import asyncio
import bisect
import collections
import itertools
import os
import sys
import time
from progressbar import ProgressBar, AnimatedMarker, Timer, Bar, Percentage, Widget
@@ -12,7 +12,36 @@ import pgpy
from pgpy.packet import Packet
from pgpy.types import Exportable
pubring = '/Users/magreene/paul_pubring.asc'
ascfiles = [ os.path.abspath(os.path.expanduser(f)) for f in sys.argv[1:] if os.path.exists(os.path.abspath(os.path.expanduser(f))) ]
if len(ascfiles) == 0:
sys.stderr.write("Please specify one or more ASCII-armored files to load\n")
sys.exit(-1)
for a in [ os.path.abspath(os.path.expanduser(a)) for a in sys.argv[1:] if a not in ascfiles ]:
sys.stderr.write("Error: {} does not exist\n".write())
class Mebibyte(int):
iec = {1: 'B',
1024: 'KiB',
1024**2: 'MiB',
1024**3: 'GiB',
1024**4: 'TiB',
1024**5: 'PiB',
1024**6: 'EiB',
1024**7: 'ZiB',
1024**8: 'YiB'}
iecl = [1, 1024, 1024**2, 1024**3, 1024**4, 1024**5, 1024**6, 1024**7, 1024**8]
# custom format class for human readable IEC byte formatting
def __format__(self, spec):
# automatically format based on size
iiec = max(0, min(bisect.bisect_right(self.iecl, int(self)), len(self.iecl)))
ieck = self.iecl[iiec - 1]
return '{:,.2f} {:s}'.format(int(self) / ieck, self.iec[ieck])
@asyncio.coroutine
def _dospinner(pbar):
@@ -24,12 +53,9 @@ def _dospinner(pbar):
print("")
break
pbar1 = ProgressBar(widgets=["Reading {} ({:,} bytes): ".format(pubring, os.path.getsize(pubring)), AnimatedMarker()])
pbar2 = ProgressBar(widgets=["Unarmoring data: ", AnimatedMarker()])
@asyncio.coroutine
def _load_pubring(future):
with open(pubring, 'r') as ppr:
def _load_pubring(ascfile, future):
with open(ascfile, 'r') as ppr:
a = yield from asyncio.get_event_loop().run_in_executor(None, ppr.read)
future.set_result(a)
@@ -38,38 +64,56 @@ def _unarmor(a, future):
b = yield from asyncio.get_event_loop().run_in_executor(None, pgpy.types.Exportable.ascii_unarmor, a)
future.set_result(b)
_b = bytearray()
loop = asyncio.get_event_loop()
for ascfile in ascfiles:
ascfile = os.path.abspath(ascfile)
if not os.path.isfile(ascfile):
sys.stderr.write('Error: {} does not exist'.format(ascfile))
continue
a = asyncio.Future()
b = asyncio.Future()
load_bar = ProgressBar(widgets=["Reading {} ({}): ".format(ascfile, Mebibyte(os.path.getsize(ascfile))), AnimatedMarker()])
unarmor_bar = ProgressBar(widgets=["Unarmoring data: ", AnimatedMarker()])
prog = asyncio.Task(_dospinner(pbar1))
asyncio.Task(_load_pubring(a))
loop.run_until_complete(a)
_a = a.result()
prog.cancel()
prog = asyncio.Task(_dospinner(pbar2))
asyncio.Task(_unarmor(_a, b))
loop.run_until_complete(b)
_b = b.result()['body']
prog.cancel()
a = asyncio.Future()
b = asyncio.Future()
lbp = asyncio.Task(_dospinner(load_bar))
asyncio.Task(_load_pubring(ascfile, a))
loop.run_until_complete(a)
_a = a.result()
lbp.cancel()
uap = asyncio.Task(_dospinner(unarmor_bar))
asyncio.Task(_unarmor(_a, b))
loop.run_until_complete(b)
_b += b.result()['body']
uap.cancel()
loop.stop()
print("")
print("\n")
packets = []
_mv = len(_b)
class BetterCounter(Widget):
def __init__(self, pktlist, format='{:,}'):
self.pktlist = pktlist
def __init__(self, pktlist, iec=False, format='{:,}'):
self.list = pktlist
self.iec = iec
self.format = format
def update(self, pbar):
return self.format.format(len(self.pktlist))
if self.iec:
return self.format.format(Mebibyte(len(self.list)))
return self.format.format(len(self.list))
pb3w = [BetterCounter(packets, '{:,} pkts'), '|', BetterCounter(_b, '{:,}B. rem'), '|', Timer("%s"), '|', Percentage(), Bar()]
pb3w = [BetterCounter(packets, False, '{:,} pkts'), '|', BetterCounter(_b, True, '{:,} rem.'), '|', Timer("%s"), '|', Percentage(), Bar()]
pbar3 = ProgressBar(maxval=_mv, widgets=pb3w).start()
while len(_b) > 0: