Skip to content

Commit

Permalink
Introduce Telemetry override for 'count' and 'sample_sum' (#354)
Browse files Browse the repository at this point in the history
* Introduce Telemetry override for 'count' and 'sample_sum'

In some rare cases -- like in the prometheus sink -- it's necessary
to fake the count and sample_sum of a telemetry. Say, if you're
windowing a certain troublesome kind of aggregation. Well, that's
what this commit does: it lets you fake those two things.

I expect the tests are broken now. I'll work those tomorrow.

This work via feedback from @dparton.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Catch the tests up to Prometheus sink changes

This commit catches our tests up to the changes made to Telemetry,
in particular with regard to the overriding of sum and count
for some kinds of Telemetry.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
blt authored Dec 6, 2017
1 parent 071328c commit 86f753c
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 12 deletions.
50 changes: 46 additions & 4 deletions src/metric/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub struct SoftTelemetry {
timestamp: Option<i64>,
tags: Option<sync::Arc<TagMap>>,
persist: Option<bool>,
override_count: Option<u64>,
override_sample_sum: Option<f64>,
}

/// A Telemetry is a name, tagmap, timestamp and aggregated, point-in-time
Expand All @@ -74,6 +76,8 @@ pub struct Telemetry {
pub tags: sync::Arc<TagMap>,
/// The time of Telemetry, measured in seconds.
pub timestamp: i64,
override_count: Option<u64>,
override_sample_sum: Option<f64>,
}

impl Add for Value {
Expand Down Expand Up @@ -243,11 +247,25 @@ impl Default for Telemetry {
persist: false,
tags: sync::Arc::new(TagMap::default()),
timestamp: time::now(),
override_sample_sum: None,
override_count: None,
}
}
}

impl SoftTelemetry {
/// Set the override sample sum
pub fn sample_sum(mut self, sum: f64) -> SoftTelemetry {
self.override_sample_sum = Some(sum);
self
}

/// Set the override count
pub fn count(mut self, count: u64) -> SoftTelemetry {
self.override_count = Some(count);
self
}

/// Set the name of the Telemetry
///
/// The likelyhood is that there will be many Telemetry with the same
Expand Down Expand Up @@ -389,6 +407,8 @@ impl SoftTelemetry {
persist: persist,
tags: tags,
timestamp: timestamp,
override_count: self.override_count,
override_sample_sum: self.override_sample_sum,
})
}
AggregationMethod::Histogram => {
Expand All @@ -415,6 +435,8 @@ impl SoftTelemetry {
persist: persist,
tags: tags,
timestamp: timestamp,
override_sample_sum: None,
override_count: None,
})
}
AggregationMethod::Set => {
Expand All @@ -435,6 +457,8 @@ impl SoftTelemetry {
persist: persist,
tags: tags,
timestamp: timestamp,
override_sample_sum: None,
override_count: None,
})
}
AggregationMethod::Sum => {
Expand All @@ -455,6 +479,8 @@ impl SoftTelemetry {
persist: persist,
tags: tags,
timestamp: timestamp,
override_sample_sum: None,
override_count: None,
})
}
}
Expand Down Expand Up @@ -501,6 +527,8 @@ impl Telemetry {
timestamp: None,
tags: None,
persist: None,
override_sample_sum: None,
override_count: None,
}
}

Expand All @@ -522,6 +550,8 @@ impl Telemetry {
timestamp: Some(self.timestamp),
tags: Some(self.tags),
persist: Some(self.persist),
override_count: None,
override_sample_sum: None,
}
}

