From 746f13f4e8edb4cde1d0b127c3985c2d32b55be0 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Thu, 19 Oct 2023 12:02:44 +0200 Subject: [PATCH] [QC-910] Moving windows in asynchronous QC (#2014) * [QC-910] Moving windows in asynchronous QC This commit allows to create objects containing data points for adjacent time windows alongside of having an integrated object for the whole run. The window length is set by the usual "cycleDurationSeconds" parameter and the windows are guaranteed not to overlap. The generated objects can be subject to Checks, just as integrated objects. Technically, the commit brings a few major changes as well: - The decision on finishing a cycle is now up to Timekeeper (excluding end of stream), which is still timer-based in sync QC, but data-driven in async QC, i.e. a new cycle is started when a sample does not fall into currently processed window. - The QC file I/O has been refactored by bringing it into a dedicated class - The QC file structure was reorganized by having two directories at the top - "int" and "mw". The first contains the integrated plots in the usual structure. The latter contains moving windows in the hierarchy "mw///". This should allow for easy usage by any user scripts (at the cost of a bit more complex walking algorithms for reading it back in the QC chain) * documentation * explain what dataReady() does, rename to isDataReady * add an explanation for creating outputspecs for moving windows * improve logging in RootFileStorage, decrease verbosity of some * tidy up RootFileStorage.h --- Framework/CMakeLists.txt | 4 +- Framework/batch-test.json.in | 15 +- .../QualityControl/MonitorObjectCollection.h | 6 +- .../include/QualityControl/RootFileSource.h | 13 +- .../include/QualityControl/RootFileStorage.h | 101 +++++ Framework/include/QualityControl/TaskRunner.h | 3 +- Framework/include/QualityControl/Timekeeper.h | 9 +- .../QualityControl/TimekeeperAsynchronous.h | 6 + .../QualityControl/TimekeeperFactory.h | 2 +- .../QualityControl/TimekeeperSynchronous.h | 1 + Framework/script/o2-qc-batch-test.sh | 27 +- Framework/src/InfrastructureGenerator.cxx | 13 +- Framework/src/MonitorObjectCollection.cxx | 15 + Framework/src/ObjectsManager.cxx | 1 + Framework/src/RootFileSink.cxx | 77 +--- Framework/src/RootFileSource.cxx | 120 +++--- Framework/src/RootFileStorage.cxx | 373 ++++++++++++++++ Framework/src/TaskRunner.cxx | 34 +- Framework/src/TimekeeperAsynchronous.cxx | 56 ++- Framework/src/TimekeeperFactory.cxx | 4 +- Framework/src/TimekeeperSynchronous.cxx | 6 + Framework/src/runFileMerger.cxx | 2 +- .../test/testInfrastructureGenerator.cxx | 6 +- .../test/testMonitorObjectCollection.cxx | 11 +- Framework/test/testRootFileStorage.cxx | 405 ++++++++++++++++++ Framework/test/testTimekeeper.cxx | 14 + doc/Advanced.md | 19 +- 27 files changed, 1157 insertions(+), 186 deletions(-) create mode 100644 Framework/include/QualityControl/RootFileStorage.h create mode 100644 Framework/src/RootFileStorage.cxx create mode 100644 Framework/test/testRootFileStorage.cxx diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index 7d8ba0f264..08a5c7137c 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -95,7 +95,8 @@ add_library(O2QualityControl src/TimekeeperSynchronous.cxx src/TimekeeperAsynchronous.cxx src/WorkflowType.cxx - src/TimekeeperFactory.cxx) + src/TimekeeperFactory.cxx + src/RootFileStorage.cxx) target_include_directories( O2QualityControl @@ -237,6 +238,7 @@ add_executable(o2-qc-test-core test/testPostProcessingRunner.cxx test/testQuality.cxx test/testQualityObject.cxx + test/testRootFileStorage.cxx test/testTaskInterface.cxx test/testTimekeeper.cxx test/testTriggerHelpers.cxx diff --git a/Framework/batch-test.json.in b/Framework/batch-test.json.in index 4158887d6a..bbe715a0be 100644 --- a/Framework/batch-test.json.in +++ b/Framework/batch-test.json.in @@ -32,12 +32,13 @@ "className": "o2::quality_control_modules::skeleton::SkeletonTask", "moduleName": "QcSkeleton", "detectorName": "TST", - "cycleDurationSeconds": "5", + "cycleDurationSeconds": "100", "maxNumberCycles": "-1", "dataSource": { "type": "dataSamplingPolicy", "name": "tst-raw" - } + }, + "movingWindows" : [ "example" ] } }, "checks": { @@ -45,13 +46,19 @@ "active": "true", "className": "o2::quality_control_modules::skeleton::SkeletonCheck", "moduleName": "QcSkeleton", - "policy": "OnAny", + "policy": "OnAll", "detectorName": "TST", "dataSource": [{ "type": "Task", "name": "BatchTestTask@UNIQUE_ID@", "MOs": ["example"] - }] + }, + { + "type": "TaskMovingWindow", + "name": "BatchTestTask@UNIQUE_ID@", + "MOs": ["example"] + } + ] } } }, diff --git a/Framework/include/QualityControl/MonitorObjectCollection.h b/Framework/include/QualityControl/MonitorObjectCollection.h index 3a250a7a0e..7f1db6e4a4 100644 --- a/Framework/include/QualityControl/MonitorObjectCollection.h +++ b/Framework/include/QualityControl/MonitorObjectCollection.h @@ -37,12 +37,16 @@ class MonitorObjectCollection : public TObjArray, public mergers::MergeInterface void setDetector(const std::string&); const std::string& getDetector() const; + void setTaskName(const std::string&); + const std::string& getTaskName() const; + MergeInterface* cloneMovingWindow() const override; private: std::string mDetector = "TST"; + std::string mTaskName = "Test"; - ClassDefOverride(MonitorObjectCollection, 1); + ClassDefOverride(MonitorObjectCollection, 2); }; } // namespace o2::quality_control::core diff --git a/Framework/include/QualityControl/RootFileSource.h b/Framework/include/QualityControl/RootFileSource.h index 469d174703..7e3f28eb27 100644 --- a/Framework/include/QualityControl/RootFileSource.h +++ b/Framework/include/QualityControl/RootFileSource.h @@ -19,10 +19,16 @@ #include #include +#include +#include namespace o2::quality_control::core { +class RootFileStorage; +class IntegralMocWalker; +class MovingWindowMocWalker; + /// \brief A Data Processor which reads MonitorObjectCollections from a specified file class RootFileSource : public framework::Task { @@ -33,10 +39,15 @@ class RootFileSource : public framework::Task void init(framework::InitContext& ictx) override; void run(framework::ProcessingContext& pctx) override; - static framework::OutputLabel outputBinding(const std::string& detectorCode, const std::string& taskName); + static framework::OutputLabel outputBinding(const std::string& detectorCode, const std::string& taskName, bool movingWindow = false); private: std::string mFilePath; + std::vector mAllowedOutputs; + + std::shared_ptr mRootFileManager = nullptr; + std::shared_ptr mIntegralMocWalker = nullptr; + std::shared_ptr mMovingWindowMocWalker = nullptr; }; } // namespace o2::quality_control::core diff --git a/Framework/include/QualityControl/RootFileStorage.h b/Framework/include/QualityControl/RootFileStorage.h new file mode 100644 index 0000000000..3c95a10487 --- /dev/null +++ b/Framework/include/QualityControl/RootFileStorage.h @@ -0,0 +1,101 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file RootFileStorage.h +/// \author Piotr Konopka +/// + +#ifndef QUALITYCONTROL_ROOTFILESTORAGE_H +#define QUALITYCONTROL_ROOTFILESTORAGE_H + +#include +#include +#include +#include + +class TFile; +class TDirectory; + +namespace o2::quality_control::core +{ + +class MonitorObjectCollection; + +/// \brief Manager for storage and retrieval of MonitorObjectCollections in TFiles +class RootFileStorage +{ + public: + struct MonitorObjectCollectionNode { + std::string fullPath{}; + std::string name{}; + MonitorObjectCollection* moc = nullptr; + }; + struct DirectoryNode { + std::string fullPath{}; + std::string name{}; + std::map> children = {}; + }; + + enum class ReadMode { + Read, + Update + }; + + explicit RootFileStorage(const std::string& filePath, ReadMode); + ~RootFileStorage(); + + DirectoryNode readStructure(bool loadObjects = false) const; + MonitorObjectCollection* readMonitorObjectCollection(const std::string& path) const; + + void storeIntegralMOC(MonitorObjectCollection* const moc); + void storeMovingWindowMOC(MonitorObjectCollection* const moc); + + private: + DirectoryNode readStructureImpl(TDirectory* currentDir, bool loadObjects) const; + + private: + TFile* mFile = nullptr; +}; + +/// \brief walks over integral MOC paths in the alphabetical order of detectors and task names +class IntegralMocWalker +{ + public: + explicit IntegralMocWalker(const RootFileStorage::DirectoryNode& rootNode); + + bool hasNextPath(); + std::string nextPath(); + + private: + using child_iterator = decltype(std::declval().children.cbegin()); + std::vector mOrder; + std::vector::const_iterator mPathIterator; +}; + +/// \brief walks over moving window MOC paths in the chronological order +class MovingWindowMocWalker +{ + public: + explicit MovingWindowMocWalker(const RootFileStorage::DirectoryNode& rootNode); + + bool hasNextPath() const; + std::string nextPath(); + + private: + using child_iterator = decltype(std::declval().children.begin()); + std::multimap mOrder; + std::multimap::const_iterator mPathIterator; +}; + +} // namespace o2::quality_control::core + +#endif // QUALITYCONTROL_ROOTFILESTORAGE_H diff --git a/Framework/include/QualityControl/TaskRunner.h b/Framework/include/QualityControl/TaskRunner.h index 43e9d5b1df..dabcf9ec77 100644 --- a/Framework/include/QualityControl/TaskRunner.h +++ b/Framework/include/QualityControl/TaskRunner.h @@ -122,7 +122,8 @@ class TaskRunner : public framework::Task /// \brief Callback for CallbackService::Id::Reset (DPL) a.k.a. RESET DEVICE transition (FairMQ) void reset(); - static std::tuple validateInputs(const framework::InputRecord&); + /// \brief Checks if all the expected data inputs are present in the provided InputRecord + static bool isDataReady(const framework::InputRecord& inputs); void refreshConfig(framework::InitContext& iCtx); void initInfologger(framework::InitContext& iCtx); void printTaskConfig() const; diff --git a/Framework/include/QualityControl/Timekeeper.h b/Framework/include/QualityControl/Timekeeper.h index 732fd649f0..2320240a1c 100644 --- a/Framework/include/QualityControl/Timekeeper.h +++ b/Framework/include/QualityControl/Timekeeper.h @@ -50,7 +50,7 @@ class Timekeeper void setEndOfActivity(validity_time_t ecsTimestamp = 0, validity_time_t configTimestamp = 0, validity_time_t currentTimestamp = 0, std::function ccdbTimestampAccessor = nullptr); - /// \brief asdf + /// \brief sets an accessor to get the number of orbits per TF for the currently processed run void setCCDBOrbitsPerTFAccessor(std::function); /// \brief updates the validity based on the provided timestamp (ms since epoch) @@ -58,6 +58,13 @@ class Timekeeper /// \brief updates the validity based on the provided TF ID virtual void updateByTimeFrameID(uint32_t tfID) = 0; + /// \brief decides if the current cycle should be finished and a new one started + /// + /// This method decides if the current cycle should be finished and a new one started. + /// In case that the cycle should be finished, the updateBy* methods should be called + /// after the cycle end has been done. + virtual bool shouldFinishCycle(const o2::framework::TimingInfo& timingInfo) = 0; + /// \brief resets the state of the mCurrent* counters virtual void reset() = 0; diff --git a/Framework/include/QualityControl/TimekeeperAsynchronous.h b/Framework/include/QualityControl/TimekeeperAsynchronous.h index 19757dc1c5..6d0c4a8b29 100644 --- a/Framework/include/QualityControl/TimekeeperAsynchronous.h +++ b/Framework/include/QualityControl/TimekeeperAsynchronous.h @@ -32,11 +32,17 @@ class TimekeeperAsynchronous : public Timekeeper void updateByTimeFrameID(uint32_t tfID) override; void reset() override; + bool shouldFinishCycle(const o2::framework::TimingInfo& timingInfo) override; + protected: validity_time_t activityBoundarySelectionStrategy(validity_time_t ecsTimestamp, validity_time_t configTimestamp, validity_time_t currentTimestamp, std::function ccdbTimestampAccessor) override; + private: + /// \brief computes validity interval of the provided timeframe ID + ValidityInterval computeTimestampFromTimeframeID(uint32_t tfID); + private: validity_time_t mWindowLengthMs = 0; uint64_t mOrbitsPerTF = 0; diff --git a/Framework/include/QualityControl/TimekeeperFactory.h b/Framework/include/QualityControl/TimekeeperFactory.h index e78afd8aca..34f0b3f39f 100644 --- a/Framework/include/QualityControl/TimekeeperFactory.h +++ b/Framework/include/QualityControl/TimekeeperFactory.h @@ -27,7 +27,7 @@ namespace o2::quality_control::core class TimekeeperFactory { public: - static std::unique_ptr create(framework::DeploymentMode); + static std::unique_ptr create(framework::DeploymentMode, validity_time_t windowLengthMs = 0); static bool needsGRPECS(framework::DeploymentMode); }; diff --git a/Framework/include/QualityControl/TimekeeperSynchronous.h b/Framework/include/QualityControl/TimekeeperSynchronous.h index c855f7d9d9..9b95adac9d 100644 --- a/Framework/include/QualityControl/TimekeeperSynchronous.h +++ b/Framework/include/QualityControl/TimekeeperSynchronous.h @@ -32,6 +32,7 @@ class TimekeeperSynchronous : public Timekeeper void updateByTimeFrameID(uint32_t tfID) override; void reset() override; + bool shouldFinishCycle(const o2::framework::TimingInfo& timingInfo) override; protected: validity_time_t activityBoundarySelectionStrategy(validity_time_t ecsTimestamp, validity_time_t configTimestamp, diff --git a/Framework/script/o2-qc-batch-test.sh b/Framework/script/o2-qc-batch-test.sh index 8ec92a1584..199ef80c0a 100755 --- a/Framework/script/o2-qc-batch-test.sh +++ b/Framework/script/o2-qc-batch-test.sh @@ -29,6 +29,7 @@ function delete_data() { rm -f /tmp/batch_test_mergedB${UNIQUE_ID}.root rm -f /tmp/batch_test_mergedC${UNIQUE_ID}.root rm -f /tmp/batch_test_obj${UNIQUE_ID}.root + rm -f /tmp/batch_test_obj_mw${UNIQUE_ID}.root rm -f /tmp/batch_test_check${UNIQUE_ID}.root } @@ -63,7 +64,7 @@ o2-qc-file-merger --input-files /tmp/batch_test_mergedA${UNIQUE_ID}.root /tmp/ba # Run Checks and Aggregators, publish results to QCDB o2-qc --config json:/${JSON_DIR}/batch-test.json --remote-batch /tmp/batch_test_mergedC${UNIQUE_ID}.root --run -# check MonitorObject +# check the integrated MonitorObject # first the return code must be 200 code=$(curl -L ccdb-test.cern.ch:8080/qc/TST/MO/BatchTestTask${UNIQUE_ID}/example/8000000/PeriodName=LHC9000x/PassName=apass500 --write-out %{http_code} --silent --output /tmp/batch_test_obj${UNIQUE_ID}.root) if (( $code != 200 )); then @@ -87,6 +88,30 @@ then exit 5 fi +# check the moving window MonitorObject +# first the return code must be 200 +code=$(curl -L ccdb-test.cern.ch:8080/qc/TST/MO/BatchTestTask${UNIQUE_ID}/mw/example/8000000/PeriodName=LHC9000x/PassName=apass500 --write-out %{http_code} --silent --output /tmp/batch_test_obj_mw${UNIQUE_ID}.root) +if (( $code != 200 )); then + echo "Error, monitor object of the QC Task could not be found." + delete_data + exit 3 +fi +# try to check that we got a valid root object +root -b -l -q -e 'TFile f("/tmp/batch_test_obj_mw${UNIQUE_ID}.root"); f.Print();' +if (( $? != 0 )); then + echo "Error, monitor object of the QC Task is invalid." + delete_data + exit 4 +fi +# try if it is a non empty histogram +entries=`root -b -l -q -e 'TFile f("/tmp/batch_test_obj_mw${UNIQUE_ID}.root"); TH1F *h = (TH1F*)f.Get("ccdb_object"); cout << h->GetEntries() << endl;' | tail -n 1` +if [ $entries -lt 225 ] 2>/dev/null +then + echo "The histogram of the QC Task has less than 225 (75%) of expected samples." + delete_data + exit 5 +fi + # check QualityObject # first the return code must be 200 code=$(curl -L ccdb-test.cern.ch:8080/qc/TST/QO/BatchTestCheck${UNIQUE_ID}/8000000/PeriodName=LHC9000x/PassName=apass500 --write-out %{http_code} --silent --output /tmp/batch_test_check${UNIQUE_ID}.root) diff --git a/Framework/src/InfrastructureGenerator.cxx b/Framework/src/InfrastructureGenerator.cxx index 67ebf3c7c2..a9028ec22a 100644 --- a/Framework/src/InfrastructureGenerator.cxx +++ b/Framework/src/InfrastructureGenerator.cxx @@ -387,6 +387,13 @@ framework::WorkflowSpec InfrastructureGenerator::generateRemoteBatchInfrastructu auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, 0, 1); fileSourceOutputs.push_back(taskConfig.moSpec); fileSourceOutputs.back().binding = RootFileSource::outputBinding(taskSpec.detectorName, taskSpec.taskName); + // We create an OutputSpec for moving windows for this task only if they are expected. + if (!taskConfig.movingWindows.empty()) { + fileSourceOutputs.push_back( + { RootFileSource::outputBinding(taskSpec.detectorName, taskSpec.taskName, true), + TaskRunner::createTaskDataOrigin(taskSpec.detectorName, true), + TaskRunner::createTaskDataDescription(taskSpec.taskName), 0, Lifetime::Sporadic }); + } } } if (!fileSourceOutputs.empty()) { @@ -632,8 +639,10 @@ void InfrastructureGenerator::generateCheckRunners(framework::WorkflowSpec& work if (taskSpec.active) { InputSpec taskOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic }; tasksOutputMap.insert({ DataSpecUtils::label(taskOutput), taskOutput }); - if (!taskSpec.movingWindows.empty() && taskSpec.location == TaskLocationSpec::Local && - (infrastructureSpec.workflowType == WorkflowType::Remote || infrastructureSpec.workflowType == WorkflowType::FullChain)) { + bool movingWindowsEnabled = !taskSpec.movingWindows.empty(); + bool synchronousRemote = taskSpec.location == TaskLocationSpec::Local && (infrastructureSpec.workflowType == WorkflowType::Remote || infrastructureSpec.workflowType == WorkflowType::FullChain); + bool asynchronousRemote = infrastructureSpec.workflowType == WorkflowType::RemoteBatch; + if (movingWindowsEnabled && (synchronousRemote || asynchronousRemote)) { InputSpec taskMovingWindowOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName, true), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic }; tasksOutputMap.insert({ DataSpecUtils::label(taskMovingWindowOutput), taskMovingWindowOutput }); } diff --git a/Framework/src/MonitorObjectCollection.cxx b/Framework/src/MonitorObjectCollection.cxx index bc10ec7f3a..89c979fba0 100644 --- a/Framework/src/MonitorObjectCollection.cxx +++ b/Framework/src/MonitorObjectCollection.cxx @@ -110,11 +110,22 @@ const std::string& MonitorObjectCollection::getDetector() const return mDetector; } +void MonitorObjectCollection::setTaskName(const std::string& taskName) +{ + mTaskName = taskName; +} + +const std::string& MonitorObjectCollection::getTaskName() const +{ + return mTaskName; +} + MergeInterface* MonitorObjectCollection::cloneMovingWindow() const { auto mw = new MonitorObjectCollection(); mw->SetOwner(true); mw->setDetector(this->getDetector()); + mw->setTaskName(this->getTaskName()); auto mwName = std::string(this->GetName()) + "/mw"; mw->SetName(mwName.c_str()); @@ -129,6 +140,10 @@ MergeInterface* MonitorObjectCollection::cloneMovingWindow() const if (!mo->getCreateMovingWindow()) { continue; } + if (mo->getValidity().isInvalid()) { + ILOG(Warning) << "MonitorObject '" << mo->getName() << "' validity is invalid, will not create a moving window" << ENDM; + continue; + } auto clonedMO = dynamic_cast(mo->Clone()); clonedMO->setTaskName(clonedMO->getTaskName() + "/mw"); clonedMO->setIsOwner(true); diff --git a/Framework/src/ObjectsManager.cxx b/Framework/src/ObjectsManager.cxx index ac4155bd53..3f592a119b 100644 --- a/Framework/src/ObjectsManager.cxx +++ b/Framework/src/ObjectsManager.cxx @@ -42,6 +42,7 @@ ObjectsManager::ObjectsManager(std::string taskName, std::string taskClass, std: mMonitorObjects->SetOwner(true); mMonitorObjects->SetName(mTaskName.c_str()); mMonitorObjects->setDetector(mDetectorName); + mMonitorObjects->setTaskName(mTaskName); // register with the discovery service if (!noDiscovery && !consulUrl.empty()) { diff --git a/Framework/src/RootFileSink.cxx b/Framework/src/RootFileSink.cxx index 3913d30857..0ae4fef772 100644 --- a/Framework/src/RootFileSink.cxx +++ b/Framework/src/RootFileSink.cxx @@ -17,11 +17,13 @@ #include "QualityControl/RootFileSink.h" #include "QualityControl/QcInfoLogger.h" #include "QualityControl/MonitorObjectCollection.h" +#include "QualityControl/MonitorObject.h" +#include "QualityControl/RootFileStorage.h" #include #include #include #include -#include + #if defined(__linux__) && __has_include() #include #endif @@ -36,43 +38,6 @@ RootFileSink::RootFileSink(std::string filePath) { } -TFile* openSinkFile(const std::string& name) -{ - auto file = new TFile(name.c_str(), "UPDATE"); - if (file->IsZombie()) { - throw std::runtime_error("File '" + name + "' is zombie."); - } - if (!file->IsOpen()) { - throw std::runtime_error("Failed to open the file: " + name); - } - if (!file->IsWritable()) { - throw std::runtime_error("File '" + name + "' is not writable."); - } - ILOG(Info) << "Output file '" << name << "' successfully open." << ENDM; - return file; -} - -void closeSinkFile(TFile* file) -{ - if (file != nullptr) { - if (file->IsOpen()) { - ILOG(Info) << "Closing file '" << file->GetName() << "'." << ENDM; - file->Write(); - file->Close(); - } - delete file; - } -} - -void deleteTDirectory(TDirectory* d) -{ - if (d != nullptr) { - d->Write(); - d->Close(); - delete d; - } -}; - void RootFileSink::customizeInfrastructure(std::vector& policies) { auto matcher = [label = RootFileSink::getLabel()](framework::DeviceSpec const& device) { @@ -88,9 +53,8 @@ void RootFileSink::init(framework::InitContext& ictx) void RootFileSink::run(framework::ProcessingContext& pctx) { - TFile* sinkFile = nullptr; try { - sinkFile = openSinkFile(mFilePath); + RootFileStorage mStorage{ mFilePath, RootFileStorage::ReadMode::Update }; for (const auto& input : InputRecordWalker(pctx.inputs())) { auto moc = DataRefUtils::as(input); if (moc == nullptr) { @@ -99,42 +63,19 @@ void RootFileSink::run(framework::ProcessingContext& pctx) } ILOG(Info, Support) << "Received MonitorObjectCollection '" << moc->GetName() << "'" << ENDM; moc->postDeserialization(); + auto mwMOC = dynamic_cast(moc->cloneMovingWindow()); - auto mocName = moc->GetName(); - if (*mocName == '\0') { - ILOG(Error, Support) << "MonitorObjectCollection does not have a name, skipping." << ENDM; - continue; + if (moc->GetEntries() > 0) { + mStorage.storeIntegralMOC(moc.get()); } - auto detector = moc->getDetector(); - - auto detDir = std::unique_ptr(sinkFile->GetDirectory(detector.c_str()), deleteTDirectory); - if (detDir == nullptr) { - ILOG(Info, Devel) << "Creating a new directory '" << detector << "'." << ENDM; - detDir = std::unique_ptr(sinkFile->mkdir(detector.c_str()), deleteTDirectory); - if (detDir == nullptr) { - ILOG(Error, Support) << "Could not create directory '" << detector << "', skipping." << ENDM; - continue; - } + if (mwMOC->GetEntries() > 0) { + mStorage.storeMovingWindowMOC(mwMOC); } - - ILOG(Info, Support) << "Checking for existing objects in the file." << ENDM; - auto storedMOC = std::unique_ptr(detDir->Get(mocName)); - if (storedMOC != nullptr) { - storedMOC->postDeserialization(); - ILOG(Info, Support) << "Merging object '" << moc->GetName() << "' with the existing one in the file." << ENDM; - moc->merge(storedMOC.get()); - } - - auto nbytes = detDir->WriteObject(moc.get(), moc->GetName(), "Overwrite"); - ILOG(Info, Support) << "Object '" << moc->GetName() << "' has been stored in the file (" << nbytes << " bytes)." << ENDM; } - closeSinkFile(sinkFile); } catch (const std::bad_alloc& ex) { ILOG(Error, Ops) << "Caught a bad_alloc exception, there is probably a huge file or object present, but I will try to survive" << ENDM; ILOG(Error, Support) << "Details: " << ex.what() << ENDM; - closeSinkFile(sinkFile); } catch (...) { - closeSinkFile(sinkFile); throw; } diff --git a/Framework/src/RootFileSource.cxx b/Framework/src/RootFileSource.cxx index 1720ec8054..85cd1a9b27 100644 --- a/Framework/src/RootFileSource.cxx +++ b/Framework/src/RootFileSource.cxx @@ -17,6 +17,7 @@ #include "QualityControl/RootFileSource.h" #include "QualityControl/QcInfoLogger.h" #include "QualityControl/MonitorObjectCollection.h" +#include "QualityControl/RootFileStorage.h" #include #include @@ -32,79 +33,90 @@ RootFileSource::RootFileSource(std::string filePath) { } -void RootFileSource::init(framework::InitContext&) -{ -} - -void RootFileSource::run(framework::ProcessingContext& ctx) +void RootFileSource::init(framework::InitContext& ctx) { auto const& deviceSpec = ctx.services().get(); - std::vector allowedOutputs; + mAllowedOutputs.clear(); + mAllowedOutputs.reserve(deviceSpec.outputs.size()); for (const auto& outputRoute : deviceSpec.outputs) { - allowedOutputs.push_back(outputRoute.matcher.binding); + mAllowedOutputs.push_back(outputRoute.matcher.binding); } - auto* file = new TFile(mFilePath.c_str(), "READ"); - if (file->IsZombie()) { - throw std::runtime_error("File '" + mFilePath + "' is zombie."); - } - if (!file->IsOpen()) { - throw std::runtime_error("Failed to open the file: " + mFilePath); + mRootFileManager = std::make_shared(mFilePath, RootFileStorage::ReadMode::Read); + if (mRootFileManager == nullptr) { + ILOG(Fatal, Ops) << "Could not open file '" << mFilePath << "'" << ENDM; + ctx.services().get().endOfStream(); + ctx.services().get().readyToQuit(false); + return; } ILOG(Info) << "Input file '" << mFilePath << "' successfully open." << ENDM; - TIter nextDetector(file->GetListOfKeys()); - TKey* detectorKey; - while ((detectorKey = (TKey*)nextDetector())) { - ILOG(Debug, Devel) << "Going to directory '" << detectorKey->GetName() << "'" << ENDM; - auto detDir = file->GetDirectory(detectorKey->GetName()); - if (detDir == nullptr) { - ILOG(Error) << "Could not get directory '" << detectorKey->GetName() << "', skipping." << ENDM; - continue; + auto fileStructure = mRootFileManager->readStructure(false); + + mIntegralMocWalker = std::make_shared(fileStructure); + mMovingWindowMocWalker = std::make_shared(fileStructure); +} + +void RootFileSource::run(framework::ProcessingContext& ctx) +{ + if (mIntegralMocWalker->hasNextPath()) { + const auto& path = mIntegralMocWalker->nextPath(); + auto moc = mRootFileManager->readMonitorObjectCollection(path); + auto binding = outputBinding(moc->getDetector(), moc->getTaskName(), false); + + if (std::find_if(mAllowedOutputs.begin(), mAllowedOutputs.end(), + [binding](const auto& other) { return other.value == binding.value; }) == mAllowedOutputs.end()) { + ILOG(Error) << "The MonitorObjectCollection '" << binding.value << "' is not among declared output bindings: "; + for (const auto& output : mAllowedOutputs) { + ILOG(Error) << output.value << " "; + } + ILOG(Error) << ", skipping." << ENDM; + return; } + // snapshot does a shallow copy, so we cannot let it delete elements in MOC when it deletes the MOC + moc->SetOwner(false); + ctx.outputs().snapshot(OutputRef{ binding.value, 0 }, *moc); + moc->postDeserialization(); + ILOG(Info) << "Read and published object '" << path << "'" << ENDM; + delete moc; + + return; + } + + if (mMovingWindowMocWalker->hasNextPath()) { + const auto& path = mMovingWindowMocWalker->nextPath(); + auto moc = mRootFileManager->readMonitorObjectCollection(path); + auto binding = outputBinding(moc->getDetector(), moc->getTaskName(), true); - TIter nextMOC(detDir->GetListOfKeys()); - TKey* mocKey; - while ((mocKey = (TKey*)nextMOC())) { - auto storedTObj = detDir->Get(mocKey->GetName()); - if (storedTObj != nullptr) { - auto storedMOC = dynamic_cast(storedTObj); - if (storedMOC == nullptr) { - ILOG(Error) << "Could not cast the stored object to MonitorObjectCollection, skipping." << ENDM; - delete storedTObj; - continue; - } - - auto binding = outputBinding(detectorKey->GetName(), storedMOC->GetName()); - if (std::find_if(allowedOutputs.begin(), allowedOutputs.end(), - [binding](const auto& other) { return other.value == binding.value; }) == allowedOutputs.end()) { - ILOG(Error) << "The MonitorObjectCollection '" << binding.value << "' is not among declared output bindings: "; - for (const auto& output : allowedOutputs) { - ILOG(Error) << output.value << " "; - } - ILOG(Error) << ", skipping." << ENDM; - continue; - } - - // snapshot does a shallow copy, so we cannot let it delete elements in MOC when it deletes the MOC - storedMOC->SetOwner(false); - ctx.outputs().snapshot(OutputRef{ binding.value, 0 }, *storedMOC); - storedMOC->postDeserialization(); - ILOG(Info) << "Read and published object '" << storedMOC->GetName() << "'" << ENDM; + if (std::find_if(mAllowedOutputs.begin(), mAllowedOutputs.end(), + [binding](const auto& other) { return other.value == binding.value; }) == mAllowedOutputs.end()) { + ILOG(Error) << "The MonitorObjectCollection '" << binding.value << "' is not among declared output bindings: "; + for (const auto& output : mAllowedOutputs) { + ILOG(Error) << output.value << " "; } - delete storedTObj; + ILOG(Error) << ", skipping." << ENDM; + return; } + // snapshot does a shallow copy, so we cannot let it delete elements in MOC when it deletes the MOC + moc->SetOwner(false); + ctx.outputs().snapshot(OutputRef{ binding.value, 0 }, *moc); + moc->postDeserialization(); + ILOG(Info) << "Read and published object '" << path << "'" << ENDM; + delete moc; + + return; } - file->Close(); - delete file; + + mRootFileManager.reset(); ctx.services().get().endOfStream(); ctx.services().get().readyToQuit(QuitRequest::Me); } -framework::OutputLabel RootFileSource::outputBinding(const std::string& detectorCode, const std::string& taskName) +framework::OutputLabel + RootFileSource::outputBinding(const std::string& detectorCode, const std::string& taskName, bool movingWindow) { - return { detectorCode + "-" + taskName }; + return movingWindow ? framework::OutputLabel{ detectorCode + "-MW-" + taskName } : framework::OutputLabel{ detectorCode + "-" + taskName }; } } // namespace o2::quality_control::core diff --git a/Framework/src/RootFileStorage.cxx b/Framework/src/RootFileStorage.cxx new file mode 100644 index 0000000000..ecfbc0a5cd --- /dev/null +++ b/Framework/src/RootFileStorage.cxx @@ -0,0 +1,373 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file RootFileStorage.cxx +/// \author Piotr Konopka +/// + +#include "QualityControl/RootFileStorage.h" +#include "QualityControl/QcInfoLogger.h" +#include "QualityControl/MonitorObjectCollection.h" +#include "QualityControl/MonitorObject.h" +#include "QualityControl/ValidityInterval.h" + +#include +#include +#include +#include +#include +#include + +namespace o2::quality_control::core +{ + +constexpr auto integralsDirectoryName = "int"; +constexpr auto movingWindowsDirectoryName = "mw"; + +RootFileStorage::RootFileStorage(const std::string& filePath, ReadMode readMode) +{ + switch (readMode) { + case ReadMode::Update: + mFile = new TFile(filePath.c_str(), "UPDATE"); + break; + case ReadMode::Read: + default: + mFile = new TFile(filePath.c_str(), "READ"); + } + if (mFile->IsZombie()) { + throw std::runtime_error("File '" + filePath + "' is zombie."); + } + if (!mFile->IsOpen()) { + throw std::runtime_error("Failed to open the file: " + filePath); + } + if (readMode == ReadMode::Update && !mFile->IsWritable()) { + throw std::runtime_error("File '" + filePath + "' is not writable."); + } + ILOG(Info) << "Output file '" << filePath << "' successfully open." << ENDM; +} + +RootFileStorage::DirectoryNode RootFileStorage::readStructure(bool loadObjects) const +{ + return readStructureImpl(mFile, loadObjects); +} + +RootFileStorage::DirectoryNode RootFileStorage::readStructureImpl(TDirectory* currentDir, bool loadObjects) const +{ + auto fullPath = currentDir->GetPath(); + auto pathToPos = std::strstr(fullPath, ":/"); + if (pathToPos == nullptr) { + ILOG(Error, Support) << "Could not extract path to node in string '" << currentDir->GetPath() << "', skipping" << ENDM; + return {}; + } + DirectoryNode currentNode{ pathToPos + 2, currentDir->GetName() }; + + TIter nextKey(currentDir->GetListOfKeys()); + TKey* key; + while ((key = (TKey*)nextKey())) { + if (!loadObjects && std::strcmp(key->GetClassName(), MonitorObjectCollection::Class_Name()) == 0) { + std::string mocPath = currentNode.fullPath + std::filesystem::path::preferred_separator + key->GetName(); + currentNode.children[key->GetName()] = MonitorObjectCollectionNode{ mocPath, key->GetName() }; + continue; + } + + ILOG(Debug, Devel) << "Getting the value for key '" << key->GetName() << "'" << ENDM; + auto* value = currentDir->Get(key->GetName()); + if (value == nullptr) { + ILOG(Error) << "Could not get the value '" << key->GetName() << "', skipping." << ENDM; + continue; + } + if (auto moc = dynamic_cast(value)) { + moc->postDeserialization(); + std::string mocPath = currentNode.fullPath + std::filesystem::path::preferred_separator + key->GetName(); + currentNode.children[moc->GetName()] = MonitorObjectCollectionNode{ mocPath, key->GetName(), moc }; + ILOG(Debug, Support) << "Read object '" << moc->GetName() << "' in path '" << currentNode.fullPath << "'" << ENDM; + } else if (auto childDir = dynamic_cast(value)) { + currentNode.children[key->GetName()] = readStructureImpl(childDir, loadObjects); + } else { + ILOG(Warning, Support) << "Could not cast the node to MonitorObjectCollection nor TDirectory, skipping." << ENDM; + delete value; + continue; + } + } + + return currentNode; +} + +MonitorObjectCollection* RootFileStorage::readMonitorObjectCollection(const std::string& path) const +{ + auto storedTObj = mFile->Get(path.c_str()); + if (storedTObj == nullptr) { + ILOG(Error, Ops) << "Could not read object '" << path << "'" << ENDM; + return nullptr; + } + auto storedMOC = dynamic_cast(storedTObj); + if (storedMOC == nullptr) { + ILOG(Error, Ops) << "Could not cast the stored object to MonitorObjectCollection" << ENDM; + delete storedTObj; + } + return storedMOC; +} + +RootFileStorage::~RootFileStorage() +{ + if (mFile != nullptr) { + if (mFile->IsOpen()) { + ILOG(Info, Support) << "Closing file '" << mFile->GetName() << "'." << ENDM; + mFile->Write(); + mFile->Close(); + } + delete mFile; + } +} + +void deleteTDirectory(TDirectory* d) +{ + if (d != nullptr) { + d->Write(); + d->Close(); + delete d; + } +} + +std::unique_ptr getOrCreateDirectory(TDirectory* parentDir, const char* dirName) +{ + auto dir = std::unique_ptr(parentDir->GetDirectory(dirName), deleteTDirectory); + if (dir == nullptr) { + ILOG(Debug, Support) << "Creating a new directory '" << dirName << "'." << ENDM; + dir = std::unique_ptr(parentDir->mkdir(dirName), deleteTDirectory); + } + return dir; +} + +validity_time_t earliestValidFrom(const MonitorObjectCollection* moc) +{ + validity_time_t earliest = std::numeric_limits::max(); + for (const auto& obj : *moc) { + if (auto mo = dynamic_cast(obj)) { + earliest = std::min(mo->getValidity().getMin(), earliest); + } + } + return earliest; +} + +bool validObjectValidities(const MonitorObjectCollection* moc) +{ + for (const auto& obj : *moc) { + if (auto mo = dynamic_cast(obj); mo->getValidity().isInvalid()) { + return false; + } + } + return true; +} + +// fixme we should not have to change the name! +void RootFileStorage::storeIntegralMOC(MonitorObjectCollection* const moc) +{ + const auto& mocStorageName = moc->getTaskName(); + if (mocStorageName.empty()) { + ILOG(Error, Support) << "taskName empty, not storing MOC '" << moc->GetName() << "' for detector '" << moc->getDetector() << "'" << ENDM; + return; + } + moc->SetName(mocStorageName.c_str()); + // directory level: int + auto integralDir = getOrCreateDirectory(mFile, integralsDirectoryName); + if (integralDir == nullptr) { + ILOG(Error, Support) << "Could not create the directory '" << integralsDirectoryName << "', skipping." << ENDM; + return; + } + + // directory level: int/DET + auto detector = moc->getDetector(); + auto detDir = getOrCreateDirectory(integralDir.get(), detector.c_str()); + if (detDir == nullptr) { + ILOG(Error, Support) << "Could not create directory '" << detector << "', skipping." << ENDM; + return; + } + + // directory level: int/DET/TASK + ILOG(Debug, Support) << "Checking for existing objects in the file." << ENDM; + int nbytes = 0; + auto storedMOC = std::unique_ptr(detDir->Get(mocStorageName.c_str())); + if (storedMOC != nullptr) { + storedMOC->postDeserialization(); + ILOG(Info, Support) << "Merging objects for task '" << detector << "/" << moc->getTaskName() << "' with the existing ones in the file." << ENDM; + storedMOC->merge(moc); + nbytes = detDir->WriteObject(storedMOC.get(), storedMOC->GetName(), "Overwrite"); + } else { + ILOG(Info, Support) << "Storing objects for task '" << detector << "/" << moc->getTaskName() << "' in the file." << ENDM; + nbytes = detDir->WriteObject(moc, moc->GetName(), "Overwrite"); + } + ILOG(Info, Support) << "Integrated objects '" << moc->GetName() << "' have been stored in the file (" << nbytes << " bytes)." << ENDM; +} + +void RootFileStorage::storeMovingWindowMOC(MonitorObjectCollection* const moc) +{ + if (moc->GetEntries() == 0) { + ILOG(Warning, Support) << "The provided MonitorObjectCollection '" << moc->GetName() << "' is empty, will not store." << ENDM; + return; + } + if (!validObjectValidities(moc)) { + // this should not happen, because we have a protection in MonitorObjectCollection::cloneMovingWindow() against it. + // thus, we should raise some concern if this occurs anyway. + ILOG(Warning, Ops) << "The provided MonitorObjectCollection '" << moc->GetName() << "' contains at least one object with invalid validity!!!" << ENDM; + } + // directory level: mw + auto mwDir = getOrCreateDirectory(mFile, movingWindowsDirectoryName); + if (mwDir == nullptr) { + ILOG(Error, Support) << "Could not create the directory '" << movingWindowsDirectoryName << "', skipping." << ENDM; + return; + } + + // directory level: mw/DET + auto detector = moc->getDetector(); + auto detDir = getOrCreateDirectory(mwDir.get(), detector.c_str()); + if (detDir == nullptr) { + ILOG(Error, Support) << "Could not create directory '" << detector << "', skipping." << ENDM; + return; + } + + // directory level: mw/DET/TASK + auto taskDir = getOrCreateDirectory(detDir.get(), moc->getTaskName().c_str()); + if (taskDir == nullptr) { + ILOG(Error, Support) << "Could not create directory '" << moc->getTaskName() << "', skipping." << ENDM; + return; + } + + // directory level: mw/DET/TASK/ + auto mocStorageName = std::to_string(earliestValidFrom(moc)); + moc->SetName(mocStorageName.c_str()); + ILOG(Info, Support) << "Checking for existing moving windows '" << mocStorageName << "' for task '" << detector << "/" << moc->getTaskName() << "' in the file." << ENDM; + int nbytes = 0; + auto storedMOC = std::unique_ptr(taskDir->Get(mocStorageName.c_str())); + if (storedMOC != nullptr) { + storedMOC->postDeserialization(); + ILOG(Info, Support) << "Merging moving windows '" << moc->GetName() << "' for task '" << moc->getDetector() << "/" << moc->getTaskName() << "' with the existing one in the file." << ENDM; + storedMOC->merge(moc); + nbytes = taskDir->WriteObject(storedMOC.get(), storedMOC->GetName(), "Overwrite"); + } else { + ILOG(Info, Support) << "Storing moving windows '" << moc->GetName() << "' for task '" << moc->getDetector() << "/" << moc->getTaskName() << "' in the file." << ENDM; + nbytes = taskDir->WriteObject(moc, moc->GetName(), "Overwrite"); + } + ILOG(Info, Support) << "Moving windows '" << moc->GetName() << "' for task '" << detector << "/" << moc->getTaskName() << "' has been stored in the file (" << nbytes << " bytes)." << ENDM; +} + +IntegralMocWalker::IntegralMocWalker(const RootFileStorage::DirectoryNode& rootNode) +{ + auto integralDirIt = rootNode.children.find(integralsDirectoryName); + if (integralDirIt == rootNode.children.end()) { + mPathIterator = mOrder.cbegin(); + return; + } + if (!std::holds_alternative(integralDirIt->second)) { + mPathIterator = mOrder.cbegin(); + return; + } + const auto& integralMocNode = std::get(integralDirIt->second); + std::stack> stack{}; + stack.push({ integralMocNode, integralMocNode.children.cbegin() }); + + while (!stack.empty()) { + auto& [currentNode, childIt] = stack.top(); + if (childIt == currentNode.children.end()) { + // move to the next child of the parent node + stack.pop(); + if (!stack.empty()) { + stack.top().second++; + } + } else if (std::holds_alternative(childIt->second)) { + // move to a child of the current node + const auto& childNode = std::get(childIt->second); + stack.push({ childNode, childNode.children.cbegin() }); + } else if (std::holds_alternative(childIt->second)) { + // move to the next child in the currentNode and return a path + const auto& childNode = std::get(childIt->second); + ++childIt; + mOrder.push_back(childNode.fullPath); + } else { + // unrecognized child node, move to the next child in the currentNode + ++childIt; + } + } + mPathIterator = mOrder.cbegin(); +} + +bool IntegralMocWalker::hasNextPath() +{ + return mPathIterator != mOrder.cend(); +} + +std::string IntegralMocWalker::nextPath() +{ + if (hasNextPath()) { + return *mPathIterator++; + } + return {}; +} + +MovingWindowMocWalker::MovingWindowMocWalker(const RootFileStorage::DirectoryNode& rootNode) +{ + auto movingWindowDirIt = rootNode.children.find(movingWindowsDirectoryName); + if (movingWindowDirIt == rootNode.children.end()) { + mPathIterator = mOrder.cbegin(); + return; + } + if (!std::holds_alternative(movingWindowDirIt->second)) { + mPathIterator = mOrder.cbegin(); + return; + } + auto movingWindowMocNode = std::get(movingWindowDirIt->second); + + std::stack> stack{}; + stack.push({ movingWindowMocNode, movingWindowMocNode.children.begin() }); + + // we walk over all the MOCs in the tree and we save them in chronological order + while (!stack.empty()) { + auto& [currentNode, childIt] = stack.top(); + + if (childIt == currentNode.children.end()) { + // move to the next child of the parent node + stack.pop(); + if (!stack.empty()) { + stack.top().second++; + } + } else if (std::holds_alternative(childIt->second)) { + // move to a child of the current node + auto& childNode = std::get(childIt->second); + stack.push({ childNode, childNode.children.begin() }); + } else if (std::holds_alternative(childIt->second)) { + // move to the next child in the currentNode and return a path + auto timestamp = std::stoull(childIt->first); + auto& childNode = std::get(childIt->second); + mOrder.emplace(timestamp, childNode.fullPath); + childIt++; + } else { + // unrecognized child node, move to the next child in the currentNode + childIt++; + } + } + mPathIterator = mOrder.cbegin(); +} + +bool MovingWindowMocWalker::hasNextPath() const +{ + return mPathIterator != mOrder.cend(); +} + +std::string MovingWindowMocWalker::nextPath() +{ + if (hasNextPath()) { + return (mPathIterator++)->second; + } + return {}; +} + +} // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/src/TaskRunner.cxx b/Framework/src/TaskRunner.cxx index 35c75ae803..9cd97fa4e8 100644 --- a/Framework/src/TaskRunner.cxx +++ b/Framework/src/TaskRunner.cxx @@ -168,8 +168,12 @@ void TaskRunner::init(InitContext& iCtx) // setup timekeeping mDeploymentMode = DefaultsHelpers::deploymentMode(); - mTimekeeper = TimekeeperFactory::create(mDeploymentMode); - mTimekeeper->setCCDBOrbitsPerTFAccessor([]() { return o2::base::GRPGeomHelper::getNHBFPerTF(); }); + mTimekeeper = TimekeeperFactory::create(mDeploymentMode, mTaskConfig.cycleDurations.back().first * 1000); + mTimekeeper->setCCDBOrbitsPerTFAccessor([]() { + // getNHBFPerTF() returns 128 if it does not know, which can be very misleading. + // instead we use 0, which will trigger another try when processing another timeslice. + return o2::base::GRPGeomHelper::instance().getGRPECS() != nullptr ? o2::base::GRPGeomHelper::getNHBFPerTF() : 0; + }); // setup user's task mTask.reset(TaskFactory::create(mTaskConfig, mObjectsManager)); @@ -208,15 +212,7 @@ void TaskRunner::run(ProcessingContext& pCtx) GRPGeomHelper::instance().checkUpdates(pCtx); } - auto [dataReady, timerReady] = validateInputs(pCtx.inputs()); - - if (dataReady) { - mTimekeeper->updateByTimeFrameID(pCtx.services().get().tfCounter); - mTask->monitorData(pCtx); - updateMonitoringStats(pCtx); - } - - if (timerReady) { + if (mTimekeeper->shouldFinishCycle(pCtx.services().get())) { mTimekeeper->updateByCurrentTimestamp(pCtx.services().get().timeslice / 1000); finishCycle(pCtx.outputs()); if (mTaskConfig.resetAfterCycles > 0 && (mCycleNumber % mTaskConfig.resetAfterCycles == 0)) { @@ -229,6 +225,12 @@ void TaskRunner::run(ProcessingContext& pCtx) mNoMoreCycles = true; } } + + if (isDataReady(pCtx.inputs())) { + mTimekeeper->updateByTimeFrameID(pCtx.services().get().tfCounter); + mTask->monitorData(pCtx); + updateMonitoringStats(pCtx); + } } void TaskRunner::finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) @@ -402,10 +404,9 @@ void TaskRunner::reset() } } -std::tuple TaskRunner::validateInputs(const framework::InputRecord& inputs) +bool TaskRunner::isDataReady(const framework::InputRecord& inputs) { size_t dataInputsPresent = 0; - bool timerReady = false; for (auto& input : inputs) { if (input.header != nullptr) { @@ -413,16 +414,13 @@ std::tuple TaskRunner::validateInputs const auto* dataHeader = get(input.header); assert(dataHeader); - if (!strncmp(dataHeader->dataDescription.str, "TIMER", 5)) { - timerReady = true; - } else { + if (strncmp(dataHeader->dataDescription.str, "TIMER", 5)) { dataInputsPresent++; } } } - bool dataReady = dataInputsPresent == inputs.size() - 1; - return { dataReady, timerReady }; + return dataInputsPresent == inputs.size() - 1; } void TaskRunner::printTaskConfig() const diff --git a/Framework/src/TimekeeperAsynchronous.cxx b/Framework/src/TimekeeperAsynchronous.cxx index 81c92fa237..ef1c66f856 100644 --- a/Framework/src/TimekeeperAsynchronous.cxx +++ b/Framework/src/TimekeeperAsynchronous.cxx @@ -18,6 +18,7 @@ #include "QualityControl/QcInfoLogger.h" #include +#include namespace o2::quality_control::core { @@ -50,27 +51,14 @@ void TimekeeperAsynchronous::updateByTimeFrameID(uint32_t tfid) return; } - if (mOrbitsPerTF == 0) { - if (auto accessor = getCCDBOrbitsPerTFAccessor()) { - mOrbitsPerTF = accessor(); - } else { - ILOG(Error, Ops) << "CCDB OrbitsPerTF accessor is not available" << ENDM; - } - if (mOrbitsPerTF == 0) { - ILOG(Error, Ops) << "nHBFperTF from CCDB GRP is 0, object validity will be incorrect" << ENDM; - } - } - - auto tfDurationMs = constants::lhc::LHCOrbitNS / 1000000 * mOrbitsPerTF; - auto tfStart = static_cast(mActivityDuration.getMin() + tfDurationMs * (tfid - 1)); - auto tfEnd = static_cast(mActivityDuration.getMin() + tfDurationMs * tfid - 1); - mCurrentSampleTimespan.update(tfStart); - mCurrentSampleTimespan.update(tfEnd); + auto tfValidity = computeTimestampFromTimeframeID(tfid); + mCurrentSampleTimespan.update(tfValidity.getMin()); + mCurrentSampleTimespan.update(tfValidity.getMax()); mCurrentTimeframeIdRange.update(tfid); - if (mActivityDuration.isOutside(tfStart)) { - ILOG(Warning, Support) << "Timestamp " << tfStart << " is outside of the assumed run duration (" + if (mActivityDuration.isOutside(tfValidity.getMin())) { + ILOG(Warning, Support) << "Timestamp " << tfValidity.getMin() << " is outside of the assumed run duration (" << mActivityDuration.getMin() << ", " << mActivityDuration.getMax() << ")" << ENDM; return; } @@ -78,7 +66,7 @@ void TimekeeperAsynchronous::updateByTimeFrameID(uint32_t tfid) if (mWindowLengthMs == 0) { mCurrentValidityTimespan = mActivityDuration; } else { - size_t subdivisionIdx = (tfStart - mActivityDuration.getMin()) / mWindowLengthMs; + size_t subdivisionIdx = (tfValidity.getMin() - mActivityDuration.getMin()) / mWindowLengthMs; size_t fullSubdivisions = mActivityDuration.delta() / mWindowLengthMs; if (subdivisionIdx < fullSubdivisions - 1) { mCurrentValidityTimespan.update(mActivityDuration.getMin() + subdivisionIdx * mWindowLengthMs); @@ -128,4 +116,34 @@ validity_time_t return selected; } +bool TimekeeperAsynchronous::shouldFinishCycle(const framework::TimingInfo& timingInfo) +{ + // we should start a new window whenever the new data falls outside of the current one. + // if the window covers the whole run, there is never a reason to finish before we receive an end of stream + return mCurrentValidityTimespan.isValid() && + mWindowLengthMs != 0 && + !timingInfo.isTimer() && + mCurrentValidityTimespan.isOutside(computeTimestampFromTimeframeID(timingInfo.tfCounter).getMin()); +} + +ValidityInterval TimekeeperAsynchronous::computeTimestampFromTimeframeID(uint32_t tfid) +{ + if (mOrbitsPerTF == 0) { + if (auto accessor = getCCDBOrbitsPerTFAccessor()) { + mOrbitsPerTF = accessor(); + ILOG(Info, Support) << "Got nOrbitsPerTF " << mOrbitsPerTF << " for TF " << tfid << ENDM; + } else { + ILOG(Error, Ops) << "CCDB OrbitsPerTF accessor is not available" << ENDM; + } + if (mOrbitsPerTF == 0) { + ILOG(Error, Ops) << "nHBFperTF from CCDB GRP is 0, object validity will be incorrect" << ENDM; + } + } + + auto tfDurationMs = constants::lhc::LHCOrbitNS / 1000000 * mOrbitsPerTF; + auto tfStart = static_cast(mActivityDuration.getMin() + tfDurationMs * (tfid - 1)); + auto tfEnd = static_cast(mActivityDuration.getMin() + tfDurationMs * tfid - 1); + return { tfStart, tfEnd }; +} + } // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/src/TimekeeperFactory.cxx b/Framework/src/TimekeeperFactory.cxx index 42b6dceb5c..74bd1d395b 100644 --- a/Framework/src/TimekeeperFactory.cxx +++ b/Framework/src/TimekeeperFactory.cxx @@ -24,12 +24,12 @@ using namespace o2::framework; namespace o2::quality_control::core { -std::unique_ptr TimekeeperFactory::create(framework::DeploymentMode deploymentMode) +std::unique_ptr TimekeeperFactory::create(framework::DeploymentMode deploymentMode, validity_time_t windowLengthMs) { switch (deploymentMode) { case DeploymentMode::Grid: { ILOG(Info, Devel) << "Detected async deployment, object validity will be based on incoming data and available SOR/EOR times" << ENDM; - return std::make_unique(); + return std::make_unique(windowLengthMs); break; } case DeploymentMode::Local: diff --git a/Framework/src/TimekeeperSynchronous.cxx b/Framework/src/TimekeeperSynchronous.cxx index 679b08c911..d1a0c281e7 100644 --- a/Framework/src/TimekeeperSynchronous.cxx +++ b/Framework/src/TimekeeperSynchronous.cxx @@ -18,6 +18,7 @@ #include "QualityControl/QcInfoLogger.h" #include +#include namespace o2::quality_control::core { @@ -98,4 +99,9 @@ validity_time_t return selected; } +bool TimekeeperSynchronous::shouldFinishCycle(const framework::TimingInfo& timingInfo) +{ + return timingInfo.isTimer(); +} + } // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/src/runFileMerger.cxx b/Framework/src/runFileMerger.cxx index 21384263ee..4a366887ad 100644 --- a/Framework/src/runFileMerger.cxx +++ b/Framework/src/runFileMerger.cxx @@ -16,7 +16,6 @@ /// \brief This is an executable which reads MonitorObjectCollections from files and creates a file with the merged result. #include "QualityControl/QcInfoLogger.h" -#include "QualityControl/MonitorObject.h" #include "QualityControl/MonitorObjectCollection.h" #include @@ -40,6 +39,7 @@ struct overloaded : Ts... { template overloaded(Ts...) -> overloaded; +// TODO: this structures and Nodes in RootFileStorage could be merged as a refactoring effort struct Node { std::string pathTo{}; std::string name{}; diff --git a/Framework/test/testInfrastructureGenerator.cxx b/Framework/test/testInfrastructureGenerator.cxx index 9b493f2b07..6625a7ff08 100644 --- a/Framework/test/testInfrastructureGenerator.cxx +++ b/Framework/test/testInfrastructureGenerator.cxx @@ -375,14 +375,14 @@ TEST_CASE("qc_infrastructure_remote_batch_test") auto configTree = configInterface->getRecursive(); auto workflow = InfrastructureGenerator::generateRemoteBatchInfrastructure(configTree, "file.root"); - REQUIRE(workflow.size() == 8); + REQUIRE(workflow.size() == 9); auto fileReader = std::find_if( workflow.begin(), workflow.end(), [](const DataProcessorSpec& d) { return d.name == "qc-root-file-source" && d.inputs.size() == 0 && - d.outputs.size() == 4; + d.outputs.size() == 5; }); CHECK(fileReader != workflow.end()); @@ -392,7 +392,7 @@ TEST_CASE("qc_infrastructure_remote_batch_test") return d.name.find("qc-check") != std::string::npos && d.inputs.size() == 1; }); - REQUIRE(checkRunnerCount == 5); + REQUIRE(checkRunnerCount == 6); auto postprocessingTask = std::find_if( workflow.begin(), workflow.end(), diff --git a/Framework/test/testMonitorObjectCollection.cxx b/Framework/test/testMonitorObjectCollection.cxx index c73edb9fa5..8361c916a5 100644 --- a/Framework/test/testMonitorObjectCollection.cxx +++ b/Framework/test/testMonitorObjectCollection.cxx @@ -135,6 +135,7 @@ TEST_CASE("monitor_object_collection_clone_mw") MonitorObject* moTH1I = new MonitorObject(objTH1I, "histo 1d", "class", "DET"); moTH1I->setIsOwner(false); moTH1I->setCreateMovingWindow(true); + moTH1I->setValidity({ 10, 432 }); moc->Add(moTH1I); TH2I* objTH2I = new TH2I("histo 2d", "histo 2d", bins, min, max, bins, min, max); @@ -143,7 +144,6 @@ TEST_CASE("monitor_object_collection_clone_mw") moc->Add(moTH2I); auto mwMergeInterface = moc->cloneMovingWindow(); - delete moc; REQUIRE(mwMergeInterface != nullptr); auto mwMOC = dynamic_cast(mwMergeInterface); @@ -158,7 +158,16 @@ TEST_CASE("monitor_object_collection_clone_mw") REQUIRE(mwTH1I != nullptr); CHECK(mwTH1I->GetBinContent(mwTH1I->FindBin(5)) == 1); + moTH1I->setValidity(gInvalidValidityInterval); + auto mwMergeInterface2 = moc->cloneMovingWindow(); + REQUIRE(mwMergeInterface2 != nullptr); + auto mwMOC2 = dynamic_cast(mwMergeInterface2); + REQUIRE(mwMOC2 != nullptr); + REQUIRE(mwMOC2->GetEntries() == 0); + + delete moc; delete mwMOC; + delete mwMOC2; } } // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/test/testRootFileStorage.cxx b/Framework/test/testRootFileStorage.cxx new file mode 100644 index 0000000000..592700ce18 --- /dev/null +++ b/Framework/test/testRootFileStorage.cxx @@ -0,0 +1,405 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file testRootFileStorage.cxx +/// \author Piotr Konopka +/// + +#include "QualityControl/RootFileStorage.h" +#include "QualityControl/MonitorObjectCollection.h" +#include "QualityControl/MonitorObject.h" +#include "QualityControl/QcInfoLogger.h" + +#include +#include +#include + +using namespace o2::quality_control::core; + +struct TestFileFixture { + TestFileFixture(const std::string& testCase) + { + filePath = "/tmp/qc_test_root_file_storage_" + testCase + "_" + std::to_string(getpid()) + ".root"; + if (!filePath.empty()) { + std::filesystem::remove(filePath); + } + } + + ~TestFileFixture() + { + if (!filePath.empty()) { + std::filesystem::remove(filePath); + } + } + + std::string filePath; +}; + +const size_t bins = 10; +const size_t min = 0; +const size_t max = 10; + +TEST_CASE("int_write_read") +{ + // the fixture will do the cleanup when being destroyed only after any file readers are destroyed earlier + TestFileFixture fixture("int_write_read"); + + MonitorObjectCollection* mocBefore = new MonitorObjectCollection(); + mocBefore->SetOwner(true); + + TH1I* histoBefore = new TH1I("histo 1d", "histo 1d", bins, min, max); + histoBefore->Fill(5); + MonitorObject* moHistoBefore = new MonitorObject(histoBefore, "histo 1d", "class", "DET"); + moHistoBefore->setActivity({ 300000, 1, "LHC32x", "apass2", "qc_async", { 100, 300 } }); + moHistoBefore->setIsOwner(true); + mocBefore->Add(moHistoBefore); + { + RootFileStorage storage(fixture.filePath, RootFileStorage::ReadMode::Update); + REQUIRE(storage.readStructure(false).children.empty()); + + // store and read back, check results + { + REQUIRE_NOTHROW(storage.storeIntegralMOC(mocBefore)); + auto mocAfter = storage.readMonitorObjectCollection("int/TST/Test"); + REQUIRE(mocAfter != nullptr); + + REQUIRE(mocBefore->GetEntries() == mocAfter->GetEntries()); + auto moHistoAfter = dynamic_cast(mocAfter->At(0)); + REQUIRE(moHistoAfter != nullptr); + + CHECK(moHistoAfter->getActivity() == moHistoBefore->getActivity()); + REQUIRE(moHistoAfter->getObject() != nullptr); + auto histoAfter = dynamic_cast(moHistoAfter->getObject()); + CHECK(histoAfter->GetBinContent(histoAfter->FindBin(5)) == 1); + CHECK(histoAfter->GetSum() == 1); + } + // merge mocBefore into the existing file, check results + { + REQUIRE_NOTHROW(storage.storeIntegralMOC(mocBefore)); + auto mocAfter = storage.readMonitorObjectCollection("int/TST/Test"); + REQUIRE(mocAfter != nullptr); + + REQUIRE(mocBefore->GetEntries() == mocAfter->GetEntries()); + auto moHistoAfter = dynamic_cast(mocAfter->At(0)); + REQUIRE(moHistoAfter != nullptr); + + CHECK(moHistoAfter->getActivity() == moHistoBefore->getActivity()); + REQUIRE(moHistoAfter->getObject() != nullptr); + auto histoAfter = dynamic_cast(moHistoAfter->getObject()); + CHECK(histoAfter->GetBinContent(histoAfter->FindBin(5)) == 2); + CHECK(histoAfter->GetSum() == 2); + } + } + + // close and reopen the file, then merge again, check results + { + RootFileStorage storage(fixture.filePath, RootFileStorage::ReadMode::Update); + REQUIRE_NOTHROW(storage.storeIntegralMOC(mocBefore)); + auto mocAfter = storage.readMonitorObjectCollection("int/TST/Test"); + REQUIRE(mocAfter != nullptr); + + REQUIRE(mocBefore->GetEntries() == mocAfter->GetEntries()); + auto moHistoAfter = dynamic_cast(mocAfter->At(0)); + REQUIRE(moHistoAfter != nullptr); + + CHECK(moHistoAfter->getActivity() == moHistoBefore->getActivity()); + REQUIRE(moHistoAfter->getObject() != nullptr); + auto histoAfter = dynamic_cast(moHistoAfter->getObject()); + CHECK(histoAfter->GetBinContent(histoAfter->FindBin(5)) == 3.0); + CHECK(histoAfter->GetSum() == 3.0); + } +} + +TEST_CASE("mw_write_read") +{ + // the fixture will do the cleanup when being destroyed only after any file readers are destroyed earlier + TestFileFixture fixture("mw_write_read"); + + MonitorObjectCollection* mocBefore = new MonitorObjectCollection(); + mocBefore->SetOwner(true); + + TH1I* histoBefore = new TH1I("histo 1d", "histo 1d", bins, min, max); + histoBefore->Fill(5); + MonitorObject* moHistoBefore = new MonitorObject(histoBefore, "histo 1d", "class", "DET"); + moHistoBefore->setActivity({ 300000, 1, "LHC32x", "apass2", "qc_async", { 100, 300 } }); + moHistoBefore->setIsOwner(true); + mocBefore->Add(moHistoBefore); + { + RootFileStorage storage(fixture.filePath, RootFileStorage::ReadMode::Update); + REQUIRE(storage.readStructure(false).children.empty()); + + // store and read back, check results + { + REQUIRE_NOTHROW(storage.storeMovingWindowMOC(mocBefore)); + auto mocAfter = storage.readMonitorObjectCollection("mw/TST/Test/100"); + REQUIRE(mocAfter != nullptr); + + REQUIRE(mocBefore->GetEntries() == mocAfter->GetEntries()); + auto moHistoAfter = dynamic_cast(mocAfter->At(0)); + REQUIRE(moHistoAfter != nullptr); + + CHECK(moHistoAfter->getActivity() == moHistoBefore->getActivity()); + REQUIRE(moHistoAfter->getObject() != nullptr); + auto histoAfter = dynamic_cast(moHistoAfter->getObject()); + CHECK(histoAfter->GetBinContent(histoAfter->FindBin(5)) == 1); + CHECK(histoAfter->GetSum() == 1); + } + // merge mocBefore into the existing file, check results + { + // extend the validity forward, start stays the same. + moHistoBefore->setValidity({ 100, 500 }); + REQUIRE_NOTHROW(storage.storeMovingWindowMOC(mocBefore)); + auto mocAfter = storage.readMonitorObjectCollection("mw/TST/Test/100"); + REQUIRE(mocAfter != nullptr); + + REQUIRE(mocBefore->GetEntries() == mocAfter->GetEntries()); + auto moHistoAfter = dynamic_cast(mocAfter->At(0)); + REQUIRE(moHistoAfter != nullptr); + + CHECK(moHistoAfter->getActivity() == moHistoBefore->getActivity()); + REQUIRE(moHistoAfter->getObject() != nullptr); + auto histoAfter = dynamic_cast(moHistoAfter->getObject()); + CHECK(histoAfter->GetBinContent(histoAfter->FindBin(5)) == 2); + CHECK(histoAfter->GetSum() == 2); + } + } + + // move the validity to the future, a new object should be stored in the file + moHistoBefore->setValidity({ 300, 500 }); + // close and reopen the file, then merge again, check results + { + RootFileStorage storage(fixture.filePath, RootFileStorage::ReadMode::Update); + REQUIRE_NOTHROW(storage.storeMovingWindowMOC(mocBefore)); + auto mocAfter = storage.readMonitorObjectCollection("mw/TST/Test/300"); + REQUIRE(mocAfter != nullptr); + + REQUIRE(mocBefore->GetEntries() == mocAfter->GetEntries()); + auto moHistoAfter = dynamic_cast(mocAfter->At(0)); + REQUIRE(moHistoAfter != nullptr); + + CHECK(moHistoAfter->getActivity() == moHistoBefore->getActivity()); + REQUIRE(moHistoAfter->getObject() != nullptr); + auto histoAfter = dynamic_cast(moHistoAfter->getObject()); + CHECK(histoAfter->GetBinContent(histoAfter->FindBin(5)) == 1.0); + CHECK(histoAfter->GetSum() == 1.0); + } +} + +TEST_CASE("read_structure") +{ + // the fixture will do the cleanup when being destroyed only after any file readers are destroyed earlier + TestFileFixture fixture("read_structure"); + + MonitorObjectCollection* moc = new MonitorObjectCollection(); + moc->SetOwner(true); + moc->setDetector("TST"); + + TH1I* histo1 = new TH1I("histo 1", "histo 1", bins, min, max); + histo1->Fill(5); + MonitorObject* moHisto1 = new MonitorObject(histo1, "histo 1", "class", "DET"); + moHisto1->setActivity({ 300000, 1, "LHC32x", "apass2", "qc_async", { 100, 300 } }); + moHisto1->setIsOwner(true); + moc->Add(moHisto1); + + TH1I* histo2 = new TH1I("histo 2", "histo 2", bins, min, max); + histo1->Fill(5); + MonitorObject* moHisto2 = new MonitorObject(histo2, "histo 2", "class", "DET"); + moHisto2->setActivity({ 300000, 1, "LHC32x", "apass2", "qc_async", { 100, 300 } }); + moHisto2->setIsOwner(true); + moc->Add(moHisto2); + + RootFileStorage storage(fixture.filePath, RootFileStorage::ReadMode::Update); + REQUIRE(storage.readStructure(false).children.empty()); + + storage.storeIntegralMOC(moc); + storage.storeMovingWindowMOC(moc); + dynamic_cast(moc->At(0))->setValidity({ 300, 500 }); + dynamic_cast(moc->At(1))->setValidity({ 300, 500 }); + storage.storeMovingWindowMOC(moc); + + auto structure = storage.readStructure(false); + CHECK(structure.children.size() == 2); + { + REQUIRE(structure.children.find("int") != structure.children.end()); + REQUIRE(std::holds_alternative(structure.children.at("int"))); + auto intDir = std::get(structure.children.at("int")); + CHECK(intDir.name == "int"); + CHECK(intDir.fullPath == "int"); + REQUIRE(intDir.children.size() == 1); + REQUIRE(intDir.children.find("TST") != intDir.children.end()); + REQUIRE(std::holds_alternative(intDir.children.at("TST"))); + { + auto intTstDir = std::get(intDir.children.at("TST")); + CHECK(intTstDir.name == "TST"); + CHECK(intTstDir.fullPath == "int/TST"); + REQUIRE(intTstDir.children.size() == 1); + REQUIRE(intTstDir.children.find("Test") != intTstDir.children.end()); + REQUIRE(std::holds_alternative(intTstDir.children.at("Test"))); + { + auto intTstTestMoc = std::get(intTstDir.children.at("Test")); + CHECK(intTstTestMoc.name == "Test"); + CHECK(intTstTestMoc.fullPath == "int/TST/Test"); + REQUIRE(intTstTestMoc.moc == nullptr); + } + } + } + { + REQUIRE(structure.children.find("mw") != structure.children.end()); + REQUIRE(std::holds_alternative(structure.children.at("mw"))); + auto mwDir = std::get(structure.children.at("mw")); + CHECK(mwDir.name == "mw"); + CHECK(mwDir.fullPath == "mw"); + REQUIRE(mwDir.children.size() == 1); + REQUIRE(mwDir.children.find("TST") != mwDir.children.end()); + REQUIRE(std::holds_alternative(mwDir.children.at("TST"))); + { + auto mwTstDir = std::get(mwDir.children.at("TST")); + CHECK(mwTstDir.name == "TST"); + CHECK(mwTstDir.fullPath == "mw/TST"); + REQUIRE(mwTstDir.children.size() == 1); + REQUIRE(mwTstDir.children.find("Test") != mwTstDir.children.end()); + REQUIRE(std::holds_alternative(mwTstDir.children.at("Test"))); + { + auto mwTstTestDir = std::get(mwTstDir.children.at("Test")); + CHECK(mwTstTestDir.name == "Test"); + CHECK(mwTstTestDir.fullPath == "mw/TST/Test"); + CHECK(mwTstTestDir.children.size() == 2); + REQUIRE(mwTstTestDir.children.find("100") != mwTstTestDir.children.end()); + REQUIRE(std::holds_alternative(mwTstTestDir.children.at("100"))); + { + auto mwTstTestMoc100 = std::get(mwTstTestDir.children.at("100")); + CHECK(mwTstTestMoc100.name == "100"); + CHECK(mwTstTestMoc100.fullPath == "mw/TST/Test/100"); + REQUIRE(mwTstTestMoc100.moc == nullptr); + } + REQUIRE(mwTstTestDir.children.find("300") != mwTstTestDir.children.end()); + REQUIRE(std::holds_alternative(mwTstTestDir.children.at("300"))); + { + auto mwTstTestMoc300 = std::get(mwTstTestDir.children.at("300")); + CHECK(mwTstTestMoc300.name == "300"); + CHECK(mwTstTestMoc300.fullPath == "mw/TST/Test/300"); + REQUIRE(mwTstTestMoc300.moc == nullptr); + } + } + } + } + + // now we read MonitorObjectCollections and delete them + structure = storage.readStructure(true); + auto intTstTest = std::get( + std::get( + std::get( + structure.children.at("int")) + .children.at("TST")) + .children.at("Test")); + CHECK(intTstTest.moc != nullptr); + delete intTstTest.moc; + + auto mwTstTest100 = std::get( + std::get( + std::get( + std::get( + structure.children.at("mw")) + .children.at("TST")) + .children.at("Test")) + .children.at("100")); + CHECK(mwTstTest100.moc != nullptr); + delete mwTstTest100.moc; + + auto mwTstTest300 = std::get( + std::get( + std::get( + std::get( + structure.children.at("mw")) + .children.at("TST")) + .children.at("Test")) + .children.at("300")); + CHECK(mwTstTest300.moc != nullptr); + delete mwTstTest300.moc; +} + +TEST_CASE("walking") +{ + // the fixture will do the cleanup when being destroyed only after any file readers are destroyed earlier + TestFileFixture fixture("walking"); + + MonitorObjectCollection* moc = new MonitorObjectCollection(); + moc->SetOwner(true); + moc->setDetector("TST"); + + TH1I* histo1 = new TH1I("histo 1", "histo 1", bins, min, max); + histo1->Fill(5); + MonitorObject* moHisto1 = new MonitorObject(histo1, "histo 1", "class", "DET"); + moHisto1->setActivity({ 300000, 1, "LHC32x", "apass2", "qc_async", { 100, 300 } }); + moHisto1->setIsOwner(true); + moc->Add(moHisto1); + + TH1I* histo2 = new TH1I("histo 2", "histo 2", bins, min, max); + histo1->Fill(5); + MonitorObject* moHisto2 = new MonitorObject(histo2, "histo 2", "class", "DET"); + moHisto2->setActivity({ 300000, 1, "LHC32x", "apass2", "qc_async", { 100, 300 } }); + moHisto2->setIsOwner(true); + moc->Add(moHisto2); + + RootFileStorage storage(fixture.filePath, RootFileStorage::ReadMode::Update); + auto structure = storage.readStructure(false); + REQUIRE(structure.children.empty()); + + // check if walkers do not crash when the file is empty + { + IntegralMocWalker intWalker(structure); + REQUIRE(!intWalker.hasNextPath()); + REQUIRE(intWalker.nextPath() == ""); + } + { + MovingWindowMocWalker mwWalker(structure); + REQUIRE(!mwWalker.hasNextPath()); + REQUIRE(mwWalker.nextPath() == ""); + } + + // now we put some data in the file and validate walkers in a usual scenario + storage.storeIntegralMOC(moc); + storage.storeMovingWindowMOC(moc); + dynamic_cast(moc->At(0))->setValidity({ 300, 500 }); + dynamic_cast(moc->At(1))->setValidity({ 300, 500 }); + storage.storeMovingWindowMOC(moc); + + structure = storage.readStructure(false); + { + IntegralMocWalker intWalker(structure); + REQUIRE(intWalker.hasNextPath()); + auto path = intWalker.nextPath(); + REQUIRE(path == "int/TST/Test"); + auto* readMoc = storage.readMonitorObjectCollection(path); + CHECK(readMoc != nullptr); + delete readMoc; + REQUIRE(!intWalker.hasNextPath()); + CHECK(intWalker.nextPath().empty()); + } + { + MovingWindowMocWalker mwWalker(structure); + REQUIRE(mwWalker.hasNextPath()); + auto path = mwWalker.nextPath(); + REQUIRE(path == "mw/TST/Test/100"); + auto* readMoc = storage.readMonitorObjectCollection(path); + CHECK(readMoc != nullptr); + delete readMoc; + REQUIRE(mwWalker.hasNextPath()); + path = mwWalker.nextPath(); + REQUIRE(path == "mw/TST/Test/300"); + readMoc = storage.readMonitorObjectCollection(path); + CHECK(readMoc != nullptr); + delete readMoc; + REQUIRE(!mwWalker.hasNextPath()); + CHECK(mwWalker.nextPath().empty()); + } +} \ No newline at end of file diff --git a/Framework/test/testTimekeeper.cxx b/Framework/test/testTimekeeper.cxx index 1b8938c0fa..7d8bfc066a 100644 --- a/Framework/test/testTimekeeper.cxx +++ b/Framework/test/testTimekeeper.cxx @@ -177,6 +177,14 @@ TEST_CASE("timekeeper_synchronous") CHECK(tk->getActivityDuration().getMin() == 2); CHECK(tk->getActivityDuration().getMax() == 5); } + + SECTION("finish_cycle") + { + auto tk = std::make_shared(); + CHECK(tk->shouldFinishCycle(TimingInfo{ 1653500000000000 })); + CHECK(tk->shouldFinishCycle(TimingInfo{ 1653500010000000 })); + CHECK(!tk->shouldFinishCycle(TimingInfo{ 54000 })); + } } TEST_CASE("timekeeper_asynchronous") @@ -235,11 +243,13 @@ TEST_CASE("timekeeper_asynchronous") tk->setCCDBOrbitsPerTFAccessor([]() { return 32; }); tk->setActivityDuration(ValidityInterval{ 1653000000000, 1655000000000 }); + CHECK(!tk->shouldFinishCycle(TimingInfo{ 3 })); // no, because validity is invalid and there is no moving window tk->updateByTimeFrameID(3); tk->updateByTimeFrameID(10); CHECK(tk->getValidity() == ValidityInterval{ 1653000000000, 1655000000000 }); CHECK(tk->getSampleTimespan() == ValidityInterval{ 1653000000005, 1653000000027 }); CHECK(tk->getTimerangeIdRange() == TimeframeIdRange{ 3, 10 }); + CHECK(!tk->shouldFinishCycle(TimingInfo{ 4 })); // no, because validity is invalid and there is no moving window tk->reset(); CHECK(tk->getValidity() == gInvalidValidityInterval); @@ -260,9 +270,12 @@ TEST_CASE("timekeeper_asynchronous") auto tk = std::make_shared(30 * 1000); tk->setActivityDuration(ValidityInterval{ 1653000000000, 1653000095000 }); // 95 seconds: 0-30, 30-60, 60-95 tk->setCCDBOrbitsPerTFAccessor([nOrbitPerTF]() { return nOrbitPerTF; }); + CHECK(!tk->shouldFinishCycle(TimingInfo{ 3, 0, 3 })); // no, because validity is invalid // hitting only the 1st window tk->updateByTimeFrameID(1); + CHECK(!tk->shouldFinishCycle(TimingInfo{ 10, 0, 10 })); // no, because TFID 10 is within the 1st window + CHECK(!tk->shouldFinishCycle(TimingInfo{ 1653500000000000 })); // no, because this is a timer input tk->updateByTimeFrameID(10); CHECK(tk->getValidity() == ValidityInterval{ 1653000000000, 1653000030000 }); CHECK(tk->getSampleTimespan() == ValidityInterval{ 1653000000000, 1653000009999 }); @@ -271,6 +284,7 @@ TEST_CASE("timekeeper_asynchronous") // hitting the 1st and 2nd window tk->reset(); tk->updateByTimeFrameID(1); + CHECK(tk->shouldFinishCycle(TimingInfo{ 55, 0, 55 })); // yes, because TFID 55 is not within the 1st window tk->updateByTimeFrameID(55); CHECK(tk->getValidity() == ValidityInterval{ 1653000000000, 1653000060000 }); CHECK(tk->getSampleTimespan() == ValidityInterval{ 1653000000000, 1653000055001 }); diff --git a/doc/Advanced.md b/doc/Advanced.md index 828dfcbd0f..12f28b8cd3 100644 --- a/doc/Advanced.md +++ b/doc/Advanced.md @@ -412,9 +412,9 @@ In the presented case, the Merger will publish one set of complete MOs per 10 mi ### Moving windows of selected plots only -In setups which use Mergers in the delta mode, - one can obtain objects spawning the last cycle alongside the ones covering the whole run. -These are saved in a subdirectory `mw` and also can be requested by Checks. +The following applies to synchronous setups which use Mergers in the delta mode and all asynchronous setups. +One can obtain objects containing data from one cycle alongside the ones covering the whole run. +These are saved in QCDB in the task subdirectory `mw` and also can be requested by Checks. To specify which objects should get a moving window variant, add a `"movingWindows"` list to the task configuration: ```json "MyTask": { @@ -435,11 +435,16 @@ To request these objects in a Check, use `TaskMovingWindow` data source, as in t }] } ``` -It is possible to request both the integrated and last cycle plots by the same Check. +It is possible to request both the integrated and single cycle plots by the same Check. + +To test it in a small setup, one can run `o2-qc` with `--full-chain` flag, which creates a complete workflow with a Merger for local QC tasks, even though it runs just one instance of them. + +In asynchronous QC, the moving window plots will appear in the intermediate QC file in the directory `mw` and will be uploaded to QCDB to `/mw`. +When testing, please make sure to let DPL know that it has to run in Grid mode, so that QC can compute object validity based on timestamps in the data: +``` +export O2_DPL_DEPLOYMENT_MODE=Grid && o2-qc ... +``` -To test it in a small setup, one can run `o2-qc` with `--full-chain` flag, which creates a complete workflow with a -Merger for local QC tasks, even though it runs just one instance of them. - ## Monitor cycles The QC tasks monitor and process data continuously during a so-called "monitor cycle". At the end of such a cycle they publish the QC objects that will then continue their way in the QC data flow.