Skip to content

Commit

Permalink
[QC-858] Single CheckRunner per workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Barthelemy committed Nov 8, 2023
1 parent 85f30bc commit 2f70d37
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 172 deletions.
16 changes: 1 addition & 15 deletions Framework/include/QualityControl/CheckRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ namespace o2::quality_control::checker
class CheckRunner : public framework::Task
{
public:
/// Constructor
/**
* \brief CheckRunner constructor
*
Expand All @@ -88,20 +87,9 @@ class CheckRunner : public framework::Task
* Group check assumes that the input of the checks is the same!
*
* @param checkRunnerConfig configuration of CheckRunner
* @param checkConfigs configuration of all Checks that should run in this data processor
*/
CheckRunner(CheckRunnerConfig, const std::vector<CheckConfig>& checkConfigs);

/**
* \brief CheckRunner constructor
*
* Create a sink for the Input. It is expected to receive Monitor Object to store.
* It will not run any checks on a given input.
*
* @param checkRunnerConfig configuration of CheckRunner
* @param input Monitor Object input spec.
*/
CheckRunner(CheckRunnerConfig, o2::framework::InputSpec input);
CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, o2::framework::Inputs inputs);

/// Destructor
~CheckRunner() override;
Expand All @@ -118,7 +106,6 @@ class CheckRunner : public framework::Task
framework::Inputs getInputs() { return mInputs; };
framework::Outputs getOutputs() { return mOutputs; };

void setTaskStoreSet(std::unordered_set<std::string> storeSet) { mInputStoreSet = storeSet; }
std::string getDeviceName() { return mDeviceName; };

static framework::DataProcessorLabel getCheckRunnerLabel() { return { "qc-check" }; }
Expand Down Expand Up @@ -222,7 +209,6 @@ class CheckRunner : public framework::Task
std::shared_ptr<Activity> mActivity; // shareable with the Checks
CheckRunnerConfig mConfig;
std::shared_ptr<o2::quality_control::repository::DatabaseInterface> mDatabase;
std::unordered_set<std::string> mInputStoreSet;
std::vector<std::shared_ptr<MonitorObject>> mMonitorObjectStoreVector;
UpdatePolicyManager updatePolicyManager;
bool mReceivedEOS = false;
Expand Down
12 changes: 1 addition & 11 deletions Framework/include/QualityControl/CheckRunnerFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,7 @@ class CheckRunnerFactory
CheckRunnerFactory() = default;
virtual ~CheckRunnerFactory() = default;

static framework::DataProcessorSpec create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, std::vector<std::string> storeVector = {});

/*
* \brief Create a CheckRunner sink DPL device.
*
* The purpose of this device is to receive and store the MO from task.
*
* @param input InputSpec with the content to store
* @param configurationSource
*/
static framework::DataProcessorSpec createSinkDevice(const CheckRunnerConfig& checkRunnerConfig, const o2::framework::InputSpec& input);
static framework::DataProcessorSpec create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs);

static CheckRunnerConfig extractConfig(const core::CommonSpec&);

Expand Down
28 changes: 5 additions & 23 deletions Framework/src/CheckRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -130,29 +130,11 @@ o2::framework::Outputs CheckRunner::collectOutputs(const std::vector<CheckConfig
return outputs;
}

CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs)
: mDetectorName(getDetectorName(checkConfigs)),
mDeviceName(createCheckRunnerName(checkConfigs)),
CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, o2::framework::Inputs inputs)
: mDeviceName(createSinkCheckRunnerName(inputs[0])),
mConfig(std::move(checkRunnerConfig)),
/* All checks have the same Input */
mInputs(checkConfigs.front().inputSpecs),
mOutputs(CheckRunner::collectOutputs(checkConfigs)),
mTotalNumberObjectsReceived(0),
mTotalNumberCheckExecuted(0),
mTotalNumberQOStored(0),
mTotalNumberMOStored(0),
mTotalQOSent(0)
{
for (auto& checkConfig : checkConfigs) {
mChecks.emplace(checkConfig.name, checkConfig);
}
}

CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, InputSpec input)
: mDeviceName(createSinkCheckRunnerName(input)),
mConfig(std::move(checkRunnerConfig)),
mInputs{ input },
mOutputs{},
mInputs{ inputs },
mOutputs{CheckRunner::collectOutputs(checkConfigs)},
mTotalNumberObjectsReceived(0),
mTotalNumberCheckExecuted(0),
mTotalNumberQOStored(0),
Expand Down Expand Up @@ -300,7 +282,7 @@ void CheckRunner::prepareCacheData(framework::InputRecord& inputRecord)

