Skip to content

Commit

Permalink
implementation of manual tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Tichák committed Jul 4, 2024
1 parent f79dc4a commit 5dadafa
Show file tree
Hide file tree
Showing 7 changed files with 512 additions and 79 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ include(AddQCWorkflow)
# ---- Project ----

project(QualityControl
VERSION 1.149.0
VERSION 1.151.0
DESCRIPTION "O2 Data Quality Control Framework"
LANGUAGES C CXX)

Expand Down Expand Up @@ -103,7 +103,6 @@ find_package(FairMQ 1.4.41 REQUIRED)
find_package(FairLogger REQUIRED)
find_package(Occ REQUIRED)
find_package(ROOT 6.06.02 COMPONENTS RHTTP Gui REQUIRED)
find_package(RdKafka REQUIRED)

configure_file(getTestDataDirectory.cxx.in getTestDataDirectory.cxx)

Expand All @@ -112,3 +111,4 @@ configure_file(getTestDataDirectory.cxx.in getTestDataDirectory.cxx)
add_subdirectory(Framework)
add_subdirectory(Modules)
add_subdirectory(doc)

2 changes: 1 addition & 1 deletion Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ add_executable(o2-qc-test-core
test/testVersion.cxx
test/testMonitorObjectCollection.cxx
test/testTrendingTask.cxx
test/testKafkaPoller.cxx
test/testKafkaTests.cxx
)
set_property(TARGET o2-qc-test-core
PROPERTY RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/tests)
Expand Down
2 changes: 1 addition & 1 deletion Framework/include/QualityControl/KafkaPoller.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class KafkaPoller

explicit KafkaPoller(const std::string& brokers, const std::string& groupId);

void subscribe(const std::string& topic);
void subscribe(const std::string& topic, size_t numberOfRetries = 5);
auto poll() -> KafkaRecords;

private:
Expand Down
42 changes: 22 additions & 20 deletions Framework/src/KafkaPoller.cxx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "QualityControl/KafkaPoller.h"

#include "QualityControl/QcInfoLogger.h"
#include "kafka/KafkaException.h"
#include "kafka/Properties.h"
#include "proto/events.pb.h"
#include <chrono>
Expand All @@ -19,7 +20,8 @@ bool isRunNumberSet(int runNumber)

bool isEnvironmentIdSet(const std::string_view environmentID)
{
return !environmentID.empty();
// qc is default value for Activity object so we need to check against it as well
return !environmentID.empty() && environmentID != "qc";
}

struct Ev_RunEventPartial {
Expand Down Expand Up @@ -53,7 +55,7 @@ auto recordToEvent(const kafka::Value& kafkaRecord) -> std::optional<events::Eve
events::Event event;

if (!event.ParseFromArray(kafkaRecord.data(), kafkaRecord.size())) {
ILOG(Error, Ops) << "Received wrong or inconsistent data in SOR parser" << ENDM;
ILOG(Error, Ops) << "Received wrong or inconsistent data while parser Event from kafka proto" << ENDM;
return std::nullopt;
}

Expand Down Expand Up @@ -93,14 +95,6 @@ bool start_of_run::check(const events::Event& event, const std::string& environm
return false;
}

// std::cout << "SOR!!!\n";
// std::cout << "transition: " << runEvent.transition() << "\n";
// std::cout << "state: " << runEvent.state() << "\n";
// std::cout << "error: " << runEvent.error() << "\n";
// std::cout << "transitionStatus: " << runEvent.transitionstatus() << "\n";
// std::cout << "envid: " << runEvent.environmentid() << "\n";
// std::cout << "runnumber: " << runEvent.runnumber() << "\n";

return runEvent == Ev_RunEventPartial{ "CONFIGURED", events::OpStatus::STARTED, environmentID, runNumber };
}

Expand All @@ -117,13 +111,6 @@ bool end_of_run::check(const events::Event& event, const std::string& environmen
}

const auto& runEvent = event.runevent();
// std::cout << "EOR!!!\n";
// std::cout << "transition: " << runEvent.transition() << "\n";
// std::cout << "state: " << runEvent.state() << "\n";
// std::cout << "error: " << runEvent.error() << "\n";
// std::cout << "transitionStatus: " << runEvent.transitionstatus() << "\n";
// std::cout << "envid: " << runEvent.environmentid() << "\n";
// std::cout << "runnumber: " << runEvent.runnumber() << "\n";

if (runEvent.transition() != "STOP_ACTIVITY" && runEvent.transition() != "TEARDOWN") {
return false;
Expand All @@ -138,17 +125,32 @@ kafka::Properties createProperties(const std::string& brokers, const std::string
{
return { { { "bootstrap.servers", { brokers } },
{ "group.id", { groupId } },
{ "enable.auto.commit", { "true" } } } };
{ "enable.auto.commit", { "true" } },
{ "auto.offset.reset", { "latest" } } } };
}

KafkaPoller::KafkaPoller(const std::string& brokers, const std::string& groupId)
: mConsumer(createProperties(brokers, groupId))
{
}

void KafkaPoller::subscribe(const std::string& topic)
void KafkaPoller::subscribe(const std::string& topic, size_t numberOfRetries)
{
mConsumer.subscribe({ topic });
for (size_t retryNumber = 0; retryNumber != numberOfRetries; ++retryNumber) {
try {
mConsumer.subscribe({ topic });
return;
} catch (const kafka::KafkaException& ex) {
// it sometimes happen that subscibe timeouts but another retry succeeds
if (ex.error().value() != RD_KAFKA_RESP_ERR__TIMED_OUT) {
throw;
} else {
ILOG(Warning, Ops) << "Failed to subscribe to kafka due to timeout " << retryNumber + 1 << "/" << numberOfRetries + 1 << " times, retrying...";
}
}
}

throw std::runtime_error(std::string{ "Kafka Poller failed to subscribe after " }.append(std::to_string(numberOfRetries)).append(" retries"));
}

auto KafkaPoller::poll() -> KafkaRecords
Expand Down
4 changes: 2 additions & 2 deletions Framework/src/Triggers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ TriggerFcn StartOfRun(const std::string& kafkaBrokers, const std::string& topic,
return [poller, copiedActivity]() mutable -> Trigger {
for (const auto& record : poller->poll()) {
if (auto event = proto::recordToEvent(record.value())) {
if (proto::end_of_run::check(*event, copiedActivity.mProvenance, copiedActivity.mId)) {
if (proto::start_of_run::check(*event, copiedActivity.mProvenance, copiedActivity.mId)) {
auto newActivityForTrigger = copiedActivity;
proto::end_of_run::fillActivity(*event, newActivityForTrigger);
proto::start_of_run::fillActivity(*event, newActivityForTrigger);
return { TriggerType::StartOfRun, false, newActivityForTrigger, static_cast<uint64_t>(event->timestamp()), "sor" };
}
}
Expand Down
53 changes: 0 additions & 53 deletions Framework/test/testKafkaPoller.cxx

This file was deleted.

Loading

0 comments on commit 5dadafa

Please sign in to comment.