diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index df26ddee7..1cc193d66 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -21,17 +21,17 @@ pub fn count( where G: Scope, G::Timestamp: Lattice, - Tr: TraceReader+Clone+'static, - Tr::Key: Ord+Hashable+Default, + Tr: TraceReader+Clone+'static, + Tr::KeyOwned: Hashable + Default, R: Monoid+Multiply+ExchangeData, - F: Fn(&P)->Tr::Key+Clone+'static, + F: Fn(&P)->Tr::KeyOwned+Clone+'static, P: ExchangeData, { crate::operators::lookup_map( prefixes, arrangement, - move |p: &(P,usize,usize), k: &mut Tr::Key| { *k = key_selector(&p.0); }, - move |(p,c,i), r, &(), s| { + move |p: &(P,usize,usize), k: &mut Tr::KeyOwned| { *k = key_selector(&p.0); }, + move |(p,c,i), r, _, s| { let s = *s as usize; if *c < s { ((p.clone(), *c, *i), r.clone()) } else { ((p.clone(), s, index), r.clone()) } diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 63cc74ccf..0f5b90509 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -31,11 +31,9 @@ //! of logical compaction, which should not be done in a way that prevents //! the correct determination of the total order comparison. -use std::borrow::Borrow; use std::collections::HashMap; use std::ops::Mul; - use timely::dataflow::Scope; use timely::dataflow::channels::pact::{Pipeline, Exchange}; use timely::dataflow::operators::Operator; @@ -70,8 +68,8 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates}; /// Notice that the time is hoisted up into data. The expectation is that /// once out of the "delta flow region", the updates will be `delay`d to the /// times specified in the payloads. -pub fn half_join( - stream: &Collection, +pub fn half_join( + stream: &Collection, arrangement: Arranged, frontier_func: FF, comparison: CF, @@ -80,20 +78,19 @@ pub fn half_join( where G: Scope, G::Timestamp: Lattice, - K: Ord + Hashable + ExchangeData + Borrow, + Tr::KeyOwned: Ord + Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, Tr: TraceReader+Clone+'static, - Tr::Key: Eq, Tr::Diff: Semigroup, R: Mul, >::Output: Semigroup, FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, DOut: Clone+'static, - S: FnMut(&K, &V, &Tr::Val)->DOut+'static, + S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>)->DOut+'static, { - let output_func = move |k: &K, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| { + let output_func = move |k: &Tr::KeyOwned, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| { let diff = diff1.clone() * diff2.clone(); let dout = (output_func(k, v1, v2), time.clone()); Some((dout, initial.clone(), diff)) @@ -125,8 +122,8 @@ where /// yield control, as a function of the elapsed time and the number of matched /// records. Note this is not the number of *output* records, owing mainly to /// the number of matched records being easiest to record with low overhead. -pub fn half_join_internal_unsafe( - stream: &Collection, +pub fn half_join_internal_unsafe( + stream: &Collection, mut arrangement: Arranged, frontier_func: FF, comparison: CF, @@ -136,11 +133,10 @@ pub fn half_join_internal_unsafe( where G: Scope, G::Timestamp: Lattice, - K: Ord + Hashable + ExchangeData + std::borrow::Borrow, + Tr::KeyOwned: Ord + Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, Tr: TraceReader+Clone+'static, - Tr::Key: Eq, Tr::Diff: Semigroup, FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, @@ -148,7 +144,7 @@ where ROut: Semigroup, Y: Fn(std::time::Instant, usize) -> bool + 'static, I: IntoIterator, - S: FnMut(&K, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static, + S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static, { // No need to block physical merging for this operator. arrangement.trace.set_physical_compaction(Antichain::new().borrow()); @@ -158,7 +154,7 @@ where let mut stash = HashMap::new(); let mut buffer = Vec::new(); - let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into()); + let exchange = Exchange::new(move |update: &((Tr::KeyOwned, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into()); // Stash for (time, diff) accumulation. let mut output_buffer = Vec::new(); @@ -216,8 +212,9 @@ where // Use TOTAL ORDER to allow the release of `time`. yielded = yielded || yield_function(timer, work); if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) { - cursor.seek_key(&storage, key.borrow()); - if cursor.get_key(&storage) == Some(key.borrow()) { + use differential_dataflow::trace::cursor::MyTrait; + cursor.seek_key(&storage, MyTrait::borrow_as(key)); + if cursor.get_key(&storage) == Some(MyTrait::borrow_as(key)) { while let Some(val2) = cursor.get_val(&storage) { cursor.map_times(&storage, |t, d| { if comparison(t, initial) { diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 9228d2d4d..89ad2fd39 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -22,23 +22,24 @@ pub fn lookup_map( mut arrangement: Arranged, key_selector: F, mut output_func: S, - supplied_key0: Tr::Key, - supplied_key1: Tr::Key, - supplied_key2: Tr::Key, + supplied_key0: Tr::KeyOwned, + supplied_key1: Tr::KeyOwned, + supplied_key2: Tr::KeyOwned, ) -> Collection where G: Scope, G::Timestamp: Lattice, Tr: TraceReader+Clone+'static, - Tr::Key: Ord+Hashable+Sized, - Tr::Val: Clone, + Tr::KeyOwned: Hashable, + // Tr::Key: Ord+Hashable+Sized, + // Tr::Val: Clone, Tr::Diff: Monoid+ExchangeData, - F: FnMut(&D, &mut Tr::Key)+Clone+'static, + F: FnMut(&D, &mut Tr::KeyOwned)+Clone+'static, D: ExchangeData, R: ExchangeData+Monoid, DOut: Clone+'static, ROut: Monoid, - S: FnMut(&D, &R, &Tr::Val, &Tr::Diff)->(DOut, ROut)+'static, + S: FnMut(&D, &R, Tr::Val<'_>, &Tr::Diff)->(DOut, ROut)+'static, { // No need to block physical merging for this operator. arrangement.trace.set_physical_compaction(Antichain::new().borrow()); @@ -51,14 +52,14 @@ where let mut buffer = Vec::new(); - let mut key: Tr::Key = supplied_key0; + let mut key: Tr::KeyOwned = supplied_key0; let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| { logic1(&update.0, &mut key); key.hashed().into() }); - let mut key1: Tr::Key = supplied_key1; - let mut key2: Tr::Key = supplied_key2; + let mut key1: Tr::KeyOwned = supplied_key1; + let mut key2: Tr::KeyOwned = supplied_key2; prefixes.inner.binary_frontier(&propose_stream, exchange, Pipeline, "LookupMap", move |_,_| move |input1, input2, output| { @@ -96,8 +97,9 @@ where for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() { if !input2.frontier.less_equal(time) { logic2(prefix, &mut key1); - cursor.seek_key(&storage, &key1); - if cursor.get_key(&storage) == Some(&key1) { + use differential_dataflow::trace::cursor::MyTrait; + cursor.seek_key(&storage, MyTrait::borrow_as(&key1)); + if cursor.get_key(&storage) == Some(MyTrait::borrow_as(&key1)) { while let Some(value) = cursor.get_val(&storage) { let mut count = Tr::Diff::zero(); cursor.map_times(&storage, |t, d| { diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index 6d6ed95f3..1434963f1 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -5,6 +5,7 @@ use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; +use differential_dataflow::trace::cursor::MyTrait; /// Proposes extensions to a prefix stream. /// @@ -18,22 +19,21 @@ pub fn propose( prefixes: &Collection, arrangement: Arranged, key_selector: F, -) -> Collection +) -> Collection where G: Scope, G::Timestamp: Lattice, Tr: TraceReader+Clone+'static, - Tr::Key: Ord+Hashable+Default, - Tr::Val: Clone, + Tr::KeyOwned: Hashable + Default, Tr::Diff: Monoid+Multiply+ExchangeData, - F: Fn(&P)->Tr::Key+Clone+'static, + F: Fn(&P)->Tr::KeyOwned+Clone+'static, P: ExchangeData, { crate::operators::lookup_map( prefixes, arrangement, - move |p: &P, k: &mut Tr::Key| { *k = key_selector(p); }, - |prefix, diff, value, sum| ((prefix.clone(), value.clone()), diff.clone().multiply(sum)), + move |p: &P, k: &mut Tr::KeyOwned | { *k = key_selector(p); }, + |prefix, diff, value, sum| ((prefix.clone(), value.into_owned()), diff.clone().multiply(sum)), Default::default(), Default::default(), Default::default(), @@ -49,22 +49,21 @@ pub fn propose_distinct( prefixes: &Collection, arrangement: Arranged, key_selector: F, -) -> Collection +) -> Collection where G: Scope, G::Timestamp: Lattice, Tr: TraceReader+Clone+'static, - Tr::Key: Ord+Hashable+Default, - Tr::Val: Clone, + Tr::KeyOwned: Hashable + Default, Tr::Diff: Monoid+Multiply+ExchangeData, - F: Fn(&P)->Tr::Key+Clone+'static, + F: Fn(&P)->Tr::KeyOwned+Clone+'static, P: ExchangeData, { crate::operators::lookup_map( prefixes, arrangement, - move |p: &P, k: &mut Tr::Key| { *k = key_selector(p); }, - |prefix, diff, value, _sum| ((prefix.clone(), value.clone()), diff.clone()), + move |p: &P, k: &mut Tr::KeyOwned| { *k = key_selector(p); }, + |prefix, diff, value, _sum| ((prefix.clone(), value.into_owned()), diff.clone()), Default::default(), Default::default(), Default::default(), diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index b41ea8687..2d1e8f371 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -21,7 +21,7 @@ pub fn validate( where G: Scope, G::Timestamp: Lattice, - Tr: TraceReader+Clone+'static, + Tr: TraceReader+Clone+'static, K: Ord+Hash+Clone+Default, V: ExchangeData+Hash+Default, Tr::Diff: Monoid+Multiply+ExchangeData, @@ -32,7 +32,7 @@ where extensions, arrangement, move |(pre,val),key| { *key = (key_selector(pre), val.clone()); }, - |(pre,val),r,&(),_| ((pre.clone(), val.clone()), r.clone()), + |(pre,val),r,_,_| ((pre.clone(), val.clone()), r.clone()), Default::default(), Default::default(), Default::default(),