Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QC-1112] Develop a task to monitor the TF/payload size #2388

Merged
merged 4 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Modules/Daq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ target_include_directories(
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
)

target_link_libraries(O2QcDaq PUBLIC O2QualityControl PRIVATE ROOT::Gpad O2::DetectorsRaw)
target_link_libraries(O2QcDaq PUBLIC O2QualityControl PRIVATE
ROOT::Gpad
O2::DetectorsRaw
O2QcCommon)

add_root_dictionary(O2QcDaq
HEADERS include/Daq/DaqTask.h
Expand Down
47 changes: 31 additions & 16 deletions Modules/Daq/daq.json → Modules/Daq/etc/daq.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,35 @@
"active": "true",
"className": "o2::quality_control_modules::daq::DaqTask",
"moduleName": "QcDaq",
"detectorName": "DAQ"
"detectorName": "ITS",
"cycleDurationSeconds": "10",
"dataSource": {
"type": "direct",
"query": "raw:ITS/RAWDATA"
},
"taskParameters": {
"TFSizeBins":"100",
"TFSizeMin":"0",
"TFSizeMax":"2047",
"payloadSizeInputsBins":"100",
"payloadSizeInputsMin":"0",
"payloadSizeInputsMax":"2047",
"numberRDHsBins":"100",
"numberRDHsMin":"0",
"numberRDHsMax":"2047",
"sumRdhSizesInTFBins":"100",
"sumRdhSizesInTFMin":"0",
"sumRdhSizesInTFMax":"2047",
"RdhSizesBins":"100",
"RdhSizesMin":"0",
"RdhSizesMax":"2047",
"RdhPayloadSizeBins":"100",
"RdhPayloadSizeMin":"0",
"RdhPayloadSizeMax":"2047",
"CRUidBins":"100",
"CRUidMin":"0",
"CRUidMax":"2047"
}
}
},
"checks": {
Expand All @@ -36,7 +64,7 @@
"className": "o2::quality_control_modules::common::NonEmpty",
"moduleName": "QcCommon",
"policy": "OnAny",
"detectorName": "DAQ",
"detectorName": "ITS",
"dataSource": [{
"type": "Task",
"name": "DaqTask",
Expand All @@ -48,26 +76,13 @@
"className": "o2::quality_control_modules::skeleton::SkeletonCheck",
"moduleName": "QcSkeleton",
"policy": "OnAny",
"detectorName": "DAQ",
"dataSource": [{
"type": "Task",
"name": "DaqTask",
"MOs": ["IDs"]
}]
},
"CheckIncreasingIDs": {
"active": "true",
"className": "o2::quality_control_modules::daq::EverIncreasingGraph",
"moduleName": "QcDaq",
"policy": "OnAny",
"detectorName": "DAQ",
"detectorName": "ITS",
"dataSource": [{
"type": "Task",
"name": "DaqTask",
"MOs": ["IDs"]
}]
}

}
}
}
36 changes: 12 additions & 24 deletions Modules/Daq/include/Daq/DaqTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
#ifndef QC_MODULE_DAQ_DAQTASK_H
#define QC_MODULE_DAQ_DAQTASK_H

// ROOT
#include <TH1.h>
#include <TH2.h>

#include "QualityControl/TaskInterface.h"
#include <Headers/DAQID.h>
#include <map>
#include <set>

class TH1F;

using namespace o2::quality_control::core;

