Skip to content

Commit

Permalink
all histo params are settable; added 2d histo rdh payload per cru
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Tichák committed Aug 16, 2024
1 parent afce9be commit 869e3c1
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 92 deletions.
5 changes: 4 additions & 1 deletion Modules/Daq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ target_include_directories(
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
)

target_link_libraries(O2QcDaq PUBLIC O2QualityControl PRIVATE ROOT::Gpad O2::DetectorsRaw)
target_link_libraries(O2QcDaq PUBLIC O2QualityControl PRIVATE
ROOT::Gpad
O2::DetectorsRaw
O2QcCommon)

add_root_dictionary(O2QcDaq
HEADERS include/Daq/DaqTask.h
Expand Down
28 changes: 8 additions & 20 deletions Modules/Daq/include/Daq/DaqTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@

// ROOT
#include <TH1.h>
#include <TH2.h>

#include "QualityControl/TaskInterface.h"
#include <Headers/DAQID.h>
#include <map>
#include <set>

class TH1F;

using namespace o2::quality_control::core;

Expand Down Expand Up @@ -55,29 +52,20 @@ class DaqTask final : public o2::quality_control::core::TaskInterface
void printInputPayload(const header::DataHeader* header, const char* payload, size_t payloadSize);
void monitorInputRecord(o2::framework::InputRecord& inputRecord);
void monitorRDHs(o2::framework::InputRecord& inputRecord);
int getIntParam(const std::string paramName, int defaultValue = 0);

// ** general information

std::map<o2::header::DAQID::ID, std::string> mSystems;
std::set<o2::header::DAQID::ID> mToBePublished; // keep the list of detectors we saw this cycle and whose plots should be published

// ** objects we publish **

// Message related
// Block = the whole InputRecord, i.e. the thing we receive and analyse in monitorData(...)
// SubBlock = a single input of the InputRecord
std::unique_ptr<TH1F> mInputRecordPayloadSize; // filled w/ the sum of the payload size of all the inputs of an inputrecord
std::unique_ptr<TH1F> mInputSize; // filled w/ the size of the inputs in each InputRecord we encounter
std::unique_ptr<TH1F> mNumberRDHs; // filled w/ the number of RDHs found in each InputRecord we encounter

// Per link information

// Per detector information
std::map<o2::header::DAQID::ID, std::unique_ptr<TH1F>> mSubSystemsTotalSizes; // filled with the sum of RDH memory sizes per InputRecord
std::map<o2::header::DAQID::ID, std::unique_ptr<TH1F>> mSubSystemsRdhSizes; // filled with the RDH memory sizes for each RDH
// todo : for the next one we need to know the number of links per detector.
// std::map<o2::header::DAQID::ID, TH1F*> mSubSystemsRdhHits; // hits per link split by detector
// todo we could add back the graph for the IDs using the TFID
std::unique_ptr<TH1F> mInputRecordPayloadSize; // filled w/ the sum of the payload size of all the inputs of an inputrecord
std::unique_ptr<TH1F> mInputSize; // filled w/ the size of the inputs in each InputRecord we encounter
std::unique_ptr<TH1F> mNumberRDHs; // filled w/ the number of RDHs found in each InputRecord we encounter
std::unique_ptr<TH1F> mSumRDHSizesPerInputRecord; // filled w/ the the sum of RDH memory sizes per InputRecord
std::unique_ptr<TH1F> mSumRDHSizesPerRDH; // filled w/ the RDH memory sizes for each RDH
std::unique_ptr<TH2F> mRDHSizesPerCRUIds; // filled w/ the RDH payload size per CRUId
};

} // namespace o2::quality_control_modules::daq
Expand Down
128 changes: 57 additions & 71 deletions Modules/Daq/src/DaqTask.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <Framework/InputRecord.h>
#include <Framework/InputRecordWalker.h>
#include <Headers/DataHeaderHelpers.h>
#include <stdexcept>
#include "Common/Utils.h"

