Skip to content

Commit

Permalink
Flush interval3 (#213)
Browse files Browse the repository at this point in the history
* Have index in FlushTimer event

* Fix test to use TimerFlush with index

* ignore tags and vscode

* Turn off debugging prints

* will not introduce a new global constant here

* Remove trailing whitespaces

* Apply diff to config

* Apply other commits from flush-interval

* Last part

* Fix problems

* compiles

* flush interval in all the sinks

* removed flush_interval from null sink

* Round out the flush-intervals work

This commit pushes the flush intervals work that Ivan's been doing
over the edge to completion. In particular, we now support:

  * per-sink configurable flush_intervals, plus a global default
  * guaranteed receive-once timer ticks

The later is done by including an every-increasing value into
TimerFlushes which allow duplicates -- because of paths in routing --
to be scrubbed out and ignored.

An example configuration is added at examples/configs/counting-example.toml
to let users fiddle with flush intervals. keep_count.lua has been
promoted to help in that effort.

This resolves #196.

This resolves #74.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Remove the configurable interval from FlushTimer

It occurred to me that explaining the difference of the global
flush-timer as anything other than a default for sinks was
confusing and that I might as well make FlushTimer tick once
per second, hard-coded. Turns out that had been done by Ivan already
so I just kindly smoothed things out.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
gliush authored and blt committed Mar 6, 2017
1 parent 9148b5e commit dd0093c
Show file tree
Hide file tree
Showing 18 changed files with 248 additions and 46 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ target/
src/metrics/statsd.rs
src/metrics/graphite.rs
*.bk
/tags
/.vscode
23 changes: 23 additions & 0 deletions examples/configs/counting-example.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
data-directory = "data/"
scripts-directory = "examples/scripts/"

flush-interval = 1

[tags]
source = "cernan"

[sources]
[sources.statsd.primary]
enabled = true
port = 8125
forwards = ["filters.keep_count"]

[filters]
[filters.keep_count]
script = "keep_count.lua"
forwards = ["sinks.console"]

[sinks]
[sinks.console]
bin_width = 1
flush_interval = 2
15 changes: 15 additions & 0 deletions examples/scripts/keep_count.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
count_per_tick = 0

function process_metric(pyld)
count_per_tick = count_per_tick + 1
end

function process_log(pyld)
count_per_tick = count_per_tick + 1
end

function tick(pyld)
payload.push_metric(pyld, "count_per_tick", count_per_tick)
payload.push_log(pyld, string.format("count_per_tick: %s", count_per_tick))
count_per_tick = 0
end
59 changes: 41 additions & 18 deletions src/bin/cernan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@ use cernan::sink::{FirehoseConfig, Sink};
use cernan::source::Source;
use cernan::util;
use chrono::UTC;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::process;
use std::str;
use std::thread;

fn populate_forwards(sends: &mut util::Channel,
mut top_level_forwards: Option<&mut HashSet<String>>,
forwards: &[String],
config_path: &str,
available_sends: &HashMap<String, hopper::Sender<metric::Event>>) {
for fwd in forwards {
match top_level_forwards.as_mut() {
Some(tlf) => {
let _ = (*tlf).insert(fwd.clone());
}
None => {}
}
match available_sends.get(fwd) {
Some(snd) => {
sends.push(snd.clone());
Expand Down Expand Up @@ -70,63 +77,59 @@ fn main() {
info!("cernan - {}", args.version);
let mut joins = Vec::new();
let mut sends: HashMap<String, hopper::Sender<metric::Event>> = HashMap::new();
let mut flush_sends = HashSet::new();

// SINKS
//
let mut flush_sends = Vec::new();
if let Some(config) = args.console {
let (console_send, console_recv) =
hopper::channel(&config.config_path, &args.data_directory).unwrap();
flush_sends.push(console_send.clone());
sends.insert(config.config_path.clone(), console_send);
joins.push(thread::spawn(move || {
cernan::sink::Console::new(config).run(console_recv);
}));
cernan::sink::Console::new(config).run(console_recv);
}));
}
if let Some(config) = args.null {
let (null_send, null_recv) = hopper::channel(&config.config_path, &args.data_directory)
.unwrap();
flush_sends.push(null_send.clone());
sends.insert(config.config_path.clone(), null_send);
joins.push(thread::spawn(move || { cernan::sink::Null::new(config).run(null_recv); }));
}
if let Some(config) = args.wavefront {
let (wf_send, wf_recv) = hopper::channel(&config.config_path, &args.data_directory)
.unwrap();
flush_sends.push(wf_send.clone());
sends.insert(config.config_path.clone(), wf_send);
joins.push(thread::spawn(move || { cernan::sink::Wavefront::new(config).run(wf_recv); }));
}
if let Some(config) = args.prometheus {
let (wf_send, wf_recv) = hopper::channel(&config.config_path, &args.data_directory)
.unwrap();
flush_sends.push(wf_send.clone());
sends.insert(config.config_path.clone(), wf_send);
joins.push(thread::spawn(move || { cernan::sink::Prometheus::new(config).run(wf_recv); }));
let (prometheus_send, prometheus_recv) =
hopper::channel(&config.config_path, &args.data_directory).unwrap();
sends.insert(config.config_path.clone(), prometheus_send);
joins.push(thread::spawn(move || {
cernan::sink::Prometheus::new(config).run(prometheus_recv);
}));
}
if let Some(config) = args.influxdb {
let (flx_send, flx_recv) = hopper::channel(&config.config_path, &args.data_directory)
.unwrap();
flush_sends.push(flx_send.clone());
sends.insert(config.config_path.clone(), flx_send);
joins.push(thread::spawn(move || { cernan::sink::InfluxDB::new(config).run(flx_recv); }));
}
if let Some(config) = args.native_sink_config {
let (cernan_send, cernan_recv) = hopper::channel(&config.config_path, &args.data_directory)
.unwrap();
flush_sends.push(cernan_send.clone());
sends.insert(config.config_path.clone(), cernan_send);
joins.push(thread::spawn(move || { cernan::sink::Native::new(config).run(cernan_recv); }));
}
for config in &args.firehosen {
let f: FirehoseConfig = config.clone();
let (firehose_send, firehose_recv) =
hopper::channel(&config.config_path, &args.data_directory).unwrap();
flush_sends.push(firehose_send.clone());
sends.insert(config.config_path.clone(), firehose_send);
joins.push(thread::spawn(move || { cernan::sink::Firehose::new(f).run(firehose_recv); }));
}


// FILTERS
//
for config in args.filters.values() {
Expand All @@ -136,6 +139,7 @@ fn main() {
sends.insert(config.config_path.clone(), flt_send);
let mut downstream_sends = Vec::new();
populate_forwards(&mut downstream_sends,
None,
&config.forwards,
&config.config_path,
&sends);
Expand All @@ -149,6 +153,7 @@ fn main() {
if let Some(config) = args.native_server_config {
let mut native_server_send = Vec::new();
populate_forwards(&mut native_server_send,
Some(&mut flush_sends),
&config.forwards,
&config.config_path,
&sends);
Expand All @@ -160,6 +165,7 @@ fn main() {
let c = (*config).clone();
let mut statsd_sends = Vec::new();
populate_forwards(&mut statsd_sends,
Some(&mut flush_sends),
&config.forwards,
&config.config_path,
&sends);
Expand All @@ -170,6 +176,7 @@ fn main() {
let c = (*config).clone();
let mut graphite_sends = Vec::new();
populate_forwards(&mut graphite_sends,
Some(&mut flush_sends),
&config.forwards,
&config.config_path,
&sends);
Expand All @@ -180,7 +187,11 @@ fn main() {

for config in args.files {
let mut fp_sends = Vec::new();
populate_forwards(&mut fp_sends, &config.forwards, &config.config_path, &sends);
populate_forwards(&mut fp_sends,
Some(&mut flush_sends),
&config.forwards,
&config.config_path,
&sends);
joins.push(thread::spawn(move || {
cernan::source::FileServer::new(fp_sends, config).run();
}));
Expand All @@ -189,9 +200,21 @@ fn main() {
// BACKGROUND
//

let flush_interval = args.flush_interval;
joins.push(thread::spawn(move || {
cernan::source::FlushTimer::new(flush_sends, flush_interval).run();
let mut flush_channels = Vec::new();
for destination in flush_sends.iter() {
match sends.get(destination) {
Some(snd) => {
flush_channels.push(snd.clone());
}
None => {
error!("Unable to fulfill configured top-level flush to {}",
destination);
process::exit(0);
}
}
}
cernan::source::FlushTimer::new(flush_channels).run();
}));

joins.push(thread::spawn(move || { cernan::time::update_time(); }));
Expand Down
Loading

0 comments on commit dd0093c

Please sign in to comment.