Expand All @@ -37,9 +37,7 @@ class DaqTask final : public o2::quality_control::core::TaskInterface
{
public:
/// \brief Constructor
DaqTask();
/// Destructor
~DaqTask() override;
DaqTask() = default;

// Definition of the methods for the template method pattern
void initialize(o2::framework::InitContext& ctx) override;
Expand All @@ -54,30 +52,20 @@ class DaqTask final : public o2::quality_control::core::TaskInterface
void printInputPayload(const header::DataHeader* header, const char* payload, size_t payloadSize);
void monitorInputRecord(o2::framework::InputRecord& inputRecord);
void monitorRDHs(o2::framework::InputRecord& inputRecord);
int getIntParam(const std::string paramName, int defaultValue = 0);

// ** general information

std::map<o2::header::DAQID::ID, std::string> mSystems;
std::set<o2::header::DAQID::ID> mToBePublished; // keep the list of detectors we saw this cycle and whose plots should be published

// ** objects we publish **

// Message related
// Block = the whole InputRecord, i.e. the thing we receive and analyse in monitorData(...)
// SubBlock = a single input of the InputRecord
TH1F* mInputRecordPayloadSize = nullptr; // filled w/ the sum of the payload size of all the inputs of an inputrecord
TH1F* mNumberInputs = nullptr; // filled w/ the number of inputs in each InputRecord we encounter
TH1F* mInputSize = nullptr; // filled w/ the size of the inputs in each InputRecord we encounter
TH1F* mNumberRDHs = nullptr; // filled w/ the number of RDHs found in each InputRecord we encounter

// Per link information

// Per detector information
std::map<o2::header::DAQID::ID, TH1F*> mSubSystemsTotalSizes; // filled with the sum of RDH memory sizes per InputRecord
std::map<o2::header::DAQID::ID, TH1F*> mSubSystemsRdhSizes; // filled with the RDH memory sizes for each RDH
// todo : for the next one we need to know the number of links per detector.
// std::map<o2::header::DAQID::ID, TH1F*> mSubSystemsRdhHits; // hits per link split by detector
// todo we could add back the graph for the IDs using the TFID
std::unique_ptr<TH1F> mTFRecordPayloadSize; // filled w/ the sum of the payload size of all the inputs of an inputrecord
std::unique_ptr<TH1F> mInputSize; // filled w/ the size of the inputs in each InputRecord we encounter
std::unique_ptr<TH1F> mNumberRDHs; // filled w/ the number of RDHs found in each InputRecord we encounter
std::unique_ptr<TH1F> mSumRDHSizesInTF; // filled w/ the the sum of RDH memory sizes per InputRecord
std::unique_ptr<TH1F> mSumRDHSizesInRDH; // filled w/ the RDH memory sizes for each RDH
std::unique_ptr<TH2F> mRDHSizesPerCRUIds; // filled w/ the RDH payload size per CRUId
};

} // namespace o2::quality_control_modules::daq
Expand Down
160 changes: 64 additions & 96 deletions Modules/Daq/src/DaqTask.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
// QC
#include "QualityControl/QcInfoLogger.h"
#include "QualityControl/stringUtils.h"
// ROOT
#include <TH1.h>
// O2
#include <DPLUtils/DPLRawParser.h>
#include <DetectorsRaw/RDHUtils.h>
#include <Framework/InputRecord.h>
#include <Framework/InputRecordWalker.h>
#include <Headers/DataHeaderHelpers.h>
#include <stdexcept>
#include "Common/Utils.h"

