diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index 7d8ba0f264..08a5c7137c 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -95,7 +95,8 @@ add_library(O2QualityControl src/TimekeeperSynchronous.cxx src/TimekeeperAsynchronous.cxx src/WorkflowType.cxx - src/TimekeeperFactory.cxx) + src/TimekeeperFactory.cxx + src/RootFileStorage.cxx) target_include_directories( O2QualityControl @@ -237,6 +238,7 @@ add_executable(o2-qc-test-core test/testPostProcessingRunner.cxx test/testQuality.cxx test/testQualityObject.cxx + test/testRootFileStorage.cxx test/testTaskInterface.cxx test/testTimekeeper.cxx test/testTriggerHelpers.cxx diff --git a/Framework/batch-test.json.in b/Framework/batch-test.json.in index 4158887d6a..bbe715a0be 100644 --- a/Framework/batch-test.json.in +++ b/Framework/batch-test.json.in @@ -32,12 +32,13 @@ "className": "o2::quality_control_modules::skeleton::SkeletonTask", "moduleName": "QcSkeleton", "detectorName": "TST", - "cycleDurationSeconds": "5", + "cycleDurationSeconds": "100", "maxNumberCycles": "-1", "dataSource": { "type": "dataSamplingPolicy", "name": "tst-raw" - } + }, + "movingWindows" : [ "example" ] } }, "checks": { @@ -45,13 +46,19 @@ "active": "true", "className": "o2::quality_control_modules::skeleton::SkeletonCheck", "moduleName": "QcSkeleton", - "policy": "OnAny", + "policy": "OnAll", "detectorName": "TST", "dataSource": [{ "type": "Task", "name": "BatchTestTask@UNIQUE_ID@", "MOs": ["example"] - }] + }, + { + "type": "TaskMovingWindow", + "name": "BatchTestTask@UNIQUE_ID@", + "MOs": ["example"] + } + ] } } }, diff --git a/Framework/include/QualityControl/MonitorObjectCollection.h b/Framework/include/QualityControl/MonitorObjectCollection.h index 3a250a7a0e..7f1db6e4a4 100644 --- a/Framework/include/QualityControl/MonitorObjectCollection.h +++ b/Framework/include/QualityControl/MonitorObjectCollection.h @@ -37,12 +37,16 @@ class MonitorObjectCollection : public TObjArray, public mergers::MergeInterface void setDetector(const std::string&); const std::string& getDetector() const; + void setTaskName(const std::string&); + const std::string& getTaskName() const; + MergeInterface* cloneMovingWindow() const override; private: std::string mDetector = "TST"; + std::string mTaskName = "Test"; - ClassDefOverride(MonitorObjectCollection, 1); + ClassDefOverride(MonitorObjectCollection, 2); }; } // namespace o2::quality_control::core diff --git a/Framework/include/QualityControl/RootFileSource.h b/Framework/include/QualityControl/RootFileSource.h index 469d174703..7e3f28eb27 100644 --- a/Framework/include/QualityControl/RootFileSource.h +++ b/Framework/include/QualityControl/RootFileSource.h @@ -19,10 +19,16 @@ #include #include +#include +#include namespace o2::quality_control::core { +class RootFileStorage; +class IntegralMocWalker; +class MovingWindowMocWalker; + /// \brief A Data Processor which reads MonitorObjectCollections from a specified file class RootFileSource : public framework::Task { @@ -33,10 +39,15 @@ class RootFileSource : public framework::Task void init(framework::InitContext& ictx) override; void run(framework::ProcessingContext& pctx) override; - static framework::OutputLabel outputBinding(const std::string& detectorCode, const std::string& taskName); + static framework::OutputLabel outputBinding(const std::string& detectorCode, const std::string& taskName, bool movingWindow = false); private: std::string mFilePath; + std::vector mAllowedOutputs; + + std::shared_ptr mRootFileManager = nullptr; + std::shared_ptr mIntegralMocWalker = nullptr; + std::shared_ptr mMovingWindowMocWalker = nullptr; }; } // namespace o2::quality_control::core diff --git a/Framework/include/QualityControl/RootFileStorage.h b/Framework/include/QualityControl/RootFileStorage.h new file mode 100644 index 0000000000..3c95a10487 --- /dev/null +++ b/Framework/include/QualityControl/RootFileStorage.h @@ -0,0 +1,101 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file RootFileStorage.h +/// \author Piotr Konopka +/// + +#ifndef QUALITYCONTROL_ROOTFILESTORAGE_H +#define QUALITYCONTROL_ROOTFILESTORAGE_H + +#include +#include +#include +#include + +class TFile; +class TDirectory; + +namespace o2::quality_control::core +{ + +class MonitorObjectCollection; + +/// \brief Manager for storage and retrieval of MonitorObjectCollections in TFiles +class RootFileStorage +{ + public: + struct MonitorObjectCollectionNode { + std::string fullPath{}; + std::string name{}; + MonitorObjectCollection* moc = nullptr; + }; + struct DirectoryNode { + std::string fullPath{}; + std::string name{}; + std::map> children = {}; + }; + + enum class ReadMode { + Read, + Update + }; + + explicit RootFileStorage(const std::string& filePath, ReadMode); + ~RootFileStorage(); + + DirectoryNode readStructure(bool loadObjects = false) const; + MonitorObjectCollection* readMonitorObjectCollection(const std::string& path) const; + + void storeIntegralMOC(MonitorObjectCollection* const moc); + void storeMovingWindowMOC(MonitorObjectCollection* const moc); + + private: + DirectoryNode readStructureImpl(TDirectory* currentDir, bool loadObjects) const; + + private: + TFile* mFile = nullptr; +}; + +/// \brief walks over integral MOC paths in the alphabetical order of detectors and task names +class IntegralMocWalker +{ + public: + explicit IntegralMocWalker(const RootFileStorage::DirectoryNode& rootNode); + + bool hasNextPath(); + std::string nextPath(); + + private: + using child_iterator = decltype(std::declval().children.cbegin()); + std::vector mOrder; + std::vector::const_iterator mPathIterator; +}; + +/// \brief walks over moving window MOC paths in the chronological order +class MovingWindowMocWalker +{ + public: + explicit MovingWindowMocWalker(const RootFileStorage::DirectoryNode& rootNode); + + bool hasNextPath() const; + std::string nextPath(); + + private: + using child_iterator = decltype(std::declval().children.begin()); + std::multimap mOrder; + std::multimap::const_iterator mPathIterator; +}; + +} // namespace o2::quality_control::core + +#endif // QUALITYCONTROL_ROOTFILESTORAGE_H diff --git a/Framework/include/QualityControl/TaskRunner.h b/Framework/include/QualityControl/TaskRunner.h index 43e9d5b1df..dabcf9ec77 100644 --- a/Framework/include/QualityControl/TaskRunner.h +++ b/Framework/include/QualityControl/TaskRunner.h @@ -122,7 +122,8 @@ class TaskRunner : public framework::Task /// \brief Callback for CallbackService::Id::Reset (DPL) a.k.a. RESET DEVICE transition (FairMQ) void reset(); - static std::tuple validateInputs(const framework::InputRecord&); + /// \brief Checks if all the expected data inputs are present in the provided InputRecord + static bool isDataReady(const framework::InputRecord& inputs); void refreshConfig(framework::InitContext& iCtx); void initInfologger(framework::InitContext& iCtx); void printTaskConfig() const; diff --git a/Framework/include/QualityControl/Timekeeper.h b/Framework/include/QualityControl/Timekeeper.h index 732fd649f0..2320240a1c 100644 --- a/Framework/include/QualityControl/Timekeeper.h +++ b/Framework/include/QualityControl/Timekeeper.h @@ -50,7 +50,7 @@ class Timekeeper void setEndOfActivity(validity_time_t ecsTimestamp = 0, validity_time_t configTimestamp = 0, validity_time_t currentTimestamp = 0, std::function ccdbTimestampAccessor = nullptr); - /// \brief asdf + /// \brief sets an accessor to get the number of orbits per TF for the currently processed run void setCCDBOrbitsPerTFAccessor(std::function); /// \brief updates the validity based on the provided timestamp (ms since epoch) @@ -58,6 +58,13 @@ class Timekeeper /// \brief updates the validity based on the provided TF ID virtual void updateByTimeFrameID(uint32_t tfID) = 0; + /// \brief decides if the current cycle should be finished and a new one started + /// + /// This method decides if the current cycle should be finished and a new one started. + /// In case that the cycle should be finished, the updateBy* methods should be called + /// after the cycle end has been done. + virtual bool shouldFinishCycle(const o2::framework::TimingInfo& timingInfo) = 0; + /// \brief resets the state of the mCurrent* counters virtual void reset() = 0; diff --git a/Framework/include/QualityControl/TimekeeperAsynchronous.h b/Framework/include/QualityControl/TimekeeperAsynchronous.h index 19757dc1c5..6d0c4a8b29 100644 --- a/Framework/include/QualityControl/TimekeeperAsynchronous.h +++ b/Framework/include/QualityControl/TimekeeperAsynchronous.h @@ -32,11 +32,17 @@ class TimekeeperAsynchronous : public Timekeeper void updateByTimeFrameID(uint32_t tfID) override; void reset() override; + bool shouldFinishCycle(const o2::framework::TimingInfo& timingInfo) override; + protected: validity_time_t activityBoundarySelectionStrategy(validity_time_t ecsTimestamp, validity_time_t configTimestamp, validity_time_t currentTimestamp, std::function ccdbTimestampAccessor) override; + private: + /// \brief computes validity interval of the provided timeframe ID + ValidityInterval computeTimestampFromTimeframeID(uint32_t tfID); + private: validity_time_t mWindowLengthMs = 0; uint64_t mOrbitsPerTF = 0; diff --git a/Framework/include/QualityControl/TimekeeperFactory.h b/Framework/include/QualityControl/TimekeeperFactory.h index e78afd8aca..34f0b3f39f 100644 --- a/Framework/include/QualityControl/TimekeeperFactory.h +++ b/Framework/include/QualityControl/TimekeeperFactory.h @@ -27,7 +27,7 @@ namespace o2::quality_control::core class TimekeeperFactory { public: - static std::unique_ptr create(framework::DeploymentMode); + static std::unique_ptr create(framework::DeploymentMode, validity_time_t windowLengthMs = 0); static bool needsGRPECS(framework::DeploymentMode); }; diff --git a/Framework/include/QualityControl/TimekeeperSynchronous.h b/Framework/include/QualityControl/TimekeeperSynchronous.h index c855f7d9d9..9b95adac9d 100644 --- a/Framework/include/QualityControl/TimekeeperSynchronous.h +++ b/Framework/include/QualityControl/TimekeeperSynchronous.h @@ -32,6 +32,7 @@ class TimekeeperSynchronous : public Timekeeper void updateByTimeFrameID(uint32_t tfID) override; void reset() override; + bool shouldFinishCycle(const o2::framework::TimingInfo& timingInfo) override; protected: validity_time_t activityBoundarySelectionStrategy(validity_time_t ecsTimestamp, validity_time_t configTimestamp, diff --git a/Framework/script/o2-qc-batch-test.sh b/Framework/script/o2-qc-batch-test.sh index 8ec92a1584..199ef80c0a 100755 --- a/Framework/script/o2-qc-batch-test.sh +++ b/Framework/script/o2-qc-batch-test.sh @@ -29,6 +29,7 @@ function delete_data() { rm -f /tmp/batch_test_mergedB${UNIQUE_ID}.root rm -f /tmp/batch_test_mergedC${UNIQUE_ID}.root rm -f /tmp/batch_test_obj${UNIQUE_ID}.root + rm -f /tmp/batch_test_obj_mw${UNIQUE_ID}.root rm -f /tmp/batch_test_check${UNIQUE_ID}.root } @@ -63,7 +64,7 @@ o2-qc-file-merger --input-files /tmp/batch_test_mergedA${UNIQUE_ID}.root /tmp/ba # Run Checks and Aggregators, publish results to QCDB o2-qc --config json:/${JSON_DIR}/batch-test.json --remote-batch /tmp/batch_test_mergedC${UNIQUE_ID}.root --run -# check MonitorObject +# check the integrated MonitorObject # first the return code must be 200 code=$(curl -L ccdb-test.cern.ch:8080/qc/TST/MO/BatchTestTask${UNIQUE_ID}/example/8000000/PeriodName=LHC9000x/PassName=apass500 --write-out %{http_code} --silent --output /tmp/batch_test_obj${UNIQUE_ID}.root) if (( $code != 200 )); then @@ -87,6 +88,30 @@ then exit 5 fi +# check the moving window MonitorObject +# first the return code must be 200 +code=$(curl -L ccdb-test.cern.ch:8080/qc/TST/MO/BatchTestTask${UNIQUE_ID}/mw/example/8000000/PeriodName=LHC9000x/PassName=apass500 --write-out %{http_code} --silent --output /tmp/batch_test_obj_mw${UNIQUE_ID}.root) +if (( $code != 200 )); then + echo "Error, monitor object of the QC Task could not be found." + delete_data + exit 3 +fi +# try to check that we got a valid root object +root -b -l -q -e 'TFile f("/tmp/batch_test_obj_mw${UNIQUE_ID}.root"); f.Print();' +if (( $? != 0 )); then + echo "Error, monitor object of the QC Task is invalid." + delete_data + exit 4 +fi +# try if it is a non empty histogram +entries=`root -b -l -q -e 'TFile f("/tmp/batch_test_obj_mw${UNIQUE_ID}.root"); TH1F *h = (TH1F*)f.Get("ccdb_object"); cout << h->GetEntries() << endl;' | tail -n 1` +if [ $entries -lt 225 ] 2>/dev/null +then + echo "The histogram of the QC Task has less than 225 (75%) of expected samples." + delete_data + exit 5 +fi + # check QualityObject # first the return code must be 200 code=$(curl -L ccdb-test.cern.ch:8080/qc/TST/QO/BatchTestCheck${UNIQUE_ID}/8000000/PeriodName=LHC9000x/PassName=apass500 --write-out %{http_code} --silent --output /tmp/batch_test_check${UNIQUE_ID}.root) diff --git a/Framework/src/InfrastructureGenerator.cxx b/Framework/src/InfrastructureGenerator.cxx index 67ebf3c7c2..a9028ec22a 100644 --- a/Framework/src/InfrastructureGenerator.cxx +++ b/Framework/src/InfrastructureGenerator.cxx @@ -387,6 +387,13 @@ framework::WorkflowSpec InfrastructureGenerator::generateRemoteBatchInfrastructu auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, 0, 1); fileSourceOutputs.push_back(taskConfig.moSpec); fileSourceOutputs.back().binding = RootFileSource::outputBinding(taskSpec.detectorName, taskSpec.taskName); + // We create an OutputSpec for moving windows for this task only if they are expected. + if (!taskConfig.movingWindows.empty()) { + fileSourceOutputs.push_back( + { RootFileSource::outputBinding(taskSpec.detectorName, taskSpec.taskName, true), + TaskRunner::createTaskDataOrigin(taskSpec.detectorName, true), + TaskRunner::createTaskDataDescription(taskSpec.taskName), 0, Lifetime::Sporadic }); + } } } if (!fileSourceOutputs.empty()) { @@ -632,8 +639,10 @@ void InfrastructureGenerator::generateCheckRunners(framework::WorkflowSpec& work if (taskSpec.active) { InputSpec taskOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic }; tasksOutputMap.insert({ DataSpecUtils::label(taskOutput), taskOutput }); - if (!taskSpec.movingWindows.empty() && taskSpec.location == TaskLocationSpec::Local && - (infrastructureSpec.workflowType == WorkflowType::Remote || infrastructureSpec.workflowType == WorkflowType::FullChain)) { + bool movingWindowsEnabled = !taskSpec.movingWindows.empty(); + bool synchronousRemote = taskSpec.location == TaskLocationSpec::Local && (infrastructureSpec.workflowType == WorkflowType::Remote || infrastructureSpec.workflowType == WorkflowType::FullChain); + bool asynchronousRemote = infrastructureSpec.workflowType == WorkflowType::RemoteBatch; + if (movingWindowsEnabled && (synchronousRemote || asynchronousRemote)) { InputSpec taskMovingWindowOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName, true), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic }; tasksOutputMap.insert({ DataSpecUtils::label(taskMovingWindowOutput), taskMovingWindowOutput }); } diff --git a/Framework/src/MonitorObjectCollection.cxx b/Framework/src/MonitorObjectCollection.cxx index bc10ec7f3a..89c979fba0 100644 --- a/Framework/src/MonitorObjectCollection.cxx +++ b/Framework/src/MonitorObjectCollection.cxx @@ -110,11 +110,22 @@ const std::string& MonitorObjectCollection::getDetector() const return mDetector; } +void MonitorObjectCollection::setTaskName(const std::string& taskName) +{ + mTaskName = taskName; +} + +const std::string& MonitorObjectCollection::getTaskName() const +{ + return mTaskName; +} + MergeInterface* MonitorObjectCollection::cloneMovingWindow() const { auto mw = new MonitorObjectCollection(); mw->SetOwner(true); mw->setDetector(this->getDetector()); + mw->setTaskName(this->getTaskName()); auto mwName = std::string(this->GetName()) + "/mw"; mw->SetName(mwName.c_str()); @@ -129,6 +140,10 @@ MergeInterface* MonitorObjectCollection::cloneMovingWindow() const if (!mo->getCreateMovingWindow()) { continue; } + if (mo->getValidity().isInvalid()) { + ILOG(Warning) << "MonitorObject '" << mo->getName() << "' validity is invalid, will not create a moving window" << ENDM; + continue; + } auto clonedMO = dynamic_cast(mo->Clone()); clonedMO->setTaskName(clonedMO->getTaskName() + "/mw"); clonedMO->setIsOwner(true); diff --git a/Framework/src/ObjectsManager.cxx b/Framework/src/ObjectsManager.cxx index ac4155bd53..3f592a119b 100644 --- a/Framework/src/ObjectsManager.cxx +++ b/Framework/src/ObjectsManager.cxx @@ -42,6 +42,7 @@ ObjectsManager::ObjectsManager(std::string taskName, std::string taskClass, std: mMonitorObjects->SetOwner(true); mMonitorObjects->SetName(mTaskName.c_str()); mMonitorObjects->setDetector(mDetectorName); + mMonitorObjects->setTaskName(mTaskName); // register with the discovery service if (!noDiscovery && !consulUrl.empty()) { diff --git a/Framework/src/RootFileSink.cxx b/Framework/src/RootFileSink.cxx index 3913d30857..0ae4fef772 100644 --- a/Framework/src/RootFileSink.cxx +++ b/Framework/src/RootFileSink.cxx @@ -17,11 +17,13 @@ #include "QualityControl/RootFileSink.h" #include "QualityControl/QcInfoLogger.h" #include "QualityControl/MonitorObjectCollection.h" +#include "QualityControl/MonitorObject.h" +#include "QualityControl/RootFileStorage.h" #include #include #include #include -#include + #if defined(__linux__) && __has_include() #include #endif @@ -36,43 +38,6 @@ RootFileSink::RootFileSink(std::string filePath) { } -TFile* openSinkFile(const std::string& name) -{ - auto file = new TFile(name.c_str(), "UPDATE"); - if (file->IsZombie()) { - throw std::runtime_error("File '" + name + "' is zombie."); - } - if (!file->IsOpen()) { - throw std::runtime_error("Failed to open the file: " + name); - } - if (!file->IsWritable()) { - throw std::runtime_error("File '" + name + "' is not writable."); - } - ILOG(Info) << "Output file '" << name << "' successfully open." << ENDM; - return file; -} - -void closeSinkFile(TFile* file) -{ - if (file != nullptr) { - if (file->IsOpen()) { - ILOG(Info) << "Closing file '" << file->GetName() << "'." << ENDM; - file->Write(); - file->Close(); - } - delete file; - } -} - -void deleteTDirectory(TDirectory* d) -{ - if (d != nullptr) { - d->Write(); - d->Close(); - delete d; - } -}; - void RootFileSink::customizeInfrastructure(std::vector& policies) { auto matcher = [label = RootFileSink::getLabel()](framework::DeviceSpec const& device) { @@ -88,9 +53,8 @@ void RootFileSink::init(framework::InitContext& ictx) void RootFileSink::run(framework::ProcessingContext& pctx) { - TFile* sinkFile = nullptr; try { - sinkFile = openSinkFile(mFilePath); + RootFileStorage mStorage{ mFilePath, RootFileStorage::ReadMode::Update }; for (const auto& input : InputRecordWalker(pctx.inputs())) { auto moc = DataRefUtils::as(input); if (moc == nullptr) { @@ -99,42 +63,19 @@ void RootFileSink::run(framework::ProcessingContext& pctx) } ILOG(Info, Support) << "Received MonitorObjectCollection '" << moc->GetName() << "'" << ENDM; moc->postDeserialization(); + auto mwMOC = dynamic_cast(moc->cloneMovingWindow()); - auto mocName = moc->GetName(); - if (*mocName == '\0') { - ILOG(Error, Support) << "MonitorObjectCollection does not have a name, skipping." << ENDM; - continue; + if (moc->GetEntries() > 0) { + mStorage.storeIntegralMOC(moc.get()); } - auto detector = moc->getDetector(); - - auto detDir = std::unique_ptr(sinkFile->GetDirectory(detector.c_str()), deleteTDirectory); - if (detDir == nullptr) { - ILOG(Info, Devel) << "Creating a new directory '" << detector << "'." << ENDM; - detDir = std::unique_ptr(sinkFile->mkdir(detector.c_str()), deleteTDirectory); - if (detDir == nullptr) { - ILOG(Error, Support) << "Could not create directory '" << detector << "', skipping." << ENDM; - continue; - } + if (mwMOC->GetEntries() > 0) { + mStorage.storeMovingWindowMOC(mwMOC); } - - ILOG(Info, Support) << "Checking for existing objects in the file." << ENDM; - auto storedMOC = std::unique_ptr(detDir->Get(mocName)); - if (storedMOC != nullptr) { - storedMOC->postDeserialization(); - ILOG(Info, Support) << "Merging object '" << moc->GetName() << "' with the existing one in the file." << ENDM; - moc->merge(storedMOC.get()); - } - - auto nbytes = detDir->WriteObject(moc.get(), moc->GetName(), "Overwrite"); - ILOG(Info, Support) << "Object '" << moc->GetName() << "' has been stored in the file (" << nbytes << " bytes)." << ENDM; } - closeSinkFile(sinkFile); } catch (const std::bad_alloc& ex) { ILOG(Error, Ops) << "Caught a bad_alloc exception, there is probably a huge file or object present, but I will try to survive" << ENDM; ILOG(Error, Support) << "Details: " << ex.what() << ENDM; - closeSinkFile(sinkFile); } catch (...) { - closeSinkFile(sinkFile); throw; } diff --git a/Framework/src/RootFileSource.cxx b/Framework/src/RootFileSource.cxx index 1720ec8054..85cd1a9b27 100644 --- a/Framework/src/RootFileSource.cxx +++ b/Framework/src/RootFileSource.cxx @@ -17,6 +17,7 @@ #include "QualityControl/RootFileSource.h" #include "QualityControl/QcInfoLogger.h" #include "QualityControl/MonitorObjectCollection.h" +#include "QualityControl/RootFileStorage.h" #include #include @@ -32,79 +33,90 @@ RootFileSource::RootFileSource(std::string filePath) { } -void RootFileSource::init(framework::InitContext&) -{ -} - -void RootFileSource::run(framework::ProcessingContext& ctx) +void RootFileSource::init(framework::InitContext& ctx) { auto const& deviceSpec = ctx.services().get(); - std::vector allowedOutputs; + mAllowedOutputs.clear(); + mAllowedOutputs.reserve(deviceSpec.outputs.size()); for (const auto& outputRoute : deviceSpec.outputs) { - allowedOutputs.push_back(outputRoute.matcher.binding); + mAllowedOutputs.push_back(outputRoute.matcher.binding); } - auto* file = new TFile(mFilePath.c_str(), "READ"); - if (file->IsZombie()) { - throw std::runtime_error("File '" + mFilePath + "' is zombie."); - } - if (!file->IsOpen()) { - throw std::runtime_error("Failed to open the file: " + mFilePath); + mRootFileManager = std::make_shared(mFilePath, RootFileStorage::ReadMode::Read); + if (mRootFileManager == nullptr) { + ILOG(Fatal, Ops) << "Could not open file '" << mFilePath << "'" << ENDM; + ctx.services().get().endOfStream(); + ctx.services().get().readyToQuit(false); + return; } ILOG(Info) << "Input file '" << mFilePath << "' successfully open." << ENDM; - TIter nextDetector(file->GetListOfKeys()); - TKey* detectorKey; - while ((detectorKey = (TKey*)nextDetector())) { - ILOG(Debug, Devel) << "Going to directory '" << detectorKey->GetName() << "'" << ENDM; - auto detDir = file->GetDirectory(detectorKey->GetName()); - if (detDir == nullptr) { - ILOG(Error) << "Could not get directory '" << detectorKey->GetName() << "', skipping." << ENDM; - continue; + auto fileStructure = mRootFileManager->readStructure(false); + + mIntegralMocWalker = std::make_shared(fileStructure); + mMovingWindowMocWalker = std::make_shared(fileStructure); +} + +void RootFileSource::run(framework::ProcessingContext& ctx) +{ + if (mIntegralMocWalker->hasNextPath()) { + const auto& path = mIntegralMocWalker->nextPath(); + auto moc = mRootFileManager->readMonitorObjectCollection(path); + auto binding = outputBinding(moc->getDetector(), moc->getTaskName(), false); + + if (std::find_if(mAllowedOutputs.begin(), mAllowedOutputs.end(), + [binding](const auto& other) { return other.value == binding.value; }) == mAllowedOutputs.end()) { + ILOG(Error) << "The MonitorObjectCollection '" << binding.value << "' is not among declared output bindings: "; + for (const auto& output : mAllowedOutputs) { + ILOG(Error) << output.value << " "; + } + ILOG(Error) << ", skipping." << ENDM; + return; } + // snapshot does a shallow copy, so we cannot let it delete elements in MOC when it deletes the MOC + moc->SetOwner(false); + ctx.outputs().snapshot(OutputRef{ binding.value, 0 }, *moc); + moc->postDeserialization(); + ILOG(Info) << "Read and published object '" << path << "'" << ENDM; + delete moc; + + return; + } + + if (mMovingWindowMocWalker->hasNextPath()) { + const auto& path = mMovingWindowMocWalker->nextPath(); + auto moc = mRootFileManager->readMonitorObjectCollection(path); + auto binding = outputBinding(moc->getDetector(), moc->getTaskName(), true); - TIter nextMOC(detDir->GetListOfKeys()); - TKey* mocKey; - while ((mocKey = (TKey*)nextMOC())) { - auto storedTObj = detDir->Get(mocKey->GetName()); - if (storedTObj != nullptr) { - auto storedMOC = dynamic_cast(storedTObj); - if (storedMOC == nullptr) { - ILOG(Error) << "Could not cast the stored object to MonitorObjectCollection, skipping." << ENDM; - delete storedTObj; - continue; - } - - auto binding = outputBinding(detectorKey->GetName(), storedMOC->GetName()); - if (std::find_if(allowedOutputs.begin(), allowedOutputs.end(), - [binding](const auto& other) { return other.value == binding.value; }) == allowedOutputs.end()) { - ILOG(Error) << "The MonitorObjectCollection '" << binding.value << "' is not among declared output bindings: "; - for (const auto& output : allowedOutputs) { - ILOG(Error) << output.value << " "; - } - ILOG(Error) << ", skipping." << ENDM; - continue; - } - - // snapshot does a shallow copy, so we cannot let it delete elements in MOC when it deletes the MOC - storedMOC->SetOwner(false); - ctx.outputs().snapshot(OutputRef{ binding.value, 0 }, *storedMOC); - storedMOC->postDeserialization(); - ILOG(Info) << "Read and published object '" << storedMOC->GetName() << "'" << ENDM; + if (std::find_if(mAllowedOutputs.begin(), mAllowedOutputs.end(), + [binding](const auto& other) { return other.value == binding.value; }) == mAllowedOutputs.end()) { + ILOG(Error) << "The MonitorObjectCollection '" << binding.value << "' is not among declared output bindings: "; + for (const auto& output : mAllowedOutputs) { + ILOG(Error) << output.value << " "; } - delete storedTObj; + ILOG(Error) << ", skipping." << ENDM; + return; } + // snapshot does a shallow copy, so we cannot let it delete elements in MOC when it deletes the MOC + moc->SetOwner(false); + ctx.outputs().snapshot(OutputRef{ binding.value, 0 }, *moc); + moc->postDeserialization(); + ILOG(Info) << "Read and published object '" << path << "'" << ENDM; + delete moc; + + return; } - file->Close(); - delete file; + + mRootFileManager.reset(); ctx.services().get().endOfStream(); ctx.services().get().readyToQuit(QuitRequest::Me); } -framework::OutputLabel RootFileSource::outputBinding(const std::string& detectorCode, const std::string& taskName) +framework::OutputLabel + RootFileSource::outputBinding(const std::string& detectorCode, const std::string& taskName, bool movingWindow) { - return { detectorCode + "-" + taskName }; + return movingWindow ? framework::OutputLabel{ detectorCode + "-MW-" + taskName } : framework::OutputLabel{ detectorCode + "-" + taskName }; } } // namespace o2::quality_control::core diff --git a/Framework/src/RootFileStorage.cxx b/Framework/src/RootFileStorage.cxx new file mode 100644 index 0000000000..ecfbc0a5cd --- /dev/null +++ b/Framework/src/RootFileStorage.cxx @@ -0,0 +1,373 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file RootFileStorage.cxx +/// \author Piotr Konopka +/// + +#include "QualityControl/RootFileStorage.h" +#include "QualityControl/QcInfoLogger.h" +#include "QualityControl/MonitorObjectCollection.h" +#include "QualityControl/MonitorObject.h" +#include "QualityControl/ValidityInterval.h" + +#include +#include +#include +#include +#include +#include + +namespace o2::quality_control::core +{ + +constexpr auto integralsDirectoryName = "int"; +constexpr auto movingWindowsDirectoryName = "mw"; + +RootFileStorage::RootFileStorage(const std::string& filePath, ReadMode readMode) +{ + switch (readMode) { + case ReadMode::Update: + mFile = new TFile(filePath.c_str(), "UPDATE"); + break; + case ReadMode::Read: + default: + mFile = new TFile(filePath.c_str(), "READ"); + } + if (mFile->IsZombie()) { + throw std::runtime_error("File '" + filePath + "' is zombie."); + } + if (!mFile->IsOpen()) { + throw std::runtime_error("Failed to open the file: " + filePath); + } + if (readMode == ReadMode::Update && !mFile->IsWritable()) { + throw std::runtime_error("File '" + filePath + "' is not writable."); + } + ILOG(Info) << "Output file '" << filePath << "' successfully open." << ENDM; +} + +RootFileStorage::DirectoryNode RootFileStorage::readStructure(bool loadObjects) const +{ + return readStructureImpl(mFile, loadObjects); +} + +RootFileStorage::DirectoryNode RootFileStorage::readStructureImpl(TDirectory* currentDir, bool loadObjects) const +{ + auto fullPath = currentDir->GetPath(); + auto pathToPos = std::strstr(fullPath, ":/"); + if (pathToPos == nullptr) { + ILOG(Error, Support) << "Could not extract path to node in string '" << currentDir->GetPath() << "', skipping" << ENDM; + return {}; + } + DirectoryNode currentNode{ pathToPos + 2, currentDir->GetName() }; + + TIter nextKey(currentDir->GetListOfKeys()); + TKey* key; + while ((key = (TKey*)nextKey())) { + if (!loadObjects && std::strcmp(key->GetClassName(), MonitorObjectCollection::Class_Name()) == 0) { + std::string mocPath = currentNode.fullPath + std::filesystem::path::preferred_separator + key->GetName(); + currentNode.children[key->GetName()] = MonitorObjectCollectionNode{ mocPath, key->GetName() }; + continue; + } + + ILOG(Debug, Devel) << "Getting the value for key '" << key->GetName() << "'" << ENDM; + auto* value = currentDir->Get(key->GetName()); + if (value == nullptr) { + ILOG(Error) << "Could not get the value '" << key->GetName() << "', skipping." << ENDM; + continue; + } + if (auto moc = dynamic_cast(value)) { + moc->postDeserialization(); + std::string mocPath = currentNode.fullPath + std::filesystem::path::preferred_separator + key->GetName(); + currentNode.children[moc->GetName()] = MonitorObjectCollectionNode{ mocPath, key->GetName(), moc }; + ILOG(Debug, Support) << "Read object '" << moc->GetName() << "' in path '" << currentNode.fullPath << "'" << ENDM; + } else if (auto childDir = dynamic_cast(value)) { + currentNode.children[key->GetName()] = readStructureImpl(childDir, loadObjects); + } else { + ILOG(Warning, Support) << "Could not cast the node to MonitorObjectCollection nor TDirectory, skipping." << ENDM; + delete value; + continue; + } + } + + return currentNode; +} + +MonitorObjectCollection* RootFileStorage::readMonitorObjectCollection(const std::string& path) const +{ + auto storedTObj = mFile->Get(path.c_str()); + if (storedTObj == nullptr) { + ILOG(Error, Ops) << "Could not read object '" << path << "'" << ENDM; + return nullptr; + } + auto storedMOC = dynamic_cast(storedTObj); + if (storedMOC == nullptr) { + ILOG(Error, Ops) << "Could not cast the stored object to MonitorObjectCollection" << ENDM; + delete storedTObj; + } + return storedMOC; +} + +RootFileStorage::~RootFileStorage() +{ + if (mFile != nullptr) { + if (mFile->IsOpen()) { + ILOG(Info, Support) << "Closing file '" << mFile->GetName() << "'." << ENDM; + mFile->Write(); + mFile->Close(); + } + delete mFile; + } +} + +void deleteTDirectory(TDirectory* d) +{ + if (d != nullptr) { + d->Write(); + d->Close(); + delete d; + } +} + +std::unique_ptr getOrCreateDirectory(TDirectory* parentDir, const char* dirName) +{ + auto dir = std::unique_ptr(parentDir->GetDirectory(dirName), deleteTDirectory); + if (dir == nullptr) { + ILOG(Debug, Support) << "Creating a new directory '" << dirName << "'." << ENDM; + dir = std::unique_ptr(parentDir->mkdir(dirName), deleteTDirectory); + } + return dir; +} + +validity_time_t earliestValidFrom(const MonitorObjectCollection* moc) +{ + validity_time_t earliest = std::numeric_limits::max(); + for (const auto& obj : *moc) { + if (auto mo = dynamic_cast(obj)) { + earliest = std::min(mo->getValidity().getMin(), earliest); + } + } + return earliest; +} + +bool validObjectValidities(const MonitorObjectCollection* moc) +{ + for (const auto& obj : *moc) { + if (auto mo = dynamic_cast(obj); mo->getValidity().isInvalid()) { + return false; + } + } + return true; +} + +// fixme we should not have to change the name! +void RootFileStorage::storeIntegralMOC(MonitorObjectCollection* const moc) +{ + const auto& mocStorageName = moc->getTaskName(); + if (mocStorageName.empty()) { + ILOG(Error, Support) << "taskName empty, not storing MOC '" << moc->GetName() << "' for detector '" << moc->getDetector() << "'" << ENDM; + return; + } + moc->SetName(mocStorageName.c_str()); + // directory level: int + auto integralDir = getOrCreateDirectory(mFile, integralsDirectoryName); + if (integralDir == nullptr) { + ILOG(Error, Support) << "Could not create the directory '" << integralsDirectoryName << "', skipping." << ENDM; + return; + } + + // directory level: int/DET + auto detector = moc->getDetector(); + auto detDir = getOrCreateDirectory(integralDir.get(), detector.c_str()); + if (detDir == nullptr) { + ILOG(Error, Support) << "Could not create directory '" << detector << "', skipping." << ENDM; + return; + } + + // directory level: int/DET/TASK + ILOG(Debug, Support) << "Checking for existing objects in the file." << ENDM; + int nbytes = 0; + auto storedMOC = std::unique_ptr(detDir->Get(mocStorageName.c_str())); + if (storedMOC != nullptr) { + storedMOC->postDeserialization(); + ILOG(Info, Support) << "Merging objects for task '" << detector << "/" << moc->getTaskName() << "' with the existing ones in the file." << ENDM; + storedMOC->merge(moc); + nbytes = detDir->WriteObject(storedMOC.get(), storedMOC->GetName(), "Overwrite"); + } else { + ILOG(Info, Support) << "Storing objects for task '" << detector << "/" << moc->getTaskName() << "' in the file." << ENDM; + nbytes = detDir->WriteObject(moc, moc->GetName(), "Overwrite"); + } + ILOG(Info, Support) << "Integrated objects '" << moc->GetName() << "' have been stored in the file (" << nbytes << " bytes)." << ENDM; +} + +void RootFileStorage::storeMovingWindowMOC(MonitorObjectCollection* const moc) +{ + if (moc->GetEntries() == 0) { + ILOG(Warning, Support) << "The provided MonitorObjectCollection '" << moc->GetName() << "' is empty, will not store." << ENDM; + return; + } + if (!validObjectValidities(moc)) { + // this should not happen, because we have a protection in MonitorObjectCollection::cloneMovingWindow() against it. + // thus, we should raise some concern if this occurs anyway. + ILOG(Warning, Ops) << "The provided MonitorObjectCollection '" << moc->GetName() << "' contains at least one object with invalid validity!!!" << ENDM; + } + // directory level: mw + auto mwDir = getOrCreateDirectory(mFile, movingWindowsDirectoryName); + if (mwDir == nullptr) { + ILOG(Error, Support) << "Could not create the directory '" << movingWindowsDirectoryName << "', skipping." << ENDM; + return; + } + + // directory level: mw/DET + auto detector = moc->getDetector(); + auto detDir = getOrCreateDirectory(mwDir.get(), detector.c_str()); + if (detDir == nullptr) { + ILOG(Error, Support) << "Could not create directory '" << detector << "', skipping." << ENDM; + return; + } + + // directory level: mw/DET/TASK + auto taskDir = getOrCreateDirectory(detDir.get(), moc->getTaskName().c_str()); + if (taskDir == nullptr) { + ILOG(Error, Support) << "Could not create directory '" << moc->getTaskName() << "', skipping." << ENDM; + return; + } + + // directory level: mw/DET/TASK/ + auto mocStorageName = std::to_string(earliestValidFrom(moc)); + moc->SetName(mocStorageName.c_str()); + ILOG(Info, Support) << "Checking for existing moving windows '" << mocStorageName << "' for task '" << detector << "/" << moc->getTaskName() << "' in the file." << ENDM; + int nbytes = 0; + auto storedMOC = std::unique_ptr(taskDir->Get(mocStorageName.c_str())); + if (storedMOC != nullptr) { + storedMOC->postDeserialization(); + ILOG(Info, Support) << "Merging moving windows '" << moc->GetName() << "' for task '" << moc->getDetector() << "/" << moc->getTaskName() << "' with the existing one in the file." << ENDM; + storedMOC->merge(moc); + nbytes = taskDir->WriteObject(storedMOC.get(), storedMOC->GetName(), "Overwrite"); + } else { + ILOG(Info, Support) << "Storing moving windows '" << moc->GetName() << "' for task '" << moc->getDetector() << "/" << moc->getTaskName() << "' in the file." << ENDM; + nbytes = taskDir->WriteObject(moc, moc->GetName(), "Overwrite"); + } + ILOG(Info, Support) << "Moving windows '" << moc->GetName() << "' for task '" << detector << "/" << moc->getTaskName() << "' has been stored in the file (" << nbytes << " bytes)." << ENDM; +} + +IntegralMocWalker::IntegralMocWalker(const RootFileStorage::DirectoryNode& rootNode) +{ + auto integralDirIt = rootNode.children.find(integralsDirectoryName); + if (integralDirIt == rootNode.children.end()) { + mPathIterator = mOrder.cbegin(); + return; + } + if (!std::holds_alternative(integralDirIt->second)) { + mPathIterator = mOrder.cbegin(); + return; + } + const auto& integralMocNode = std::get(integralDirIt->second); + std::stack> stack{}; + stack.push({ integralMocNode, integralMocNode.children.cbegin() }); + + while (!stack.empty()) { + auto& [currentNode, childIt] = stack.top(); + if (childIt == currentNode.children.end()) { + // move to the next child of the parent node + stack.pop(); + if (!stack.empty()) { + stack.top().second++; + } + } else if (std::holds_alternative(childIt->second)) { + // move to a child of the current node + const auto& childNode = std::get(childIt->second); + stack.push({ childNode, childNode.children.cbegin() }); + } else if (std::holds_alternative(childIt->second)) { + // move to the next child in the currentNode and return a path + const auto& childNode = std::get(childIt->second); + ++childIt; + mOrder.push_back(childNode.fullPath); + } else { + // unrecognized child node, move to the next child in the currentNode + ++childIt; + } + } + mPathIterator = mOrder.cbegin(); +} + +bool IntegralMocWalker::hasNextPath() +{ + return mPathIterator != mOrder.cend(); +} + +std::string IntegralMocWalker::nextPath() +{ + if (hasNextPath()) { + return *mPathIterator++; + } + return {}; +} + +MovingWindowMocWalker::MovingWindowMocWalker(const RootFileStorage::DirectoryNode& rootNode) +{ + auto movingWindowDirIt = rootNode.children.find(movingWindowsDirectoryName); + if (movingWindowDirIt == rootNode.children.end()) { + mPathIterator = mOrder.cbegin(); + return; + } + if (!std::holds_alternative(movingWindowDirIt->second)) { + mPathIterator = mOrder.cbegin(); + return; + } + auto movingWindowMocNode = std::get(movingWindowDirIt->second); + + std::stack> stack{}; + stack.push({ movingWindowMocNode, movingWindowMocNode.children.begin() }); + + // we walk over all the MOCs in the tree and we save them in chronological order + while (!stack.empty()) { + auto& [currentNode, childIt] = stack.top(); + + if (childIt == currentNode.children.end()) { + // move to the next child of the parent node + stack.pop(); + if (!stack.empty()) { + stack.top().second++; + } + } else if (std::holds_alternative(childIt->second)) { + // move to a child of the current node + auto& childNode = std::get(childIt->second); + stack.push({ childNode, childNode.children.begin() }); + } else if (std::holds_alternative(childIt->second)) { + // move to the next child in the currentNode and return a path + auto timestamp = std::stoull(childIt->first); + auto& childNode = std::get(childIt->second); + mOrder.emplace(timestamp, childNode.fullPath); + childIt++; + } else { + // unrecognized child node, move to the next child in the currentNode + childIt++; + } + } + mPathIterator = mOrder.cbegin(); +} + +bool MovingWindowMocWalker::hasNextPath() const +{ + return mPathIterator != mOrder.cend(); +} + +std::string MovingWindowMocWalker::nextPath() +{ + if (hasNextPath()) { + return (mPathIterator++)->second; + } + return {}; +} + +} // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/src/TaskRunner.cxx b/Framework/src/TaskRunner.cxx index 35c75ae803..9cd97fa4e8 100644 --- a/Framework/src/TaskRunner.cxx +++ b/Framework/src/TaskRunner.cxx @@ -168,8 +168,12 @@ void TaskRunner::init(InitContext& iCtx) // setup timekeeping mDeploymentMode = DefaultsHelpers::deploymentMode(); - mTimekeeper = TimekeeperFactory::create(mDeploymentMode); - mTimekeeper->setCCDBOrbitsPerTFAccessor([]() { return o2::base::GRPGeomHelper::getNHBFPerTF(); }); + mTimekeeper = TimekeeperFactory::create(mDeploymentMode, mTaskConfig.cycleDurations.back().first * 1000); + mTimekeeper->setCCDBOrbitsPerTFAccessor([]() { + // getNHBFPerTF() returns 128 if it does not know, which can be very misleading. + // instead we use 0, which will trigger another try when processing another timeslice. + return o2::base::GRPGeomHelper::instance().getGRPECS() != nullptr ? o2::base::GRPGeomHelper::getNHBFPerTF() : 0; + }); // setup user's task mTask.reset(TaskFactory::create(mTaskConfig, mObjectsManager)); @@ -208,15 +212,7 @@ void TaskRunner::run(ProcessingContext& pCtx) GRPGeomHelper::instance().checkUpdates(pCtx); } - auto [dataReady, timerReady] = validateInputs(pCtx.inputs()); - - if (dataReady) { - mTimekeeper->updateByTimeFrameID(pCtx.services().get().tfCounter); - mTask->monitorData(pCtx); - updateMonitoringStats(pCtx); - } - - if (timerReady) { + if (mTimekeeper->shouldFinishCycle(pCtx.services().get())) { mTimekeeper->updateByCurrentTimestamp(pCtx.services().get().timeslice / 1000); finishCycle(pCtx.outputs()); if (mTaskConfig.resetAfterCycles > 0 && (mCycleNumber % mTaskConfig.resetAfterCycles == 0)) { @@ -229,6 +225,12 @@ void TaskRunner::run(ProcessingContext& pCtx) mNoMoreCycles = true; } } + + if (isDataReady(pCtx.inputs())) { + mTimekeeper->updateByTimeFrameID(pCtx.services().get().tfCounter); + mTask->monitorData(pCtx); + updateMonitoringStats(pCtx); + } } void TaskRunner::finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) @@ -402,10 +404,9 @@ void TaskRunner::reset() } } -std::tuple TaskRunner::validateInputs(const framework::InputRecord& inputs) +bool TaskRunner::isDataReady(const framework::InputRecord& inputs) { size_t dataInputsPresent = 0; - bool timerReady = false; for (auto& input : inputs) { if (input.header != nullptr) { @@ -413,16 +414,13 @@ std::tuple TaskRunner::validateInputs const auto* dataHeader = get(input.header); assert(dataHeader); - if (!strncmp(dataHeader->dataDescription.str, "TIMER", 5)) { - timerReady = true; - } else { + if (strncmp(dataHeader->dataDescription.str, "TIMER", 5)) { dataInputsPresent++; } } } - bool dataReady = dataInputsPresent == inputs.size() - 1; - return { dataReady, timerReady }; + return dataInputsPresent == inputs.size() - 1; } void TaskRunner::printTaskConfig() const diff --git a/Framework/src/TimekeeperAsynchronous.cxx b/Framework/src/TimekeeperAsynchronous.cxx index 81c92fa237..ef1c66f856 100644 --- a/Framework/src/TimekeeperAsynchronous.cxx +++ b/Framework/src/TimekeeperAsynchronous.cxx @@ -18,6 +18,7 @@ #include "QualityControl/QcInfoLogger.h" #include +#include namespace o2::quality_control::core { @@ -50,27 +51,14 @@ void TimekeeperAsynchronous::updateByTimeFrameID(uint32_t tfid) return; } - if (mOrbitsPerTF == 0) { - if (auto accessor = getCCDBOrbitsPerTFAccessor()) { - mOrbitsPerTF = accessor(); - } else { - ILOG(Error, Ops) << "CCDB OrbitsPerTF accessor is not available" << ENDM; - } - if (mOrbitsPerTF == 0) { - ILOG(Error, Ops) << "nHBFperTF from CCDB GRP is 0, object validity will be incorrect" << ENDM; - } - } - - auto tfDurationMs = constants::lhc::LHCOrbitNS / 1000000 * mOrbitsPerTF; - auto tfStart = static_cast(mActivityDuration.getMin() + tfDurationMs * (tfid - 1)); - auto tfEnd = static_cast(mActivityDuration.getMin() + tfDurationMs * tfid - 1); - mCurrentSampleTimespan.update(tfStart); - mCurrentSampleTimespan.update(tfEnd); + auto tfValidity = computeTimestampFromTimeframeID(tfid); + mCurrentSampleTimespan.update(tfValidity.getMin()); + mCurrentSampleTimespan.update(tfValidity.getMax()); mCurrentTimeframeIdRange.update(tfid); - if (mActivityDuration.isOutside(tfStart)) { - ILOG(Warning, Support) << "Timestamp " << tfStart << " is outside of the assumed run duration (" + if (mActivityDuration.isOutside(tfValidity.getMin())) { + ILOG(Warning, Support) << "Timestamp " << tfValidity.getMin() << " is outside of the assumed run duration (" << mActivityDuration.getMin() << ", " << mActivityDuration.getMax() << ")" << ENDM; return; } @@ -78,7 +66,7 @@ void TimekeeperAsynchronous::updateByTimeFrameID(uint32_t tfid) if (mWindowLengthMs == 0) { mCurrentValidityTimespan = mActivityDuration; } else { - size_t subdivisionIdx = (tfStart - mActivityDuration.getMin()) / mWindowLengthMs; + size_t subdivisionIdx = (tfValidity.getMin() - mActivityDuration.getMin()) / mWindowLengthMs; size_t fullSubdivisions = mActivityDuration.delta() / mWindowLengthMs; if (subdivisionIdx < fullSubdivisions - 1) { mCurrentValidityTimespan.update(mActivityDuration.getMin() + subdivisionIdx * mWindowLengthMs); @@ -128,4 +116,34 @@ validity_time_t return selected; } +bool TimekeeperAsynchronous::shouldFinishCycle(const framework::TimingInfo& timingInfo) +{ + // we should start a new window whenever the new data falls outside of the current one. + // if the window covers the whole run, there is never a reason to finish before we receive an end of stream + return mCurrentValidityTimespan.isValid() && + mWindowLengthMs != 0 && + !timingInfo.isTimer() && + mCurrentValidityTimespan.isOutside(computeTimestampFromTimeframeID(timingInfo.tfCounter).getMin()); +} + +ValidityInterval TimekeeperAsynchronous::computeTimestampFromTimeframeID(uint32_t tfid) +{ + if (mOrbitsPerTF == 0) { + if (auto accessor = getCCDBOrbitsPerTFAccessor()) { + mOrbitsPerTF = accessor(); + ILOG(Info, Support) << "Got nOrbitsPerTF " << mOrbitsPerTF << " for TF " << tfid << ENDM; + } else { + ILOG(Error, Ops) << "CCDB OrbitsPerTF accessor is not available" << ENDM; + } + if (mOrbitsPerTF == 0) { + ILOG(Error, Ops) << "nHBFperTF from CCDB GRP is 0, object validity will be incorrect" << ENDM; + } + } + + auto tfDurationMs = constants::lhc::LHCOrbitNS / 1000000 * mOrbitsPerTF; + auto tfStart = static_cast(mActivityDuration.getMin() + tfDurationMs * (tfid - 1)); + auto tfEnd = static_cast(mActivityDuration.getMin() + tfDurationMs * tfid - 1); + return { tfStart, tfEnd }; +} + } // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/src/TimekeeperFactory.cxx b/Framework/src/TimekeeperFactory.cxx index 42b6dceb5c..74bd1d395b 100644 --- a/Framework/src/TimekeeperFactory.cxx +++ b/Framework/src/TimekeeperFactory.cxx @@ -24,12 +24,12 @@ using namespace o2::framework; namespace o2::quality_control::core { -std::unique_ptr TimekeeperFactory::create(framework::DeploymentMode deploymentMode) +std::unique_ptr TimekeeperFactory::create(framework::DeploymentMode deploymentMode, validity_time_t windowLengthMs) { switch (deploymentMode) { case DeploymentMode::Grid: { ILOG(Info, Devel) << "Detected async deployment, object validity will be based on incoming data and available SOR/EOR times" << ENDM; - return std::make_unique(); + return std::make_unique(windowLengthMs); break; } case DeploymentMode::Local: diff --git a/Framework/src/TimekeeperSynchronous.cxx b/Framework/src/TimekeeperSynchronous.cxx index 679b08c911..d1a0c281e7 100644 --- a/Framework/src/TimekeeperSynchronous.cxx +++ b/Framework/src/TimekeeperSynchronous.cxx @@ -18,6 +18,7 @@ #include "QualityControl/QcInfoLogger.h" #include +#include namespace o2::quality_control::core { @@ -98,4 +99,9 @@ validity_time_t return selected; } +bool TimekeeperSynchronous::shouldFinishCycle(const framework::TimingInfo& timingInfo) +{ + return timingInfo.isTimer(); +} + } // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/src/runFileMerger.cxx b/Framework/src/runFileMerger.cxx index 21384263ee..4a366887ad 100644 --- a/Framework/src/runFileMerger.cxx +++ b/Framework/src/runFileMerger.cxx @@ -16,7 +16,6 @@ /// \brief This is an executable which reads MonitorObjectCollections from files and creates a file with the merged result. #include "QualityControl/QcInfoLogger.h" -#include "QualityControl/MonitorObject.h" #include "QualityControl/MonitorObjectCollection.h" #include @@ -40,6 +39,7 @@ struct overloaded : Ts... { template overloaded(Ts...) -> overloaded; +// TODO: this structures and Nodes in RootFileStorage could be merged as a refactoring effort struct Node { std::string pathTo{}; std::string name{}; diff --git a/Framework/test/testInfrastructureGenerator.cxx b/Framework/test/testInfrastructureGenerator.cxx index 9b493f2b07..6625a7ff08 100644 --- a/Framework/test/testInfrastructureGenerator.cxx +++ b/Framework/test/testInfrastructureGenerator.cxx @@ -375,14 +375,14 @@ TEST_CASE("qc_infrastructure_remote_batch_test") auto configTree = configInterface->getRecursive(); auto workflow = InfrastructureGenerator::generateRemoteBatchInfrastructure(configTree, "file.root"); - REQUIRE(workflow.size() == 8); + REQUIRE(workflow.size() == 9); auto fileReader = std::find_if( workflow.begin(), workflow.end(), [](const DataProcessorSpec& d) { return d.name == "qc-root-file-source" && d.inputs.size() == 0 && - d.outputs.size() == 4; + d.outputs.size() == 5; }); CHECK(fileReader != workflow.end()); @@ -392,7 +392,7 @@ TEST_CASE("qc_infrastructure_remote_batch_test") return d.name.find("qc-check") != std::string::npos && d.inputs.size() == 1; }); - REQUIRE(checkRunnerCount == 5); + REQUIRE(checkRunnerCount == 6); auto postprocessingTask = std::find_if( workflow.begin(), workflow.end(), diff --git a/Framework/test/testMonitorObjectCollection.cxx b/Framework/test/testMonitorObjectCollection.cxx index c73edb9fa5..8361c916a5 100644 --- a/Framework/test/testMonitorObjectCollection.cxx +++ b/Framework/test/testMonitorObjectCollection.cxx @@ -135,6 +135,7 @@ TEST_CASE("monitor_object_collection_clone_mw") MonitorObject* moTH1I = new MonitorObject(objTH1I, "histo 1d", "class", "DET"); moTH1I->setIsOwner(false); moTH1I->setCreateMovingWindow(true); + moTH1I->setValidity({ 10, 432 }); moc->Add(moTH1I); TH2I* objTH2I = new TH2I("histo 2d", "histo 2d", bins, min, max, bins, min, max); @@ -143,7 +144,6 @@ TEST_CASE("monitor_object_collection_clone_mw") moc->Add(moTH2I); auto mwMergeInterface = moc->cloneMovingWindow(); - delete moc; REQUIRE(mwMergeInterface != nullptr); auto mwMOC = dynamic_cast(mwMergeInterface); @@ -158,7 +158,16 @@ TEST_CASE("monitor_object_collection_clone_mw") REQUIRE(mwTH1I != nullptr); CHECK(mwTH1I->GetBinContent(mwTH1I->FindBin(5)) == 1); + moTH1I->setValidity(gInvalidValidityInterval); + auto mwMergeInterface2 = moc->cloneMovingWindow(); + REQUIRE(mwMergeInterface2 != nullptr); + auto mwMOC2 = dynamic_cast(mwMergeInterface2); + REQUIRE(mwMOC2 != nullptr); + REQUIRE(mwMOC2->GetEntries() == 0); + + delete moc; delete mwMOC; + delete mwMOC2; } } // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/test/testRootFileStorage.cxx b/Framework/test/testRootFileStorage.cxx new file mode 100644 index 0000000000..592700ce18 --- /dev/null +++ b/Framework/test/testRootFileStorage.cxx @@ -0,0 +1,405 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file testRootFileStorage.cxx +/// \author Piotr Konopka +/// + +#include "QualityControl/RootFileStorage.h" +#include "QualityControl/MonitorObjectCollection.h" +#include "QualityControl/MonitorObject.h" +#include "QualityControl/QcInfoLogger.h" + +#include +#include +#include + +using namespace o2::quality_control::core; + +struct TestFileFixture { + TestFileFixture(const std::string& testCase) + { + filePath = "/tmp/qc_test_root_file_storage_" + testCase + "_" + std::to_string(getpid()) + ".root"; + if (!filePath.empty()) { + std::filesystem::remove(filePath); + } + } + + ~TestFileFixture() + { + if (!filePath.empty()) { + std::filesystem::remove(filePath); + } + } + + std::string filePath; +}; + +const size_t bins = 10; +const size_t min = 0; +const size_t max = 10; + +TEST_CASE("int_write_read") +{ + // the fixture will do the cleanup when being destroyed only after any file readers are destroyed earlier + TestFileFixture fixture("int_write_read"); + + MonitorObjectCollection* mocBefore = new MonitorObjectCollection(); + mocBefore->SetOwner(true); + + TH1I* histoBefore = new TH1I("histo 1d", "histo 1d", bins, min, max); + histoBefore->Fill(5); + MonitorObject* moHistoBefore = new MonitorObject(histoBefore, "histo 1d", "class", "DET"); + moHistoBefore->setActivity({ 300000, 1, "LHC32x", "apass2", "qc_async", { 100, 300 } }); + moHistoBefore->setIsOwner(true); + mocBefore->Add(moHistoBefore); + { + RootFileStorage storage(fixture.filePath, RootFileStorage::ReadMode::Update); + REQUIRE(storage.readStructure(false).children.empty()); + + // store and read back, check results + { + REQUIRE_NOTHROW(storage.storeIntegralMOC(mocBefore)); + auto mocAfter = storage.readMonitorObjectCollection("int/TST/Test"); + REQUIRE(mocAfter != nullptr); + + REQUIRE(mocBefore->GetEntries() == mocAfter->GetEntries()); + auto moHistoAfter = dynamic_cast(mocAfter->At(0)); + REQUIRE(moHistoAfter != nullptr); + + CHECK(moHistoAfter->getActivity() == moHistoBefore->getActivity()); + REQUIRE(moHistoAfter->getObject() != nullptr); + auto histoAfter = dynamic_cast(moHistoAfter->getObject()); + CHECK(histoAfter->GetBinContent(histoAfter->FindBin(5)) == 1); + CHECK(histoAfter->GetSum() == 1); + } + // merge mocBefore into the existing file, check results + { + REQUIRE_NOTHROW(storage.storeIntegralMOC(mocBefore)); + auto mocAfter = storage.readMonitorObjectCollection("int/TST/Test"); + REQUIRE(mocAfter != nullptr); + + REQUIRE(mocBefore->GetEntries() == mocAfter->GetEntries()); + auto moHistoAfter = dynamic_cast(mocAfter->At(0)); + REQUIRE(moHistoAfter != nullptr); + + CHECK(moHistoAfter->getActivity() == moHistoBefore->getActivity()); + REQUIRE(moHistoAfter->getObject() != nullptr); + auto histoAfter = dynamic_cast(moHistoAfter->getObject()); + CHECK(histoAfter->GetBinContent(histoAfter->FindBin(5)) == 2); + CHECK(histoAfter->GetSum() == 2); + } + } + + // close and reopen the file, then merge again, check results + { + RootFileStorage storage(fixture.filePath, RootFileStorage::ReadMode::Update); + REQUIRE_NOTHROW(storage.storeIntegralMOC(mocBefore)); + auto mocAfter = storage.readMonitorObjectCollection("int/TST/Test"); + REQUIRE(mocAfter != nullptr); + + REQUIRE(mocBefore->GetEntries() == mocAfter->GetEntries()); + auto moHistoAfter = dynamic_cast(mocAfter->At(0)); + REQUIRE(moHistoAfter != nullptr); + + CHECK(moHistoAfter->getActivity() == moHistoBefore->getActivity()); + REQUIRE(moHistoAfter->getObject() != nullptr); + auto histoAfter = dynamic_cast(moHistoAfter->getObject()); + CHECK(histoAfter->GetBinContent(histoAfter->FindBin(5)) == 3.0); + CHECK(histoAfter->GetSum() == 3.0); + } +} + +TEST_CASE("mw_write_read") +{ + // the fixture will do the cleanup when being destroyed only after any file readers are destroyed earlier + TestFileFixture fixture("mw_write_read"); + + MonitorObjectCollection* mocBefore = new MonitorObjectCollection(); + mocBefore->SetOwner(true); + + TH1I* histoBefore = new TH1I("histo 1d", "histo 1d", bins, min, max); + histoBefore->Fill(5); + MonitorObject* moHistoBefore = new MonitorObject(histoBefore, "histo 1d", "class", "DET"); + moHistoBefore->setActivity({ 300000, 1, "LHC32x", "apass2", "qc_async", { 100, 300 } }); + moHistoBefore->setIsOwner(true); + mocBefore->Add(moHistoBefore); + { + RootFileStorage storage(fixture.filePath, RootFileStorage::ReadMode::Update); + REQUIRE(storage.readStructure(false).children.empty()); + + // store and read back, check results + { + REQUIRE_NOTHROW(storage.storeMovingWindowMOC(mocBefore)); + auto mocAfter = storage.readMonitorObjectCollection("mw/TST/Test/100"); + REQUIRE(mocAfter != nullptr); + + REQUIRE(mocBefore->GetEntries() == mocAfter->GetEntries()); + auto moHistoAfter = dynamic_cast(mocAfter->At(0)); + REQUIRE(moHistoAfter != nullptr); + + CHECK(moHistoAfter->getActivity() == moHistoBefore->getActivity()); + REQUIRE(moHistoAfter->getObject() != nullptr); + auto histoAfter = dynamic_cast(moHistoAfter->getObject()); + CHECK(histoAfter->GetBinContent(histoAfter->FindBin(5)) == 1); + CHECK(histoAfter->GetSum() == 1); + } + // merge mocBefore into the existing file, check results + { + // extend the validity forward, start stays the same. + moHistoBefore->setValidity({ 100, 500 }); + REQUIRE_NOTHROW(storage.storeMovingWindowMOC(mocBefore)); + auto mocAfter = storage.readMonitorObjectCollection("mw/TST/Test/100"); + REQUIRE(mocAfter != nullptr); + + REQUIRE(mocBefore->GetEntries() == mocAfter->GetEntries()); + auto moHistoAfter = dynamic_cast(mocAfter->At(0)); + REQUIRE(moHistoAfter != nullptr); + + CHECK(moHistoAfter->getActivity() == moHistoBefore->getActivity()); + REQUIRE(moHistoAfter->getObject() != nullptr); + auto histoAfter = dynamic_cast(moHistoAfter->getObject()); + CHECK(histoAfter->GetBinContent(histoAfter->FindBin(5)) == 2); + CHECK(histoAfter->GetSum() == 2); + } + } + + // move the validity to the future, a new object should be stored in the file + moHistoBefore->setValidity({ 300, 500 }); + // close and reopen the file, then merge again, check results + { + RootFileStorage storage(fixture.filePath, RootFileStorage::ReadMode::Update); + REQUIRE_NOTHROW(storage.storeMovingWindowMOC(mocBefore)); + auto mocAfter = storage.readMonitorObjectCollection("mw/TST/Test/300"); + REQUIRE(mocAfter != nullptr); + + REQUIRE(mocBefore->GetEntries() == mocAfter->GetEntries()); + auto moHistoAfter = dynamic_cast(mocAfter->At(0)); + REQUIRE(moHistoAfter != nullptr); + + CHECK(moHistoAfter->getActivity() == moHistoBefore->getActivity()); + REQUIRE(moHistoAfter->getObject() != nullptr); + auto histoAfter = dynamic_cast(moHistoAfter->getObject()); + CHECK(histoAfter->GetBinContent(histoAfter->FindBin(5)) == 1.0); + CHECK(histoAfter->GetSum() == 1.0); + } +} + +TEST_CASE("read_structure") +{ + // the fixture will do the cleanup when being destroyed only after any file readers are destroyed earlier + TestFileFixture fixture("read_structure"); + + MonitorObjectCollection* moc = new MonitorObjectCollection(); + moc->SetOwner(true); + moc->setDetector("TST"); + + TH1I* histo1 = new TH1I("histo 1", "histo 1", bins, min, max); + histo1->Fill(5); + MonitorObject* moHisto1 = new MonitorObject(histo1, "histo 1", "class", "DET"); + moHisto1->setActivity({ 300000, 1, "LHC32x", "apass2", "qc_async", { 100, 300 } }); + moHisto1->setIsOwner(true); + moc->Add(moHisto1); + + TH1I* histo2 = new TH1I("histo 2", "histo 2", bins, min, max); + histo1->Fill(5); + MonitorObject* moHisto2 = new MonitorObject(histo2, "histo 2", "class", "DET"); + moHisto2->setActivity({ 300000, 1, "LHC32x", "apass2", "qc_async", { 100, 300 } }); + moHisto2->setIsOwner(true); + moc->Add(moHisto2); + + RootFileStorage storage(fixture.filePath, RootFileStorage::ReadMode::Update); + REQUIRE(storage.readStructure(false).children.empty()); + + storage.storeIntegralMOC(moc); + storage.storeMovingWindowMOC(moc); + dynamic_cast(moc->At(0))->setValidity({ 300, 500 }); + dynamic_cast(moc->At(1))->setValidity({ 300, 500 }); + storage.storeMovingWindowMOC(moc); + + auto structure = storage.readStructure(false); + CHECK(structure.children.size() == 2); + { + REQUIRE(structure.children.find("int") != structure.children.end()); + REQUIRE(std::holds_alternative(structure.children.at("int"))); + auto intDir = std::get(structure.children.at("int")); + CHECK(intDir.name == "int"); + CHECK(intDir.fullPath == "int"); + REQUIRE(intDir.children.size() == 1); + REQUIRE(intDir.children.find("TST") != intDir.children.end()); + REQUIRE(std::holds_alternative(intDir.children.at("TST"))); + { + auto intTstDir = std::get(intDir.children.at("TST")); + CHECK(intTstDir.name == "TST"); + CHECK(intTstDir.fullPath == "int/TST"); + REQUIRE(intTstDir.children.size() == 1); + REQUIRE(intTstDir.children.find("Test") != intTstDir.children.end()); + REQUIRE(std::holds_alternative(intTstDir.children.at("Test"))); + { + auto intTstTestMoc = std::get(intTstDir.children.at("Test")); + CHECK(intTstTestMoc.name == "Test"); + CHECK(intTstTestMoc.fullPath == "int/TST/Test"); + REQUIRE(intTstTestMoc.moc == nullptr); + } + } + } + { + REQUIRE(structure.children.find("mw") != structure.children.end()); + REQUIRE(std::holds_alternative(structure.children.at("mw"))); + auto mwDir = std::get(structure.children.at("mw")); + CHECK(mwDir.name == "mw"); + CHECK(mwDir.fullPath == "mw"); + REQUIRE(mwDir.children.size() == 1); + REQUIRE(mwDir.children.find("TST") != mwDir.children.end()); + REQUIRE(std::holds_alternative(mwDir.children.at("TST"))); + { + auto mwTstDir = std::get(mwDir.children.at("TST")); + CHECK(mwTstDir.name == "TST"); + CHECK(mwTstDir.fullPath == "mw/TST"); + REQUIRE(mwTstDir.children.size() == 1); + REQUIRE(mwTstDir.children.find("Test") != mwTstDir.children.end()); + REQUIRE(std::holds_alternative(mwTstDir.children.at("Test"))); + { + auto mwTstTestDir = std::get(mwTstDir.children.at("Test")); + CHECK(mwTstTestDir.name == "Test"); + CHECK(mwTstTestDir.fullPath == "mw/TST/Test"); + CHECK(mwTstTestDir.children.size() == 2); + REQUIRE(mwTstTestDir.children.find("100") != mwTstTestDir.children.end()); + REQUIRE(std::holds_alternative(mwTstTestDir.children.at("100"))); + { + auto mwTstTestMoc100 = std::get(mwTstTestDir.children.at("100")); + CHECK(mwTstTestMoc100.name == "100"); + CHECK(mwTstTestMoc100.fullPath == "mw/TST/Test/100"); + REQUIRE(mwTstTestMoc100.moc == nullptr); + } + REQUIRE(mwTstTestDir.children.find("300") != mwTstTestDir.children.end()); + REQUIRE(std::holds_alternative(mwTstTestDir.children.at("300"))); + { + auto mwTstTestMoc300 = std::get(mwTstTestDir.children.at("300")); + CHECK(mwTstTestMoc300.name == "300"); + CHECK(mwTstTestMoc300.fullPath == "mw/TST/Test/300"); + REQUIRE(mwTstTestMoc300.moc == nullptr); + } + } + } + } + + // now we read MonitorObjectCollections and delete them + structure = storage.readStructure(true); + auto intTstTest = std::get( + std::get( + std::get( + structure.children.at("int")) + .children.at("TST")) + .children.at("Test")); + CHECK(intTstTest.moc != nullptr); + delete intTstTest.moc; + + auto mwTstTest100 = std::get( + std::get( + std::get( + std::get( + structure.children.at("mw")) + .children.at("TST")) + .children.at("Test")) + .children.at("100")); + CHECK(mwTstTest100.moc != nullptr); + delete mwTstTest100.moc; + + auto mwTstTest300 = std::get( + std::get( + std::get( + std::get( + structure.children.at("mw")) + .children.at("TST")) + .children.at("Test")) + .children.at("300")); + CHECK(mwTstTest300.moc != nullptr); + delete mwTstTest300.moc; +} + +TEST_CASE("walking") +{ + // the fixture will do the cleanup when being destroyed only after any file readers are destroyed earlier + TestFileFixture fixture("walking"); + + MonitorObjectCollection* moc = new MonitorObjectCollection(); + moc->SetOwner(true); + moc->setDetector("TST"); + + TH1I* histo1 = new TH1I("histo 1", "histo 1", bins, min, max); + histo1->Fill(5); + MonitorObject* moHisto1 = new MonitorObject(histo1, "histo 1", "class", "DET"); + moHisto1->setActivity({ 300000, 1, "LHC32x", "apass2", "qc_async", { 100, 300 } }); + moHisto1->setIsOwner(true); + moc->Add(moHisto1); + + TH1I* histo2 = new TH1I("histo 2", "histo 2", bins, min, max); + histo1->Fill(5); + MonitorObject* moHisto2 = new MonitorObject(histo2, "histo 2", "class", "DET"); + moHisto2->setActivity({ 300000, 1, "LHC32x", "apass2", "qc_async", { 100, 300 } }); + moHisto2->setIsOwner(true); + moc->Add(moHisto2); + + RootFileStorage storage(fixture.filePath, RootFileStorage::ReadMode::Update); + auto structure = storage.readStructure(false); + REQUIRE(structure.children.empty()); + + // check if walkers do not crash when the file is empty + { + IntegralMocWalker intWalker(structure); + REQUIRE(!intWalker.hasNextPath()); + REQUIRE(intWalker.nextPath() == ""); + } + { + MovingWindowMocWalker mwWalker(structure); + REQUIRE(!mwWalker.hasNextPath()); + REQUIRE(mwWalker.nextPath() == ""); + } + + // now we put some data in the file and validate walkers in a usual scenario + storage.storeIntegralMOC(moc); + storage.storeMovingWindowMOC(moc); + dynamic_cast(moc->At(0))->setValidity({ 300, 500 }); + dynamic_cast(moc->At(1))->setValidity({ 300, 500 }); + storage.storeMovingWindowMOC(moc); + + structure = storage.readStructure(false); + { + IntegralMocWalker intWalker(structure); + REQUIRE(intWalker.hasNextPath()); + auto path = intWalker.nextPath(); + REQUIRE(path == "int/TST/Test"); + auto* readMoc = storage.readMonitorObjectCollection(path); + CHECK(readMoc != nullptr); + delete readMoc; + REQUIRE(!intWalker.hasNextPath()); + CHECK(intWalker.nextPath().empty()); + } + { + MovingWindowMocWalker mwWalker(structure); + REQUIRE(mwWalker.hasNextPath()); + auto path = mwWalker.nextPath(); + REQUIRE(path == "mw/TST/Test/100"); + auto* readMoc = storage.readMonitorObjectCollection(path); + CHECK(readMoc != nullptr); + delete readMoc; + REQUIRE(mwWalker.hasNextPath()); + path = mwWalker.nextPath(); + REQUIRE(path == "mw/TST/Test/300"); + readMoc = storage.readMonitorObjectCollection(path); + CHECK(readMoc != nullptr); + delete readMoc; + REQUIRE(!mwWalker.hasNextPath()); + CHECK(mwWalker.nextPath().empty()); + } +} \ No newline at end of file diff --git a/Framework/test/testTimekeeper.cxx b/Framework/test/testTimekeeper.cxx index 1b8938c0fa..7d8bfc066a 100644 --- a/Framework/test/testTimekeeper.cxx +++ b/Framework/test/testTimekeeper.cxx @@ -177,6 +177,14 @@ TEST_CASE("timekeeper_synchronous") CHECK(tk->getActivityDuration().getMin() == 2); CHECK(tk->getActivityDuration().getMax() == 5); } + + SECTION("finish_cycle") + { + auto tk = std::make_shared(); + CHECK(tk->shouldFinishCycle(TimingInfo{ 1653500000000000 })); + CHECK(tk->shouldFinishCycle(TimingInfo{ 1653500010000000 })); + CHECK(!tk->shouldFinishCycle(TimingInfo{ 54000 })); + } } TEST_CASE("timekeeper_asynchronous") @@ -235,11 +243,13 @@ TEST_CASE("timekeeper_asynchronous") tk->setCCDBOrbitsPerTFAccessor([]() { return 32; }); tk->setActivityDuration(ValidityInterval{ 1653000000000, 1655000000000 }); + CHECK(!tk->shouldFinishCycle(TimingInfo{ 3 })); // no, because validity is invalid and there is no moving window tk->updateByTimeFrameID(3); tk->updateByTimeFrameID(10); CHECK(tk->getValidity() == ValidityInterval{ 1653000000000, 1655000000000 }); CHECK(tk->getSampleTimespan() == ValidityInterval{ 1653000000005, 1653000000027 }); CHECK(tk->getTimerangeIdRange() == TimeframeIdRange{ 3, 10 }); + CHECK(!tk->shouldFinishCycle(TimingInfo{ 4 })); // no, because validity is invalid and there is no moving window tk->reset(); CHECK(tk->getValidity() == gInvalidValidityInterval); @@ -260,9 +270,12 @@ TEST_CASE("timekeeper_asynchronous") auto tk = std::make_shared(30 * 1000); tk->setActivityDuration(ValidityInterval{ 1653000000000, 1653000095000 }); // 95 seconds: 0-30, 30-60, 60-95 tk->setCCDBOrbitsPerTFAccessor([nOrbitPerTF]() { return nOrbitPerTF; }); + CHECK(!tk->shouldFinishCycle(TimingInfo{ 3, 0, 3 })); // no, because validity is invalid // hitting only the 1st window tk->updateByTimeFrameID(1); + CHECK(!tk->shouldFinishCycle(TimingInfo{ 10, 0, 10 })); // no, because TFID 10 is within the 1st window + CHECK(!tk->shouldFinishCycle(TimingInfo{ 1653500000000000 })); // no, because this is a timer input tk->updateByTimeFrameID(10); CHECK(tk->getValidity() == ValidityInterval{ 1653000000000, 1653000030000 }); CHECK(tk->getSampleTimespan() == ValidityInterval{ 1653000000000, 1653000009999 }); @@ -271,6 +284,7 @@ TEST_CASE("timekeeper_asynchronous") // hitting the 1st and 2nd window tk->reset(); tk->updateByTimeFrameID(1); + CHECK(tk->shouldFinishCycle(TimingInfo{ 55, 0, 55 })); // yes, because TFID 55 is not within the 1st window tk->updateByTimeFrameID(55); CHECK(tk->getValidity() == ValidityInterval{ 1653000000000, 1653000060000 }); CHECK(tk->getSampleTimespan() == ValidityInterval{ 1653000000000, 1653000055001 }); diff --git a/doc/Advanced.md b/doc/Advanced.md index 828dfcbd0f..12f28b8cd3 100644 --- a/doc/Advanced.md +++ b/doc/Advanced.md @@ -412,9 +412,9 @@ In the presented case, the Merger will publish one set of complete MOs per 10 mi ### Moving windows of selected plots only -In setups which use Mergers in the delta mode, - one can obtain objects spawning the last cycle alongside the ones covering the whole run. -These are saved in a subdirectory `mw` and also can be requested by Checks. +The following applies to synchronous setups which use Mergers in the delta mode and all asynchronous setups. +One can obtain objects containing data from one cycle alongside the ones covering the whole run. +These are saved in QCDB in the task subdirectory `mw` and also can be requested by Checks. To specify which objects should get a moving window variant, add a `"movingWindows"` list to the task configuration: ```json "MyTask": { @@ -435,11 +435,16 @@ To request these objects in a Check, use `TaskMovingWindow` data source, as in t }] } ``` -It is possible to request both the integrated and last cycle plots by the same Check. +It is possible to request both the integrated and single cycle plots by the same Check. + +To test it in a small setup, one can run `o2-qc` with `--full-chain` flag, which creates a complete workflow with a Merger for local QC tasks, even though it runs just one instance of them. + +In asynchronous QC, the moving window plots will appear in the intermediate QC file in the directory `mw` and will be uploaded to QCDB to `/mw`. +When testing, please make sure to let DPL know that it has to run in Grid mode, so that QC can compute object validity based on timestamps in the data: +``` +export O2_DPL_DEPLOYMENT_MODE=Grid && o2-qc ... +``` -To test it in a small setup, one can run `o2-qc` with `--full-chain` flag, which creates a complete workflow with a -Merger for local QC tasks, even though it runs just one instance of them. - ## Monitor cycles The QC tasks monitor and process data continuously during a so-called "monitor cycle". At the end of such a cycle they publish the QC objects that will then continue their way in the QC data flow.