Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close hdf5 file after resuming or after errors occur #286

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 97 additions & 54 deletions nexus-writer/src/nexus/hdf5_file/run_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ use supermusr_streaming_types::{
ecs_se00_data_generated::se00_SampleEnvironmentData,
};
use tracing::debug;

#[derive(Debug)]
pub(crate) struct RunFile {
file: File,
contents: RunFileContents,
}

#[derive(Debug)]
struct RunFileContents {
idf_version: Dataset,
definition: Dataset,
program_name: Dataset,
Expand Down Expand Up @@ -48,26 +53,20 @@ pub(crate) struct RunFile {
lists: EventRun,
}

impl RunFile {
impl RunFileContents {
#[tracing::instrument(skip_all, err(level = "warn"))]
pub(crate) fn new_runfile(
path: &Path,
run_name: &str,
pub(crate) fn populate_new_runfile(
file: &File,
nexus_settings: &NexusSettings,
) -> anyhow::Result<Self> {
create_dir_all(path)?;
let filename = RunParameters::get_hdf5_filename(path, run_name);
debug!("File save begin. File: {0}.", filename.display());

let file = File::create(filename)?;
set_group_nx_class(&file, NX::ROOT)?;
set_group_nx_class(file, NX::ROOT)?;

add_attribute_to(&file, "HDF5_version", "1.14.3")?; // Can this be taken directly from the nix package;
add_attribute_to(&file, "NeXus_version", "")?; // Where does this come from?
add_attribute_to(&file, "file_name", &file.filename())?; // This should be absolutized at some point
add_attribute_to(&file, "file_time", Utc::now().to_string().as_str())?; // This should be formatted, the nanoseconds are overkill.
add_attribute_to(file, "HDF5_version", "1.14.3")?; // Can this be taken directly from the nix package;
add_attribute_to(file, "NeXus_version", "")?; // Where does this come from?
add_attribute_to(file, "file_name", &file.filename())?; // This should be absolutized at some point
add_attribute_to(file, "file_time", Utc::now().to_string().as_str())?; // This should be formatted, the nanoseconds are overkill.

let entry = add_new_group_to(&file, "raw_data_1", NX::ENTRY)?;
let entry = add_new_group_to(file, "raw_data_1", NX::ENTRY)?;

let idf_version = entry.new_dataset::<u32>().create("IDF_version")?;
let definition = entry.new_dataset::<VarLenUnicode>().create("definition")?;
Expand Down Expand Up @@ -112,7 +111,6 @@ impl RunFile {
let lists = EventRun::new_event_runfile(&entry, nexus_settings)?;

Ok(Self {
file,
idf_version,
start_time,
end_time,
Expand All @@ -134,13 +132,7 @@ impl RunFile {
})
}

#[tracing::instrument(skip_all, err(level = "warn"))]
pub(crate) fn open_runfile(local_path: &Path, run_name: &str) -> anyhow::Result<Self> {
let filename = RunParameters::get_hdf5_filename(local_path, run_name);
debug!("File open begin. File: {0}.", filename.display());

let file = File::open_rw(filename)?;

fn populate_open_runfile(file: &File) -> anyhow::Result<Self> {
let entry = file.group("raw_data_1")?;

let idf_version = entry.dataset("IDF_version")?;
Expand Down Expand Up @@ -176,7 +168,6 @@ impl RunFile {
let lists = EventRun::open_event_runfile(&entry)?;

Ok(Self {
file,
idf_version,
start_time,
end_time,
Expand All @@ -197,53 +188,100 @@ impl RunFile {
experiment_identifier,
})
}
}

impl RunFile {
#[tracing::instrument(skip_all, err(level = "warn"))]
pub(crate) fn new_runfile(
path: &Path,
run_name: &str,
nexus_settings: &NexusSettings,
) -> anyhow::Result<Self> {
create_dir_all(path)?;
let filename = RunParameters::get_hdf5_filename(path, run_name);
debug!("File save begin. File: {0}.", filename.display());

let file = File::create(filename)?;
match RunFileContents::populate_new_runfile(&file, nexus_settings) {
Ok(contents) => Ok(Self { file, contents }),
Err(e) => {
file.close()?;
Err(e)
}
}
}

#[tracing::instrument(skip_all, err(level = "warn"))]
pub(crate) fn open_runfile(local_path: &Path, run_name: &str) -> anyhow::Result<Self> {
let filename = RunParameters::get_hdf5_filename(local_path, run_name);
debug!("File open begin. File: {0}.", filename.display());

let file = File::open_rw(filename)?;
match RunFileContents::populate_open_runfile(&file) {
Ok(contents) => Ok(Self { file, contents }),
Err(e) => {
file.close()?;
Err(e)
}
}
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn init(
&mut self,
parameters: &RunParameters,
nexus_configuration: &NexusConfiguration,
) -> anyhow::Result<()> {
self.idf_version.write_scalar(&2)?;
self.run_number.write_scalar(&parameters.run_number)?;
self.contents.idf_version.write_scalar(&2)?;
self.contents
.run_number
.write_scalar(&parameters.run_number)?;

set_string_to(&self.definition, "muonTD")?;
set_string_to(&self.experiment_identifier, "")?;
set_string_to(&self.contents.definition, "muonTD")?;
set_string_to(&self.contents.experiment_identifier, "")?;

set_string_to(&self.program_name, "SuperMuSR Data Pipeline Nexus Writer")?;
add_attribute_to(&self.program_name, "version", "1.0")?;
set_string_to(
&self.contents.program_name,
"SuperMuSR Data Pipeline Nexus Writer",
)?;
add_attribute_to(&self.contents.program_name, "version", "1.0")?;
add_attribute_to(
&self.program_name,
&self.contents.program_name,
"configuration",
&nexus_configuration.configuration,
)?;

let start_time = parameters.collect_from.format(DATETIME_FORMAT).to_string();

set_string_to(&self.start_time, &start_time)?;
set_string_to(&self.end_time, "")?;
set_string_to(&self.contents.start_time, &start_time)?;
set_string_to(&self.contents.end_time, "")?;

set_string_to(&self.name, &parameters.run_name)?;
set_string_to(&self.title, "")?;
set_string_to(&self.contents.name, &parameters.run_name)?;
set_string_to(&self.contents.title, "")?;

set_string_to(&self.instrument_name, &parameters.instrument_name)?;
set_string_to(&self.contents.instrument_name, &parameters.instrument_name)?;

self.period_number.write_scalar(&parameters.num_periods)?;
set_slice_to(&self.period_type, &vec![1; parameters.num_periods as usize])?;
self.contents
.period_number
.write_scalar(&parameters.num_periods)?;
set_slice_to(
&self.contents.period_type,
&vec![1; parameters.num_periods as usize],
)?;

set_string_to(&self.source_name, "MuSR")?;
set_string_to(&self.source_type, "")?;
set_string_to(&self.source_probe, "")?;
set_string_to(&self.contents.source_name, "MuSR")?;
set_string_to(&self.contents.source_type, "")?;
set_string_to(&self.contents.source_probe, "")?;

self.lists.init(&parameters.collect_from)?;
self.contents.lists.init(&parameters.collect_from)?;
Ok(())
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn set_end_time(&mut self, end_time: &DateTime<Utc>) -> anyhow::Result<()> {
let end_time = end_time.format(DATETIME_FORMAT).to_string();

set_string_to(&self.end_time, &end_time)?;
set_string_to(&self.contents.end_time, &end_time)?;
Ok(())
}

Expand All @@ -253,12 +291,14 @@ impl RunFile {
logdata: &f144_LogData,
nexus_settings: &NexusSettings,
) -> anyhow::Result<()> {
self.logs.push_logdata_to_runlog(logdata, nexus_settings)
self.contents
.logs
.push_logdata_to_runlog(logdata, nexus_settings)
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn push_alarm_to_runfile(&mut self, alarm: Alarm) -> anyhow::Result<()> {
self.selogs.push_alarm_to_selog(alarm)
self.contents.selogs.push_alarm_to_selog(alarm)
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
Expand All @@ -267,7 +307,8 @@ impl RunFile {
selogdata: se00_SampleEnvironmentData,
nexus_settings: &NexusSettings,
) -> anyhow::Result<()> {
self.selogs
self.contents
.selogs
.push_selogdata_to_selog(&selogdata, nexus_settings)
}

Expand All @@ -276,7 +317,7 @@ impl RunFile {
&mut self,
message: &FrameAssembledEventListMessage,
) -> anyhow::Result<()> {
self.lists.push_message_to_event_runfile(message)
self.contents.lists.push_message_to_event_runfile(message)
}

fn try_read_scalar<T: H5Type>(dataset: &Dataset) -> anyhow::Result<T> {
Expand All @@ -294,12 +335,13 @@ impl RunFile {
#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn extract_run_parameters(&self) -> anyhow::Result<RunParameters> {
let collect_from: DateTime<Utc> =
Self::try_read_scalar::<VarLenUnicode>(&self.start_time)?.parse()?;
let run_name = Self::try_read_scalar::<VarLenUnicode>(&self.name)?.into();
let run_number = Self::try_read_scalar::<u32>(&self.run_number)?;
let num_periods = Self::try_read_scalar::<u32>(&self.period_number)?;
let instrument_name = Self::try_read_scalar::<VarLenUnicode>(&self.instrument_name)?.into();
let run_stop_parameters = Self::try_read_scalar::<VarLenUnicode>(&self.end_time)?
Self::try_read_scalar::<VarLenUnicode>(&self.contents.start_time)?.parse()?;
let run_name = Self::try_read_scalar::<VarLenUnicode>(&self.contents.name)?.into();
let run_number = Self::try_read_scalar::<u32>(&self.contents.run_number)?;
let num_periods = Self::try_read_scalar::<u32>(&self.contents.period_number)?;
let instrument_name =
Self::try_read_scalar::<VarLenUnicode>(&self.contents.instrument_name)?.into();
let run_stop_parameters = Self::try_read_scalar::<VarLenUnicode>(&self.contents.end_time)?
.parse()
.map(|collect_until| RunStopParameters {
collect_until,
Expand All @@ -322,7 +364,8 @@ impl RunFile {
stop_time: i32,
nexus_settings: &NexusSettings,
) -> anyhow::Result<()> {
self.logs
self.contents
.logs
.set_aborted_run_warning(stop_time, nexus_settings)?;
Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions nexus-writer/src/nexus/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ impl Run {
pub(crate) fn resume_partial_run(local_path: &Path, filename: &str) -> anyhow::Result<Self> {
let run = RunFile::open_runfile(local_path, filename)?;
let parameters = run.extract_run_parameters()?;
run.close()?;

Ok(Self {
span: Default::default(),
parameters,
Expand Down
Loading