Skip to content

Commit

Permalink
Add an option to skip aggregate gen. for wavefront (#25)
Browse files Browse the repository at this point in the history
In some systems we have no desire to ship aggregates to wavefront
for rollup. The long term ambition is to never ship aggregates,
done now for rough compatability with librato.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
blt authored Jul 6, 2016
1 parent 6d39c9b commit 83155fb
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 50 deletions.
6 changes: 6 additions & 0 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub fn factory(console: &bool,
tags: &str,
wavefront_host: &str,
wavefront_port: &u16,
wavefront_skip_aggrs: &bool,
librato_username: &str,
librato_token: &str,
librato_host: &str)
Expand All @@ -31,6 +32,7 @@ pub fn factory(console: &bool,
let wf_tags: String = tags.replace(",", " ");
backends.push(Box::new(wavefront::Wavefront::new(wavefront_host,
*wavefront_port,
*wavefront_skip_aggrs,
wf_tags)));
}
if *librato {
Expand Down Expand Up @@ -69,6 +71,7 @@ mod test {
"source=src",
"127.0.0.1",
&2878,
&false,
"username",
"token",
"http://librato.example.com/");
Expand All @@ -83,6 +86,7 @@ mod test {
"source=src,host=h,service=s",
"127.0.0.1",
&2878,
&false,
"username",
"token",
"http://librato.example.com/");
Expand All @@ -97,6 +101,7 @@ mod test {
"source=src",
"127.0.0.1",
&2878,
&false,
"username",
"token",
"http://librato.example.com/");
Expand All @@ -111,6 +116,7 @@ mod test {
"source=src",
"127.0.0.1",
&2878,
&false,
"username",
"token",
"http://librato.example.com/");
Expand Down
138 changes: 93 additions & 45 deletions src/backends/wavefront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use backend::Backend;
pub struct Wavefront {
addr: SocketAddrV4,
tags: String,
mk_aggrs: bool,
// The wavefront implementation keeps an aggregate, depricated, and ships
// exact points. The aggregate is 'aggrs' and the exact points is 'points'.
aggrs: Buckets,
Expand All @@ -24,12 +25,13 @@ impl Wavefront {
/// ```
/// let wave = Wavefront::new(host, port, source);
/// ```
pub fn new(host: &str, port: u16, tags: String) -> Wavefront {
pub fn new(host: &str, port: u16, skip_aggrs: bool, tags: String) -> Wavefront {
let ip = Ipv4Addr::from_str(host).unwrap();
let addr = SocketAddrV4::new(ip, port);
Wavefront {
addr: addr,
tags: tags,
mk_aggrs: !skip_aggrs,
aggrs: Buckets::new(),
points: Vec::new(),
}
Expand All @@ -44,51 +46,53 @@ impl Wavefront {
};
let mut stats = String::new();

for (key, value) in self.aggrs.counters().iter() {
write!(stats, "{} {} {} {}\n", key, value, start, self.tags).unwrap();
}
if self.mk_aggrs {
for (key, value) in self.aggrs.counters().iter() {
write!(stats, "{} {} {} {}\n", key, value, start, self.tags).unwrap();
}

for (key, value) in self.aggrs.gauges().iter() {
write!(stats, "{} {} {} {}\n", key, value, start, self.tags).unwrap();
}
for (key, value) in self.aggrs.gauges().iter() {
write!(stats, "{} {} {} {}\n", key, value, start, self.tags).unwrap();
}

for (key, value) in self.aggrs.histograms().iter() {
for tup in &[("min", 0.0),
("max", 1.0),
("50", 0.5),
("90", 0.9),
("99", 0.99),
("999", 0.999)] {
let stat: &str = tup.0;
let quant: f64 = tup.1;
write!(stats,
"{}.{} {} {} {}\n",
key,
stat,
value.query(quant).unwrap().1,
start,
self.tags)
.unwrap();
for (key, value) in self.aggrs.histograms().iter() {
for tup in &[("min", 0.0),
("max", 1.0),
("50", 0.5),
("90", 0.9),
("99", 0.99),
("999", 0.999)] {
let stat: &str = tup.0;
let quant: f64 = tup.1;
write!(stats,
"{}.{} {} {} {}\n",
key,
stat,
value.query(quant).unwrap().1,
start,
self.tags)
.unwrap();
}
}
}

for (key, value) in self.aggrs.timers().iter() {
for tup in &[("min", 0.0),
("max", 1.0),
("50", 0.5),
("90", 0.9),
("99", 0.99),
("999", 0.999)] {
let stat: &str = tup.0;
let quant: f64 = tup.1;
write!(stats,
"{}.{} {} {} {}\n",
key,
stat,
value.query(quant).unwrap().1,
start,
self.tags)
.unwrap();
for (key, value) in self.aggrs.timers().iter() {
for tup in &[("min", 0.0),
("max", 1.0),
("50", 0.5),
("90", 0.9),
("99", 0.99),
("999", 0.999)] {
let stat: &str = tup.0;
let quant: f64 = tup.1;
write!(stats,
"{}.{} {} {} {}\n",
key,
stat,
value.query(quant).unwrap().1,
start,
self.tags)
.unwrap();
}
}
}

Expand Down Expand Up @@ -116,7 +120,9 @@ impl Backend for Wavefront {
}

fn deliver(&mut self, point: Metric) {
self.aggrs.add(&point);
if self.mk_aggrs {
self.aggrs.add(&point);
}
self.points.push(point);
}
}
Expand All @@ -129,8 +135,8 @@ mod test {
use super::*;

#[test]
fn test_format_wavefront_buckets_no_timers() {
let mut wavefront = Wavefront::new("127.0.0.1", 2003, "source=test-src".to_string());
fn test_format_wavefront() {
let mut wavefront = Wavefront::new("127.0.0.1", 2003, false, "source=test-src".to_string());
let dt = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 12);
wavefront.deliver(Metric::new_with_source("test.counter",
1.0,
Expand Down Expand Up @@ -176,4 +182,46 @@ mod test {
assert_eq!(lines[11], "test.timer 1.101 645181811 source=test-src");
assert_eq!(lines[12], "test.timer 3.101 645181811 source=test-src");
}

#[test]
fn test_format_wavefront_skip_aggrs() {
let mut wavefront = Wavefront::new("127.0.0.1", 2003, true, "source=test-src".to_string());
let dt = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 12);
wavefront.deliver(Metric::new_with_source("test.counter",
1.0,
Some(dt),
MetricKind::Counter(1.0),
None));
wavefront.deliver(Metric::new_with_source("test.gauge",
3.211,
Some(dt),
MetricKind::Gauge,
None));
wavefront.deliver(Metric::new_with_source("test.timer",
12.101,
Some(dt),
MetricKind::Timer,
None));
wavefront.deliver(Metric::new_with_source("test.timer",
1.101,
Some(dt),
MetricKind::Timer,
None));
wavefront.deliver(Metric::new_with_source("test.timer",
3.101,
Some(dt),
MetricKind::Timer,
None));
let result = wavefront.format_stats(Some(10101));
let lines: Vec<&str> = result.lines().collect();

println!("{:?}", lines);
assert_eq!(5, lines.len());
assert_eq!(lines[0], "test.counter 1 645181811 source=test-src");
assert_eq!(lines[1], "test.gauge 3.211 645181811 source=test-src");
assert_eq!(lines[2], "test.timer 12.101 645181811 source=test-src");
assert_eq!(lines[3], "test.timer 1.101 645181811 source=test-src");
assert_eq!(lines[4], "test.timer 3.101 645181811 source=test-src");
}

}
13 changes: 8 additions & 5 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ Options:
2878].
--wavefront-host=<p> The host wavefront proxy is running on. [default: \
127.0.0.1].
--librato-username=<p> The librato username for authentication. [default: \
statsd].
--librato-host=<p> The librato host to report to. [default: \
https://metrics-api.librato.com/v1/metrics].
--librato-token=<p> The librato token for \
--wavefront-skip-aggrs Send aggregate metrics to wavefront (librato \
compatibility)
--librato-username=<p> The librato username for \
authentication. [default: statsd].
--librato-host=<p> The librato host to report to. \
[default: https://metrics-api.librato.com/v1/metrics].
--librato-token=<p> The librato \
token for authentication. [default: statsd].
--version
";

Expand All @@ -48,6 +50,7 @@ pub struct Args {
pub flag_tags: String,
pub flag_wavefront_port: u16,
pub flag_wavefront_host: String,
pub flag_wavefront_skip_aggrs: bool,
pub flag_librato_username: String,
pub flag_librato_token: String,
pub flag_librato_host: String,
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ fn main() {
&args.flag_tags,
&args.flag_wavefront_host,
&args.flag_wavefront_port,
&args.flag_wavefront_skip_aggrs,
&args.flag_librato_username,
&args.flag_librato_token,
&args.flag_librato_host);
Expand Down

0 comments on commit 83155fb

Please sign in to comment.