Skip to content

Commit

Permalink
Bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Modularius committed Nov 25, 2024
1 parent e3aa0b5 commit 3461368
Showing 1 changed file with 48 additions and 17 deletions.
65 changes: 48 additions & 17 deletions nexus-writer/src/nexus/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,24 +174,20 @@ impl NexusEngine {

#[tracing::instrument(skip_all, level = "debug")]
pub(crate) fn flush(&mut self, delay: &Duration) {
// Get Indices of all completed run
let removed_indices: Vec<_> = self
.run_cache
.iter()
.enumerate()
.filter_map(|(index, run)| run.has_completed(delay).then_some(index))
.collect();

// Remove all runs found to be completed, and place them in self.run_move_cache
for index in removed_indices {
let run = self
.run_cache
.remove(index)
.expect("Index should be within bounds");
if let Err(e) = run.end_span() {
warn!("Run span drop failed {e}")
// Moves the runs into a new vector, then consumes it,
// directing completed runs to self.run_move_cache
// and incomplete ones back to self.run_cache
let temp: Vec<_> = self.run_cache.drain(..).collect();
//let mut temp = VecDeque::<Run>::new();
for run in temp.into_iter() {
if run.has_completed(delay) {
if let Err(e) = run.end_span() {
warn!("Run span drop failed {e}")
}
self.run_move_cache.push(run);
} else {
self.run_cache.push_back(run);
}
self.run_move_cache.push(run);
}
}

Expand Down Expand Up @@ -409,6 +405,41 @@ mod test {
nexus.flush(&Duration::zero());
assert_eq!(nexus.cache_iter().len(), 0);
}

#[test]
fn two_runs_flushed() {
let mut nexus = NexusEngine::new(
None,
NexusSettings::default(),
NexusConfiguration::new(None),
);
let mut fbb = FlatBufferBuilder::new();

let ts_start: DateTime<Utc> = GpsTime::new(0, 1, 0, 0, 15, 0, 0, 0).try_into().unwrap();
let ts_end: DateTime<Utc> = GpsTime::new(0, 1, 0, 0, 17, 0, 0, 0).try_into().unwrap();

let start = create_start(&mut fbb, "Test1", ts_start.timestamp_millis() as u64).unwrap();
nexus.start_command(start).unwrap();

fbb.reset();
let stop = create_stop(&mut fbb, "Test1", ts_end.timestamp_millis() as u64).unwrap();
nexus.stop_command(stop).unwrap();

assert_eq!(nexus.cache_iter().len(), 1);

fbb.reset();
let start = create_start(&mut fbb, "Test1", ts_start.timestamp_millis() as u64).unwrap();
nexus.start_command(start).unwrap();

fbb.reset();
let stop = create_stop(&mut fbb, "Test1", ts_end.timestamp_millis() as u64).unwrap();
nexus.stop_command(stop).unwrap();

assert_eq!(nexus.cache_iter().len(), 2);

nexus.flush(&Duration::zero());
assert_eq!(nexus.cache_iter().len(), 0);
}
}

#[derive(Default, Debug)]
Expand Down

0 comments on commit 3461368

Please sign in to comment.