Skip to content

Commit

Permalink
Refactoring: various simplifications
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Büchse <matthias.buechse@cloudandheat.com>
  • Loading branch information
mbuechse committed Nov 15, 2023
1 parent f4e331f commit e4183e7
Showing 1 changed file with 25 additions and 32 deletions.
57 changes: 25 additions & 32 deletions Tests/scs-compliance-check.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import datetime
import subprocess
import copy
from functools import partial
from itertools import chain
import yaml

Expand Down Expand Up @@ -73,7 +74,7 @@ def add_search_path(arg0):
# os.environ['PATH'] += ":" + MYPATH


def run_check_tool(executable, args, os_cloud, verbose=False, quiet=False):
def run_check_tool(executable, args, env):
"Run executable and return exit code"
if executable.startswith("http://") or executable.startswith("https://"):
print(f"ERROR: remote check_tool {executable} not yet supported", file=sys.stderr)
Expand All @@ -93,16 +94,10 @@ def run_check_tool(executable, args, os_cloud, verbose=False, quiet=False):
exe.extend(shlex.split(args))
# print(f"{exe}")
# compl = subprocess.run(exe, capture_output=True, text=True, check=False)
env = {'OS_CLOUD': os_cloud, **os.environ}
compl = subprocess.run(
return subprocess.run(
exe, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
encoding='UTF-8', check=False, env=env,
)
if verbose:
print(compl.stdout)
if not quiet:
print(compl.stderr, file=sys.stderr)
return compl


def errcode_to_text(err):
Expand Down Expand Up @@ -184,13 +179,20 @@ def check_keywords(d, ctx, valid=()):
print(f"ERROR in spec: {ctx} uses unknown keywords: {','.join(invalid)}", file=sys.stderr)


def suppress(*args, **kwargs):
return


def main(argv):
"""Entry point for the checker"""
config = Config()
config.apply_argv(argv)
if not config.os_cloud:
print("You need to have OS_CLOUD set or pass --os-cloud=CLOUD.", file=sys.stderr)
return 1
check_env = {'OS_CLOUD': config.os_cloud, **os.environ}
printv = suppress if not config.verbose else partial(print, file=sys.stderr)
printnq = suppress if config.quiet else partial(print, file=sys.stderr)
with open(config.arg0, "r", encoding="UTF-8") as specfile:
specdict = yaml.load(specfile, Loader=yaml.SafeLoader)
allaborts = 0
Expand All @@ -216,40 +218,31 @@ def main(argv):
futuristic = not stb_date or config.checkdate < stb_date
outdated = obs_date and obs_date < config.checkdate
vd.update({
"status": "n/a",
"status": outdated and "outdated" or futuristic and "preview" or "valid",
"passed": False,
})
if outdated:
vd["status"] = "outdated"
elif futuristic:
vd["status"] = "preview"
else:
vd["status"] = "valid"
if outdated and not config.version:
continue
matches += 1
if config.version and outdated:
print(f"WARNING: Forced version {config.version} outdated", file=sys.stderr)
if config.version and futuristic:
print(f"INFO: Forced version {config.version} not (yet) stable", file=sys.stderr)
if not config.quiet:
print(f"Testing {specdict['name']} version {vd['version']}")
printnq(f"Testing {specdict['name']} version {vd['version']}")
if "standards" not in vd:
print(f"WARNING: No standards defined yet for {specdict['name']} version {vd['version']}",
file=sys.stderr)
errors = 0
aborts = 0
for standard in vd["standards"]:
for standard in vd.get("standards", ()):
check_keywords(standard, 'standard', KEYWORDS_STANDARD)
optional = condition_optional(standard)
if not config.quiet:
print("*******************************************************")
print(f"Testing {'optional ' * optional}standard {standard['name']} ...")
print(f"Reference: {standard['url']} ...")
printnq("*******************************************************")
printnq(f"Testing {'optional ' * optional}standard {standard['name']} ...")
printnq(f"Reference: {standard['url']} ...")
if "check_tools" not in standard:
print(f"WARNING: No compliance check tool implemented yet for {standard['name']}")
continue
for check in standard["check_tools"]:
printnq(f"WARNING: No check tool specified for {standard['name']}", file=sys.stderr)
for check in standard.get("check_tools", ()):
check_keywords(check, 'checktool', KEYWORDS_CHECKTOOL)
if check.get("classification", "light") not in config.classes:
print(f"skipping check tool '{check['executable']}' because of resource classification")
Expand All @@ -258,7 +251,9 @@ def main(argv):
memo_key = f"{check['executable']} {args}".strip()
invokation = memo.get(memo_key)
if invokation is None:
compl = run_check_tool(check["executable"], args, config.os_cloud, config.verbose, config.quiet)
compl = run_check_tool(check["executable"], args, check_env)
printv(compl.stdout)
printnq(compl.stderr)
invokation = {
"rc": compl.returncode,
"stdout": compl.stdout.splitlines(),
Expand All @@ -273,8 +268,7 @@ def main(argv):
memo[memo_key] = invokation
abort = invokation["critical"]
error = invokation["error"]
if not config.quiet:
print(f"... returned {error} errors")
printnq(f"... returned {error} errors, {abort} aborts")
check["aborts"] = abort
check["errors"] = error
if not condition_optional(check, optional):
Expand All @@ -283,10 +277,9 @@ def main(argv):
vd["aborts"] = errors
vd["errors"] = errors
vd["passed"] = not (aborts + errors)
if not config.quiet:
print("*******************************************************")
print(f"Verdict for os_cloud {config.os_cloud}, {specdict['name']}, "
f"version {vd['version']}: {errcode_to_text(aborts + errors)}")
printnq("*******************************************************")
printnq(f"Verdict for os_cloud {config.os_cloud}, {specdict['name']}, "
f"version {vd['version']}: {errcode_to_text(aborts + errors)}")
allaborts += aborts
allerrors += errors
if not matches:
Expand Down

0 comments on commit e4183e7

Please sign in to comment.