diff --git a/Tests/scs-compliance-check.py b/Tests/scs-compliance-check.py index d9125af04..48dabae58 100755 --- a/Tests/scs-compliance-check.py +++ b/Tests/scs-compliance-check.py @@ -30,6 +30,7 @@ import datetime import subprocess import copy +from functools import partial from itertools import chain import yaml @@ -73,7 +74,7 @@ def add_search_path(arg0): # os.environ['PATH'] += ":" + MYPATH -def run_check_tool(executable, args, os_cloud, verbose=False, quiet=False): +def run_check_tool(executable, args, env): "Run executable and return exit code" if executable.startswith("http://") or executable.startswith("https://"): print(f"ERROR: remote check_tool {executable} not yet supported", file=sys.stderr) @@ -93,16 +94,10 @@ def run_check_tool(executable, args, os_cloud, verbose=False, quiet=False): exe.extend(shlex.split(args)) # print(f"{exe}") # compl = subprocess.run(exe, capture_output=True, text=True, check=False) - env = {'OS_CLOUD': os_cloud, **os.environ} - compl = subprocess.run( + return subprocess.run( exe, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='UTF-8', check=False, env=env, ) - if verbose: - print(compl.stdout) - if not quiet: - print(compl.stderr, file=sys.stderr) - return compl def errcode_to_text(err): @@ -184,6 +179,10 @@ def check_keywords(d, ctx, valid=()): print(f"ERROR in spec: {ctx} uses unknown keywords: {','.join(invalid)}", file=sys.stderr) +def suppress(*args, **kwargs): + return + + def main(argv): """Entry point for the checker""" config = Config() @@ -191,6 +190,9 @@ def main(argv): if not config.os_cloud: print("You need to have OS_CLOUD set or pass --os-cloud=CLOUD.", file=sys.stderr) return 1 + check_env = {'OS_CLOUD': config.os_cloud, **os.environ} + printv = suppress if not config.verbose else partial(print, file=sys.stderr) + printnq = suppress if config.quiet else partial(print, file=sys.stderr) with open(config.arg0, "r", encoding="UTF-8") as specfile: specdict = yaml.load(specfile, Loader=yaml.SafeLoader) allaborts = 0 @@ -216,15 +218,9 @@ def main(argv): futuristic = not stb_date or config.checkdate < stb_date outdated = obs_date and obs_date < config.checkdate vd.update({ - "status": "n/a", + "status": outdated and "outdated" or futuristic and "preview" or "valid", "passed": False, }) - if outdated: - vd["status"] = "outdated" - elif futuristic: - vd["status"] = "preview" - else: - vd["status"] = "valid" if outdated and not config.version: continue matches += 1 @@ -232,24 +228,21 @@ def main(argv): print(f"WARNING: Forced version {config.version} outdated", file=sys.stderr) if config.version and futuristic: print(f"INFO: Forced version {config.version} not (yet) stable", file=sys.stderr) - if not config.quiet: - print(f"Testing {specdict['name']} version {vd['version']}") + printnq(f"Testing {specdict['name']} version {vd['version']}") if "standards" not in vd: print(f"WARNING: No standards defined yet for {specdict['name']} version {vd['version']}", file=sys.stderr) errors = 0 aborts = 0 - for standard in vd["standards"]: + for standard in vd.get("standards", ()): check_keywords(standard, 'standard', KEYWORDS_STANDARD) optional = condition_optional(standard) - if not config.quiet: - print("*******************************************************") - print(f"Testing {'optional ' * optional}standard {standard['name']} ...") - print(f"Reference: {standard['url']} ...") + printnq("*******************************************************") + printnq(f"Testing {'optional ' * optional}standard {standard['name']} ...") + printnq(f"Reference: {standard['url']} ...") if "check_tools" not in standard: - print(f"WARNING: No compliance check tool implemented yet for {standard['name']}") - continue - for check in standard["check_tools"]: + printnq(f"WARNING: No check tool specified for {standard['name']}", file=sys.stderr) + for check in standard.get("check_tools", ()): check_keywords(check, 'checktool', KEYWORDS_CHECKTOOL) if check.get("classification", "light") not in config.classes: print(f"skipping check tool '{check['executable']}' because of resource classification") @@ -258,7 +251,9 @@ def main(argv): memo_key = f"{check['executable']} {args}".strip() invokation = memo.get(memo_key) if invokation is None: - compl = run_check_tool(check["executable"], args, config.os_cloud, config.verbose, config.quiet) + compl = run_check_tool(check["executable"], args, check_env) + printv(compl.stdout) + printnq(compl.stderr) invokation = { "rc": compl.returncode, "stdout": compl.stdout.splitlines(), @@ -273,8 +268,7 @@ def main(argv): memo[memo_key] = invokation abort = invokation["critical"] error = invokation["error"] - if not config.quiet: - print(f"... returned {error} errors") + printnq(f"... returned {error} errors, {abort} aborts") check["aborts"] = abort check["errors"] = error if not condition_optional(check, optional): @@ -283,10 +277,9 @@ def main(argv): vd["aborts"] = errors vd["errors"] = errors vd["passed"] = not (aborts + errors) - if not config.quiet: - print("*******************************************************") - print(f"Verdict for os_cloud {config.os_cloud}, {specdict['name']}, " - f"version {vd['version']}: {errcode_to_text(aborts + errors)}") + printnq("*******************************************************") + printnq(f"Verdict for os_cloud {config.os_cloud}, {specdict['name']}, " + f"version {vd['version']}: {errcode_to_text(aborts + errors)}") allaborts += aborts allerrors += errors if not matches: