Skip to content

Commit

Permalink
[QC-910] Moving windows in asynchronous QC (#2014)
Browse files Browse the repository at this point in the history
* [QC-910] Moving windows in asynchronous QC

This commit allows to create objects containing data points for adjacent time windows alongside of having an integrated object for the whole run. The window length is set by the usual "cycleDurationSeconds" parameter and the windows are guaranteed not to overlap. The generated objects can be subject to Checks, just as integrated objects.

Technically, the commit brings a few major changes as well:
- The decision on finishing a cycle is now up to Timekeeper (excluding end of stream), which is still timer-based in sync QC, but data-driven in async QC, i.e. a new cycle is started when a sample does not fall into currently processed window.
- The QC file I/O has been refactored by bringing it into a dedicated class
- The QC file structure was reorganized by having two directories at the top - "int" and "mw". The first contains the integrated plots in the usual structure. The latter contains moving windows in the hierarchy "mw/<DET>/<TASK>/<MW START>". This should allow for easy usage by any user scripts (at the cost of a bit more complex walking algorithms for reading it back in the QC chain)

* documentation

* explain what dataReady() does, rename to isDataReady

* add an explanation for creating outputspecs for moving windows

* improve logging in RootFileStorage, decrease verbosity of some

* tidy up RootFileStorage.h
  • Loading branch information
knopers8 authored Oct 19, 2023
1 parent bc38d7d commit 746f13f
Show file tree
Hide file tree
Showing 27 changed files with 1,157 additions and 186 deletions.
4 changes: 3 additions & 1 deletion Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ add_library(O2QualityControl
src/TimekeeperSynchronous.cxx
src/TimekeeperAsynchronous.cxx
src/WorkflowType.cxx
src/TimekeeperFactory.cxx)
src/TimekeeperFactory.cxx
src/RootFileStorage.cxx)

target_include_directories(
O2QualityControl
Expand Down Expand Up @@ -237,6 +238,7 @@ add_executable(o2-qc-test-core
test/testPostProcessingRunner.cxx
test/testQuality.cxx
test/testQualityObject.cxx
test/testRootFileStorage.cxx
test/testTaskInterface.cxx
test/testTimekeeper.cxx
test/testTriggerHelpers.cxx
Expand Down
15 changes: 11 additions & 4 deletions Framework/batch-test.json.in
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,33 @@
"className": "o2::quality_control_modules::skeleton::SkeletonTask",
"moduleName": "QcSkeleton",
"detectorName": "TST",
"cycleDurationSeconds": "5",
"cycleDurationSeconds": "100",
"maxNumberCycles": "-1",
"dataSource": {
"type": "dataSamplingPolicy",
"name": "tst-raw"
}
},
"movingWindows" : [ "example" ]
}
},
"checks": {
"BatchTestCheck@UNIQUE_ID@": {
"active": "true",
"className": "o2::quality_control_modules::skeleton::SkeletonCheck",
"moduleName": "QcSkeleton",
"policy": "OnAny",
"policy": "OnAll",
"detectorName": "TST",
"dataSource": [{
"type": "Task",
"name": "BatchTestTask@UNIQUE_ID@",
"MOs": ["example"]
}]
},
{
"type": "TaskMovingWindow",
"name": "BatchTestTask@UNIQUE_ID@",
"MOs": ["example"]
}
]
}
}
},
Expand Down
6 changes: 5 additions & 1 deletion Framework/include/QualityControl/MonitorObjectCollection.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@ class MonitorObjectCollection : public TObjArray, public mergers::MergeInterface
void setDetector(const std::string&);
const std::string& getDetector() const;

void setTaskName(const std::string&);
const std::string& getTaskName() const;

MergeInterface* cloneMovingWindow() const override;

private:
std::string mDetector = "TST";
std::string mTaskName = "Test";

ClassDefOverride(MonitorObjectCollection, 1);
ClassDefOverride(MonitorObjectCollection, 2);
};

} // namespace o2::quality_control::core
Expand Down
13 changes: 12 additions & 1 deletion Framework/include/QualityControl/RootFileSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@

#include <Framework/Task.h>
#include <string>
#include <vector>
#include <memory>

