From 4337b1013970d02a6e02e70ec61b0b438a087a76 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Wed, 18 Dec 2024 02:04:15 +0200 Subject: [PATCH] dataflow/operators/core: add `.partition()` for `StreamCore` (#610) Signed-off-by: Petros Angelatos --- timely/src/dataflow/operators/core/mod.rs | 2 + .../src/dataflow/operators/core/partition.rs | 75 +++++++++++++++++++ timely/src/dataflow/operators/partition.rs | 32 +------- 3 files changed, 80 insertions(+), 29 deletions(-) create mode 100644 timely/src/dataflow/operators/core/partition.rs diff --git a/timely/src/dataflow/operators/core/mod.rs b/timely/src/dataflow/operators/core/mod.rs index 5074014bd..3674fc49e 100644 --- a/timely/src/dataflow/operators/core/mod.rs +++ b/timely/src/dataflow/operators/core/mod.rs @@ -11,6 +11,7 @@ pub mod input; pub mod inspect; pub mod map; pub mod ok_err; +pub mod partition; pub mod probe; pub mod rc; pub mod reclock; @@ -27,6 +28,7 @@ pub use input::Input; pub use inspect::{Inspect, InspectCore}; pub use map::Map; pub use ok_err::OkErr; +pub use partition::Partition; pub use probe::Probe; pub use to_stream::{ToStream, ToStreamBuilder}; pub use reclock::Reclock; diff --git a/timely/src/dataflow/operators/core/partition.rs b/timely/src/dataflow/operators/core/partition.rs new file mode 100644 index 000000000..e5c5ce2bd --- /dev/null +++ b/timely/src/dataflow/operators/core/partition.rs @@ -0,0 +1,75 @@ +//! Partition a stream of records into multiple streams. + +use timely_container::{Container, ContainerBuilder, PushInto, SizableContainer}; + +use crate::dataflow::channels::pact::Pipeline; +use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; +use crate::dataflow::{Scope, StreamCore}; +use crate::Data; + +/// Partition a stream of records into multiple streams. +pub trait Partition { + /// Produces `parts` output streams, containing records produced and assigned by `route`. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::ToStream; + /// use timely::dataflow::operators::core::{Partition, Inspect}; + /// + /// timely::example(|scope| { + /// let streams = (0..10).to_stream(scope) + /// .partition(3, |x| (x % 3, x)); + /// + /// for (idx, stream) in streams.into_iter().enumerate() { + /// stream + /// .container::>() + /// .inspect(move |x| println!("seen {idx}: {x:?}")); + /// } + /// }); + /// ``` + fn partition(&self, parts: u64, route: F) -> Vec> + where + CB: ContainerBuilder, + CB::Container: SizableContainer + PushInto + Data, + F: FnMut(C::Item<'_>) -> (u64, D2) + 'static; +} + +impl Partition for StreamCore { + fn partition(&self, parts: u64, mut route: F) -> Vec> + where + CB: ContainerBuilder, + CB::Container: SizableContainer + PushInto + Data, + F: FnMut(C::Item<'_>) -> (u64, D2) + 'static, + { + let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope()); + + let mut input = builder.new_input(self, Pipeline); + let mut outputs = Vec::with_capacity(parts as usize); + let mut streams = Vec::with_capacity(parts as usize); + + for _ in 0..parts { + let (output, stream) = builder.new_output(); + outputs.push(output); + streams.push(stream); + } + + builder.build(move |_| { + move |_frontiers| { + let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::>(); + input.for_each(|time, data| { + let mut sessions = handles + .iter_mut() + .map(|h| h.session(&time)) + .collect::>(); + + for datum in data.drain() { + let (part, datum2) = route(datum); + sessions[part as usize].give(datum2); + } + }); + } + }); + + streams + } +} diff --git a/timely/src/dataflow/operators/partition.rs b/timely/src/dataflow/operators/partition.rs index 39fc427cc..6388efc63 100644 --- a/timely/src/dataflow/operators/partition.rs +++ b/timely/src/dataflow/operators/partition.rs @@ -1,7 +1,7 @@ //! Partition a stream of records into multiple streams. -use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; +use crate::container::CapacityContainerBuilder; +use crate::dataflow::operators::core::Partition as PartitionCore; use crate::dataflow::{Scope, Stream}; use crate::Data; @@ -27,32 +27,6 @@ pub trait Partition (u64, D2)> { impl(u64, D2)+'static> Partition for Stream { fn partition(&self, parts: u64, route: F) -> Vec> { - let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope()); - - let mut input = builder.new_input(self, Pipeline); - let mut outputs = Vec::with_capacity(parts as usize); - let mut streams = Vec::with_capacity(parts as usize); - - for _ in 0 .. parts { - let (output, stream) = builder.new_output(); - outputs.push(output); - streams.push(stream); - } - - builder.build(move |_| { - move |_frontiers| { - let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::>(); - input.for_each(|time, data| { - let mut sessions = handles.iter_mut().map(|h| h.session(&time)).collect::>(); - - for datum in data.drain(..) { - let (part, datum2) = route(datum); - sessions[part as usize].give(datum2); - } - }); - } - }); - - streams + PartitionCore::partition::, _, _>(self, parts, route) } }