Skip to content

Commit

Permalink
[AnalysisQC] Align some depending code (#1594)
Browse files Browse the repository at this point in the history
* update o2dpg_analysis_test_config.py
  * take user config if given
  * fall back to config in analysis output directory
  * fall back to default O2DPG config

* update analysis_test.sh

* copy a config to the analysis output directory to contain the
  MergedAnalyses analysis

Co-authored-by: Benedikt Volkel <benedikt.volkel@cern.ch>
  • Loading branch information
benedikt-voelkel and Benedikt Volkel authored Apr 15, 2024
1 parent 6d50485 commit 281b243
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 27 deletions.
9 changes: 1 addition & 8 deletions MC/analysis_testing/analysis_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ else
shift
shift
;;
--add-common-args)
add_common_args=" ${2} ${3} "
shift
shift
shift
;;
*)
echo "ERROR: Unknown argument ${1}"
exit 1
Expand All @@ -66,7 +60,6 @@ fi
# basic checks
[[ "${testanalysis}" == "" ]] && { echo "ERROR: No analysis specified to be run" ; exit 1 ; }
[[ "${aod}" == "" ]] && { echo "ERROR: No AOD found to be analysed" ; exit 1 ; }
[[ "${add_common_args}" != "" ]] && add_common_args="--add-common-args ${add_common_args}"

# check if enabled
enabled=$($O2DPG_ROOT/MC/analysis_testing/o2dpg_analysis_test_config.py check -t ${testanalysis} --status)
Expand All @@ -77,7 +70,7 @@ mkdir Analysis 2>/dev/null
include_disabled=${include_disabled:+--include-disabled}
workflow_path="Analysis/workflow_analysis_test_${testanalysis}.json"
rm ${workflow_path} 2>/dev/null
$O2DPG_ROOT/MC/analysis_testing/o2dpg_analysis_test_workflow.py --is-mc -f ${aod} -o ${workflow_path} --only-analyses ${testanalysis} ${include_disabled} ${add_common_args}
$O2DPG_ROOT/MC/analysis_testing/o2dpg_analysis_test_workflow.py --is-mc --split-analyses -f ${aod} -o ${workflow_path} --only-analyses ${testanalysis} ${include_disabled}
[[ ! -f "${workflow_path}" ]] && { echo "Could not construct workflow for analysis ${testanalysis}" ; exit 1 ; }
$O2DPG_ROOT/MC/bin/o2_dpg_workflow_runner.py -f ${workflow_path} -tt Analysis_${testanalysis}$ --rerun-from Analysis_${testanalysis}$

Expand Down
49 changes: 31 additions & 18 deletions MC/analysis_testing/o2dpg_analysis_test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys
import argparse
from os import environ
from os.path import join, exists
from os.path import join, exists, isdir
import json

# make sure O2DPG + O2 is loaded
Expand All @@ -14,14 +14,29 @@
sys.exit(1)


def get_config(path=None):
default_path = join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "analyses_config.json")
if not path:
return default_path

if isdir(path):
# assume to look for analyses_config.json in this directory
path = join(path, "analyses_config.json")

if not exists(path):
print(f"WARNING: Cannot locate config for AnalysisQC at custom path {path}. USe default at {default_path}")
return default_path

with open(path, "r") as f:
return json.load(f)["analyses"]


def modify(args):
"""
modify and create a new config
"""

analyses = None
with open (args.config, "r") as f:
analyses = json.load(f)["analyses"]
analyses = get_config(args.config)

for ana in analyses:
if args.disable_tasks and ana["name"] in args.disable_tasks:
Expand Down Expand Up @@ -51,9 +66,7 @@ def print_status(enabled):
return
print("DISABLED")

analyses = None
with open (args.config, "r") as f:
analyses = json.load(f)["analyses"]
analyses = get_config(args.config)

for ana in analyses:
if ana["name"] == args.task:
Expand All @@ -80,9 +93,7 @@ def show_tasks(args):
args.enabled = True
args.disabled = True

analyses = None
with open (args.config, "r") as f:
analyses = json.load(f)["analyses"]
analyses = get_config(args.config)

for ana in analyses:
if (args.enabled and ana["enabled"]) or (args.disabled and not ana["enabled"]):
Expand All @@ -92,9 +103,12 @@ def show_tasks(args):