using namespace std;
using namespace o2::raw;
Expand All @@ -36,63 +36,55 @@ using namespace o2::header;
namespace o2::quality_control_modules::daq
{

DaqTask::DaqTask()
: TaskInterface()
int DaqTask::getIntParam(const std::string paramName, int defaultValue)
{
}

DaqTask::~DaqTask()
{
delete mInputRecordPayloadSize;
delete mNumberInputs;
delete mInputSize;
delete mNumberRDHs;
}

// TODO remove this function once the equivalent is available in O2 DAQID
bool isDetIdValid(DAQID::ID id)
{
return (id < DAQID::MAXDAQ + 1) && (DAQID::DAQtoO2(id) != gDataOriginInvalid);
return common::getFromConfig<int>(mCustomParameters, paramName, defaultValue);
}

void DaqTask::initialize(o2::framework::InitContext& /*ctx*/)
{
ILOG(Debug, Devel) << "initializiation of DaqTask" << ENDM;

// General plots, related mostly to the payload size (InputRecord, Inputs) and the numbers of RDHs and Inputs in an InputRecord.
mInputRecordPayloadSize = new TH1F("inputRecordSize", "Total payload size per InputRecord;bytes", 128, 0, 2047);
mInputRecordPayloadSize->SetCanExtend(TH1::kXaxis);
getObjectsManager()->startPublishing(mInputRecordPayloadSize, PublicationPolicy::Forever);
mNumberInputs = new TH1F("numberInputs", "Number of inputs per InputRecords", 100, 1, 100);
getObjectsManager()->startPublishing(mNumberInputs, PublicationPolicy::Forever);
mInputSize = new TH1F("payloadSizeInputs", "Payload size of the inputs;bytes", 128, 0, 2047);
mInputSize->SetCanExtend(TH1::kXaxis);
getObjectsManager()->startPublishing(mInputSize, PublicationPolicy::Forever);
mNumberRDHs = new TH1F("numberRDHs", "Number of RDHs per InputRecord", 100, 1, 100);
mNumberRDHs->SetCanExtend(TH1::kXaxis);
getObjectsManager()->startPublishing(mNumberRDHs, PublicationPolicy::Forever);

// initialize a map for the subsystems (id, name)
for (int i = DAQID::MINDAQ; i < DAQID::MAXDAQ + 1; i++) {
DataOrigin origin = DAQID::DAQtoO2(i);
if (origin != gDataOriginInvalid) {
mSystems[i] = origin.str;
}
}
mSystems[DAQID::INVALID] = "UNKNOWN"; // to store RDH info for unknown detectors

// subsystems plots: distribution of rdh size, distribution of the sum of rdh in each message.
for (const auto& system : mSystems) {
string name = system.second + "/sumRdhSizesPerInputRecord";
string title = "Sum of RDH sizes per InputRecord for " + system.second + ";bytes";
mSubSystemsTotalSizes[system.first] = new TH1F(name.c_str(), title.c_str(), 128, 0, 2047);
mSubSystemsTotalSizes[system.first]->SetCanExtend(TH1::kXaxis);

name = system.second + "/RdhSizes";
title = "RDH sizes for " + system.second + ";bytes";
mSubSystemsRdhSizes[system.first] = new TH1F(name.c_str(), title.c_str(), 128, 0, 2047);
mSubSystemsRdhSizes[system.first]->SetCanExtend(TH1::kXaxis);
}
mTFRecordPayloadSize = std::make_unique<TH1F>("TFSize", "Total payload size in TF;bytes",
getIntParam("TFSizeBins", 128),
getIntParam("TFSizeMin", 0),
getIntParam("TFSizeMax", 2047));
getObjectsManager()->startPublishing(mTFRecordPayloadSize.get(), PublicationPolicy::Forever);

mInputSize = std::make_unique<TH1F>("payloadSizeInputs", "Payload size of the inputs;bytes",
getIntParam("payloadSizeInputsBins", 128),
getIntParam("payloadSizeInputsMin", 0),
getIntParam("payloadSizeInputsMax", 2047));
getObjectsManager()->startPublishing(mInputSize.get(), PublicationPolicy::Forever);

mNumberRDHs = std::make_unique<TH1F>("numberRdhs", "Number of RDHs in TF;RDH count",
getIntParam("numberRDHsBins", 100),
getIntParam("numberRDHsMin", 1),
getIntParam("numberRDHsMax", 100));
getObjectsManager()->startPublishing(mNumberRDHs.get(), PublicationPolicy::Forever);

mSumRDHSizesInTF = std::make_unique<TH1F>("sumRdhSizesInTF", "Sum of RDH sizes in TF;bytes",
getIntParam("sumRdhSizesInTFBins", 128),
getIntParam("sumRdhSizesInTFMin", 0),
getIntParam("sumRdhSizesInTFMax", 2047));
getObjectsManager()->startPublishing(mSumRDHSizesInTF.get(), PublicationPolicy::Forever);

mSumRDHSizesInRDH = std::make_unique<TH1F>("RdhSizes", "RDH sizes;bytes",
getIntParam("RdhSizesBins", 128),
getIntParam("RdhSizesMin", 0),
getIntParam("RdhSizesMax", 2047));
getObjectsManager()->startPublishing(mSumRDHSizesInRDH.get(), PublicationPolicy::Forever);

mRDHSizesPerCRUIds = std::make_unique<TH2F>("RdhPayloadSizePerCRUid", "RDH payload size per CRU",
getIntParam("CRUidBins", (1 << 12) - 1), // CRU id is defined as 12 bits (see O2 RAWDataHeader.h cruID)
getIntParam("CRUidMin", 0),
getIntParam("CRUidMax", 500),
getIntParam("RdhPayloadSizeBins", 128),
getIntParam("RdhPayloadSizeMin", 0),
getIntParam("RdhPayloadSizeMax", 2047));
mRDHSizesPerCRUIds->GetXaxis()->SetTitle("CRU Id");
mRDHSizesPerCRUIds->GetYaxis()->SetTitle("bytes");
getObjectsManager()->startPublishing(mRDHSizesPerCRUIds.get(), PublicationPolicy::Forever);
}

void DaqTask::startOfActivity(const Activity& activity)
Expand All @@ -115,31 +107,18 @@ void DaqTask::monitorData(o2::framework::ProcessingContext& ctx)
void DaqTask::endOfCycle()
{
ILOG(Debug, Devel) << "endOfCycle" << ENDM;

// TODO make this optional once we are able to know the run number and the detectors included.
// It might still be necessary in test runs without a proper run number.
for (auto toBeAdded : mToBePublished) {
if (!getObjectsManager()->isBeingPublished(mSubSystemsTotalSizes[toBeAdded]->GetName())) {
getObjectsManager()->startPublishing(mSubSystemsTotalSizes[toBeAdded], PublicationPolicy::ThroughStop);
}
if (!getObjectsManager()->isBeingPublished(mSubSystemsRdhSizes[toBeAdded]->GetName())) {
getObjectsManager()->startPublishing(mSubSystemsRdhSizes[toBeAdded], PublicationPolicy::ThroughStop);
}
}
}

void DaqTask::endOfActivity(const Activity& /*activity*/)
{
ILOG(Debug, Devel) << "endOfActivity" << ENDM;

for (const auto& system : mSystems) {
if (getObjectsManager()->isBeingPublished(mSubSystemsTotalSizes[system.first]->GetName())) {
getObjectsManager()->stopPublishing(mSubSystemsTotalSizes[system.first]);
}
if (getObjectsManager()->isBeingPublished(mSubSystemsRdhSizes[system.first]->GetName())) {
getObjectsManager()->stopPublishing(mSubSystemsRdhSizes[system.first]);
}
}
getObjectsManager()->stopPublishing(mTFRecordPayloadSize.get());
getObjectsManager()->stopPublishing(mInputSize.get());
getObjectsManager()->stopPublishing(mNumberRDHs.get());
getObjectsManager()->stopPublishing(mSumRDHSizesInRDH.get());
getObjectsManager()->stopPublishing(mSumRDHSizesInRDH.get());
getObjectsManager()->stopPublishing(mRDHSizesPerCRUIds.get());
}

void DaqTask::reset()
justonedev1 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -149,15 +128,12 @@ void DaqTask::reset()
// TODO if the number of plots grows we should probably have a container with pointers/references to all of them.
// then we can just iterate over.

mInputRecordPayloadSize->Reset();
mNumberInputs->Reset();
mTFRecordPayloadSize->Reset();
mInputSize->Reset();
mNumberRDHs->Reset();

for (const auto& system : mSystems) {
mSubSystemsRdhSizes.at(system.first)->Reset();
mSubSystemsTotalSizes.at(system.first)->Reset();
}
mSumRDHSizesInRDH->Reset();
mSumRDHSizesInRDH->Reset();
mRDHSizesPerCRUIds->Reset();
}

