Skip to content

Commit

Permalink
shvfilejournal: Use unified getlog implementationd
Browse files Browse the repository at this point in the history
About the removal of the assertion: the log_header.until is no longer
the last entry and given the nature of the test being random, it makes
no sense to write an algorithm for the test. This algorithm is already
inside getLog and it has its own tests.
  • Loading branch information
syyyr committed Jan 3, 2024
1 parent 985c057 commit 1da124b
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 241 deletions.
251 changes: 11 additions & 240 deletions libshvcore/src/utils/shvfilejournal.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <shv/core/utils/getlog.h>
#include <shv/core/utils/shvjournalcommon.h>
#include <shv/core/utils/shvfilejournal.h>

Expand Down Expand Up @@ -588,8 +589,16 @@ const ShvFileJournal::JournalContext &ShvFileJournal::checkJournalContext(bool f

chainpack::RpcValue ShvFileJournal::getLog(const ShvGetLogParams &params, bool ignore_record_count_limit)
{
JournalContext ctx = checkJournalContext();
return getLog(ctx, params, ignore_record_count_limit);
std::vector<std::function<ShvJournalFileReader()>> readers;
{
JournalContext ctx = checkJournalContext();
for (const auto file_ms : ctx.files) {
readers.emplace_back([file_path = ctx.fileMsecToFilePath(file_ms)] {
return ShvJournalFileReader{file_path};
});
}
}
return shv::core::utils::getLog(readers, params);
}

chainpack::RpcValue ShvFileJournal::getSnapShotMap()
Expand All @@ -604,244 +613,6 @@ chainpack::RpcValue ShvFileJournal::getSnapShotMap()
return m;
}