namespace o2::quality_control::core
{

class RootFileStorage;
class IntegralMocWalker;
class MovingWindowMocWalker;

/// \brief A Data Processor which reads MonitorObjectCollections from a specified file
class RootFileSource : public framework::Task
{
Expand All @@ -33,10 +39,15 @@ class RootFileSource : public framework::Task
void init(framework::InitContext& ictx) override;
void run(framework::ProcessingContext& pctx) override;

static framework::OutputLabel outputBinding(const std::string& detectorCode, const std::string& taskName);
static framework::OutputLabel outputBinding(const std::string& detectorCode, const std::string& taskName, bool movingWindow = false);

private:
std::string mFilePath;
std::vector<framework::OutputLabel> mAllowedOutputs;

std::shared_ptr<RootFileStorage> mRootFileManager = nullptr;
std::shared_ptr<IntegralMocWalker> mIntegralMocWalker = nullptr;
std::shared_ptr<MovingWindowMocWalker> mMovingWindowMocWalker = nullptr;
};

} // namespace o2::quality_control::core
Expand Down
101 changes: 101 additions & 0 deletions Framework/include/QualityControl/RootFileStorage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

///
/// \file RootFileStorage.h
/// \author Piotr Konopka
///

#ifndef QUALITYCONTROL_ROOTFILESTORAGE_H
#define QUALITYCONTROL_ROOTFILESTORAGE_H

#include <string>
#include <map>
#include <variant>
#include <vector>

class TFile;
class TDirectory;

namespace o2::quality_control::core
{

class MonitorObjectCollection;

/// \brief Manager for storage and retrieval of MonitorObjectCollections in TFiles
class RootFileStorage
{
public:
struct MonitorObjectCollectionNode {
std::string fullPath{};
std::string name{};
MonitorObjectCollection* moc = nullptr;
};
struct DirectoryNode {
std::string fullPath{};
std::string name{};
std::map<std::string, std::variant<DirectoryNode, MonitorObjectCollectionNode>> children = {};
};

enum class ReadMode {
Read,
Update
};

explicit RootFileStorage(const std::string& filePath, ReadMode);
~RootFileStorage();

DirectoryNode readStructure(bool loadObjects = false) const;
MonitorObjectCollection* readMonitorObjectCollection(const std::string& path) const;

void storeIntegralMOC(MonitorObjectCollection* const moc);
void storeMovingWindowMOC(MonitorObjectCollection* const moc);

private:
DirectoryNode readStructureImpl(TDirectory* currentDir, bool loadObjects) const;

private:
TFile* mFile = nullptr;
};

/// \brief walks over integral MOC paths in the alphabetical order of detectors and task names
class IntegralMocWalker
{
public:
explicit IntegralMocWalker(const RootFileStorage::DirectoryNode& rootNode);

bool hasNextPath();
std::string nextPath();

private:
using child_iterator = decltype(std::declval<RootFileStorage::DirectoryNode>().children.cbegin());
std::vector<std::string> mOrder;
std::vector<std::string>::const_iterator mPathIterator;
};

/// \brief walks over moving window MOC paths in the chronological order
class MovingWindowMocWalker
{
public:
explicit MovingWindowMocWalker(const RootFileStorage::DirectoryNode& rootNode);

bool hasNextPath() const;
std::string nextPath();

private:
using child_iterator = decltype(std::declval<RootFileStorage::DirectoryNode>().children.begin());
std::multimap<uint64_t, std::string> mOrder;
std::multimap<uint64_t, std::string>::const_iterator mPathIterator;
};

} // namespace o2::quality_control::core

#endif // QUALITYCONTROL_ROOTFILESTORAGE_H
3 changes: 2 additions & 1 deletion Framework/include/QualityControl/TaskRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ class TaskRunner : public framework::Task
/// \brief Callback for CallbackService::Id::Reset (DPL) a.k.a. RESET DEVICE transition (FairMQ)
void reset();

static std::tuple<bool /*data ready*/, bool /*timer ready*/> validateInputs(const framework::InputRecord&);
/// \brief Checks if all the expected data inputs are present in the provided InputRecord
static bool isDataReady(const framework::InputRecord& inputs);
void refreshConfig(framework::InitContext& iCtx);
void initInfologger(framework::InitContext& iCtx);
void printTaskConfig() const;
Expand Down
9 changes: 8 additions & 1 deletion Framework/include/QualityControl/Timekeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,21 @@ class Timekeeper
void setEndOfActivity(validity_time_t ecsTimestamp = 0, validity_time_t configTimestamp = 0, validity_time_t currentTimestamp = 0,
std::function<validity_time_t(void)> ccdbTimestampAccessor = nullptr);

/// \brief asdf
/// \brief sets an accessor to get the number of orbits per TF for the currently processed run
void setCCDBOrbitsPerTFAccessor(std::function<int(void)>);

/// \brief updates the validity based on the provided timestamp (ms since epoch)
virtual void updateByCurrentTimestamp(validity_time_t timestampMs) = 0;
/// \brief updates the validity based on the provided TF ID
virtual void updateByTimeFrameID(uint32_t tfID) = 0;