// for each item of the array, check whether it is a MonitorObject. If not, create one and encapsulate.
// Then, store the MonitorObject in the various maps and vectors we will use later.
bool store = mInputStoreSet.count(DataSpecUtils::label(input)) > 0; // Check if this CheckRunner stores this input
bool store = true;
for (const auto tObject : *array) {
std::shared_ptr<MonitorObject> mo{ dynamic_cast<MonitorObject*>(tObject) };

Expand Down
28 changes: 9 additions & 19 deletions Framework/src/CheckRunnerFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,17 @@ namespace o2::quality_control::checker

using namespace o2::framework;

DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, std::vector<std::string> storeVector)
DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs)
{
auto options = checkRunnerConfig.options;
CheckRunner qcCheckRunner{ std::move(checkRunnerConfig), checkConfigs };
qcCheckRunner.setTaskStoreSet({ storeVector.begin(), storeVector.end() });

// concatenate all inputs
o2::framework::Inputs allInputs;
for(auto config : checkConfigs) {
allInputs.insert(allInputs.end(), config.inputSpecs.begin(), config.inputSpecs.end());
}

CheckRunner qcCheckRunner{ std::move(checkRunnerConfig), checkConfigs, allInputs };

DataProcessorSpec newCheckRunner{ qcCheckRunner.getDeviceName(),
qcCheckRunner.getInputs(),
Expand All @@ -49,22 +55,6 @@ DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig
return newCheckRunner;
}

DataProcessorSpec CheckRunnerFactory::createSinkDevice(const CheckRunnerConfig& checkRunnerConfig, const o2::framework::InputSpec& input)
{
CheckRunner qcCheckRunner{ checkRunnerConfig, input };
qcCheckRunner.setTaskStoreSet({ DataSpecUtils::label(input) });

DataProcessorSpec newCheckRunner{ qcCheckRunner.getDeviceName(),
qcCheckRunner.getInputs(),
Outputs{ qcCheckRunner.getOutputs() },
adaptFromTask<CheckRunner>(std::move(qcCheckRunner)),
checkRunnerConfig.options,
{},
{ o2::framework::ecs::qcReconfigurable } };

return newCheckRunner;
}

void CheckRunnerFactory::customizeInfrastructure(std::vector<framework::CompletionPolicy>& policies)
{
auto matcher = [label = CheckRunner::getCheckRunnerLabel()](framework::DeviceSpec const& device) {
Expand Down
109 changes: 5 additions & 104 deletions Framework/src/InfrastructureGenerator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -626,116 +626,17 @@ void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow,

void InfrastructureGenerator::generateCheckRunners(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec)
{
// todo have a look if this complex procedure can be simplified.
// todo also make well defined and scoped functions to make it more readable and clearer.
typedef std::vector<std::string> InputNames;
typedef std::vector<CheckConfig> CheckConfigs;
std::map<std::string, o2::framework::InputSpec> tasksOutputMap; // all active tasks' output, as inputs, keyed by their label
std::map<InputNames, CheckConfigs> checksMap; // all the Checks defined in the config mapped keyed by their sorted inputNames
std::map<InputNames, InputNames> storeVectorMap;

// todo: avoid code repetition
for (const auto& taskSpec : infrastructureSpec.tasks) {
if (taskSpec.active) {
InputSpec taskOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic };
tasksOutputMap.insert({ DataSpecUtils::label(taskOutput), taskOutput });
bool movingWindowsEnabled = !taskSpec.movingWindows.empty();
bool synchronousRemote = taskSpec.location == TaskLocationSpec::Local && (infrastructureSpec.workflowType == WorkflowType::Remote || infrastructureSpec.workflowType == WorkflowType::FullChain);
bool asynchronousRemote = infrastructureSpec.workflowType == WorkflowType::RemoteBatch;
if (movingWindowsEnabled && (synchronousRemote || asynchronousRemote)) {
InputSpec taskMovingWindowOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName, true), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic };
tasksOutputMap.insert({ DataSpecUtils::label(taskMovingWindowOutput), taskMovingWindowOutput });
}
}
}

for (const auto& ppTaskSpec : infrastructureSpec.postProcessingTasks) {
if (ppTaskSpec.active) {
InputSpec ppTaskOutput{ ppTaskSpec.taskName,
PostProcessingDevice::createPostProcessingDataOrigin(ppTaskSpec.detectorName),
PostProcessingDevice::createPostProcessingDataDescription(ppTaskSpec.taskName),
Lifetime::Sporadic };
tasksOutputMap.insert({ DataSpecUtils::label(ppTaskOutput), ppTaskOutput });
}
}

for (const auto& externalTaskSpec : infrastructureSpec.externalTasks) {
if (externalTaskSpec.active) {
auto query = externalTaskSpec.query;
Inputs inputs = DataDescriptorQueryBuilder::parse(query.c_str());
for (const auto& taskOutput : inputs) {
tasksOutputMap.insert({ DataSpecUtils::label(taskOutput), taskOutput });
}
}
}

// Instantiate Checks based on the configuration and build a map of checks (keyed by their inputs names)
CheckConfigs checkConfigs; // all the checkConfigs
for (const auto& checkSpec : infrastructureSpec.checks) {
ILOG(Debug, Devel) << ">> Check name : " << checkSpec.checkName << ENDM;
if (checkSpec.active) {
auto checkConfig = Check::extractConfig(infrastructureSpec.common, checkSpec);
InputNames inputNames;

for (const auto& inputSpec : checkConfig.inputSpecs) {
inputNames.push_back(DataSpecUtils::label(inputSpec));
}
// Create a grouping key - sorted vector of stringified InputSpecs //todo: consider std::set, which is sorted
std::sort(inputNames.begin(), inputNames.end());
// Group checks
checksMap[inputNames].push_back(checkConfig);
}
auto checkConfig = Check::extractConfig(infrastructureSpec.common, checkSpec);
checkConfigs.push_back(checkConfig);
}

// For every Task output, find a Check to store the MOs in the database.
// If none is found we create a sink device.
for (auto& [label, inputSpec] : tasksOutputMap) { // for each task output
(void)inputSpec;
bool isStored = false;
// Look for this task as input in the Checks' inputs, if we found it then we are done
for (auto& [inputNames, checks] : checksMap) { // for each set of inputs
(void)checks;

if (std::find(inputNames.begin(), inputNames.end(), label) != inputNames.end() && inputNames.size() == 1) {
storeVectorMap[inputNames].push_back(label);
break;
}
}
if (!isStored) { // fixme: statement is always true
// If there is no Check for a given input, create a candidate for a sink device
InputNames singleEntry{ label };
// Init empty Check vector to appear in the next step
checksMap[singleEntry];
storeVectorMap[singleEntry].push_back(label);
}
}

// Create CheckRunners: 1 per set of inputs
std::vector<framework::OutputSpec> checkRunnerOutputs;
auto checkRunnerConfig = CheckRunnerFactory::extractConfig(infrastructureSpec.common);
for (auto& [inputNames, checkConfigs] : checksMap) {
// Logging
ILOG(Debug, Devel) << ">> Inputs (" << inputNames.size() << "): ";
for (const auto& name : inputNames)
ILOG(Debug, Devel) << name << " ";
ILOG(Debug, Devel) << " ; Checks (" << checkConfigs.size() << "): ";
for (const auto& checkConfig : checkConfigs)
ILOG(Debug, Devel) << checkConfig.name << " ";
ILOG(Debug, Devel) << " ; Stores (" << storeVectorMap[inputNames].size() << "): ";
for (const auto& input : storeVectorMap[inputNames])
ILOG(Debug, Devel) << input << " ";
ILOG(Debug, Devel) << ENDM;

DataProcessorSpec spec = checkConfigs.empty()
? CheckRunnerFactory::createSinkDevice(checkRunnerConfig, tasksOutputMap.find(inputNames[0])->second)
: CheckRunnerFactory::create(checkRunnerConfig, checkConfigs, storeVectorMap[inputNames]);
workflow.emplace_back(spec);
checkRunnerOutputs.insert(checkRunnerOutputs.end(), spec.outputs.begin(), spec.outputs.end());
}

ILOG(Debug, Devel) << ">> Outputs (" << checkRunnerOutputs.size() << "): ";
for (const auto& output : checkRunnerOutputs)
ILOG(Debug, Devel) << DataSpecUtils::describe(output) << " ";
ILOG(Debug, Devel) << ENDM;
const DataProcessorSpec spec = CheckRunnerFactory::create(checkRunnerConfig, checkConfigs);
workflow.emplace_back(spec);
}

void InfrastructureGenerator::throwIfAggNamesClashCheckNames(const InfrastructureSpec& infrastructureSpec)
Expand Down

0 comments on commit 2f70d37

Please sign in to comment.