Skip to content

Commit

Permalink
Include count outputs with histograms / timers (#71)
Browse files Browse the repository at this point in the history
Previously we relied on the wavefront backend to accept all points
and that this would allow us to get percentile information in
addition to counts. That's not true. A previous commit put us back
on a good foot with regard to counts and this puts us in a good
place with regard to counts.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
blt authored Aug 16, 2016
1 parent 6d9a4ac commit a14d0f8
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 7 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Brian L. Troutwine <blt@postmates.com>"]
build = "build.rs"

[dependencies]
quantiles = "0.1.4"
quantiles = "0.1.5"
hyper = "0.9.6"
lru-cache = "0.0.7"
mime = "0.2.0"
Expand Down
48 changes: 47 additions & 1 deletion src/buckets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Buckets {
}
}

/// Adds a metric to the bucket storage.
/// Resets appropriate aggregates
///
/// # Examples
///
Expand All @@ -46,6 +46,31 @@ impl Buckets {
///
/// let metric = metric::Metric::FromStr("foo:1|c");
/// let mut bucket = Buckets::new();
/// let rname = Atom::from("foo");
///
/// assert_eq!(None, buckets.counters.get_mut(&rname));
///
/// bucket.add(metric);
/// assert_eq!(Some(&mut 1.0), buckets.counters.get_mut(&rname));
/// buckets.reset();
/// assert_eq!(Some(&mut 0.0), buckets.counters.get_mut(&rname));
/// ```
pub fn reset(&mut self) {
for (_, v) in self.counters.iter_mut() {
*v = 0.0;
}
}

/// Adds a metric to the bucket storage.
///
/// # Examples
/// ```
/// use buckets::Buckets;
/// use super::metric;
/// use std::str::FromStr;
///
/// let metric = metric::Metric::FromStr("foo:1|c");
/// let mut bucket = Buckets::new();
/// bucket.add(metric);
/// ```
pub fn add(&mut self, value: &Metric) {
Expand Down Expand Up @@ -137,6 +162,27 @@ mod test {
assert_eq!(0, buckets.gauges().len());
}

#[test]
fn test_add_counter_metric_reset() {
let mut buckets = Buckets::new();
let mname = Atom::from("some.metric");
let metric = Metric::new(mname, 1.0, MetricKind::Counter(1.0));
buckets.add(&metric);

let rmname = Atom::from("some.metric");
assert!(buckets.counters.contains_key(&rmname),
"Should contain the metric key");
assert_eq!(Some(&mut 1.0), buckets.counters.get_mut(&rmname));

// Increment counter
buckets.add(&metric);
assert_eq!(Some(&mut 2.0), buckets.counters.get_mut(&rmname));
assert_eq!(1, buckets.counters().len());

buckets.reset();
assert_eq!(Some(&mut 0.0), buckets.counters.get_mut(&rmname));
}

#[test]
fn test_add_counter_metric_sampled() {
let mut buckets = Buckets::new();
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/librato.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl Librato {

for (key, value) in self.aggrs.counters().iter() {
counters.push(LCounter {
name: key.as_ref().to_string(),
name: format!("{}", key),
value: *value,
});
}
Expand Down
12 changes: 10 additions & 2 deletions src/sinks/wavefront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ impl Wavefront {
self.tags)
.unwrap();
}
let count = value.count();
write!(stats, "{}.count {} {} {}\n", key, count, start, self.tags).unwrap();
}

for (key, value) in self.aggrs.timers().iter() {
Expand Down Expand Up @@ -108,6 +110,8 @@ impl Wavefront {
self.tags)
.unwrap();
}
let count = value.count();
write!(stats, "{}.count {} {} {}\n", key, count, start, self.tags).unwrap();
}

stats
Expand All @@ -122,7 +126,10 @@ impl Sink for Wavefront {
let stats = self.format_stats(None);
debug!("wavefront - {}", stats);
match stream.write(stats.as_bytes()) {
Ok(n) => trace!("wrote {} bytes", n),
Ok(n) => {
trace!("wrote {} bytes", n);
self.aggrs.reset();
}
Err(e) => debug!("WF failed to write: {}", e),
}
}
Expand Down Expand Up @@ -173,7 +180,7 @@ mod test {
let lines: Vec<&str> = result.lines().collect();

println!("{:?}", lines);
assert_eq!(15, lines.len());
assert_eq!(16, lines.len());
assert_eq!(lines[0], "test.counter 1 10101 source=test-src");
assert_eq!(lines[1], "test.gauge 3.211 10101 source=test-src");
assert_eq!(lines[2], "test.timer.min 1.101 10101 source=test-src");
Expand All @@ -189,5 +196,6 @@ mod test {
assert_eq!(lines[12], "test.timer.98 12.101 10101 source=test-src");
assert_eq!(lines[13], "test.timer.99 12.101 10101 source=test-src");
assert_eq!(lines[14], "test.timer.999 12.101 10101 source=test-src");
assert_eq!(lines[15], "test.timer.count 3 10101 source=test-src");
}
}

0 comments on commit a14d0f8

Please sign in to comment.