Skip to content

Commit

Permalink
Close hdf5 file after resuming or after errors occur (#286)
Browse files Browse the repository at this point in the history
## Summary of changes

Fixes two possible source of memory leakage:
 - Explicitly closes the hdf5 file after resuming a partial run
- Explicitly closes the hdf5 file after an error occurs whilst reading
or creating the datasets.

## Instruction for review/testing

- General code review.
- Advice on limiting unnecessary memory usage.
  • Loading branch information
Modularius authored Dec 11, 2024
1 parent 0f6fd91 commit 0705b8f
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 54 deletions.
151 changes: 97 additions & 54 deletions nexus-writer/src/nexus/hdf5_file/run_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ use supermusr_streaming_types::{
ecs_se00_data_generated::se00_SampleEnvironmentData,
};
use tracing::debug;

#[derive(Debug)]
pub(crate) struct RunFile {
file: File,
contents: RunFileContents,
}

#[derive(Debug)]
struct RunFileContents {
idf_version: Dataset,
definition: Dataset,
program_name: Dataset,
Expand Down Expand Up @@ -48,26 +53,20 @@ pub(crate) struct RunFile {
lists: EventRun,
}

impl RunFile {
impl RunFileContents {
#[tracing::instrument(skip_all, err(level = "warn"))]
pub(crate) fn new_runfile(
path: &Path,
run_name: &str,
pub(crate) fn populate_new_runfile(
file: &File,
nexus_settings: &NexusSettings,
) -> anyhow::Result<Self> {
create_dir_all(path)?;
let filename = RunParameters::get_hdf5_filename(path, run_name);
debug!("File save begin. File: {0}.", filename.display());

let file = File::create(filename)?;
set_group_nx_class(&file, NX::ROOT)?;
set_group_nx_class(file, NX::ROOT)?;

add_attribute_to(&file, "HDF5_version", "1.14.3")?; // Can this be taken directly from the nix package;
add_attribute_to(&file, "NeXus_version", "")?; // Where does this come from?
add_attribute_to(&file, "file_name", &file.filename())?; // This should be absolutized at some point
add_attribute_to(&file, "file_time", Utc::now().to_string().as_str())?; // This should be formatted, the nanoseconds are overkill.
add_attribute_to(file, "HDF5_version", "1.14.3")?; // Can this be taken directly from the nix package;
add_attribute_to(file, "NeXus_version", "")?; // Where does this come from?
add_attribute_to(file, "file_name", &file.filename())?; // This should be absolutized at some point
add_attribute_to(file, "file_time", Utc::now().to_string().as_str())?; // This should be formatted, the nanoseconds are overkill.

let entry = add_new_group_to(&file, "raw_data_1", NX::ENTRY)?;
let entry = add_new_group_to(file, "raw_data_1", NX::ENTRY)?;

let idf_version = entry.new_dataset::<u32>().create("IDF_version")?;
let definition = entry.new_dataset::<VarLenUnicode>().create("definition")?;
Expand Down Expand Up @@ -112,7 +111,6 @@ impl RunFile {
let lists = EventRun::new_event_runfile(&entry, nexus_settings)?;

Ok(Self {
file,
idf_version,
start_time,
end_time,
Expand All @@ -134,13 +132,7 @@ impl RunFile {
})
}

#[tracing::instrument(skip_all, err(level = "warn"))]
pub(crate) fn open_runfile(local_path: &Path, run_name: &str) -> anyhow::Result<Self> {
let filename = RunParameters::get_hdf5_filename(local_path, run_name);
debug!("File open begin. File: {0}.", filename.display());

let file = File::open_rw(filename)?;

fn populate_open_runfile(file: &File) -> anyhow::Result<Self> {
let entry = file.group("raw_data_1")?;

let idf_version = entry.dataset("IDF_version")?;
Expand Down Expand Up @@ -176,7 +168,6 @@ impl RunFile {
let lists = EventRun::open_event_runfile(&entry)?;

Ok(Self {
file,
idf_version,
start_time,
end_time,
Expand All @@ -197,53 +188,100 @@ impl RunFile {
experiment_identifier,
})
}
}

impl RunFile {
#[tracing::instrument(skip_all, err(level = "warn"))]
pub(crate) fn new_runfile(
path: &Path,
run_name: &str,
nexus_settings: &NexusSettings,
) -> anyhow::Result<Self> {
create_dir_all(path)?;
let filename = RunParameters::get_hdf5_filename(path, run_name);
debug!("File save begin. File: {0}.", filename.display());

let file = File::create(filename)?;
match RunFileContents::populate_new_runfile(&file, nexus_settings) {
Ok(contents) => Ok(Self { file, contents }),
Err(e) => {
file.close()?;
Err(e)
}
}
}

#[tracing::instrument(skip_all, err(level = "warn"))]
pub(crate) fn open_runfile(local_path: &Path, run_name: &str) -> anyhow::Result<Self> {
let filename = RunParameters::get_hdf5_filename(local_path, run_name);
debug!("File open begin. File: {0}.", filename.display());

let file = File::open_rw(filename)?;
match RunFileContents::populate_open_runfile(&file) {
Ok(contents) => Ok(Self { file, contents }),
Err(e) => {
file.close()?;
Err(e)
}
}
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn init(
&mut self,
parameters: &RunParameters,
nexus_configuration: &NexusConfiguration,
) -> anyhow::Result<()> {
self.idf_version.write_scalar(&2)?;
self.run_number.write_scalar(&parameters.run_number)?;
self.contents.idf_version.write_scalar(&2)?;
self.contents
.run_number
.write_scalar(&parameters.run_number)?;

set_string_to(&self.definition, "muonTD")?;
set_string_to(&self.experiment_identifier, "")?;
set_string_to(&self.contents.definition, "muonTD")?;
set_string_to(&self.contents.experiment_identifier, "")?;

set_string_to(&self.program_name, "SuperMuSR Data Pipeline Nexus Writer")?;
add_attribute_to(&self.program_name, "version", "1.0")?;
set_string_to(
&self.contents.program_name,
"SuperMuSR Data Pipeline Nexus Writer",
)?;
add_attribute_to(&self.contents.program_name, "version", "1.0")?;
add_attribute_to(
&self.program_name,
&self.contents.program_name,
"configuration",
&nexus_configuration.configuration,
)?;

let start_time = parameters.collect_from.format(DATETIME_FORMAT).to_string();

set_string_to(&self.start_time, &start_time)?;
set_string_to(&self.end_time, "")?;
set_string_to(&self.contents.start_time, &start_time)?;
set_string_to(&self.contents.end_time, "")?;

set_string_to(&self.name, &parameters.run_name)?;
set_string_to(&self.title, "")?;
set_string_to(&self.contents.name, &parameters.run_name)?;
set_string_to(&self.contents.title, "")?;

set_string_to(&self.instrument_name, &parameters.instrument_name)?;
set_string_to(&self.contents.instrument_name, &parameters.instrument_name)?;

self.period_number.write_scalar(&parameters.num_periods)?;
set_slice_to(&self.period_type, &vec![1; parameters.num_periods as usize])?;
self.contents
.period_number
.write_scalar(&parameters.num_periods)?;
set_slice_to(
&self.contents.period_type,
&vec![1; parameters.num_periods as usize],
)?;

set_string_to(&self.source_name, "MuSR")?;
set_string_to(&self.source_type, "")?;
set_string_to(&self.source_probe, "")?;
set_string_to(&self.contents.source_name, "MuSR")?;
set_string_to(&self.contents.source_type, "")?;
set_string_to(&self.contents.source_probe, "")?;

self.lists.init(&parameters.collect_from)?;
self.contents.lists.init(&parameters.collect_from)?;
Ok(())
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn set_end_time(&mut self, end_time: &DateTime<Utc>) -> anyhow::Result<()> {
let end_time = end_time.format(DATETIME_FORMAT).to_string();

set_string_to(&self.end_time, &end_time)?;
set_string_to(&self.contents.end_time, &end_time)?;
Ok(())
}

Expand All @@ -253,12 +291,14 @@ impl RunFile {
logdata: &f144_LogData,
nexus_settings: &NexusSettings,
) -> anyhow::Result<()> {
self.logs.push_logdata_to_runlog(logdata, nexus_settings)
self.contents
.logs
.push_logdata_to_runlog(logdata, nexus_settings)
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn push_alarm_to_runfile(&mut self, alarm: Alarm) -> anyhow::Result<()> {
self.selogs.push_alarm_to_selog(alarm)
self.contents.selogs.push_alarm_to_selog(alarm)
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
Expand All @@ -267,7 +307,8 @@ impl RunFile {
selogdata: se00_SampleEnvironmentData,
nexus_settings: &NexusSettings,
) -> anyhow::Result<()> {
self.selogs
self.contents
.selogs
.push_selogdata_to_selog(&selogdata, nexus_settings)
}

Expand All @@ -276,7 +317,7 @@ impl RunFile {
&mut self,
message: &FrameAssembledEventListMessage,
) -> anyhow::Result<()> {
self.lists.push_message_to_event_runfile(message)
self.contents.lists.push_message_to_event_runfile(message)
}

fn try_read_scalar<T: H5Type>(dataset: &Dataset) -> anyhow::Result<T> {
Expand All @@ -294,12 +335,13 @@ impl RunFile {
#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn extract_run_parameters(&self) -> anyhow::Result<RunParameters> {
let collect_from: DateTime<Utc> =
Self::try_read_scalar::<VarLenUnicode>(&self.start_time)?.parse()?;
let run_name = Self::try_read_scalar::<VarLenUnicode>(&self.name)?.into();
let run_number = Self::try_read_scalar::<u32>(&self.run_number)?;
let num_periods = Self::try_read_scalar::<u32>(&self.period_number)?;
let instrument_name = Self::try_read_scalar::<VarLenUnicode>(&self.instrument_name)?.into();
let run_stop_parameters = Self::try_read_scalar::<VarLenUnicode>(&self.end_time)?
Self::try_read_scalar::<VarLenUnicode>(&self.contents.start_time)?.parse()?;
let run_name = Self::try_read_scalar::<VarLenUnicode>(&self.contents.name)?.into();
let run_number = Self::try_read_scalar::<u32>(&self.contents.run_number)?;
let num_periods = Self::try_read_scalar::<u32>(&self.contents.period_number)?;
let instrument_name =
Self::try_read_scalar::<VarLenUnicode>(&self.contents.instrument_name)?.into();
let run_stop_parameters = Self::try_read_scalar::<VarLenUnicode>(&self.contents.end_time)?
.parse()
.map(|collect_until| RunStopParameters {
collect_until,
Expand All @@ -322,7 +364,8 @@ impl RunFile {
stop_time: i32,
nexus_settings: &NexusSettings,
) -> anyhow::Result<()> {
self.logs
self.contents
.logs
.set_aborted_run_warning(stop_time, nexus_settings)?;
Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions nexus-writer/src/nexus/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ impl Run {
pub(crate) fn resume_partial_run(local_path: &Path, filename: &str) -> anyhow::Result<Self> {
let run = RunFile::open_runfile(local_path, filename)?;
let parameters = run.extract_run_parameters()?;
run.close()?;

Ok(Self {
span: Default::default(),
parameters,
Expand Down

0 comments on commit 0705b8f

Please sign in to comment.