Skip to content

Commit

Permalink
dataflow/operators/core: add .partition() for StreamCore (#610)
Browse files Browse the repository at this point in the history
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
  • Loading branch information
petrosagg authored Dec 18, 2024
1 parent 47e0722 commit 4337b10
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 29 deletions.
2 changes: 2 additions & 0 deletions timely/src/dataflow/operators/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod input;
pub mod inspect;
pub mod map;
pub mod ok_err;
pub mod partition;
pub mod probe;
pub mod rc;
pub mod reclock;
Expand All @@ -27,6 +28,7 @@ pub use input::Input;
pub use inspect::{Inspect, InspectCore};
pub use map::Map;
pub use ok_err::OkErr;
pub use partition::Partition;
pub use probe::Probe;
pub use to_stream::{ToStream, ToStreamBuilder};
pub use reclock::Reclock;
Expand Down
75 changes: 75 additions & 0 deletions timely/src/dataflow/operators/core/partition.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//! Partition a stream of records into multiple streams.
use timely_container::{Container, ContainerBuilder, PushInto, SizableContainer};

use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
use crate::dataflow::{Scope, StreamCore};
use crate::Data;

/// Partition a stream of records into multiple streams.
pub trait Partition<G: Scope, C: Container> {
/// Produces `parts` output streams, containing records produced and assigned by `route`.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::ToStream;
/// use timely::dataflow::operators::core::{Partition, Inspect};
///
/// timely::example(|scope| {
/// let streams = (0..10).to_stream(scope)
/// .partition(3, |x| (x % 3, x));
///
/// for (idx, stream) in streams.into_iter().enumerate() {
/// stream
/// .container::<Vec<_>>()
/// .inspect(move |x| println!("seen {idx}: {x:?}"));
/// }
/// });
/// ```
fn partition<CB, D2, F>(&self, parts: u64, route: F) -> Vec<StreamCore<G, CB::Container>>
where
CB: ContainerBuilder,
CB::Container: SizableContainer + PushInto<D2> + Data,
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
}

impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
fn partition<CB, D2, F>(&self, parts: u64, mut route: F) -> Vec<StreamCore<G, CB::Container>>
where
CB: ContainerBuilder,
CB::Container: SizableContainer + PushInto<D2> + Data,
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
{
let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());

let mut input = builder.new_input(self, Pipeline);
let mut outputs = Vec::with_capacity(parts as usize);
let mut streams = Vec::with_capacity(parts as usize);

for _ in 0..parts {
let (output, stream) = builder.new_output();
outputs.push(output);
streams.push(stream);
}

builder.build(move |_| {
move |_frontiers| {
let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
input.for_each(|time, data| {
let mut sessions = handles
.iter_mut()
.map(|h| h.session(&time))
.collect::<Vec<_>>();

for datum in data.drain() {
let (part, datum2) = route(datum);
sessions[part as usize].give(datum2);
}
});
}
});

streams
}
}
32 changes: 3 additions & 29 deletions timely/src/dataflow/operators/partition.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Partition a stream of records into multiple streams.
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
use crate::container::CapacityContainerBuilder;
use crate::dataflow::operators::core::Partition as PartitionCore;
use crate::dataflow::{Scope, Stream};
use crate::Data;

Expand All @@ -27,32 +27,6 @@ pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2)> {

impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D2, F> for Stream<G, D> {
fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>> {
let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());

let mut input = builder.new_input(self, Pipeline);
let mut outputs = Vec::with_capacity(parts as usize);
let mut streams = Vec::with_capacity(parts as usize);

for _ in 0 .. parts {
let (output, stream) = builder.new_output();
outputs.push(output);
streams.push(stream);
}

builder.build(move |_| {
move |_frontiers| {
let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
input.for_each(|time, data| {
let mut sessions = handles.iter_mut().map(|h| h.session(&time)).collect::<Vec<_>>();

for datum in data.drain(..) {
let (part, datum2) = route(datum);
sessions[part as usize].give(datum2);
}
});
}
});

streams
PartitionCore::partition::<CapacityContainerBuilder<_>, _, _>(self, parts, route)
}
}

0 comments on commit 4337b10

Please sign in to comment.