using namespace std;
using namespace o2::raw;
Expand All @@ -34,48 +36,55 @@ using namespace o2::header;
namespace o2::quality_control_modules::daq
{

// TODO remove this function once the equivalent is available in O2 DAQID
bool isDetIdValid(DAQID::ID id)
int DaqTask::getIntParam(const std::string paramName, int defaultValue)
{
return (id < DAQID::MAXDAQ + 1) && (DAQID::DAQtoO2(id) != gDataOriginInvalid);
return common::getFromConfig<int>(mCustomParameters, paramName, defaultValue);
}

void DaqTask::initialize(o2::framework::InitContext& /*ctx*/)
{
ILOG(Debug, Devel) << "initializiation of DaqTask" << ENDM;

// General plots, related mostly to the payload size (InputRecord, Inputs) and the numbers of RDHs and Inputs in an InputRecord.
mInputRecordPayloadSize = std::make_unique<TH1F>("inputRecordSize", "Total payload size per InputRecord;bytes", 128, 0, 2047);
mInputRecordPayloadSize->SetCanExtend(TH1::kXaxis);
mInputRecordPayloadSize = std::make_unique<TH1F>("inputRecordSize", "Total payload size per InputRecord;bytes",
getIntParam("inputRecordSizeBins", 128),
getIntParam("inputRecordSizeMin", 0),
getIntParam("inputRecordSizeMax", 2047));
getObjectsManager()->startPublishing(mInputRecordPayloadSize.get(), PublicationPolicy::Forever);
mInputSize = std::make_unique<TH1F>("payloadSizeInputs", "Payload size of the inputs;bytes", 128, 0, 2047);
mInputSize->SetCanExtend(TH1::kXaxis);

mInputSize = std::make_unique<TH1F>("payloadSizeInputs", "Payload size of the inputs;bytes",
getIntParam("payloadSizeInputsBins", 128),
getIntParam("payloadSizeInputsMin", 0),
getIntParam("payloadSizeInputsMax", 2047));
getObjectsManager()->startPublishing(mInputSize.get(), PublicationPolicy::Forever);
mNumberRDHs = std::make_unique<TH1F>("numberRDHs", "Number of RDHs per InputRecord", 100, 1, 100);
mNumberRDHs->SetCanExtend(TH1::kXaxis);

mNumberRDHs = std::make_unique<TH1F>("numberRdhs", "Number of RDHs per InputRecord",
getIntParam("numberRDHsBins", 100),
getIntParam("numberRDHsMin", 1),
getIntParam("numberRDHsMax", 100));
getObjectsManager()->startPublishing(mNumberRDHs.get(), PublicationPolicy::Forever);

// initialize a map for the subsystems (id, name)
for (int i = DAQID::MINDAQ; i < DAQID::MAXDAQ + 1; i++) {
DataOrigin origin = DAQID::DAQtoO2(i);
if (origin != gDataOriginInvalid) {
mSystems[i] = origin.str;
}
}
mSystems[DAQID::INVALID] = "UNKNOWN"; // to store RDH info for unknown detectors

// subsystems plots: distribution of rdh size, distribution of the sum of rdh in each message.
for (const auto& system : mSystems) {
string name = system.second + "/sumRdhSizesPerInputRecord";
string title = "Sum of RDH sizes per InputRecord for " + system.second + ";bytes";
mSubSystemsTotalSizes[system.first] = std::make_unique<TH1F>(name.c_str(), title.c_str(), 128, 0, 2047);
mSubSystemsTotalSizes[system.first]->SetCanExtend(TH1::kXaxis);

name = system.second + "/RdhSizes";
title = "RDH sizes for " + system.second + ";bytes";
mSubSystemsRdhSizes[system.first] = std::make_unique<TH1F>(name.c_str(), title.c_str(), 128, 0, 2047);
mSubSystemsRdhSizes[system.first]->SetCanExtend(TH1::kXaxis);
}
mSumRDHSizesPerInputRecord = std::make_unique<TH1F>("sumRdhSizesPerInputRecord", "Sum of RDH sizes per InputRecord;bytes",
getIntParam("sumRdhSizesPerInputRecordBins", 128),
getIntParam("sumRdhSizesPerInputRecordMin", 0),
getIntParam("sumRdhSizesPerInputRecordMax", 2047));
getObjectsManager()->startPublishing(mSumRDHSizesPerInputRecord.get(), PublicationPolicy::Forever);

mSumRDHSizesPerRDH = std::make_unique<TH1F>("RdhSizes", "RDH sizes; bytes",
getIntParam("RdhSizesBins", 128),
getIntParam("RdhSizesMin", 0),
getIntParam("RdhSizesMax", 2047));
getObjectsManager()->startPublishing(mSumRDHSizesPerRDH.get(), PublicationPolicy::Forever);

mRDHSizesPerCRUIds = std::make_unique<TH2F>("RdhPayloadSizePerCRUid", "RDH payload size per CRU",
getIntParam("RdhPayloadSizeBins", 128),
getIntParam("RdhPayloadSizeMin", 0),
getIntParam("RdhPayloadSizeMax", 2047),
getIntParam("CRUidBins", 500),
getIntParam("CRUidMin", 0),
getIntParam("CRUidMax", 500));
mRDHSizesPerCRUIds->GetXaxis()->SetTitle("bytes");
mRDHSizesPerCRUIds->GetYaxis()->SetTitle("CRU Id");
getObjectsManager()->startPublishing(mRDHSizesPerCRUIds.get(), PublicationPolicy::Forever);
}

void DaqTask::startOfActivity(const Activity& activity)
Expand All @@ -98,31 +107,18 @@ void DaqTask::monitorData(o2::framework::ProcessingContext& ctx)
void DaqTask::endOfCycle()
{
ILOG(Debug, Devel) << "endOfCycle" << ENDM;

// TODO make this optional once we are able to know the run number and the detectors included.
// It might still be necessary in test runs without a proper run number.
for (auto toBeAdded : mToBePublished) {
if (!getObjectsManager()->isBeingPublished(mSubSystemsTotalSizes[toBeAdded]->GetName())) {
getObjectsManager()->startPublishing(mSubSystemsTotalSizes[toBeAdded].get(), PublicationPolicy::ThroughStop);
}
if (!getObjectsManager()->isBeingPublished(mSubSystemsRdhSizes[toBeAdded]->GetName())) {
getObjectsManager()->startPublishing(mSubSystemsRdhSizes[toBeAdded].get(), PublicationPolicy::ThroughStop);
}
}
}

void DaqTask::endOfActivity(const Activity& /*activity*/)
{
ILOG(Debug, Devel) << "endOfActivity" << ENDM;

for (const auto& system : mSystems) {
if (getObjectsManager()->isBeingPublished(mSubSystemsTotalSizes[system.first]->GetName())) {
getObjectsManager()->stopPublishing(mSubSystemsTotalSizes[system.first].get());
}
if (getObjectsManager()->isBeingPublished(mSubSystemsRdhSizes[system.first]->GetName())) {
getObjectsManager()->stopPublishing(mSubSystemsRdhSizes[system.first].get());
}
}
getObjectsManager()->stopPublishing(mInputRecordPayloadSize.get());
getObjectsManager()->stopPublishing(mInputSize.get());
getObjectsManager()->stopPublishing(mNumberRDHs.get());
getObjectsManager()->stopPublishing(mSumRDHSizesPerInputRecord.get());
getObjectsManager()->stopPublishing(mSumRDHSizesPerRDH.get());
getObjectsManager()->stopPublishing(mRDHSizesPerCRUIds.get());
}

void DaqTask::reset()
Expand All @@ -135,11 +131,8 @@ void DaqTask::reset()
mInputRecordPayloadSize->Reset();
mInputSize->Reset();
mNumberRDHs->Reset();

for (const auto& system : mSystems) {
mSubSystemsRdhSizes.at(system.first)->Reset();
mSubSystemsTotalSizes.at(system.first)->Reset();
}
mSumRDHSizesPerRDH->Reset();
mSumRDHSizesPerInputRecord->Reset();
}