chainpack::RpcValue ShvFileJournal::getLog(const ShvFileJournal::JournalContext &journal_context, const ShvGetLogParams &params, bool ignore_record_count_limit)
{
logIShvJournal() << "========================= getLog ==================";
logIShvJournal() << "params:" << params.toRpcValue().toCpon();

struct {
ShvSnapshot snapshot;
bool snapshotWritten;
int64_t snapshotMsec = 0;
} snapshot_ctx;
snapshot_ctx.snapshotWritten = !params.withSnapshot;

RpcValue::List log;
bool since_last = params.isSinceLast();

RpcValue::Map path_cache;
const auto params_since_msec = params.since.isDateTime()
? params.since.toDateTime().msecsSinceEpoch()
: (since_last ? std::numeric_limits<int64_t>::max() : 0);
const auto params_until_msec = params.until.isDateTime()? params.until.toDateTime().msecsSinceEpoch(): 0;
int64_t journal_start_msec = 0;
int64_t first_record_msec = 0;
int64_t last_record_msec = 0;
int rec_cnt_limit = (ignore_record_count_limit)? std::numeric_limits<decltype (params.recordCountLimit)>::max(): std::min(params.recordCountLimit, ShvJournalCommon::DEFAULT_GET_LOG_RECORD_COUNT_LIMIT);
bool rec_cnt_limit_hit = false;

/// this ensure that there be only one copy of each path in memory
int max_path_id = 0;
auto make_path_shared = [&path_cache, &max_path_id, &params](const std::string &path) -> RpcValue {
if(!params.withPathsDict)
return path;
RpcValue ret;
if(auto it = path_cache.find(path); it == path_cache.end()) {
ret = ++max_path_id;
logDShvJournal() << "Adding record to path cache:" << path << "-->" << ret.toCpon();
path_cache[path] = ret;
}
else {
ret = it->second;
}
return ret;
};
auto append_log_entry = [make_path_shared, rec_cnt_limit, &rec_cnt_limit_hit, &first_record_msec, &last_record_msec, &log](const ShvJournalEntry &e) {
if(static_cast<int>(log.size()) >= rec_cnt_limit) {
rec_cnt_limit_hit = true;
return false;
}
if(first_record_msec == 0)
first_record_msec = e.epochMsec;
last_record_msec = e.epochMsec;
RpcValue::List rec;
rec.push_back(e.dateTime());
rec.push_back(make_path_shared(e.path));
rec.push_back(e.value);
rec.push_back(e.shortTime == ShvJournalEntry::NO_SHORT_TIME? RpcValue(nullptr): RpcValue(e.shortTime));
rec.push_back((e.domain.empty() || e.domain == ShvJournalEntry::DOMAIN_VAL_CHANGE)? RpcValue(nullptr): e.domain);
rec.push_back(e.valueFlags);
rec.push_back(e.userId.empty()? RpcValue(nullptr): RpcValue(e.userId));
log.push_back(std::move(rec));
return true;
};
auto write_snapshot = [append_log_entry, since_last, params_since_msec, &snapshot_ctx]() {
logMShvJournal() << "\t writing snapshot, record count:" << snapshot_ctx.snapshot.keyvals.size();
snapshot_ctx.snapshotWritten = true;
if(!snapshot_ctx.snapshot.keyvals.empty()) {
snapshot_ctx.snapshotMsec = params_since_msec;
if(since_last) {
snapshot_ctx.snapshotMsec = 0;
for(const auto &kv : snapshot_ctx.snapshot.keyvals)
snapshot_ctx.snapshotMsec = std::max(snapshot_ctx.snapshotMsec, kv.second.epochMsec);
}
for(const auto &kv : snapshot_ctx.snapshot.keyvals) {
ShvJournalEntry e = kv.second;
e.epochMsec = snapshot_ctx.snapshotMsec;
e.setSnapshotValue(true);
// erase EVENT flag in the snapshot values,
// they can trigger events during reply otherwise
e.setSpontaneous(false);
logDShvJournal() << "\t SNAPSHOT entry:" << e.toRpcValueMap().toCpon();
if(!append_log_entry(e))
return false;
}
}
return true;
};
if(!journal_context.files.empty()) {
auto first_file_it = journal_context.files.begin();
journal_start_msec = *first_file_it;
if(params_since_msec > 0) {
logMShvJournal() << "since:" << params.since.toCpon() << "msec:" << params_since_msec;
first_file_it = std::lower_bound(journal_context.files.begin(), journal_context.files.end(), params_since_msec);
if(first_file_it == journal_context.files.end()) {
/// take last file
--first_file_it;
logMShvJournal() << "\t" << "not found, taking last file:" << *first_file_it << journal_context.fileMsecToFileName(*first_file_it);
}
else if(*first_file_it == params_since_msec) {
/// take exactly this file
logMShvJournal() << "\t" << "found exactly:" << *first_file_it << journal_context.fileMsecToFileName(*first_file_it);
}
else if(first_file_it == journal_context.files.begin()) {
/// take first file
logMShvJournal() << "\t" << "begin, taking first file:" << *first_file_it << journal_context.fileMsecToFileName(*first_file_it);
}
else {
/// take previous file
logMShvJournal() << "\t" << "lower bound found:" << *first_file_it << journal_context.fileMsecToFileName(*first_file_it) << "taking previous file";
--first_file_it;
}
}

PatternMatcher pattern_matcher(params);
auto path_match = [&params, &pattern_matcher](const ShvJournalEntry &e) {
if(!params.pathPattern.empty()) {
if(!pattern_matcher.match(e.path, e.domain))
return false;
}
return true;
};
for(auto file_it = first_file_it; file_it != journal_context.files.end(); file_it++) {
std::string fn = journal_context.fileMsecToFilePath(*file_it);
logMShvJournal() << "-------- opening file:" << fn;
try {
ShvJournalFileReader rd(fn);
while(rd.next()) {
const ShvJournalEntry &entry = rd.entry();
if(rd.inSnapshot()) {
logDShvJournal() << "\t snapshot:" << entry.path;
if(auto it = snapshot_ctx.snapshot.keyvals.find(entry.path); it != snapshot_ctx.snapshot.keyvals.end()) {
if(it->second.value == entry.value) {
// throw away current file snapshot values present already in global snapshot
continue;
}
}
}
if(!path_match(entry))
continue;
bool before_since = params_since_msec > 0 && entry.epochMsec < params_since_msec;
bool after_until = params_until_msec > 0 && entry.epochMsec >= params_until_msec;
if(before_since) {
logDShvJournal() << "\t SNAPSHOT entry:" << entry.toRpcValueMap().toCpon();
addToSnapshot(snapshot_ctx.snapshot, entry);
}
else if(after_until) {
goto log_finish;
}
else {
if(!snapshot_ctx.snapshotWritten) {
if(!write_snapshot())
goto log_finish;
}
#ifdef SKIP_DUP_LOG_ENTRIES
{
// skip CHNG duplicates, values that are the same as last log entry for the same path
auto it = snapshot_ctx.snapshot.find(e.path);
if (it != snapshot_ctx.snapshot.cend()) {
if(it->second.value == e.value && e.domain == chainpack::Rpc::SIG_VAL_CHANGED) {
logDShvJournal() << "\t Skipping DUP LOG entry:" << e.toRpcValueMap().toCpon();
snapshot_ctx.snapshot.erase(it);
continue;
}
}
}
#endif
logDShvJournal() << "\t LOG entry:" << entry.toRpcValueMap().toCpon();
// keep updating snapshot to enable snapshot erasure on log merge
addToSnapshot(snapshot_ctx.snapshot, entry);
if(!append_log_entry(entry))
goto log_finish;
}
}
}
catch (const shv::core::Exception &e) {
shvError() << "Cannot read shv journal file:" << fn;
}
}
}
log_finish:
// snapshot should be written already
// this is only case, when log is empty and
// only snapshot shall be returned
if(!snapshot_ctx.snapshotWritten) {
write_snapshot();
}

int64_t log_since_msec = params_since_msec;
if (since_last) {
log_since_msec = snapshot_ctx.snapshotMsec;
}
else if(log_since_msec < journal_start_msec) {
log_since_msec = journal_start_msec;
}
int64_t log_until_msec = params_until_msec;
if(params_until_msec == 0 || rec_cnt_limit_hit) {
log_until_msec = last_record_msec;
}
RpcValue ret = log;
ShvLogHeader log_header;
{
log_header.setDeviceId(journal_context.deviceId);
log_header.setDeviceType(journal_context.deviceType);
log_header.setDateTime(RpcValue::DateTime::now());
log_header.setLogParams(params);
log_header.setSince((log_since_msec > 0)? RpcValue(RpcValue::DateTime::fromMSecsSinceEpoch(log_since_msec)): RpcValue(nullptr));
log_header.setUntil((log_until_msec > 0)? RpcValue(RpcValue::DateTime::fromMSecsSinceEpoch(log_until_msec)): RpcValue(nullptr));
log_header.setRecordCount(static_cast<int>(log.size()));
log_header.setRecordCountLimit(rec_cnt_limit);
log_header.setRecordCountLimitHit(rec_cnt_limit_hit);
log_header.setWithSnapShot(params.withSnapshot);
log_header.setWithPathsDict(params.withPathsDict);

using Column = ShvLogHeader::Column;
RpcValue::List fields;
fields.push_back(RpcValue::Map{{ShvJournalCommon::KEY_NAME, Column::name(Column::Enum::Timestamp)}});
fields.push_back(RpcValue::Map{{ShvJournalCommon::KEY_NAME, Column::name(Column::Enum::Path)}});
fields.push_back(RpcValue::Map{{ShvJournalCommon::KEY_NAME, Column::name(Column::Enum::Value)}});
fields.push_back(RpcValue::Map{{ShvJournalCommon::KEY_NAME, Column::name(Column::Enum::ShortTime)}});
fields.push_back(RpcValue::Map{{ShvJournalCommon::KEY_NAME, Column::name(Column::Enum::Domain)}});
fields.push_back(RpcValue::Map{{ShvJournalCommon::KEY_NAME, Column::name(Column::Enum::ValueFlags)}});
fields.push_back(RpcValue::Map{{ShvJournalCommon::KEY_NAME, Column::name(Column::Enum::UserId)}});
log_header.setFields(std::move(fields));
}
if(params.withPathsDict) {
logMShvJournal() << "Generating paths dict size:" << path_cache.size();
RpcValue::IMap path_dict;
for(const auto &kv : path_cache) {
path_dict[kv.second.toInt()] = kv.first;
}
log_header.setPathDict(std::move(path_dict));
}
if(params.withTypeInfo) {
log_header.setTypeInfo(journal_context.typeInfo);
}
ret.setMetaData(log_header.toMetaData());
logIShvJournal() << "result record cnt:" << log.size();
return ret;
}

const char *ShvFileJournal::TxtColumn::name(ShvFileJournal::TxtColumn::Enum e)
{
switch (e) {
Expand Down
1 change: 0 additions & 1 deletion libshvcore/tests/test_shvlog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ DOCTEST_TEST_CASE("ShvLog")
cnt++;
}
REQUIRE(rd.logHeader().since().toDateTime().msecsSinceEpoch() == first_entry.epochMsec);
REQUIRE(rd.logHeader().until().toDateTime().msecsSinceEpoch() == last_entry.epochMsec);
REQUIRE(cnt == rd.logHeader().recordCount());
REQUIRE(cnt == log1.asList().size());
shvInfo() << cnt << "records read";
Expand Down

0 comments on commit 1da124b

Please sign in to comment.