Skip to content

Commit

Permalink
Update dogs^3
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Dec 2, 2023
1 parent 44d110a commit d8c855e
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 47 deletions.
10 changes: 5 additions & 5 deletions dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ pub fn count<G, Tr, R, F, P>(
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Val=(), Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr: TraceReader<ValOwned=(), Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr::KeyOwned: Hashable + Default,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &(P,usize,usize), k: &mut Tr::Key| { *k = key_selector(&p.0); },
move |(p,c,i), r, &(), s| {
move |p: &(P,usize,usize), k: &mut Tr::KeyOwned| { *k = key_selector(&p.0); },
move |(p,c,i), r, _, s| {
let s = *s as usize;
if *c < s { ((p.clone(), *c, *i), r.clone()) }
else { ((p.clone(), s, index), r.clone()) }
Expand Down
29 changes: 13 additions & 16 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@
//! of logical compaction, which should not be done in a way that prevents
//! the correct determination of the total order comparison.
use std::borrow::Borrow;
use std::collections::HashMap;
use std::ops::Mul;


use timely::dataflow::Scope;
use timely::dataflow::channels::pact::{Pipeline, Exchange};
use timely::dataflow::operators::Operator;
Expand Down Expand Up @@ -70,8 +68,8 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates};
/// Notice that the time is hoisted up into data. The expectation is that
/// once out of the "delta flow region", the updates will be `delay`d to the
/// times specified in the payloads.
pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (K, V, G::Timestamp), R>,
pub fn half_join<G, V, R, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (Tr::KeyOwned, V, G::Timestamp), R>,
arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
Expand All @@ -80,20 +78,19 @@ pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
where
G: Scope,
G::Timestamp: Lattice,
K: Ord + Hashable + ExchangeData + Borrow<Tr::Key>,
Tr::KeyOwned: Ord + Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Eq,
Tr::Diff: Semigroup,
R: Mul<Tr::Diff>,
<R as Mul<Tr::Diff>>::Output: Semigroup,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
S: FnMut(&K, &V, &Tr::Val)->DOut+'static,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>)->DOut+'static,
{
let output_func = move |k: &K, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let output_func = move |k: &Tr::KeyOwned, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let diff = diff1.clone() * diff2.clone();
let dout = (output_func(k, v1, v2), time.clone());
Some((dout, initial.clone(), diff))
Expand Down Expand Up @@ -125,8 +122,8 @@ where
/// yield control, as a function of the elapsed time and the number of matched
/// records. Note this is not the number of *output* records, owing mainly to
/// the number of matched records being easiest to record with low overhead.
pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (K, V, G::Timestamp), R>,
pub fn half_join_internal_unsafe<G, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (Tr::KeyOwned, V, G::Timestamp), R>,
mut arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
Expand All @@ -136,19 +133,18 @@ pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
where
G: Scope,
G::Timestamp: Lattice,
K: Ord + Hashable + ExchangeData + std::borrow::Borrow<Tr::Key>,
Tr::KeyOwned: Ord + Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Eq,
Tr::Diff: Semigroup,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
ROut: Semigroup,
Y: Fn(std::time::Instant, usize) -> bool + 'static,
I: IntoIterator<Item=(DOut, G::Timestamp, ROut)>,
S: FnMut(&K, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand All @@ -158,7 +154,7 @@ where
let mut stash = HashMap::new();
let mut buffer = Vec::new();

let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());
let exchange = Exchange::new(move |update: &((Tr::KeyOwned, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());

// Stash for (time, diff) accumulation.
let mut output_buffer = Vec::new();
Expand Down Expand Up @@ -216,8 +212,9 @@ where
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) {
cursor.seek_key(&storage, key.borrow());
if cursor.get_key(&storage) == Some(key.borrow()) {
use differential_dataflow::trace::cursor::MyTrait;
cursor.seek_key(&storage, MyTrait::borrow_as(key));
if cursor.get_key(&storage) == Some(MyTrait::borrow_as(key)) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
Expand Down
26 changes: 14 additions & 12 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,24 @@ pub fn lookup_map<G, D, R, Tr, F, DOut, ROut, S>(
mut arrangement: Arranged<G, Tr>,
key_selector: F,
mut output_func: S,
supplied_key0: Tr::Key,
supplied_key1: Tr::Key,
supplied_key2: Tr::Key,
supplied_key0: Tr::KeyOwned,
supplied_key1: Tr::KeyOwned,
supplied_key2: Tr::KeyOwned,
) -> Collection<G, DOut, ROut>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Sized,
Tr::Val: Clone,
Tr::KeyOwned: Hashable,
// Tr::Key: Ord+Hashable+Sized,
// Tr::Val: Clone,
Tr::Diff: Monoid+ExchangeData,
F: FnMut(&D, &mut Tr::Key)+Clone+'static,
F: FnMut(&D, &mut Tr::KeyOwned)+Clone+'static,
D: ExchangeData,
R: ExchangeData+Monoid,
DOut: Clone+'static,
ROut: Monoid,
S: FnMut(&D, &R, &Tr::Val, &Tr::Diff)->(DOut, ROut)+'static,
S: FnMut(&D, &R, Tr::Val<'_>, &Tr::Diff)->(DOut, ROut)+'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand All @@ -51,14 +52,14 @@ where

let mut buffer = Vec::new();

let mut key: Tr::Key = supplied_key0;
let mut key: Tr::KeyOwned = supplied_key0;
let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| {
logic1(&update.0, &mut key);
key.hashed().into()
});

let mut key1: Tr::Key = supplied_key1;
let mut key2: Tr::Key = supplied_key2;
let mut key1: Tr::KeyOwned = supplied_key1;
let mut key2: Tr::KeyOwned = supplied_key2;

prefixes.inner.binary_frontier(&propose_stream, exchange, Pipeline, "LookupMap", move |_,_| move |input1, input2, output| {

Expand Down Expand Up @@ -96,8 +97,9 @@ where
for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() {
if !input2.frontier.less_equal(time) {
logic2(prefix, &mut key1);
cursor.seek_key(&storage, &key1);
if cursor.get_key(&storage) == Some(&key1) {
use differential_dataflow::trace::cursor::MyTrait;
cursor.seek_key(&storage, MyTrait::borrow_as(&key1));
if cursor.get_key(&storage) == Some(MyTrait::borrow_as(&key1)) {
while let Some(value) = cursor.get_val(&storage) {
let mut count = Tr::Diff::zero();
cursor.map_times(&storage, |t, d| {
Expand Down
23 changes: 11 additions & 12 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::MyTrait;

/// Proposes extensions to a prefix stream.
///
Expand All @@ -18,22 +19,21 @@ pub fn propose<G, Tr, F, P>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
) -> Collection<G, (P, Tr::Val), Tr::Diff>
) -> Collection<G, (P, Tr::ValOwned), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr::Val: Clone,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::Key| { *k = key_selector(p); },
|prefix, diff, value, sum| ((prefix.clone(), value.clone()), diff.clone().multiply(sum)),
move |p: &P, k: &mut Tr::KeyOwned | { *k = key_selector(p); },
|prefix, diff, value, sum| ((prefix.clone(), value.into_owned()), diff.clone().multiply(sum)),
Default::default(),
Default::default(),
Default::default(),
Expand All @@ -49,22 +49,21 @@ pub fn propose_distinct<G, Tr, F, P>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
) -> Collection<G, (P, Tr::Val), Tr::Diff>
) -> Collection<G, (P, Tr::ValOwned), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr::Val: Clone,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::Key| { *k = key_selector(p); },
|prefix, diff, value, _sum| ((prefix.clone(), value.clone()), diff.clone()),
move |p: &P, k: &mut Tr::KeyOwned| { *k = key_selector(p); },
|prefix, diff, value, _sum| ((prefix.clone(), value.into_owned()), diff.clone()),
Default::default(),
Default::default(),
Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions dogsdogsdogs/src/operators/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub fn validate<G, K, V, Tr, F, P>(
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Key=(K,V), Val=(), Time=G::Timestamp>+Clone+'static,
Tr: TraceReader<KeyOwned=(K,V), ValOwned=(), Time=G::Timestamp>+Clone+'static,
K: Ord+Hash+Clone+Default,
V: ExchangeData+Hash+Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
Expand All @@ -32,7 +32,7 @@ where
extensions,
arrangement,
move |(pre,val),key| { *key = (key_selector(pre), val.clone()); },
|(pre,val),r,&(),_| ((pre.clone(), val.clone()), r.clone()),
|(pre,val),r,_,_| ((pre.clone(), val.clone()), r.clone()),
Default::default(),
Default::default(),
Default::default(),
Expand Down

0 comments on commit d8c855e

Please sign in to comment.