void DaqTask::printInputPayload(const header::DataHeader* header, const char* payload, size_t payloadSize)
Expand Down Expand Up @@ -218,8 +194,7 @@ void DaqTask::monitorInputRecord(InputRecord& inputRecord)
ILOG(Warning, Support) << "Received an input with an empty header" << ENDM;
}
}
mInputRecordPayloadSize->Fill(totalPayloadSize);
mNumberInputs->Fill(inputRecord.countValidInputs());
mTFRecordPayloadSize->Fill(totalPayloadSize);
}

template <class T>
Expand All @@ -241,9 +216,8 @@ void DaqTask::monitorRDHs(o2::framework::InputRecord& inputRecord)
{
// Use the DPLRawParser to get information about the Pages and RDHs stored in the inputRecord
o2::framework::DPLRawParser parser(inputRecord);
size_t rdhCounter = 0;
size_t totalSize = 0;
DAQID::ID rdhSource = DAQID::INVALID;
size_t rdhCounter = 0;
for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
// TODO for some reason this does not work
// ILOG(Info, Ops) << "Header: " << ENDM;
Expand Down Expand Up @@ -271,26 +245,20 @@ void DaqTask::monitorRDHs(o2::framework::InputRecord& inputRecord)

// RDH plots
try {
rdhSource = RDHUtils::getVersion(rdh) >= 6 ? RDHUtils::getSourceID(rdh) : DAQID::INVALID; // there is no sourceID before v6
if (!isDetIdValid(rdhSource)) { // if we found it , is it valid ?
rdhSource = DAQID::INVALID;
}
totalSize += RDHUtils::getMemorySize(rdh);
const auto rdhSize = RDHUtils::getMemorySize(rdh) - RDHUtils::getHeaderSize(rdh);
mSumRDHSizesInRDH->Fill(rdhSize);
totalSize += rdhSize;
rdhCounter++;
mSubSystemsRdhSizes.at(rdhSource)->Fill(RDHUtils::getMemorySize(rdh));

mRDHSizesPerCRUIds->Fill(RDHUtils::getCRUID(rdh), rdhSize);

} catch (std::runtime_error& e) {
ILOG(Error, Devel) << "Catched an exception when accessing the rdh fields: \n"
ILOG(Error, Devel) << "Caught an exception when accessing the rdh fields: \n"
<< e.what() << ENDM;
}
}

mSubSystemsTotalSizes.at(rdhSource)->Fill(totalSize);

// TODO make this optional once we are able to know the run number and the detectors included.
mToBePublished.insert(rdhSource);

// TODO why is the payload size reported by the dataref.header->print() different than the one from the sum
// of the RDH memory size + dataref header size ? a few hundreds bytes difference.
mSumRDHSizesInTF->Fill(totalSize);
mNumberRDHs->Fill(rdhCounter);
}

Expand Down
Loading