void DaqTask::printInputPayload(const header::DataHeader* header, const char* payload, size_t payloadSize)
Expand Down Expand Up @@ -222,9 +215,8 @@ void DaqTask::monitorRDHs(o2::framework::InputRecord& inputRecord)
{
// Use the DPLRawParser to get information about the Pages and RDHs stored in the inputRecord
o2::framework::DPLRawParser parser(inputRecord);
size_t rdhCounter = 0;
size_t totalSize = 0;
DAQID::ID rdhSource = DAQID::INVALID;
size_t rdhCounter = 0;
for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
// TODO for some reason this does not work
// ILOG(Info, Ops) << "Header: " << ENDM;
Expand Down Expand Up @@ -252,26 +244,20 @@ void DaqTask::monitorRDHs(o2::framework::InputRecord& inputRecord)

// RDH plots
try {
rdhSource = RDHUtils::getVersion(rdh) >= 6 ? RDHUtils::getSourceID(rdh) : DAQID::INVALID; // there is no sourceID before v6
if (!isDetIdValid(rdhSource)) { // if we found it , is it valid ?
rdhSource = DAQID::INVALID;
}
totalSize += RDHUtils::getMemorySize(rdh);
const auto RDHSize = RDHUtils::getMemorySize(rdh) - RDHUtils::getHeaderSize(rdh);
mSumRDHSizesPerRDH->Fill(RDHSize);
totalSize += RDHSize;
rdhCounter++;
mSubSystemsRdhSizes.at(rdhSource)->Fill(RDHUtils::getMemorySize(rdh));

mRDHSizesPerCRUIds->Fill(RDHSize, RDHUtils::getCRUID(rdh));

} catch (std::runtime_error& e) {
ILOG(Error, Devel) << "Catched an exception when accessing the rdh fields: \n"
ILOG(Error, Devel) << "Caught an exception when accessing the rdh fields: \n"
<< e.what() << ENDM;
}
}

mSubSystemsTotalSizes.at(rdhSource)->Fill(totalSize);

// TODO make this optional once we are able to know the run number and the detectors included.
mToBePublished.insert(rdhSource);

// TODO why is the payload size reported by the dataref.header->print() different than the one from the sum
// of the RDH memory size + dataref header size ? a few hundreds bytes difference.
mSumRDHSizesPerInputRecord->Fill(totalSize);
mNumberRDHs->Fill(rdhCounter);
}

Expand Down

0 comments on commit 869e3c1

Please sign in to comment.