From 2f70d371315f14734da47607c5a640b4777b4b91 Mon Sep 17 00:00:00 2001 From: Barthelemy Date: Wed, 8 Nov 2023 08:31:05 +0100 Subject: [PATCH] [QC-858] Single CheckRunner per workflow --- .../include/QualityControl/CheckRunner.h | 16 +-- .../QualityControl/CheckRunnerFactory.h | 12 +- Framework/src/CheckRunner.cxx | 28 +---- Framework/src/CheckRunnerFactory.cxx | 28 ++--- Framework/src/InfrastructureGenerator.cxx | 109 +----------------- 5 files changed, 21 insertions(+), 172 deletions(-) diff --git a/Framework/include/QualityControl/CheckRunner.h b/Framework/include/QualityControl/CheckRunner.h index 2fd03eac5c..bb8ac3570b 100644 --- a/Framework/include/QualityControl/CheckRunner.h +++ b/Framework/include/QualityControl/CheckRunner.h @@ -79,7 +79,6 @@ namespace o2::quality_control::checker class CheckRunner : public framework::Task { public: - /// Constructor /** * \brief CheckRunner constructor * @@ -88,20 +87,9 @@ class CheckRunner : public framework::Task * Group check assumes that the input of the checks is the same! * * @param checkRunnerConfig configuration of CheckRunner - * @param checkConfigs configuration of all Checks that should run in this data processor - */ - CheckRunner(CheckRunnerConfig, const std::vector& checkConfigs); - - /** - * \brief CheckRunner constructor - * - * Create a sink for the Input. It is expected to receive Monitor Object to store. - * It will not run any checks on a given input. - * - * @param checkRunnerConfig configuration of CheckRunner * @param input Monitor Object input spec. */ - CheckRunner(CheckRunnerConfig, o2::framework::InputSpec input); + CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector& checkConfigs, o2::framework::Inputs inputs); /// Destructor ~CheckRunner() override; @@ -118,7 +106,6 @@ class CheckRunner : public framework::Task framework::Inputs getInputs() { return mInputs; }; framework::Outputs getOutputs() { return mOutputs; }; - void setTaskStoreSet(std::unordered_set storeSet) { mInputStoreSet = storeSet; } std::string getDeviceName() { return mDeviceName; }; static framework::DataProcessorLabel getCheckRunnerLabel() { return { "qc-check" }; } @@ -222,7 +209,6 @@ class CheckRunner : public framework::Task std::shared_ptr mActivity; // shareable with the Checks CheckRunnerConfig mConfig; std::shared_ptr mDatabase; - std::unordered_set mInputStoreSet; std::vector> mMonitorObjectStoreVector; UpdatePolicyManager updatePolicyManager; bool mReceivedEOS = false; diff --git a/Framework/include/QualityControl/CheckRunnerFactory.h b/Framework/include/QualityControl/CheckRunnerFactory.h index ff3fba1859..c35a4b8557 100644 --- a/Framework/include/QualityControl/CheckRunnerFactory.h +++ b/Framework/include/QualityControl/CheckRunnerFactory.h @@ -43,17 +43,7 @@ class CheckRunnerFactory CheckRunnerFactory() = default; virtual ~CheckRunnerFactory() = default; - static framework::DataProcessorSpec create(CheckRunnerConfig checkRunnerConfig, const std::vector& checkConfigs, std::vector storeVector = {}); - - /* - * \brief Create a CheckRunner sink DPL device. - * - * The purpose of this device is to receive and store the MO from task. - * - * @param input InputSpec with the content to store - * @param configurationSource - */ - static framework::DataProcessorSpec createSinkDevice(const CheckRunnerConfig& checkRunnerConfig, const o2::framework::InputSpec& input); + static framework::DataProcessorSpec create(CheckRunnerConfig checkRunnerConfig, const std::vector& checkConfigs); static CheckRunnerConfig extractConfig(const core::CommonSpec&); diff --git a/Framework/src/CheckRunner.cxx b/Framework/src/CheckRunner.cxx index 73327f8635..357a7c3aed 100644 --- a/Framework/src/CheckRunner.cxx +++ b/Framework/src/CheckRunner.cxx @@ -130,29 +130,11 @@ o2::framework::Outputs CheckRunner::collectOutputs(const std::vector& checkConfigs) - : mDetectorName(getDetectorName(checkConfigs)), - mDeviceName(createCheckRunnerName(checkConfigs)), +CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector& checkConfigs, o2::framework::Inputs inputs) + : mDeviceName(createSinkCheckRunnerName(inputs[0])), mConfig(std::move(checkRunnerConfig)), - /* All checks have the same Input */ - mInputs(checkConfigs.front().inputSpecs), - mOutputs(CheckRunner::collectOutputs(checkConfigs)), - mTotalNumberObjectsReceived(0), - mTotalNumberCheckExecuted(0), - mTotalNumberQOStored(0), - mTotalNumberMOStored(0), - mTotalQOSent(0) -{ - for (auto& checkConfig : checkConfigs) { - mChecks.emplace(checkConfig.name, checkConfig); - } -} - -CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, InputSpec input) - : mDeviceName(createSinkCheckRunnerName(input)), - mConfig(std::move(checkRunnerConfig)), - mInputs{ input }, - mOutputs{}, + mInputs{ inputs }, + mOutputs{CheckRunner::collectOutputs(checkConfigs)}, mTotalNumberObjectsReceived(0), mTotalNumberCheckExecuted(0), mTotalNumberQOStored(0), @@ -300,7 +282,7 @@ void CheckRunner::prepareCacheData(framework::InputRecord& inputRecord) // for each item of the array, check whether it is a MonitorObject. If not, create one and encapsulate. // Then, store the MonitorObject in the various maps and vectors we will use later. - bool store = mInputStoreSet.count(DataSpecUtils::label(input)) > 0; // Check if this CheckRunner stores this input + bool store = true; for (const auto tObject : *array) { std::shared_ptr mo{ dynamic_cast(tObject) }; diff --git a/Framework/src/CheckRunnerFactory.cxx b/Framework/src/CheckRunnerFactory.cxx index 5b0a1e1597..dcf71e377e 100644 --- a/Framework/src/CheckRunnerFactory.cxx +++ b/Framework/src/CheckRunnerFactory.cxx @@ -32,11 +32,17 @@ namespace o2::quality_control::checker using namespace o2::framework; -DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig, const std::vector& checkConfigs, std::vector storeVector) +DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig, const std::vector& checkConfigs) { auto options = checkRunnerConfig.options; - CheckRunner qcCheckRunner{ std::move(checkRunnerConfig), checkConfigs }; - qcCheckRunner.setTaskStoreSet({ storeVector.begin(), storeVector.end() }); + + // concatenate all inputs + o2::framework::Inputs allInputs; + for(auto config : checkConfigs) { + allInputs.insert(allInputs.end(), config.inputSpecs.begin(), config.inputSpecs.end()); + } + + CheckRunner qcCheckRunner{ std::move(checkRunnerConfig), checkConfigs, allInputs }; DataProcessorSpec newCheckRunner{ qcCheckRunner.getDeviceName(), qcCheckRunner.getInputs(), @@ -49,22 +55,6 @@ DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig return newCheckRunner; } -DataProcessorSpec CheckRunnerFactory::createSinkDevice(const CheckRunnerConfig& checkRunnerConfig, const o2::framework::InputSpec& input) -{ - CheckRunner qcCheckRunner{ checkRunnerConfig, input }; - qcCheckRunner.setTaskStoreSet({ DataSpecUtils::label(input) }); - - DataProcessorSpec newCheckRunner{ qcCheckRunner.getDeviceName(), - qcCheckRunner.getInputs(), - Outputs{ qcCheckRunner.getOutputs() }, - adaptFromTask(std::move(qcCheckRunner)), - checkRunnerConfig.options, - {}, - { o2::framework::ecs::qcReconfigurable } }; - - return newCheckRunner; -} - void CheckRunnerFactory::customizeInfrastructure(std::vector& policies) { auto matcher = [label = CheckRunner::getCheckRunnerLabel()](framework::DeviceSpec const& device) { diff --git a/Framework/src/InfrastructureGenerator.cxx b/Framework/src/InfrastructureGenerator.cxx index a9028ec22a..1bc5361b2e 100644 --- a/Framework/src/InfrastructureGenerator.cxx +++ b/Framework/src/InfrastructureGenerator.cxx @@ -626,116 +626,17 @@ void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow, void InfrastructureGenerator::generateCheckRunners(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec) { - // todo have a look if this complex procedure can be simplified. - // todo also make well defined and scoped functions to make it more readable and clearer. - typedef std::vector InputNames; typedef std::vector CheckConfigs; - std::map tasksOutputMap; // all active tasks' output, as inputs, keyed by their label - std::map checksMap; // all the Checks defined in the config mapped keyed by their sorted inputNames - std::map storeVectorMap; - // todo: avoid code repetition - for (const auto& taskSpec : infrastructureSpec.tasks) { - if (taskSpec.active) { - InputSpec taskOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic }; - tasksOutputMap.insert({ DataSpecUtils::label(taskOutput), taskOutput }); - bool movingWindowsEnabled = !taskSpec.movingWindows.empty(); - bool synchronousRemote = taskSpec.location == TaskLocationSpec::Local && (infrastructureSpec.workflowType == WorkflowType::Remote || infrastructureSpec.workflowType == WorkflowType::FullChain); - bool asynchronousRemote = infrastructureSpec.workflowType == WorkflowType::RemoteBatch; - if (movingWindowsEnabled && (synchronousRemote || asynchronousRemote)) { - InputSpec taskMovingWindowOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName, true), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic }; - tasksOutputMap.insert({ DataSpecUtils::label(taskMovingWindowOutput), taskMovingWindowOutput }); - } - } - } - - for (const auto& ppTaskSpec : infrastructureSpec.postProcessingTasks) { - if (ppTaskSpec.active) { - InputSpec ppTaskOutput{ ppTaskSpec.taskName, - PostProcessingDevice::createPostProcessingDataOrigin(ppTaskSpec.detectorName), - PostProcessingDevice::createPostProcessingDataDescription(ppTaskSpec.taskName), - Lifetime::Sporadic }; - tasksOutputMap.insert({ DataSpecUtils::label(ppTaskOutput), ppTaskOutput }); - } - } - - for (const auto& externalTaskSpec : infrastructureSpec.externalTasks) { - if (externalTaskSpec.active) { - auto query = externalTaskSpec.query; - Inputs inputs = DataDescriptorQueryBuilder::parse(query.c_str()); - for (const auto& taskOutput : inputs) { - tasksOutputMap.insert({ DataSpecUtils::label(taskOutput), taskOutput }); - } - } - } - - // Instantiate Checks based on the configuration and build a map of checks (keyed by their inputs names) + CheckConfigs checkConfigs; // all the checkConfigs for (const auto& checkSpec : infrastructureSpec.checks) { - ILOG(Debug, Devel) << ">> Check name : " << checkSpec.checkName << ENDM; - if (checkSpec.active) { - auto checkConfig = Check::extractConfig(infrastructureSpec.common, checkSpec); - InputNames inputNames; - - for (const auto& inputSpec : checkConfig.inputSpecs) { - inputNames.push_back(DataSpecUtils::label(inputSpec)); - } - // Create a grouping key - sorted vector of stringified InputSpecs //todo: consider std::set, which is sorted - std::sort(inputNames.begin(), inputNames.end()); - // Group checks - checksMap[inputNames].push_back(checkConfig); - } + auto checkConfig = Check::extractConfig(infrastructureSpec.common, checkSpec); + checkConfigs.push_back(checkConfig); } - // For every Task output, find a Check to store the MOs in the database. - // If none is found we create a sink device. - for (auto& [label, inputSpec] : tasksOutputMap) { // for each task output - (void)inputSpec; - bool isStored = false; - // Look for this task as input in the Checks' inputs, if we found it then we are done - for (auto& [inputNames, checks] : checksMap) { // for each set of inputs - (void)checks; - - if (std::find(inputNames.begin(), inputNames.end(), label) != inputNames.end() && inputNames.size() == 1) { - storeVectorMap[inputNames].push_back(label); - break; - } - } - if (!isStored) { // fixme: statement is always true - // If there is no Check for a given input, create a candidate for a sink device - InputNames singleEntry{ label }; - // Init empty Check vector to appear in the next step - checksMap[singleEntry]; - storeVectorMap[singleEntry].push_back(label); - } - } - - // Create CheckRunners: 1 per set of inputs - std::vector checkRunnerOutputs; auto checkRunnerConfig = CheckRunnerFactory::extractConfig(infrastructureSpec.common); - for (auto& [inputNames, checkConfigs] : checksMap) { - // Logging - ILOG(Debug, Devel) << ">> Inputs (" << inputNames.size() << "): "; - for (const auto& name : inputNames) - ILOG(Debug, Devel) << name << " "; - ILOG(Debug, Devel) << " ; Checks (" << checkConfigs.size() << "): "; - for (const auto& checkConfig : checkConfigs) - ILOG(Debug, Devel) << checkConfig.name << " "; - ILOG(Debug, Devel) << " ; Stores (" << storeVectorMap[inputNames].size() << "): "; - for (const auto& input : storeVectorMap[inputNames]) - ILOG(Debug, Devel) << input << " "; - ILOG(Debug, Devel) << ENDM; - - DataProcessorSpec spec = checkConfigs.empty() - ? CheckRunnerFactory::createSinkDevice(checkRunnerConfig, tasksOutputMap.find(inputNames[0])->second) - : CheckRunnerFactory::create(checkRunnerConfig, checkConfigs, storeVectorMap[inputNames]); - workflow.emplace_back(spec); - checkRunnerOutputs.insert(checkRunnerOutputs.end(), spec.outputs.begin(), spec.outputs.end()); - } - - ILOG(Debug, Devel) << ">> Outputs (" << checkRunnerOutputs.size() << "): "; - for (const auto& output : checkRunnerOutputs) - ILOG(Debug, Devel) << DataSpecUtils::describe(output) << " "; - ILOG(Debug, Devel) << ENDM; + const DataProcessorSpec spec = CheckRunnerFactory::create(checkRunnerConfig, checkConfigs); + workflow.emplace_back(spec); } void InfrastructureGenerator::throwIfAggNamesClashCheckNames(const InfrastructureSpec& infrastructureSpec)