/// \brief decides if the current cycle should be finished and a new one started
///
/// This method decides if the current cycle should be finished and a new one started.
/// In case that the cycle should be finished, the updateBy* methods should be called
/// after the cycle end has been done.
virtual bool shouldFinishCycle(const o2::framework::TimingInfo& timingInfo) = 0;

/// \brief resets the state of the mCurrent* counters
virtual void reset() = 0;

Expand Down
6 changes: 6 additions & 0 deletions Framework/include/QualityControl/TimekeeperAsynchronous.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,17 @@ class TimekeeperAsynchronous : public Timekeeper
void updateByTimeFrameID(uint32_t tfID) override;
void reset() override;

bool shouldFinishCycle(const o2::framework::TimingInfo& timingInfo) override;

protected:
validity_time_t activityBoundarySelectionStrategy(validity_time_t ecsTimestamp, validity_time_t configTimestamp,
validity_time_t currentTimestamp,
std::function<validity_time_t(void)> ccdbTimestampAccessor) override;

private:
/// \brief computes validity interval of the provided timeframe ID
ValidityInterval computeTimestampFromTimeframeID(uint32_t tfID);

private:
validity_time_t mWindowLengthMs = 0;
uint64_t mOrbitsPerTF = 0;
Expand Down
2 changes: 1 addition & 1 deletion Framework/include/QualityControl/TimekeeperFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace o2::quality_control::core
class TimekeeperFactory
{
public:
static std::unique_ptr<Timekeeper> create(framework::DeploymentMode);
static std::unique_ptr<Timekeeper> create(framework::DeploymentMode, validity_time_t windowLengthMs = 0);
static bool needsGRPECS(framework::DeploymentMode);
};

Expand Down
1 change: 1 addition & 0 deletions Framework/include/QualityControl/TimekeeperSynchronous.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class TimekeeperSynchronous : public Timekeeper
void updateByTimeFrameID(uint32_t tfID) override;

void reset() override;
bool shouldFinishCycle(const o2::framework::TimingInfo& timingInfo) override;

