Skip to content

Commit

Permalink
Resume Run Functionality (#273)
Browse files Browse the repository at this point in the history
## Summary of changes

Added behaviour for the Nexus Writer to resume a run if it was
interrupted by the component going down.
- ~~A file with extension ".partial_run" is added to the local path when
a run_start is received. This file is removed when the nexus file is
transferred to the archive (signalling a successful run).~~
- When the nexus writer starts, any ~~".partial_run"~~ ".nxs" files in
the local path are read and runs re-created (as ~~.partial_run~~ .nxs
files indicate that a previous instance of nexus writer had failed).

## Instruction for review/testing

General code review.

Tool was tested with simulated data.
  • Loading branch information
Modularius authored Dec 9, 2024
1 parent 35abd02 commit 10b362d
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 118 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ chrono = { version = "0.4.38", features = ["serde"] }
clap = { version = "4.5", features = ["derive", "env"] }
crossterm = "0.26.1"
flatbuffers = "22.12.6"
glob = "0.3.1"
hdf5 = "0.8.1"
itertools = "0.13.0"
lazy_static = "1.5.0"
Expand Down
1 change: 1 addition & 0 deletions nexus-writer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition.workspace = true
anyhow.workspace = true
chrono.workspace = true
clap.workspace = true
glob.workspace = true
hdf5.workspace = true
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion nexus-writer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ async fn main() -> anyhow::Result<()> {
nexus_settings,
nexus_configuration,
);
nexus_engine.resume_partial_runs()?;

let mut nexus_write_interval =
tokio::time::interval(time::Duration::from_millis(args.cache_poll_interval_ms));
Expand Down Expand Up @@ -389,7 +390,7 @@ fn process_run_stop_message(nexus_engine: &mut NexusEngine, payload: &[u8]) {
}
Err(e) => {
let _guard = warn_span!(
"RunStop Error.",
"RunStop Error",
run_name = data.run_name(),
stop_time = data.stop_time(),
)
Expand Down
62 changes: 47 additions & 15 deletions nexus-writer/src/nexus/engine.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::{Run, RunParameters};
use chrono::{DateTime, Duration, Utc};
use glob::glob;
#[cfg(test)]
use std::collections::vec_deque;
use std::{
collections::VecDeque,
ffi::OsStr,
path::{Path, PathBuf},
};
use supermusr_common::spanned::SpannedAggregator;
Expand All @@ -13,10 +15,10 @@ use supermusr_streaming_types::{
ecs_f144_logdata_generated::f144_LogData, ecs_pl72_run_start_generated::RunStart,
ecs_se00_data_generated::se00_SampleEnvironmentData,
};
use tracing::warn;
use tracing::{info_span, warn};

pub(crate) struct NexusEngine {
filename: Option<PathBuf>,
local_path: Option<PathBuf>,
run_cache: VecDeque<Run>,
run_number: u32,
nexus_settings: NexusSettings,
Expand All @@ -27,12 +29,12 @@ pub(crate) struct NexusEngine {
impl NexusEngine {
#[tracing::instrument(skip_all)]
pub(crate) fn new(
filename: Option<&Path>,
local_path: Option<&Path>,
nexus_settings: NexusSettings,
nexus_configuration: NexusConfiguration,
) -> Self {
Self {
filename: filename.map(ToOwned::to_owned),
local_path: local_path.map(ToOwned::to_owned),
run_cache: Default::default(),
run_number: 0,
nexus_settings,
Expand All @@ -41,6 +43,36 @@ impl NexusEngine {
}
}

pub(crate) fn resume_partial_runs(&mut self) -> anyhow::Result<()> {
if let Some(local_path) = &self.local_path {
let local_path_str = local_path.as_os_str().to_str().ok_or_else(|| {
anyhow::anyhow!("Cannot convert local path to string: {0:?}", local_path)
})?;

for filename in glob(&format!("{local_path_str}/*.nxs"))? {
let filename = filename?;
let filename_str =
filename
.file_stem()
.and_then(OsStr::to_str)
.ok_or_else(|| {
anyhow::anyhow!("Cannot convert filename to string: {0:?}", filename)
})?;
let mut run = info_span!(
"Partial Run Found",
path = local_path_str,
file_name = filename_str
)
.in_scope(|| Run::resume_partial_run(local_path, filename_str))?;
if let Err(e) = run.span_init() {
warn!("Run span initiation failed {e}")
}
self.run_cache.push_back(run);
}
}
Ok(())
}

#[cfg(test)]
fn cache_iter(&self) -> vec_deque::Iter<'_, Run> {
self.run_cache.iter()
Expand All @@ -61,7 +93,7 @@ impl NexusEngine {
.iter_mut()
.find(|run| run.is_message_timestamp_valid(&timestamp))
{
run.push_selogdata(self.filename.as_deref(), data, &self.nexus_settings)?;
run.push_selogdata(self.local_path.as_deref(), data, &self.nexus_settings)?;
Ok(Some(run))
} else {
warn!("No run found for selogdata message with timestamp: {timestamp}");
Expand All @@ -77,7 +109,7 @@ impl NexusEngine {
.iter_mut()
.find(|run| run.is_message_timestamp_valid(&timestamp))
{
run.push_logdata_to_run(self.filename.as_deref(), data, &self.nexus_settings)?;
run.push_logdata_to_run(self.local_path.as_deref(), data, &self.nexus_settings)?;
Ok(Some(run))
} else {
warn!("No run found for logdata message with timestamp: {timestamp}");
Expand All @@ -93,7 +125,7 @@ impl NexusEngine {
.iter_mut()
.find(|run| run.is_message_timestamp_valid(&timestamp))
{
run.push_alarm_to_run(self.filename.as_deref(), data)?;
run.push_alarm_to_run(self.local_path.as_deref(), data)?;
Ok(Some(run))
} else {
warn!("No run found for alarm message with timestamp: {timestamp}");
Expand All @@ -110,7 +142,7 @@ impl NexusEngine {
}

let mut run = Run::new_run(
self.filename.as_deref(),
self.local_path.as_deref(),
RunParameters::new(data, self.run_number)?,
&self.nexus_settings,
&self.nexus_configuration,
Expand All @@ -134,7 +166,7 @@ impl NexusEngine {
.back_mut()
.expect("run_cache::back_mut should exist")
.abort_run(
self.filename.as_deref(),
self.local_path.as_deref(),
data.start_time(),
&self.nexus_settings,
)?;
Expand All @@ -144,7 +176,7 @@ impl NexusEngine {
#[tracing::instrument(skip_all)]
pub(crate) fn stop_command(&mut self, data: RunStop<'_>) -> anyhow::Result<&Run> {
if let Some(last_run) = self.run_cache.back_mut() {
last_run.set_stop_if_valid(self.filename.as_deref(), data)?;
last_run.set_stop_if_valid(self.local_path.as_deref(), data)?;

Ok(last_run)
} else {
Expand Down Expand Up @@ -178,7 +210,7 @@ impl NexusEngine {
.iter_mut()
.find(|run| run.is_message_timestamp_valid(&timestamp))
{
run.push_message(self.filename.as_deref(), message)?;
run.push_message(self.local_path.as_deref(), message)?;
Some(run)
} else {
warn!("No run found for message with timestamp: {timestamp}");
Expand Down Expand Up @@ -211,12 +243,12 @@ impl NexusEngine {
/// Afterwhich the runs are dropped.
#[tracing::instrument(skip_all, level = "debug")]
pub(crate) async fn flush_move_cache(&mut self) {
if let Some((file_name, archive_name)) = Option::zip(
self.filename.as_ref(),
self.nexus_settings.archive_path.as_ref(),
if let Some((local_path, archive_path)) = Option::zip(
self.local_path.as_deref(),
self.nexus_settings.archive_path.as_deref(),
) {
for run in self.run_move_cache.iter() {
match run.move_to_archive(file_name, archive_name) {
match run.move_to_archive(local_path, archive_path) {
Ok(move_to_archive) => move_to_archive.await,
Err(e) => warn!("Error Moving to Archive {e}"),
}
Expand Down
108 changes: 49 additions & 59 deletions nexus-writer/src/nexus/hdf5_file/run_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use super::{
};
use crate::nexus::{
hdf5_file::run_file_components::{RunLog, SeLog},
nexus_class as NX, NexusConfiguration, NexusSettings, RunParameters, DATETIME_FORMAT,
nexus_class as NX,
run_parameters::RunStopParameters,
NexusConfiguration, NexusSettings, RunParameters, DATETIME_FORMAT,
};
use chrono::{DateTime, Duration, Utc};
use hdf5::{types::VarLenUnicode, Dataset, File};
use chrono::{DateTime, Utc};
use hdf5::{types::VarLenUnicode, Dataset, File, H5Type};
use std::{fs::create_dir_all, path::Path};
use supermusr_streaming_types::{
aev2_frame_assembled_event_v2_generated::FrameAssembledEventListMessage,
Expand Down Expand Up @@ -49,17 +51,12 @@ pub(crate) struct RunFile {
impl RunFile {
#[tracing::instrument(skip_all, err(level = "warn"))]
pub(crate) fn new_runfile(
filename: &Path,
path: &Path,
run_name: &str,
nexus_settings: &NexusSettings,
) -> anyhow::Result<Self> {
create_dir_all(filename)?;
let filename = {
let mut filename = filename.to_owned();
filename.push(run_name);
filename.set_extension("nxs");
filename
};
create_dir_all(path)?;
let filename = RunParameters::get_hdf5_filename(path, run_name);
debug!("File save begin. File: {0}.", filename.display());

let file = File::create(filename)?;
Expand Down Expand Up @@ -138,13 +135,8 @@ impl RunFile {
}

#[tracing::instrument(skip_all, err(level = "warn"))]
pub(crate) fn open_runfile(filename: &Path, run_name: &str) -> anyhow::Result<Self> {
let filename = {
let mut filename = filename.to_owned();
filename.push(run_name);
filename.set_extension("nxs");
filename
};
pub(crate) fn open_runfile(local_path: &Path, run_name: &str) -> anyhow::Result<Self> {
let filename = RunParameters::get_hdf5_filename(local_path, run_name);
debug!("File open begin. File: {0}.", filename.display());

let file = File::open_rw(filename)?;
Expand Down Expand Up @@ -229,6 +221,7 @@ impl RunFile {
let start_time = parameters.collect_from.format(DATETIME_FORMAT).to_string();

set_string_to(&self.start_time, &start_time)?;
set_string_to(&self.end_time, "")?;

set_string_to(&self.name, &parameters.run_name)?;
set_string_to(&self.title, "")?;
Expand All @@ -254,43 +247,6 @@ impl RunFile {
Ok(())
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn ensure_end_time_is_set(
&mut self,
parameters: &RunParameters,
message: &FrameAssembledEventListMessage,
) -> anyhow::Result<()> {
let end_time = {
if let Some(run_stop_parameters) = &parameters.run_stop_parameters {
run_stop_parameters.collect_until
} else {
let time = message
.time()
.ok_or(anyhow::anyhow!("Event time missing."))?;

let ms = if time.is_empty() {
0
} else {
time.get(time.len() - 1).div_ceil(1_000_000).into()
};

let duration = Duration::try_milliseconds(ms)
.ok_or(anyhow::anyhow!("Invalid duration {ms}ms."))?;

let timestamp: DateTime<Utc> = (*message
.metadata()
.timestamp()
.ok_or(anyhow::anyhow!("Message timestamp missing."))?)
.try_into()?;

timestamp
.checked_add_signed(duration)
.ok_or(anyhow::anyhow!("Unable to add {duration} to {timestamp}"))?
}
};
self.set_end_time(&end_time)
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn push_logdata_to_runfile(
&mut self,
Expand Down Expand Up @@ -318,12 +274,46 @@ impl RunFile {
#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn push_message_to_runfile(
&mut self,
parameters: &RunParameters,
message: &FrameAssembledEventListMessage,
) -> anyhow::Result<()> {
self.lists.push_message_to_event_runfile(message)?;
self.ensure_end_time_is_set(parameters, message)?;
Ok(())
self.lists.push_message_to_event_runfile(message)
}

fn try_read_scalar<T: H5Type>(dataset: &Dataset) -> anyhow::Result<T> {
if dataset.storage_size() != 0 {
if dataset.is_scalar() {
Ok(dataset.read_scalar::<T>()?)
} else {
anyhow::bail!("{} is not a scalar", dataset.name())
}
} else {
anyhow::bail!("{} is not allocated", dataset.name())
}
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn extract_run_parameters(&self) -> anyhow::Result<RunParameters> {
let collect_from: DateTime<Utc> =
Self::try_read_scalar::<VarLenUnicode>(&self.start_time)?.parse()?;
let run_name = Self::try_read_scalar::<VarLenUnicode>(&self.name)?.into();
let run_number = Self::try_read_scalar::<u32>(&self.run_number)?;
let num_periods = Self::try_read_scalar::<u32>(&self.period_number)?;
let instrument_name = Self::try_read_scalar::<VarLenUnicode>(&self.instrument_name)?.into();
let run_stop_parameters = Self::try_read_scalar::<VarLenUnicode>(&self.end_time)?
.parse()
.map(|collect_until| RunStopParameters {
collect_until,
last_modified: Utc::now(),
})
.ok();
Ok(RunParameters {
collect_from,
run_stop_parameters,
num_periods,
run_name,
run_number,
instrument_name,
})
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
Expand Down
Loading

0 comments on commit 10b362d

Please sign in to comment.