Skip to content

Commit

Permalink
all histo params are settable; added 2d histo rdh payload per cru
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Tichák committed Aug 15, 2024
1 parent afce9be commit ef8ac48
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 95 deletions.
29 changes: 8 additions & 21 deletions Modules/Daq/include/Daq/DaqTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@

// ROOT
#include <TH1.h>
#include <TH2.h>

#include "QualityControl/TaskInterface.h"
#include <Headers/DAQID.h>
#include <map>
#include <set>

class TH1F;

using namespace o2::quality_control::core;

Expand Down Expand Up @@ -55,29 +52,19 @@ class DaqTask final : public o2::quality_control::core::TaskInterface
void printInputPayload(const header::DataHeader* header, const char* payload, size_t payloadSize);
void monitorInputRecord(o2::framework::InputRecord& inputRecord);
void monitorRDHs(o2::framework::InputRecord& inputRecord);

int getIntParam(const std::string paramName);
// ** general information

std::map<o2::header::DAQID::ID, std::string> mSystems;
std::set<o2::header::DAQID::ID> mToBePublished; // keep the list of detectors we saw this cycle and whose plots should be published

// ** objects we publish **

// Message related
// Block = the whole InputRecord, i.e. the thing we receive and analyse in monitorData(...)
// SubBlock = a single input of the InputRecord
std::unique_ptr<TH1F> mInputRecordPayloadSize; // filled w/ the sum of the payload size of all the inputs of an inputrecord
std::unique_ptr<TH1F> mInputSize; // filled w/ the size of the inputs in each InputRecord we encounter
std::unique_ptr<TH1F> mNumberRDHs; // filled w/ the number of RDHs found in each InputRecord we encounter

// Per link information

// Per detector information
std::map<o2::header::DAQID::ID, std::unique_ptr<TH1F>> mSubSystemsTotalSizes; // filled with the sum of RDH memory sizes per InputRecord
std::map<o2::header::DAQID::ID, std::unique_ptr<TH1F>> mSubSystemsRdhSizes; // filled with the RDH memory sizes for each RDH
// todo : for the next one we need to know the number of links per detector.
// std::map<o2::header::DAQID::ID, TH1F*> mSubSystemsRdhHits; // hits per link split by detector
// todo we could add back the graph for the IDs using the TFID
std::unique_ptr<TH1F> mInputRecordPayloadSize; // filled w/ the sum of the payload size of all the inputs of an inputrecord
std::unique_ptr<TH1F> mInputSize; // filled w/ the size of the inputs in each InputRecord we encounter
std::unique_ptr<TH1F> mNumberRDHs; // filled w/ the number of RDHs found in each InputRecord we encounter
std::unique_ptr<TH1F> mSumRDHSizesPerInputRecord; // filled w/ the the sum of RDH memory sizes per InputRecord
std::unique_ptr<TH1F> mSumRDHSizesPerRDH; // filled w/ the RDH memory sizes for each RDH
std::unique_ptr<TH2F> mRDHSizesPerCRUIds;
};

} // namespace o2::quality_control_modules::daq
Expand Down
145 changes: 71 additions & 74 deletions Modules/Daq/src/DaqTask.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <Framework/InputRecord.h>
#include <Framework/InputRecordWalker.h>
#include <Headers/DataHeaderHelpers.h>
#include <stdexcept>

