123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670 |
- import os
- import argparse
- import csv
- import sys
- import datetime
- import shutil
- import subprocess
- from config import BASE_PATH, SOURCE_PATH, DL_PATH, DL_LOG
- import rewise
- from dl import downloadFile
- from sources import InstallerSourceUniqIt, InstallerSourceAllUniqIt
- from installers import InstallerStats, installerStatsCsvIt, acceptableInstallerIt
- KiB = 1024
- MiB = KiB ** 2
- GiB = KiB ** 3
- TiB = KiB ** 4
- def formatFileSize(fileSize):
- """
- @param fileSize: File size in bytes to format to string.
- @type fileSize: uint
- """
- sizeStr = ""
- if fileSize > TiB: # TiB
- sizeStr = f"{fileSize / TiB:.2f} TB"
- elif fileSize > GiB: # GiB
- sizeStr = f"{fileSize / GiB:.2f} GB"
- elif fileSize > MiB: # MiB
- sizeStr = f"{fileSize / MiB:.2f} MB"
- elif fileSize > KiB: # KiB
- sizeStr = f"{fileSize / KiB:.2f} KB"
- else:
- sizeStr = f"{fileSize} Bytes"
- return sizeStr
- def getMd5Sum(file):
- try:
- output = subprocess.check_output(["md5sum", file])
- except subprocess.CalledProcessError as err:
- return ""
- else:
- return output.decode('utf-8').split(" ")[0]
- def getB3Sum(file):
- try:
- output = subprocess.check_output(["b3sum", file])
- except subprocess.CalledProcessError as err:
- return ""
- else:
- return output.decode('utf-8').split(" ")[0]
- ESC = "\x1B"
- CSI = "\x9B"
- def clearLine(file=sys.stdout):
- print(f"{ESC}[0G", end="", file=file) # go to line begin
- print(f"{ESC}[0K", end="", file=file) # clear line
- def saveDownloadLog(logs):
- if not logs:
- return
- with open(DL_LOG, "w+", newline="") as fp:
- writer = csv.writer(fp, dialect="unix", quoting=csv.QUOTE_STRINGS)
- for row in logs:
- writer.writerow(row)
- def addErrorLog(logs, errorType, filepath, message=""):
- logs.append([datetime.datetime.now(), errorType, filepath, message])
- DOWNLOAD_ARGS = [
- (["--simulate"], {"dest":"simulate", "action":"store_true", "help":"Don't download files, just check missing and corrupt."}),
- (["--skip-sum"], {"dest":"skip_sum", "action":"store_true", "help":"Skip Blake 3 sum check for already present files."}),
- (None, {"dest":"IN_SOURCE", "type":str, "help":"The installer source .csv"})
- ]
- def download(namespace):
- sourceFile = namespace.IN_SOURCE
- simulate = namespace.simulate
- skipsum = namespace.skip_sum
- sources = list(InstallerSourceUniqIt(sourceFile))
- total = len(sources)
- totalSize = 0
- toDlSize = 0
- current = 1
- errors = 0
- log = []
- # Get total sizes
- for source in sources:
- if os.path.isfile(source.filepath):
- if os.path.getsize(source.filepath) != source.filesize:
- toDlSize += source.filesize
- else:
- toDlSize += source.filesize
- totalSize += source.filesize
- # Check free disc space
- if shutil.disk_usage(DL_PATH).free < toDlSize:
- print(f"Not enough free space at {DL_PATH}; at least {formatFileSize(toDlSize)} is needed.")
- return
- for source in sources:
- clearLine()
- print(f"[{current:4d} / {total:4d}] [Errors: {errors}] {source.b3} {source.getFilename()}", end="", flush=True)
- localfile = source.filepath
- localpath = os.path.dirname(localfile)
- needDl = False
- if os.path.isfile(localfile):
- if os.path.getsize(localfile) != source.filesize:
- addErrorLog(log, "LOCAL_SIZE", source.filepath)
- errors += 1
- current += 1
- continue
- if not skipsum:
- b3 = getB3Sum(localfile)
- if b3 == source.b3:
- current += 1
- continue
- else:
- addErrorLog(log, "LOCAL_SUM", source.filepath)
- errors += 1
- current += 1
- continue
- else:
- current += 1
- continue
- else:
- needDl = True
- if simulate:
- current += 1
- continue
- dlerror = downloadFile(source.dlUrl, localfile)
- if dlerror:
- # errors should have been printed..
- if os.path.exists(localfile):
- os.remove(localfile)
- addErrorLog(log, "DL", source.filepath, dlerror)
- continue
- b3 = getB3Sum(localfile)
- if b3 != source.b3:
- addErrorLog(log, "DL_SUM", source.filepath)
- continue
- current += 1
- print()
- saveDownloadLog(log)
- print("Total size:", formatFileSize(totalSize))
- INIT_ARGS = [
- (None, {"dest":"IN_SOURCE", "type":str, "help":"The installer source .csv"}),
- (None, {"dest":"OUT_RESULT", "type":str, "help":"The output result .csv"})
- ]
- def init(namespace):
- sourceFile = namespace.IN_SOURCE
- resultFile = namespace.OUT_RESULT
- with open(resultFile, "w", newline="") as fp:
- writer = csv.writer(fp, dialect="unix")
- # Write the header
- writer.writerow(InstallerStats.columns())
- sources = list(InstallerSourceUniqIt(sourceFile))
- total = len(sources)
- current = 1
- errors = 0
- for source in sources:
- installer = InstallerStats(source)
- clearLine()
- print(f"[{current:4d} / {total:4d}] [Errors: {errors:4d}] {installer.filename}", end="", flush=True)
- installer.fullTest()
- # analyze errors
- if installer.devStatus != "OK":
- installer.generateComments()
- errors += 1
- writer.writerow(installer.asRow())
- current += 1
- print()
- UPDATE_ARGS = [
- (None, {"dest":"IN_SOURCE", "type":str, "help":"The installer source .csv"}),
- (None, {"dest":"OUT_RESULT", "type":str, "help":"The output result .csv"}),
- (["--only-new"], {"dest":"only_new", "action":"store_true", "help":"Only run tests on new found installers"})
- ]
- def update(namespace):
- sourceFile = namespace.IN_SOURCE
- resultFile = namespace.OUT_RESULT
- only_new = namespace.only_new
- new = [installer for installer in installerStatsCsvIt(resultFile)]
- oldmd5 = [installer.md5 for installer in new] # new is still old here :')
- for source in InstallerSourceUniqIt(sourceFile):
- if source.md5 in oldmd5:
- continue
- new.append(InstallerStats(source))
- new.sort()
- with open(resultFile, "w", newline="") as fp:
- writer = csv.writer(fp, dialect="unix")
- # Write the header
- writer.writerow(InstallerStats.columns())
- total = len(new)
- current = 1
- for installer in new:
- if only_new:
- if installer.md5 not in oldmd5:
- print(f"[{current:4d} / {total:4d}] NEW: {installer.filepath}")
- installer.fullTest()
- installer.generateComments()
- else:
- clearLine()
- print(f"[{current:4d} / {total:4d}] {installer.filepath}", end="", flush=True)
- installer.testDevVerify()
- installer.testDevList()
- installer.testDevRaw()
- if installer.md5 not in oldmd5:
- installer.fullTest()
- installer.generateComments()
- writer.writerow(installer.asRow())
- current += 1
- print()
- CREATE_DIFF_ARGS = [
- (None, {"dest":"IN_RESULT", "type":str, "help":"The input result .csv"}),
- (None, {"dest":"OUT_RESULT", "type":str, "help":"The output result .csv"})
- ]
- def create_diff(namespace):
- oldResultFile = namespace.IN_RESULT
- resultFile = namespace.OUT_RESULT
- old = [installer for installer in installerStatsCsvIt(oldResultFile)]
- with open(resultFile, "w", newline="") as fp:
- writer = csv.writer(fp, dialect="unix")
- # Write the header
- writer.writerow(InstallerStats.columns())
- total = len(old)
- current = 1
- errors = 0
- for installer in old:
- clearLine()
- print(f"[{current:4d} / {total:4d}] [Err {errors:4d}] {installer.filepath}", end="", flush=True)
- installer.testDevVerify()
- installer.testDevList()
- installer.testDevRaw()
- if installer.devStatus != "OK":
- installer.generateComments()
- errors += 1
- writer.writerow(installer.asRow())
- current += 1
- print()
- FILTER_INVALID_ARGS = [
- (None, {"dest":"IN_RESULT", "type":str, "help":"The input result .csv"}),
- (None, {"dest":"OUT_RESULT", "type":str, "help":"The output result .csv"})
- ]
- def filter_invalid(namespace):
- resultFile = namespace.IN_RESULT
- newResultFile = namespace.OUT_RESULT
- with open(newResultFile, "w", newline="") as fp:
- writer = csv.writer(fp, dialect="unix")
- # Write the header
- writer.writerow(InstallerStats.columns())
- for installer in acceptableInstallerIt(resultFile):
- writer.writerow(installer.asRow())
- PRINT_DIFF_ARGS = [
- (None, {"dest":"IN_RESULT1", "type":str, "help":"Old result to compare"}),
- (None, {"dest":"IN_RESULT2", "type":str, "help":"New result to compare"})
- ]
- def print_diff(namespace):
- resultFileOld = namespace.IN_RESULT1
- resultFileNew = namespace.IN_RESULT2
- # - new
- # - fixed verify
- # - broke verify
- old = [installer for installer in installerStatsCsvIt(resultFileOld)]
- totalDevVerifyOk = 0
- totalDevVerifyError = 0
- verifyFixed = []
- verifyBroke = []
- for newInst in installerStatsCsvIt(resultFileNew):
- for oldInst in old:
- if newInst.md5 != oldInst.md5:
- continue
- if newInst.devStatus != oldInst.devStatus:
- if oldInst.devStatus == "OK":
- verifyBroke.append((oldInst.b3, oldInst.peBuild, oldInst.filename))
- elif newInst.devStatus == "OK":
- verifyFixed.append((oldInst.b3, oldInst.peBuild, oldInst.filename))
- if newInst.devStatus == "OK":
- totalDevVerifyOk += 1
- else:
- totalDevVerifyError += 1
- break
- print(f"Fixed ({len(verifyFixed)})")
- print("--------------------------------")
- for fixed in verifyFixed:
- print(fixed[0], fixed[2], end="\n")
- print()
- print(f"Broke ({len(verifyBroke)})")
- print("--------------------------------")
- for broke in verifyBroke:
- print(broke[0], broke[2], end="\n")
- PRINT_STATS_ARGS = [
- (None, {"dest":"IN_RESULT", "type":str, "help":"Result file"})
- ]
- def print_stats(namespace):
- resultFile = namespace.IN_RESULT
- BUILDDATE_VERSION_MAP = {
- "1998-11-09 21:17:09": "InstallMaster 7",
- "2000-04-25 16:37:12": "InstallMaster 8",
- "2001-10-25 21:47:11": "Installation System 9"
- }
- BuildStats = {}
- TotalOk = 0
- TotalErr = 0
- ListTotalOk = 0
- ListTotalErr = 0
- RawTotalOk = 0
- RawTotalErr = 0
- R02TotalOk = 0
- R02TotalErr = 0
- pkOk = 0
- pkError = 0
- pkRawOk = 0
- pkRawError = 0
- # Find installers that work on v0.2.0 but not on dev
- workingOn02NotOnDev = []
- errorWoComment = 0
- installers = list(installerStatsCsvIt(resultFile))
- for installer in installers:
- if installer.peBuild not in BuildStats:
- BuildStats.update({
- installer.peBuild: {
- "error": 0,
- "ok": 0,
- "listError": 0,
- "listOk": 0,
- "rawError": 0,
- "rawOk": 0,
- "r02Ok": 0,
- "r02Error": 0
- }
- })
- if installer.devStatus != "OK":
- BuildStats[installer.peBuild]["error"] += 1
- else:
- BuildStats[installer.peBuild]["ok"] += 1
- if installer.devRawStatus != "OK":
- BuildStats[installer.peBuild]["rawError"] += 1
- else:
- BuildStats[installer.peBuild]["rawOk"] += 1
- if installer.devListStatus != "OK":
- BuildStats[installer.peBuild]["listError"] += 1
- else:
- BuildStats[installer.peBuild]["listOk"] += 1
- if installer.rew02Status != "OK":
- BuildStats[installer.peBuild]["r02Error"] += 1
- else:
- BuildStats[installer.peBuild]["r02Ok"] += 1
- # More stats
- CommentStats = {}
- VerifyErrorStats = {}
- for installer in installers:
- if installer.devStatus != "OK":
- TotalErr += 1
- if not installer.comment:
- errorWoComment += 1
- if installer.isPk:
- pkError += 1
- comments = installer.comment.split("\n---\n")
- comment = comments[-1]
- if comment:
- if comment not in CommentStats:
- CommentStats.update({comment:0})
- CommentStats[comment] += 1
- error = installer.devError.rstrip().split("\n")[-1]
- # strip unique stuff
- if "ERROR: Failed seek to file offset" in error:
- error = "ERROR: Failed seek to file offset"
- elif "ERROR: parseWiseScript unknown OP" in error:
- error = error[:37]
- error = f"{installer.devStatus.ljust(18)} {error}"
- if error not in VerifyErrorStats:
- VerifyErrorStats.update({error: 0})
- VerifyErrorStats[error] += 1
- else:
- TotalOk += 1
- if installer.isPk:
- pkOk += 1
- if installer.devRawStatus != "OK":
- RawTotalErr += 1
- if installer.isPk:
- pkRawError += 1
- else:
- RawTotalOk += 1
- if installer.isPk:
- pkRawOk += 1
- if installer.devListStatus != "OK":
- ListTotalErr += 1
- else:
- ListTotalOk += 1
- if installer.rew02Status != "OK":
- R02TotalErr += 1
- else:
- R02TotalOk += 1
- if installer.devStatus != "OK":
- workingOn02NotOnDev.append(installer.filepath)
- # Print stuff
- print("| DATE | OK | ERR | LOK | LERR | ROK | RERR | v02 | v02E | VERSION")
- print("| :------------------ | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | :------")
- for build in sorted(list(BuildStats.keys())):
- stats = BuildStats[build]
- d = datetime.datetime.fromtimestamp(int(build)).strftime('%Y-%m-%d %H:%M:%S')
- if d == "1970-01-01 01:00:00":
- print("| NE ", end="")
- else:
- #d = build
- print(f"| {d}", end="")
- print(f" | {stats['ok']:4d}", end="")
- print(f" | {stats['error']:4d}", end="")
- print(f" | {stats['listOk']:4d}", end="")
- print(f" | {stats['listError']:4d}", end="")
- print(f" | {stats['rawOk']:4d}", end="")
- print(f" | {stats['rawError']:4d}", end="")
- print(f" | {stats['r02Ok']:4d}", end="")
- print(f" | {stats['r02Error']:4d}", end="")
- print(" | ", end="")
- if d in BUILDDATE_VERSION_MAP:
- print(BUILDDATE_VERSION_MAP[d])
- else:
- print(f"Unknown")
- print("| ------------------- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | -------")
- print(f"| Total: {TotalOk + TotalErr:12d}", end="")
- print(f" | {TotalOk:4d}", end="")
- print(f" | {TotalErr:4d}", end="")
- print(f" | {ListTotalOk:4d}", end="")
- print(f" | {ListTotalErr:4d}", end="")
- print(f" | {RawTotalOk:4d}", end="")
- print(f" | {RawTotalErr:4d}", end="")
- print(f" | {R02TotalOk:4d}", end="")
- print(f" | {R02TotalErr:4d}", end="")
- print(" | ")
- print()
- print("PK Total :", pkOk + pkError)
- print("PK OK :", pkOk)
- print("PK ERROR :", pkError)
- print("PK Raw OK :", pkRawOk)
- print("PK Raw ERROR:", pkRawError)
- print()
- print("Comments:")
- print(f"{errorWoComment:3d} Without comment")
- for comment, count in sorted(CommentStats.items(), key=lambda x: x[1], reverse=True):
- print(f"{count:3d} {comment}")
- print()
- print("Verify errors:")
- for error, count in sorted(VerifyErrorStats.items(), key=lambda x: x[1], reverse=True):
- print(f"{count:3d} {error}")
- if workingOn02NotOnDev:
- print()
- print("These installers work on v0.2.0 but not on dev:")
- for filepath in workingOn02NotOnDev:
- print(" -", filepath)
- UPDATE_COMMENTS_ARGS = [
- (None, {"dest":"IN_OUT_RESULT", "type":str, "help":"Result file"}),
- ]
- def update_comments(namespace):
- resultFile = namespace.IN_OUT_RESULT
- installers = [installer for installer in installerStatsCsvIt(resultFile)]
- with open(resultFile, "w", newline="") as fp:
- writer = csv.writer(fp, dialect="unix")
- # Write the header
- writer.writerow(InstallerStats.columns())
- for installer in installers:
- if installer.devStatus != "OK":
- installer.clearComments()
- installer.gatherInfo()
- installer.generateComments()
- writer.writerow(installer.asRow())
- RESULT_TO_SOURCE_ARGS = [
- (None, {"dest":"IN_RESULT", "type":str, "help":"Result file"}),
- (None, {"dest":"OUT_SOURCE", "type":str, "help":"New source file"}),
- ]
- def result_to_source(namespace):
- resultFile = namespace.IN_RESULT
- sourceFile = namespace.OUT_SOURCE
- with open(sourceFile, "w", newline="") as fp:
- writer = csv.writer(fp, dialect="unix")
- for installer in installerStatsCsvIt(resultFile):
- writer.writerow([installer.md5, installer.filepath, installer.dlUrl])
- def add_parser(subparsers, operation, args, func, help=""):
- parser = subparsers.add_parser(operation, help=help)
- for arg in args:
- args, kwargs = arg
- if args is None:
- parser.add_argument(**kwargs)
- else:
- parser.add_argument(*args, **kwargs)
- parser.set_defaults(func=func)
- return parser
- def indent(s, spaces):
- newstr = ""
- istr = ''.ljust(spaces)
- for line in s.split("\n")[2:-1]: # TODO [2:]
- newstr += f"{istr}{line}\n"
- return newstr
- def print_full_help(parser, subparsers):
- parser.print_help(sys.stderr)
- print()
- print("---------\n")
- for p in parsers:
- print(f"{p.prog.split(' ')[-1]}:")
- print(indent(p.format_help(), 2))
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- subparsers = parser.add_subparsers(title="COMMANDS",
- description=("Note: use -h after a command to get more help on that "
- "command"))
- parsers = [
- add_parser(subparsers, "download", DOWNLOAD_ARGS, download,
- help=("Download installers from the given SOURCE file "
- "when needed")),
- add_parser(subparsers, "init", INIT_ARGS, init,
- help=("Run initial test, the output "
- "will be written to the given RESULT file. The "
- "initial test does include testing against REWise "
- "v0.2")),
- add_parser(subparsers, "update", UPDATE_ARGS, update,
- help=("Add new installers from given SOURCE and re-run "
- "tests for all installers, excluding the REWise "
- "v0.2 tests. To only add new and run tests for "
- "new installers give the '--only-new' argument.")),
- add_parser(subparsers, "create-diff", CREATE_DIFF_ARGS, create_diff,
- help=("Re-run tests of first given RESULT, ouput to "
- "second given RESULT file.")),
- add_parser(subparsers, "filter-invalid", FILTER_INVALID_ARGS, filter_invalid,
- help=("Filter out results that appear to be invalid "
- "files / not Wise installers.")),
- add_parser(subparsers, "update-comments", UPDATE_COMMENTS_ARGS, update_comments,
- help="Update comments for given RESULT file."),
- add_parser(subparsers, "print-diff", PRINT_DIFF_ARGS, print_diff,
- help=("Compare two RESULT files and print the installers "
- "that it fixed/broke.")),
- add_parser(subparsers, "print-stats", PRINT_STATS_ARGS, print_stats,
- help="Print some stats for the given RESULT file."),
- add_parser(subparsers, "result-to-source", RESULT_TO_SOURCE_ARGS,
- result_to_source,
- help="Convert RESULT file into SOURCE file.")
- ]
- args = parser.parse_args()
- if "func" not in args:
- parser.print_help(sys.stderr)
- #print_full_help(parser, parsers)
- sys.exit(1)
- args.func(args)
|