Expand Down Expand Up @@ -637,6 +667,8 @@ impl Telemetry {
/// The inserted value will be subject to the Telemetry's aggregation
/// method.
pub fn insert(mut self, value: f64) -> Telemetry {
assert!(self.override_sample_sum.is_none());
assert!(self.override_count.is_none());
match self.value {
Some(Value::Set(_)) => {
self.value = Some(Value::Set(value));
Expand Down Expand Up @@ -696,16 +728,26 @@ impl Telemetry {
match self.value {
Some(Value::Set(_)) | Some(Value::Sum(_)) | None => None,
Some(Value::Histogram(ref histo)) => histo.sum(),
Some(Value::Quantiles(ref ckms)) => ckms.sum(),
Some(Value::Quantiles(ref ckms)) => {
if self.override_sample_sum.is_some() {
self.override_sample_sum
} else {
ckms.sum()
}
}
}
}

/// Return the total count of Telemetry aggregated into this Telemetry.
pub fn count(&self) -> usize {
if let Some(ref v) = self.value {
v.count()
if let Some(cnt) = self.override_count {
cnt as usize
} else {
0
if let Some(ref v) = self.value {
v.count()
} else {
0
}
}
}

Expand Down
47 changes: 39 additions & 8 deletions src/sink/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ enum Accumulator {
Perpetual(metric::Telemetry),
Windowed {
cap: usize,
sum: f64,
count: u64,
samples: Vec<metric::Telemetry>,
},
}
Expand All @@ -110,13 +112,15 @@ impl Accumulator {
}

#[cfg(test)]
pub fn total_samples(&self) -> usize {
pub fn total_stored_samples(&self) -> usize {
match *self {
Accumulator::Perpetual(_) => 1,
Accumulator::Windowed {
cap: _,
samples: ref s,
} => s.len(),
ref samples,
sum: _,
count: _,
} => samples.len(),
}
}

Expand All @@ -125,9 +129,28 @@ impl Accumulator {
Accumulator::Perpetual(ref mut t) => *t += telem,
Accumulator::Windowed {
cap,
ref mut sum,
ref mut count,
ref mut samples,
} => {
use std::u64;
assert_eq!(telem.kind(), AggregationMethod::Summarize);
// u64::wrapping_add makes a new u64. We need this to be
// in-place. Oops!
if (u64::MAX - *count) <= 1 {
*count = 1;
} else {
*count += 1;
}
let val = telem.query(1.0).unwrap();
// There's no wrapping_add for f64. Since it's rude to crash
// cernan because we've been summing for too long we knock
// together our own wrap.
if (f64::MAX - val) <= *sum {
*sum = val - (f64::MAX - *sum);
} else {
*sum += val;
}
match samples
.binary_search_by(|probe| probe.timestamp.cmp(&telem.timestamp))
{
Expand Down Expand Up @@ -199,7 +222,7 @@ impl PrometheusAggr {
let accum = self.values.index(hsh_idx).clone();
match accum {
Accumulator::Perpetual(t) => Some(t),
Accumulator::Windowed { cap: _, samples } => {
Accumulator::Windowed { cap: _, samples, sum: _, count: _ } => {
let mut start = samples[0].clone();
for t in &samples[1..] {
start += t.clone();
Expand Down Expand Up @@ -257,9 +280,12 @@ impl PrometheusAggr {
metric::AggregationMethod::Summarize => {
let mut samples =
Vec::with_capacity(self.capacity_in_seconds);
let sum = telem.query(1.0).unwrap();
samples.push(telem);
Accumulator::Windowed {
cap: self.capacity_in_seconds,
count: 1,
sum: sum,
samples: samples,
}
}
Expand Down Expand Up @@ -304,19 +330,19 @@ impl<'a> Iterator for Iter<'a> {
self.idx += 1;
return Some(t.clone());
}
Accumulator::Windowed { ref samples, .. } => {
Accumulator::Windowed { ref samples, count, sum, .. } => {
self.idx += 1;
match samples.len() {
0 => unreachable!(),
1 => {
return Some(samples[0].clone());
return Some(samples[0].clone().thaw().sample_sum(sum).count(count).harden().unwrap());
}
_ => {
let mut start = samples[0].clone();
for t in &samples[1..] {
start += t.clone();
}
return Some(start);
return Some(start.thaw().sample_sum(sum).count(count).harden().unwrap());
}
}
}
Expand Down Expand Up @@ -769,10 +795,13 @@ mod test {
metric::AggregationMethod::Summarize => {
let mut samples =
Vec::with_capacity(capacity_in_seconds);
let val = telem.query(1.0).unwrap();
samples.push(telem);
Accumulator::Windowed {
cap: capacity_in_seconds,
samples: samples,
count: 1,
sum: val,
}
}
};
Expand All @@ -794,6 +823,8 @@ mod test {
let mut windowed = Accumulator::Windowed {
cap: cap,
samples: Vec::new(),
sum: 0.0,
count: 0,
};

for t in telems.into_iter() {
Expand All @@ -803,7 +834,7 @@ mod test {
windowed.insert(t);
}

assert!(windowed.total_samples() <= cap);
assert!(windowed.total_stored_samples() <= cap);

TestResult::passed()
}
Expand Down

0 comments on commit 86f753c

Please sign in to comment.