From dd0093c87ace80c1456b9c092b601966a18be281 Mon Sep 17 00:00:00 2001 From: Ivan Glushkov Date: Tue, 7 Mar 2017 03:19:23 +0400 Subject: [PATCH] Flush interval3 (#213) * Have index in FlushTimer event * Fix test to use TimerFlush with index * ignore tags and vscode * Turn off debugging prints * will not introduce a new global constant here * Remove trailing whitespaces * Apply diff to config * Apply other commits from flush-interval * Last part * Fix problems * compiles * flush interval in all the sinks * removed flush_interval from null sink * Round out the flush-intervals work This commit pushes the flush intervals work that Ivan's been doing over the edge to completion. In particular, we now support: * per-sink configurable flush_intervals, plus a global default * guaranteed receive-once timer ticks The later is done by including an every-increasing value into TimerFlushes which allow duplicates -- because of paths in routing -- to be scrubbed out and ignored. An example configuration is added at examples/configs/counting-example.toml to let users fiddle with flush intervals. keep_count.lua has been promoted to help in that effort. This resolves #196. This resolves #74. Signed-off-by: Brian L. Troutwine * Remove the configurable interval from FlushTimer It occurred to me that explaining the difference of the global flush-timer as anything other than a default for sinks was confusing and that I might as well make FlushTimer tick once per second, hard-coded. Turns out that had been done by Ivan already so I just kindly smoothed things out. Signed-off-by: Brian L. Troutwine --- .gitignore | 2 + examples/configs/counting-example.toml | 23 ++++++++++ examples/scripts/keep_count.lua | 15 +++++++ src/bin/cernan.rs | 59 +++++++++++++++++-------- src/config.rs | 60 ++++++++++++++++++++++++-- src/filter/programmable_filter.rs | 7 ++- src/metric/event.rs | 2 +- src/metric/telemetry.rs | 2 +- src/sink/console.rs | 18 ++++++-- src/sink/firehose.rs | 7 +++ src/sink/influxdb.rs | 8 ++++ src/sink/mod.rs | 13 +++++- src/sink/native.rs | 20 ++++++++- src/sink/null.rs | 5 +++ src/sink/prometheus.rs | 4 ++ src/sink/wavefront.rs | 8 ++++ src/source/flush.rs | 28 +++++++----- tests/programmable_filter.rs | 13 +++--- 18 files changed, 248 insertions(+), 46 deletions(-) create mode 100644 examples/configs/counting-example.toml create mode 100644 examples/scripts/keep_count.lua diff --git a/.gitignore b/.gitignore index 3ceb7cc0..eea842c5 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ target/ src/metrics/statsd.rs src/metrics/graphite.rs *.bk +/tags +/.vscode diff --git a/examples/configs/counting-example.toml b/examples/configs/counting-example.toml new file mode 100644 index 00000000..1b89cc41 --- /dev/null +++ b/examples/configs/counting-example.toml @@ -0,0 +1,23 @@ +data-directory = "data/" +scripts-directory = "examples/scripts/" + +flush-interval = 1 + +[tags] +source = "cernan" + +[sources] + [sources.statsd.primary] + enabled = true + port = 8125 + forwards = ["filters.keep_count"] + +[filters] + [filters.keep_count] + script = "keep_count.lua" + forwards = ["sinks.console"] + +[sinks] + [sinks.console] + bin_width = 1 + flush_interval = 2 diff --git a/examples/scripts/keep_count.lua b/examples/scripts/keep_count.lua new file mode 100644 index 00000000..41221917 --- /dev/null +++ b/examples/scripts/keep_count.lua @@ -0,0 +1,15 @@ +count_per_tick = 0 + +function process_metric(pyld) + count_per_tick = count_per_tick + 1 +end + +function process_log(pyld) + count_per_tick = count_per_tick + 1 +end + +function tick(pyld) + payload.push_metric(pyld, "count_per_tick", count_per_tick) + payload.push_log(pyld, string.format("count_per_tick: %s", count_per_tick)) + count_per_tick = 0 +end diff --git a/src/bin/cernan.rs b/src/bin/cernan.rs index 0b6e6981..85a63230 100644 --- a/src/bin/cernan.rs +++ b/src/bin/cernan.rs @@ -11,16 +11,23 @@ use cernan::sink::{FirehoseConfig, Sink}; use cernan::source::Source; use cernan::util; use chrono::UTC; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::process; use std::str; use std::thread; fn populate_forwards(sends: &mut util::Channel, + mut top_level_forwards: Option<&mut HashSet>, forwards: &[String], config_path: &str, available_sends: &HashMap>) { for fwd in forwards { + match top_level_forwards.as_mut() { + Some(tlf) => { + let _ = (*tlf).insert(fwd.clone()); + } + None => {} + } match available_sends.get(fwd) { Some(snd) => { sends.push(snd.clone()); @@ -70,51 +77,47 @@ fn main() { info!("cernan - {}", args.version); let mut joins = Vec::new(); let mut sends: HashMap> = HashMap::new(); + let mut flush_sends = HashSet::new(); // SINKS // - let mut flush_sends = Vec::new(); if let Some(config) = args.console { let (console_send, console_recv) = hopper::channel(&config.config_path, &args.data_directory).unwrap(); - flush_sends.push(console_send.clone()); sends.insert(config.config_path.clone(), console_send); joins.push(thread::spawn(move || { - cernan::sink::Console::new(config).run(console_recv); - })); + cernan::sink::Console::new(config).run(console_recv); + })); } if let Some(config) = args.null { let (null_send, null_recv) = hopper::channel(&config.config_path, &args.data_directory) .unwrap(); - flush_sends.push(null_send.clone()); sends.insert(config.config_path.clone(), null_send); joins.push(thread::spawn(move || { cernan::sink::Null::new(config).run(null_recv); })); } if let Some(config) = args.wavefront { let (wf_send, wf_recv) = hopper::channel(&config.config_path, &args.data_directory) .unwrap(); - flush_sends.push(wf_send.clone()); sends.insert(config.config_path.clone(), wf_send); joins.push(thread::spawn(move || { cernan::sink::Wavefront::new(config).run(wf_recv); })); } if let Some(config) = args.prometheus { - let (wf_send, wf_recv) = hopper::channel(&config.config_path, &args.data_directory) - .unwrap(); - flush_sends.push(wf_send.clone()); - sends.insert(config.config_path.clone(), wf_send); - joins.push(thread::spawn(move || { cernan::sink::Prometheus::new(config).run(wf_recv); })); + let (prometheus_send, prometheus_recv) = + hopper::channel(&config.config_path, &args.data_directory).unwrap(); + sends.insert(config.config_path.clone(), prometheus_send); + joins.push(thread::spawn(move || { + cernan::sink::Prometheus::new(config).run(prometheus_recv); + })); } if let Some(config) = args.influxdb { let (flx_send, flx_recv) = hopper::channel(&config.config_path, &args.data_directory) .unwrap(); - flush_sends.push(flx_send.clone()); sends.insert(config.config_path.clone(), flx_send); joins.push(thread::spawn(move || { cernan::sink::InfluxDB::new(config).run(flx_recv); })); } if let Some(config) = args.native_sink_config { let (cernan_send, cernan_recv) = hopper::channel(&config.config_path, &args.data_directory) .unwrap(); - flush_sends.push(cernan_send.clone()); sends.insert(config.config_path.clone(), cernan_send); joins.push(thread::spawn(move || { cernan::sink::Native::new(config).run(cernan_recv); })); } @@ -122,11 +125,11 @@ fn main() { let f: FirehoseConfig = config.clone(); let (firehose_send, firehose_recv) = hopper::channel(&config.config_path, &args.data_directory).unwrap(); - flush_sends.push(firehose_send.clone()); sends.insert(config.config_path.clone(), firehose_send); joins.push(thread::spawn(move || { cernan::sink::Firehose::new(f).run(firehose_recv); })); } + // FILTERS // for config in args.filters.values() { @@ -136,6 +139,7 @@ fn main() { sends.insert(config.config_path.clone(), flt_send); let mut downstream_sends = Vec::new(); populate_forwards(&mut downstream_sends, + None, &config.forwards, &config.config_path, &sends); @@ -149,6 +153,7 @@ fn main() { if let Some(config) = args.native_server_config { let mut native_server_send = Vec::new(); populate_forwards(&mut native_server_send, + Some(&mut flush_sends), &config.forwards, &config.config_path, &sends); @@ -160,6 +165,7 @@ fn main() { let c = (*config).clone(); let mut statsd_sends = Vec::new(); populate_forwards(&mut statsd_sends, + Some(&mut flush_sends), &config.forwards, &config.config_path, &sends); @@ -170,6 +176,7 @@ fn main() { let c = (*config).clone(); let mut graphite_sends = Vec::new(); populate_forwards(&mut graphite_sends, + Some(&mut flush_sends), &config.forwards, &config.config_path, &sends); @@ -180,7 +187,11 @@ fn main() { for config in args.files { let mut fp_sends = Vec::new(); - populate_forwards(&mut fp_sends, &config.forwards, &config.config_path, &sends); + populate_forwards(&mut fp_sends, + Some(&mut flush_sends), + &config.forwards, + &config.config_path, + &sends); joins.push(thread::spawn(move || { cernan::source::FileServer::new(fp_sends, config).run(); })); @@ -189,9 +200,21 @@ fn main() { // BACKGROUND // - let flush_interval = args.flush_interval; joins.push(thread::spawn(move || { - cernan::source::FlushTimer::new(flush_sends, flush_interval).run(); + let mut flush_channels = Vec::new(); + for destination in flush_sends.iter() { + match sends.get(destination) { + Some(snd) => { + flush_channels.push(snd.clone()); + } + None => { + error!("Unable to fulfill configured top-level flush to {}", + destination); + process::exit(0); + } + } + } + cernan::source::FlushTimer::new(flush_channels).run(); })); joins.push(thread::spawn(move || { cernan::time::update_time(); })); diff --git a/src/config.rs b/src/config.rs index 7ed9f2d8..21931a40 100644 --- a/src/config.rs +++ b/src/config.rs @@ -79,6 +79,7 @@ pub fn parse_args() -> Args { } // We read from CLI arguments None => { + let flush_interval = 60; let wavefront = if args.is_present("wavefront") { let percentiles = vec![("min".to_string(), 0.0), ("max".to_string(), 1.0), @@ -100,6 +101,7 @@ pub fn parse_args() -> Args { config_path: "sinks.wavefront".to_string(), percentiles: percentiles, tags: Default::default(), + flush_interval: flush_interval, }) } else { None @@ -110,7 +112,7 @@ pub fn parse_args() -> Args { None }; let console = if args.is_present("console") { - Some(ConsoleConfig::new("sinks.console".to_string())) + Some(ConsoleConfig::new("sinks.console".to_string(), flush_interval)) } else { None }; @@ -144,14 +146,13 @@ pub fn parse_args() -> Args { graphites: graphites, native_server_config: None, native_sink_config: None, - flush_interval: u64::from_str(args.value_of("flush-interval").unwrap()) - .expect("flush-interval must be an integer"), console: console, null: null, wavefront: wavefront, influxdb: None, prometheus: None, firehosen: Vec::default(), + flush_interval: flush_interval, files: Default::default(), filters: Default::default(), verbose: verb, @@ -183,6 +184,11 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args { None => TagMap::default(), }; + let global_flush_interval = value.lookup("flush-interval") + .unwrap_or(&Value::Integer(60)) + .as_integer() + .unwrap(); + let null = if value.lookup("null").or(value.lookup("sinks.null")).is_some() { Some(NullConfig { config_path: "sinks.null".to_string() }) } else { @@ -197,6 +203,12 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args { .as_integer() .expect("could not parse sinks.console.bin_width"), config_path: "sinks.console".to_string(), + flush_interval: value.lookup("console.flush_interval") + .or(value.lookup("sinks.console.flush_interval")) + .unwrap_or(&Value::Integer(global_flush_interval)) + .as_integer() + .map(|i| i as u64) + .unwrap(), }) } else { None @@ -262,6 +274,12 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args { config_path: "sinks.wavefront".to_string(), percentiles: percentiles, tags: tags.clone(), + flush_interval: value.lookup("wavefront.flush_interval") + .or(value.lookup("sinks.wavefront.flush_interval")) + .unwrap_or(&Value::Integer(global_flush_interval)) + .as_integer() + .map(|i| i as u64) + .unwrap(), }) } else { None @@ -288,6 +306,12 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args { .expect("could not parse sinks.influxdb.bin_width"), config_path: "sinks.influxdb".to_string(), tags: tags.clone(), + flush_interval: value.lookup("influxdb.flush_interval") + .or(value.lookup("sinks.influxdb.flush_interval")) + .unwrap_or(&Value::Integer(global_flush_interval)) + .as_integer() + .map(|i| i as u64) + .unwrap(), }) } else { None @@ -331,6 +355,11 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args { .map(|s| s.to_string()) .expect("could not parse sinks.native.host"), config_path: "sinks.native".to_string(), + flush_interval: value.lookup("sinks.native.flush_interval") + .unwrap_or(&Value::Integer(global_flush_interval)) + .as_integer() + .map(|i| i as u64) + .unwrap(), }) } else { None @@ -463,11 +492,17 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args { "us-west-2" | _ => Region::UsWest2, }); let delivery_stream = ds.unwrap().to_string(); + let flush_interval = val.lookup("flush_interval") + .unwrap_or(&Value::Integer(global_flush_interval)) + .as_integer() + .map(|i| i as u64) + .unwrap(); firehosen.push(FirehoseConfig { delivery_stream: delivery_stream.clone(), batch_size: bs, region: r.unwrap(), config_path: format!("sinks.firehose.{}", delivery_stream), + flush_interval: flush_interval, }) } None => continue, @@ -502,11 +537,17 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args { "us-west-1" => Region::UsWest1, "us-west-2" | _ => Region::UsWest2, }); + let flush_interval = tbl.lookup("flush_interval") + .unwrap_or(&Value::Integer(global_flush_interval)) + .as_integer() + .map(|i| i as u64) + .unwrap(); firehosen.push(FirehoseConfig { delivery_stream: ds.unwrap().to_string(), batch_size: bs, region: r.unwrap(), config_path: format!("sinks.firehose.{}", key), + flush_interval: flush_interval, }) } None => continue, @@ -797,6 +838,7 @@ scripts-directory = "/foo/bar" [sinks.native] host = "foo.example.com" port = 1972 + flush_interval = 120 "# .to_string(); @@ -806,6 +848,7 @@ scripts-directory = "/foo/bar" let native_sink_config = args.native_sink_config.unwrap(); assert_eq!(native_sink_config.host, String::from("foo.example.com")); assert_eq!(native_sink_config.port, 1972); + assert_eq!(native_sink_config.flush_interval, 120); } #[test] @@ -1078,6 +1121,7 @@ bin_width = 9 port = 3131 host = "example.com" bin_width = 9 + flush_interval = 15 "# .to_string(); @@ -1088,6 +1132,7 @@ bin_width = 9 assert_eq!(wavefront.host, String::from("example.com")); assert_eq!(wavefront.port, 3131); assert_eq!(wavefront.bin_width, 9); + assert_eq!(wavefront.flush_interval, 15); } #[test] @@ -1144,6 +1189,7 @@ bin_width = 9 port = 3131 host = "example.com" bin_width = 9 + flush_interval = 70 "# .to_string(); @@ -1154,6 +1200,7 @@ bin_width = 9 assert_eq!(influxdb.host, String::from("example.com")); assert_eq!(influxdb.port, 3131); assert_eq!(influxdb.bin_width, 9); + assert_eq!(influxdb.flush_interval, 70); } #[test] @@ -1234,7 +1281,9 @@ bin_width = 9 let args = parse_config_file(config, 4); assert!(args.console.is_some()); - assert_eq!(args.console.unwrap().bin_width, 9); + let console = args.console.unwrap(); + assert_eq!(console.bin_width, 9); + assert_eq!(console.flush_interval, 60); // default } #[test] @@ -1320,6 +1369,7 @@ region = "us-east-1" [sinks.firehose.stream_one] delivery_stream = "stream_one" batch_size = 20 + flush_interval = 15 [sinks.firehose.stream_two] delivery_stream = "stream_two" @@ -1335,10 +1385,12 @@ region = "us-east-1" assert_eq!(args.firehosen[0].delivery_stream, "stream_one"); assert_eq!(args.firehosen[0].batch_size, 20); assert_eq!(args.firehosen[0].region, Region::UsWest2); + assert_eq!(args.firehosen[0].flush_interval, 15); assert_eq!(args.firehosen[1].delivery_stream, "stream_two"); assert_eq!(args.firehosen[1].batch_size, 800); assert_eq!(args.firehosen[1].region, Region::UsEast1); + assert_eq!(args.firehosen[1].flush_interval, 60); // default } #[test] diff --git a/src/filter/programmable_filter.rs b/src/filter/programmable_filter.rs index 3b84811f..16e08133 100644 --- a/src/filter/programmable_filter.rs +++ b/src/filter/programmable_filter.rs @@ -335,6 +335,7 @@ pub struct ProgrammableFilter { state: lua::State, path: String, global_tags: metric::TagMap, + last_flush_idx: u64, } #[derive(Debug, Clone)] @@ -382,6 +383,7 @@ impl ProgrammableFilter { state: state, path: config.config_path, global_tags: config.tags, + last_flush_idx: 0, } } } @@ -423,7 +425,8 @@ impl filter::Filter for ProgrammableFilter { } Ok(()) } - metric::Event::TimerFlush => { + metric::Event::TimerFlush(flush_idx) if self.last_flush_idx >= flush_idx => Ok(()), + metric::Event::TimerFlush(flush_idx) => { self.state.get_global("tick"); if !self.state.is_fn(-1) { let fail = @@ -450,6 +453,8 @@ impl filter::Filter for ProgrammableFilter { for mt in pyld.metrics { res.push(metric::Event::new_telemetry(*mt)); } + res.push(event); + self.last_flush_idx = flush_idx; Ok(()) } metric::Event::Log(mut l) => { diff --git a/src/metric/event.rs b/src/metric/event.rs index 5343997a..40f97367 100644 --- a/src/metric/event.rs +++ b/src/metric/event.rs @@ -5,7 +5,7 @@ use std::sync; pub enum Event { Telemetry(sync::Arc>), Log(sync::Arc>), - TimerFlush, + TimerFlush(u64), } impl Event { diff --git a/src/metric/telemetry.rs b/src/metric/telemetry.rs index 8e94f2e1..189f043d 100644 --- a/src/metric/telemetry.rs +++ b/src/metric/telemetry.rs @@ -527,7 +527,7 @@ mod tests { fn rand(rng: &mut R) -> Event { let i: usize = rng.gen(); match i % 3 { - 0 => Event::TimerFlush, + 0 => Event::TimerFlush(rng.gen()), _ => Event::Telemetry(Arc::new(Some(rng.gen()))), } } diff --git a/src/sink/console.rs b/src/sink/console.rs index 6a77f542..7e8e95d8 100644 --- a/src/sink/console.rs +++ b/src/sink/console.rs @@ -11,6 +11,7 @@ use std::sync; /// print each `flush-interval` to stdout. pub struct Console { aggrs: Buckets, + flush_interval: u64, } impl Console { @@ -21,11 +22,14 @@ impl Console { /// ``` /// use cernan::sink::{Console, ConsoleConfig}; /// let config = ConsoleConfig { config_path: "sinks.console".to_string(), - /// bin_width: 2 }; + /// bin_width: 2, flush_interval: 60 }; /// let c = Console::new(config); /// ``` pub fn new(config: ConsoleConfig) -> Console { - Console { aggrs: Buckets::new(config.bin_width) } + Console { + aggrs: Buckets::new(config.bin_width), + flush_interval: config.flush_interval, + } } } @@ -39,6 +43,7 @@ pub struct ConsoleConfig { /// Sets the bin width for Console's underlying /// [bucket](../buckets/struct.Bucket.html). pub bin_width: i64, + pub flush_interval: u64, } impl ConsoleConfig { @@ -49,13 +54,14 @@ impl ConsoleConfig { /// /// ``` /// use cernan::sink::ConsoleConfig; - /// let config = ConsoleConfig::new("sinks.console".to_string()); + /// let config = ConsoleConfig::new("sinks.console".to_string(), 60); /// assert_eq!(1, config.bin_width); /// ``` - pub fn new(config_path: String) -> ConsoleConfig { + pub fn new(config_path: String, flush_interval: u64) -> ConsoleConfig { ConsoleConfig { config_path: config_path, bin_width: 1, + flush_interval: flush_interval, } } } @@ -73,6 +79,10 @@ impl Sink for Console { // drop the line, intentionally } + fn flush_interval(&self) -> Option { + Some(self.flush_interval) + } + fn flush(&mut self) { println!("Flushing metrics: {}", chrono::UTC::now().to_rfc3339()); diff --git a/src/sink/firehose.rs b/src/sink/firehose.rs index fe2cd20f..45f118df 100644 --- a/src/sink/firehose.rs +++ b/src/sink/firehose.rs @@ -21,6 +21,7 @@ pub struct FirehoseConfig { pub batch_size: usize, pub region: Region, pub config_path: String, + pub flush_interval: u64, } pub struct Firehose { @@ -28,6 +29,7 @@ pub struct Firehose { delivery_stream_name: String, region: Region, batch_size: usize, + flush_interval: u64, } impl Firehose { @@ -37,11 +39,16 @@ impl Firehose { delivery_stream_name: config.delivery_stream, region: config.region, batch_size: config.batch_size, + flush_interval: config.flush_interval, } } } impl Sink for Firehose { + fn flush_interval(&self) -> Option { + Some(self.flush_interval) + } + fn flush(&mut self) { let provider = DefaultCredentialsProvider::new().unwrap(); let dispatcher = default_tls_client().unwrap(); diff --git a/src/sink/influxdb.rs b/src/sink/influxdb.rs index 7e022f66..343b4227 100644 --- a/src/sink/influxdb.rs +++ b/src/sink/influxdb.rs @@ -13,6 +13,7 @@ pub struct InfluxDB { aggrs: Buckets, delivery_attempts: u32, stats: String, + flush_interval: u64, } #[derive(Debug)] @@ -22,6 +23,7 @@ pub struct InfluxDBConfig { pub port: u16, pub config_path: String, pub tags: TagMap, + pub flush_interval: u64, } #[inline] @@ -67,6 +69,7 @@ impl InfluxDB { aggrs: Buckets::new(config.bin_width), delivery_attempts: 0, stats: String::with_capacity(8_192), + flush_interval: config.flush_interval, } } @@ -159,6 +162,10 @@ impl InfluxDB { } impl Sink for InfluxDB { + fn flush_interval(&self) -> Option { + Some(self.flush_interval) + } + fn flush(&mut self) { loop { if self.delivery_attempts > 0 { @@ -240,6 +247,7 @@ mod test { port: 1987, config_path: "sinks.influxdb".to_string(), tags: tags.clone(), + flush_interval: 60, }; let mut influxdb = InfluxDB::new(config); let dt_0 = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 00).timestamp(); diff --git a/src/sink/mod.rs b/src/sink/mod.rs index 2b35385d..73785868 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -32,6 +32,7 @@ pub enum Valve { /// A 'sink' is a sink for metrics. pub trait Sink { + fn flush_interval(&self) -> Option; fn flush(&mut self) -> (); fn valve_state(&self) -> Valve; fn deliver(&mut self, point: sync::Arc>) -> (); @@ -39,6 +40,7 @@ pub trait Sink { fn run(&mut self, recv: hopper::Receiver) { let mut attempts = 0; let mut recv = recv.into_iter(); + let mut last_flush_idx = 0; loop { time::delay(attempts); match recv.next() { @@ -48,7 +50,16 @@ pub trait Sink { match self.valve_state() { Valve::Open => { match event { - Event::TimerFlush => self.flush(), + Event::TimerFlush(idx) => { + if idx > last_flush_idx { + if let Some(flush_interval) = self.flush_interval() { + if idx % flush_interval == 0 { + self.flush(); + } + } + last_flush_idx = idx; + } + } Event::Telemetry(metric) => { self.deliver(metric); } diff --git a/src/sink/native.rs b/src/sink/native.rs index 4f9415cb..6f346265 100644 --- a/src/sink/native.rs +++ b/src/sink/native.rs @@ -17,6 +17,7 @@ pub struct Native { port: u16, host: String, buffer: Vec, + flush_interval: u64, } #[derive(Debug)] @@ -24,6 +25,7 @@ pub struct NativeConfig { pub port: u16, pub host: String, pub config_path: String, + pub flush_interval: u64, } impl Native { @@ -32,6 +34,7 @@ impl Native { port: config.port, host: config.host, buffer: Vec::new(), + flush_interval: config.flush_interval, } } } @@ -42,6 +45,7 @@ impl Default for Native { port: 1972, host: String::from("127.0.0.1"), buffer: Vec::new(), + flush_interval: 60, } } } @@ -62,6 +66,7 @@ impl Sink for Native { fn run(&mut self, recv: hopper::Receiver) { let mut attempts = 0; let mut recv = recv.into_iter(); + let mut last_flush_idx = 0; loop { time::delay(attempts); if self.buffer.len() > 10_000 { @@ -73,7 +78,16 @@ impl Sink for Native { Some(event) => { attempts = 0; match event { - metric::Event::TimerFlush => self.flush(), + metric::Event::TimerFlush(idx) => { + if idx > last_flush_idx { + if let Some(flush_interval) = self.flush_interval() { + if idx % flush_interval == 0 { + self.flush(); + } + } + last_flush_idx = idx; + } + } _ => self.buffer.push(event), } } @@ -81,6 +95,10 @@ impl Sink for Native { } } + fn flush_interval(&self) -> Option { + Some(self.flush_interval) + } + fn flush(&mut self) { let mut points = Vec::with_capacity(1024); let mut lines = Vec::with_capacity(1024); diff --git a/src/sink/null.rs b/src/sink/null.rs index d4559e64..bc191f8e 100644 --- a/src/sink/null.rs +++ b/src/sink/null.rs @@ -34,6 +34,11 @@ impl Sink for Null { // discard point } + fn flush_interval(&self) -> Option { + None + } + + fn flush(&mut self) { // do nothing } diff --git a/src/sink/prometheus.rs b/src/sink/prometheus.rs index 55645f25..a3f0fece 100644 --- a/src/sink/prometheus.rs +++ b/src/sink/prometheus.rs @@ -163,6 +163,10 @@ fn sanitize(metric: metric::Telemetry) -> metric::Telemetry { } impl Sink for Prometheus { + fn flush_interval(&self) -> Option { + None + } + fn flush(&mut self) { // There is no flush for the Prometheus sink. Prometheus prefers to // pull via HTTP / Protobuf. See PrometheusSrv. diff --git a/src/sink/wavefront.rs b/src/sink/wavefront.rs index ab270719..a8b6a15d 100644 --- a/src/sink/wavefront.rs +++ b/src/sink/wavefront.rs @@ -16,6 +16,7 @@ pub struct Wavefront { delivery_attempts: u32, percentiles: Vec<(String, f64)>, pub stats: String, + flush_interval: u64, } #[derive(Debug)] @@ -26,6 +27,7 @@ pub struct WavefrontConfig { pub config_path: String, pub percentiles: Vec<(String, f64)>, pub tags: TagMap, + pub flush_interval: u64, } #[inline] @@ -67,6 +69,7 @@ impl Wavefront { delivery_attempts: 0, percentiles: config.percentiles, stats: String::with_capacity(8_192), + flush_interval: config.flush_interval, } } @@ -134,6 +137,10 @@ impl Wavefront { } impl Sink for Wavefront { + fn flush_interval(&self) -> Option { + Some(self.flush_interval) + } + fn flush(&mut self) { loop { if self.delivery_attempts > 0 { @@ -227,6 +234,7 @@ mod test { config_path: "sinks.wavefront".to_string(), tags: tags.clone(), percentiles: percentiles, + flush_interval: 60, }; let mut wavefront = Wavefront::new(config); let dt_0 = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 00).timestamp(); diff --git a/src/source/flush.rs b/src/source/flush.rs index 90755589..7921b22c 100644 --- a/src/source/flush.rs +++ b/src/source/flush.rs @@ -7,25 +7,33 @@ use util::send; pub struct FlushTimer { chans: util::Channel, - interval: u64, } impl FlushTimer { - pub fn new(chans: util::Channel, interval: u64) -> FlushTimer { - FlushTimer { - chans: chans, - interval: interval, - } + pub fn new(chans: util::Channel) -> FlushTimer { + FlushTimer { chans: chans } } } impl Source for FlushTimer { fn run(&mut self) { - let duration = Duration::new(self.interval, 0); - debug!("flush-interval: {:?}", duration); + let one_second = Duration::new(1, 0); + // idx will _always_ increase. If it's kept at u64 or greater it will + // overflow long past the collapse of our industrial civilization only + // so long as the intervals we sleep are kept to once a second or so. + // + // Heh, even past the length of the universe, really. + // + // Point being, there's a theoretical overflow problem here but it's not + // going to be hit in practice. + let mut idx: u64 = 0; loop { - sleep(duration); - send("flush", &mut self.chans, metric::Event::TimerFlush); + // We start with TimerFlush(1) as receivers start with + // TimerFlush(0). This will update their last_flush_idx seen at + // system boot. + idx += 1; + sleep(one_second); + send("flush", &mut self.chans, metric::Event::TimerFlush(idx)); } } } diff --git a/tests/programmable_filter.rs b/tests/programmable_filter.rs index 0b069089..f534cd4e 100644 --- a/tests/programmable_filter.rs +++ b/tests/programmable_filter.rs @@ -278,19 +278,21 @@ mod integration { let log2 = metric::Event::new_log(metric::LogLine::new("identity", "more")); let log3 = metric::Event::new_log(metric::LogLine::new("identity", "less")); - let flush = metric::Event::TimerFlush; + let flush1 = metric::Event::TimerFlush(1); + let flush2 = metric::Event::TimerFlush(2); let mut events = Vec::new(); for ev in &[metric0, metric1, metric2, log0, log1] { let _ = cs.process(ev.clone(), &mut events); } events.clear(); - let res = cs.process(flush.clone(), &mut events); + let res = cs.process(flush1, &mut events); assert!(res.is_ok()); assert!(!events.is_empty()); - assert_eq!(events.len(), 2); + assert_eq!(events.len(), 3); println!("EVENTS: {:?}", events); + assert_eq!(events[2], metric::Event::TimerFlush(1)); assert_eq!(events[1], metric::Event::new_telemetry(metric::Telemetry::new("count_per_tick", 5.0))); assert_eq!(events[0], @@ -302,12 +304,13 @@ mod integration { let _ = cs.process(ev.clone(), &mut events); } events.clear(); - let res = cs.process(flush, &mut events); + let res = cs.process(flush2, &mut events); assert!(res.is_ok()); assert!(!events.is_empty()); - assert_eq!(events.len(), 2); + assert_eq!(events.len(), 3); println!("EVENTS: {:?}", events); + assert_eq!(events[2], metric::Event::TimerFlush(2)); assert_eq!(events[1], metric::Event::new_telemetry(metric::Telemetry::new("count_per_tick", 2.0))); assert_eq!(events[0],