protected:
validity_time_t activityBoundarySelectionStrategy(validity_time_t ecsTimestamp, validity_time_t configTimestamp,
Expand Down
27 changes: 26 additions & 1 deletion Framework/script/o2-qc-batch-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function delete_data() {
rm -f /tmp/batch_test_mergedB${UNIQUE_ID}.root
rm -f /tmp/batch_test_mergedC${UNIQUE_ID}.root
rm -f /tmp/batch_test_obj${UNIQUE_ID}.root
rm -f /tmp/batch_test_obj_mw${UNIQUE_ID}.root
rm -f /tmp/batch_test_check${UNIQUE_ID}.root
}

Expand Down Expand Up @@ -63,7 +64,7 @@ o2-qc-file-merger --input-files /tmp/batch_test_mergedA${UNIQUE_ID}.root /tmp/ba
# Run Checks and Aggregators, publish results to QCDB
o2-qc --config json:/${JSON_DIR}/batch-test.json --remote-batch /tmp/batch_test_mergedC${UNIQUE_ID}.root --run

# check MonitorObject
# check the integrated MonitorObject
# first the return code must be 200
code=$(curl -L ccdb-test.cern.ch:8080/qc/TST/MO/BatchTestTask${UNIQUE_ID}/example/8000000/PeriodName=LHC9000x/PassName=apass500 --write-out %{http_code} --silent --output /tmp/batch_test_obj${UNIQUE_ID}.root)
if (( $code != 200 )); then
Expand All @@ -87,6 +88,30 @@ then
exit 5
fi

# check the moving window MonitorObject
# first the return code must be 200
code=$(curl -L ccdb-test.cern.ch:8080/qc/TST/MO/BatchTestTask${UNIQUE_ID}/mw/example/8000000/PeriodName=LHC9000x/PassName=apass500 --write-out %{http_code} --silent --output /tmp/batch_test_obj_mw${UNIQUE_ID}.root)
if (( $code != 200 )); then
echo "Error, monitor object of the QC Task could not be found."
delete_data
exit 3
fi
# try to check that we got a valid root object
root -b -l -q -e 'TFile f("/tmp/batch_test_obj_mw${UNIQUE_ID}.root"); f.Print();'
if (( $? != 0 )); then
echo "Error, monitor object of the QC Task is invalid."
delete_data
exit 4
fi
# try if it is a non empty histogram
entries=`root -b -l -q -e 'TFile f("/tmp/batch_test_obj_mw${UNIQUE_ID}.root"); TH1F *h = (TH1F*)f.Get("ccdb_object"); cout << h->GetEntries() << endl;' | tail -n 1`
if [ $entries -lt 225 ] 2>/dev/null
then
echo "The histogram of the QC Task has less than 225 (75%) of expected samples."
delete_data
exit 5
fi

# check QualityObject
# first the return code must be 200
code=$(curl -L ccdb-test.cern.ch:8080/qc/TST/QO/BatchTestCheck${UNIQUE_ID}/8000000/PeriodName=LHC9000x/PassName=apass500 --write-out %{http_code} --silent --output /tmp/batch_test_check${UNIQUE_ID}.root)
Expand Down
13 changes: 11 additions & 2 deletions Framework/src/InfrastructureGenerator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ framework::WorkflowSpec InfrastructureGenerator::generateRemoteBatchInfrastructu
auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, 0, 1);
fileSourceOutputs.push_back(taskConfig.moSpec);
fileSourceOutputs.back().binding = RootFileSource::outputBinding(taskSpec.detectorName, taskSpec.taskName);
// We create an OutputSpec for moving windows for this task only if they are expected.
if (!taskConfig.movingWindows.empty()) {
fileSourceOutputs.push_back(
{ RootFileSource::outputBinding(taskSpec.detectorName, taskSpec.taskName, true),
TaskRunner::createTaskDataOrigin(taskSpec.detectorName, true),
TaskRunner::createTaskDataDescription(taskSpec.taskName), 0, Lifetime::Sporadic });
}
}
}
if (!fileSourceOutputs.empty()) {
Expand Down Expand Up @@ -632,8 +639,10 @@ void InfrastructureGenerator::generateCheckRunners(framework::WorkflowSpec& work
if (taskSpec.active) {
InputSpec taskOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic };
tasksOutputMap.insert({ DataSpecUtils::label(taskOutput), taskOutput });
if (!taskSpec.movingWindows.empty() && taskSpec.location == TaskLocationSpec::Local &&
(infrastructureSpec.workflowType == WorkflowType::Remote || infrastructureSpec.workflowType == WorkflowType::FullChain)) {
bool movingWindowsEnabled = !taskSpec.movingWindows.empty();
bool synchronousRemote = taskSpec.location == TaskLocationSpec::Local && (infrastructureSpec.workflowType == WorkflowType::Remote || infrastructureSpec.workflowType == WorkflowType::FullChain);
bool asynchronousRemote = infrastructureSpec.workflowType == WorkflowType::RemoteBatch;
if (movingWindowsEnabled && (synchronousRemote || asynchronousRemote)) {
InputSpec taskMovingWindowOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName, true), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic };
tasksOutputMap.insert({ DataSpecUtils::label(taskMovingWindowOutput), taskMovingWindowOutput });
}
Expand Down
15 changes: 15 additions & 0 deletions Framework/src/MonitorObjectCollection.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,22 @@ const std::string& MonitorObjectCollection::getDetector() const
return mDetector;
}

void MonitorObjectCollection::setTaskName(const std::string& taskName)
{
mTaskName = taskName;
}

const std::string& MonitorObjectCollection::getTaskName() const
{
return mTaskName;
}

MergeInterface* MonitorObjectCollection::cloneMovingWindow() const
{
auto mw = new MonitorObjectCollection();
mw->SetOwner(true);
mw->setDetector(this->getDetector());
mw->setTaskName(this->getTaskName());
auto mwName = std::string(this->GetName()) + "/mw";
mw->SetName(mwName.c_str());

Expand All @@ -129,6 +140,10 @@ MergeInterface* MonitorObjectCollection::cloneMovingWindow() const
if (!mo->getCreateMovingWindow()) {
continue;
}
if (mo->getValidity().isInvalid()) {
ILOG(Warning) << "MonitorObject '" << mo->getName() << "' validity is invalid, will not create a moving window" << ENDM;
continue;
}
auto clonedMO = dynamic_cast<MonitorObject*>(mo->Clone());
clonedMO->setTaskName(clonedMO->getTaskName() + "/mw");
clonedMO->setIsOwner(true);
Expand Down
1 change: 1 addition & 0 deletions Framework/src/ObjectsManager.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ ObjectsManager::ObjectsManager(std::string taskName, std::string taskClass, std:
mMonitorObjects->SetOwner(true);
mMonitorObjects->SetName(mTaskName.c_str());
mMonitorObjects->setDetector(mDetectorName);
mMonitorObjects->setTaskName(mTaskName);

// register with the discovery service
if (!noDiscovery && !consulUrl.empty()) {
Expand Down
Loading

0 comments on commit 746f13f

Please sign in to comment.