def validate_output(args):
analyses = None
with open (args.config, "r") as f:
analyses = json.load(f)["analyses"]

if not args.config:
# first see if config is not explicitly given, then use the directory where the analyses to check are located
args.config = args.directory

analyses = get_config(args.config)

# global return code
ret = 0
Expand All @@ -105,11 +119,10 @@ def validate_output(args):
analysis_name = ana["name"]

if args.tasks:
if analysis_name in args.tasks:
# tasks were specified explicitly, make sure to take them into account at all costs
include_disabled = True
else:
if analysis_name not in args.tasks:
continue
# tasks were specified explicitly, make sure to take them into account at all costs
include_disabled = True

if not ana["enabled"] and not include_disabled:
# continue if disabled and not including those
Expand Down Expand Up @@ -160,7 +173,7 @@ def main():
sub_parsers = parser.add_subparsers(dest="command")

config_parser = argparse.ArgumentParser(add_help=False)
config_parser.add_argument("-c", "--config", help="input configuration to modify", default=join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "analyses_config.json"))
config_parser.add_argument("-c", "--config", help="input configuration to modify")

# modify config
modify_parser = sub_parsers.add_parser("modify", parents=[config_parser])
Expand Down
1 change: 1 addition & 0 deletions MC/analysis_testing/o2dpg_analysis_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ANALYSIS_VALID_DATA = "data"
ANALYSIS_COLLISION_SYSTEM_PP = "pp"
ANALYSIS_COLLISION_SYSTEM_PBPB = "pbpb"
ANALYSIS_MERGED_ANALYSIS_NAME = "MergedAnalyses"


def adjust_configuration_line(line, data_or_mc, collision_system):
Expand Down
21 changes: 20 additions & 1 deletion MC/analysis_testing/o2dpg_analysis_test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis
merged_analysis_pipe = additional_workflows.copy()
# cpu and mem of merged analyses
merged_analysis_cpu_mem = [0, 0]
# expected output of merged analysis
merged_analysis_expected_output = []
# analyses config to write
analyses_config = []

for ana in load_analyses(analyses_only, include_disabled_analyses=include_disabled_analyses):
if is_mc and not ana.get("valid_mc", False):
Expand All @@ -233,26 +237,41 @@ def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis
analysis_pipes.append(ana['tasks'])
analysis_names.append(ana['name'])
analysis_cpu_mem.append((1, 2000))
analyses_config.append(ana)
continue

merged_analysis_pipe.extend(ana['tasks'])
# underestimate what a single analysis would take in the merged case.
# Putting everything into one big pipe does not mean that the resources scale the same!
merged_analysis_cpu_mem[0] += 0.5
merged_analysis_cpu_mem[1] += 700
merged_analysis_expected_output.extend(ana['expected_output'])

if not split_analyses:
# add the merged analysis
analysis_pipes.append(merged_analysis_pipe)
analysis_names.append('MergedAnalyses')
analysis_names.append(ANALYSIS_MERGED_ANALYSIS_NAME)
# take at least the resources estimated for a single analysis
analysis_cpu_mem.append((max(1, merged_analysis_cpu_mem[0]), max(2000, merged_analysis_cpu_mem[1])))
merged_analysis_expected_output = list(set(merged_analysis_expected_output))
# config of the merged analysis. Since it doesn't exist in the previous config, but we would like to have it defined, do it here
analyses_config.append({'name': ANALYSIS_MERGED_ANALYSIS_NAME,
'valid_mc': is_mc,
'valid_data': not is_mc,
'enabled': True,
'tasks': merged_analysis_pipe,
'expected_output': merged_analysis_expected_output})


# now we need to create the output directory where we want the final configurations to go
output_dir_config = join(output_dir, 'config')
if not exists(output_dir_config):
makedirs(output_dir_config)

# write the analysis config of this
with open(join(output_dir, 'analyses_config.json'), 'w') as f:
json.dump({'analyses': analyses_config}, f, indent=2)

configuration = adjust_and_get_configuration_path(data_or_mc, collision_system, output_dir_config)

for analysis_name, analysis_pipe, analysis_res in zip(analysis_names, analysis_pipes, analysis_cpu_mem):
Expand Down

0 comments on commit 281b243

Please sign in to comment.