using namespace std;
using namespace o2::raw;
Expand All @@ -34,47 +35,66 @@ using namespace o2::header;
namespace o2::quality_control_modules::daq
{

// TODO remove this function once the equivalent is available in O2 DAQID
bool isDetIdValid(DAQID::ID id)
int DaqTask::getIntParam(const std::string paramName)
{
return (id < DAQID::MAXDAQ + 1) && (DAQID::DAQtoO2(id) != gDataOriginInvalid);
if (mCustomParameters.count(paramName) > 0) {
return std::stoi(mCustomParameters.at(paramName));
}
throw std::runtime_error("Failed to retrieve parameter " + paramName + " from \"taskParameters\"");
}

void DaqTask::initialize(o2::framework::InitContext& /*ctx*/)
{
ILOG(Debug, Devel) << "initializiation of DaqTask" << ENDM;

// General plots, related mostly to the payload size (InputRecord, Inputs) and the numbers of RDHs and Inputs in an InputRecord.
mInputRecordPayloadSize = std::make_unique<TH1F>("inputRecordSize", "Total payload size per InputRecord;bytes", 128, 0, 2047);
mInputRecordPayloadSize->SetCanExtend(TH1::kXaxis);
getObjectsManager()->startPublishing(mInputRecordPayloadSize.get(), PublicationPolicy::Forever);
mInputSize = std::make_unique<TH1F>("payloadSizeInputs", "Payload size of the inputs;bytes", 128, 0, 2047);
mInputSize->SetCanExtend(TH1::kXaxis);
getObjectsManager()->startPublishing(mInputSize.get(), PublicationPolicy::Forever);
mNumberRDHs = std::make_unique<TH1F>("numberRDHs", "Number of RDHs per InputRecord", 100, 1, 100);
mNumberRDHs->SetCanExtend(TH1::kXaxis);
getObjectsManager()->startPublishing(mNumberRDHs.get(), PublicationPolicy::Forever);

// initialize a map for the subsystems (id, name)
for (int i = DAQID::MINDAQ; i < DAQID::MAXDAQ + 1; i++) {
DataOrigin origin = DAQID::DAQtoO2(i);
if (origin != gDataOriginInvalid) {
mSystems[i] = origin.str;
}
}
mSystems[DAQID::INVALID] = "UNKNOWN"; // to store RDH info for unknown detectors

// subsystems plots: distribution of rdh size, distribution of the sum of rdh in each message.
for (const auto& system : mSystems) {
string name = system.second + "/sumRdhSizesPerInputRecord";
string title = "Sum of RDH sizes per InputRecord for " + system.second + ";bytes";
mSubSystemsTotalSizes[system.first] = std::make_unique<TH1F>(name.c_str(), title.c_str(), 128, 0, 2047);
mSubSystemsTotalSizes[system.first]->SetCanExtend(TH1::kXaxis);

name = system.second + "/RdhSizes";
title = "RDH sizes for " + system.second + ";bytes";
mSubSystemsRdhSizes[system.first] = std::make_unique<TH1F>(name.c_str(), title.c_str(), 128, 0, 2047);
mSubSystemsRdhSizes[system.first]->SetCanExtend(TH1::kXaxis);
try {

// General plots, related mostly to the payload size (InputRecord, Inputs) and the numbers of RDHs and Inputs in an InputRecord.
mInputRecordPayloadSize = std::make_unique<TH1F>("inputRecordSize", "Total payload size per InputRecord;bytes",
getIntParam("inputRecordSizeBins"),
getIntParam("inputRecordSizeMin"),
getIntParam("inputRecordSizeMax"));
getObjectsManager()->startPublishing(mInputRecordPayloadSize.get(), PublicationPolicy::Forever);

mInputSize = std::make_unique<TH1F>("payloadSizeInputs", "Payload size of the inputs;bytes",
getIntParam("payloadSizeInputsBins"),
getIntParam("payloadSizeInputsMin"),
getIntParam("payloadSizeInputsMax"));
getObjectsManager()->startPublishing(mInputSize.get(), PublicationPolicy::Forever);

mNumberRDHs = std::make_unique<TH1F>("numberRDHs", "Number of RDHs per InputRecord",
getIntParam("numberRDHsBins"),
getIntParam("numberRDHsMin"),
getIntParam("numberRDHsMax"));
getObjectsManager()->startPublishing(mNumberRDHs.get(), PublicationPolicy::Forever);

mSumRDHSizesPerInputRecord = std::make_unique<TH1F>("sumRdhSizesPerInputRecord", "Sum of RDH sizes per InputRecord;bytes",
getIntParam("sumRdhSizesPerInputRecordBins"),
getIntParam("sumRdhSizesPerInputRecordMin"),
getIntParam("sumRdhSizesPerInputRecordMax"));
getObjectsManager()->startPublishing(mSumRDHSizesPerInputRecord.get(), PublicationPolicy::Forever);

mSumRDHSizesPerRDH = std::make_unique<TH1F>("RdhSizes", "RDH sizes; bytes",
getIntParam("RdhSizesBins"),
getIntParam("RdhSizesMin"),
getIntParam("RdhSizesMax"));
getObjectsManager()->startPublishing(mSumRDHSizesPerRDH.get(), PublicationPolicy::Forever);

mRDHSizesPerCRUIds = std::make_unique<TH2F>("RdhPayloadSizePerCRUid", "RDH payload size per CRU",
getIntParam("RdhPayloadSizePerCRUidXBins"),
getIntParam("RdhPayloadSizePerCRUidXMin"),
getIntParam("RdhPayloadSizePerCRUidXMax"),
getIntParam("RdhPayloadSizePerCRUidYBins"),
getIntParam("RdhPayloadSizePerCRUidYMin"),
getIntParam("RdhPayloadSizePerCRUidYMax"));
mRDHSizesPerCRUIds->GetXaxis()->SetTitle("bytes");
mRDHSizesPerCRUIds->GetYaxis()->SetTitle("CRU Id");
getObjectsManager()->startPublishing(mRDHSizesPerCRUIds.get(), PublicationPolicy::Forever);

} catch (std::exception& ex) {
ILOG(Fatal, Support) << "Failed to initialize task with message: " << ex.what() << ENDM;
// we want to crash this task because we do not have config
throw ex;
}
}

Expand All @@ -98,31 +118,18 @@ void DaqTask::monitorData(o2::framework::ProcessingContext& ctx)
void DaqTask::endOfCycle()
{
ILOG(Debug, Devel) << "endOfCycle" << ENDM;

// TODO make this optional once we are able to know the run number and the detectors included.
// It might still be necessary in test runs without a proper run number.
for (auto toBeAdded : mToBePublished) {
if (!getObjectsManager()->isBeingPublished(mSubSystemsTotalSizes[toBeAdded]->GetName())) {
getObjectsManager()->startPublishing(mSubSystemsTotalSizes[toBeAdded].get(), PublicationPolicy::ThroughStop);
}
if (!getObjectsManager()->isBeingPublished(mSubSystemsRdhSizes[toBeAdded]->GetName())) {
getObjectsManager()->startPublishing(mSubSystemsRdhSizes[toBeAdded].get(), PublicationPolicy::ThroughStop);
}
}
}

void DaqTask::endOfActivity(const Activity& /*activity*/)
{
ILOG(Debug, Devel) << "endOfActivity" << ENDM;

for (const auto& system : mSystems) {
if (getObjectsManager()->isBeingPublished(mSubSystemsTotalSizes[system.first]->GetName())) {
getObjectsManager()->stopPublishing(mSubSystemsTotalSizes[system.first].get());
}
if (getObjectsManager()->isBeingPublished(mSubSystemsRdhSizes[system.first]->GetName())) {
getObjectsManager()->stopPublishing(mSubSystemsRdhSizes[system.first].get());
}
}
getObjectsManager()->stopPublishing(mInputRecordPayloadSize.get());
getObjectsManager()->stopPublishing(mInputSize.get());
getObjectsManager()->stopPublishing(mNumberRDHs.get());
getObjectsManager()->stopPublishing(mSumRDHSizesPerInputRecord.get());
getObjectsManager()->stopPublishing(mSumRDHSizesPerRDH.get());
getObjectsManager()->stopPublishing(mRDHSizesPerCRUIds.get());
}

void DaqTask::reset()
Expand All @@ -135,11 +142,8 @@ void DaqTask::reset()
mInputRecordPayloadSize->Reset();
mInputSize->Reset();
mNumberRDHs->Reset();

for (const auto& system : mSystems) {
mSubSystemsRdhSizes.at(system.first)->Reset();
mSubSystemsTotalSizes.at(system.first)->Reset();
}
mSumRDHSizesPerRDH->Reset();
mSumRDHSizesPerInputRecord->Reset();
}

void DaqTask::printInputPayload(const header::DataHeader* header, const char* payload, size_t payloadSize)
Expand Down Expand Up @@ -222,9 +226,8 @@ void DaqTask::monitorRDHs(o2::framework::InputRecord& inputRecord)
{
// Use the DPLRawParser to get information about the Pages and RDHs stored in the inputRecord
o2::framework::DPLRawParser parser(inputRecord);
size_t rdhCounter = 0;
size_t totalSize = 0;
DAQID::ID rdhSource = DAQID::INVALID;
size_t rdhCounter = 0;
for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
// TODO for some reason this does not work
// ILOG(Info, Ops) << "Header: " << ENDM;
Expand Down Expand Up @@ -252,26 +255,20 @@ void DaqTask::monitorRDHs(o2::framework::InputRecord& inputRecord)

// RDH plots
try {
rdhSource = RDHUtils::getVersion(rdh) >= 6 ? RDHUtils::getSourceID(rdh) : DAQID::INVALID; // there is no sourceID before v6
if (!isDetIdValid(rdhSource)) { // if we found it , is it valid ?
rdhSource = DAQID::INVALID;
}
totalSize += RDHUtils::getMemorySize(rdh);
const auto RDHSize = RDHUtils::getMemorySize(rdh) - RDHUtils::getHeaderSize(rdh);
mSumRDHSizesPerRDH->Fill(RDHSize);
totalSize += RDHSize;
rdhCounter++;
mSubSystemsRdhSizes.at(rdhSource)->Fill(RDHUtils::getMemorySize(rdh));

mRDHSizesPerCRUIds->Fill(RDHSize, RDHUtils::getCRUID(rdh));

} catch (std::runtime_error& e) {
ILOG(Error, Devel) << "Catched an exception when accessing the rdh fields: \n"
ILOG(Error, Devel) << "Caught an exception when accessing the rdh fields: \n"
<< e.what() << ENDM;
}
}

mSubSystemsTotalSizes.at(rdhSource)->Fill(totalSize);

// TODO make this optional once we are able to know the run number and the detectors included.
mToBePublished.insert(rdhSource);

// TODO why is the payload size reported by the dataref.header->print() different than the one from the sum
// of the RDH memory size + dataref header size ? a few hundreds bytes difference.
mSumRDHSizesPerInputRecord->Fill(totalSize);
mNumberRDHs->Fill(rdhCounter);
}

Expand Down

0 comments on commit ef8ac48

Please sign in to comment.