Skip to content

Commit

Permalink
All backends now run in separate threads (#47)
Browse files Browse the repository at this point in the history
This work is a follow on from the last commit and moves us closer
to the world in which the main thread is a very fast router. There
is little routing going on here as you can see because we have not
yet fully addressed issue #39. We're on a good foot to do it though
that is for sure.

On that day, many Shubs and Zuuls will know what it is to be roasted
in the depths of a Sloar that day, I can tell you!

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
blt authored Jul 20, 2016
1 parent caeded6 commit 8043ac4
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 103 deletions.
69 changes: 56 additions & 13 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,79 @@ use config::Args;
use regex::Regex;
use std::sync::Arc;

use std::sync::mpsc::{Receiver, Sender, channel};
use std::thread;

use server;

/// A 'backend' is a sink for metrics.
pub trait Backend {
fn flush(&mut self) -> ();
fn deliver(&mut self, point: Arc<Metric>) -> ();
fn run(&mut self, recv: Receiver<Arc<server::Event>>) {
for event in recv.recv().iter() {
match *(*event) {
server::Event::TimerFlush => self.flush(),
server::Event::Graphite(ref metrics) => {
for metric in metrics {
self.deliver(metric.clone());
}
}
server::Event::Statsd(ref metrics) => {
for metric in metrics {
self.deliver(metric.clone());
}
}
}
}
}
}

/// Creates the collection of backends based on the paraemeters
///
pub fn factory(args: Args) -> Vec<Box<Backend>> {
let mut backends: Vec<Box<Backend>> = Vec::with_capacity(3);
pub fn factory(args: Args) -> Vec<Sender<Arc<server::Event>>> {
// create the channel
// spawn the thread

let mut backends = Vec::with_capacity(3);

if args.console {
backends.push(Box::new(console::Console::new()));
let (send, recv) = channel();
thread::spawn(move || {
console::Console::new().run(recv);
});
backends.push(send);
}
if args.wavefront {
let (send, recv) = channel();
let wf_tags: String = args.tags.replace(",", " ");
backends.push(Box::new(wavefront::Wavefront::new(&args.wavefront_host.unwrap(),
args.wavefront_port.unwrap(),
args.wavefront_skip_aggrs,
wf_tags)));
let cp_args = args.clone();
thread::spawn(move || {
wavefront::Wavefront::new(&cp_args.wavefront_host.unwrap(),
cp_args.wavefront_port.unwrap(),
cp_args.wavefront_skip_aggrs,
wf_tags)
.run(recv);
});
backends.push(send);
}
if args.librato {
let re = Regex::new(r"(?x)(source=(?P<source>.*),+)?").unwrap();
let metric_source = re.captures(&args.tags).unwrap().name("source").unwrap_or("cernan");
let (send, recv) = channel();
let cp_args = args.clone();

// librato does not support arbitrary tags, only a 'source' tag. We have
// to parse the source tag--if it exists--out and ship only that.
backends.push(Box::new(librato::Librato::new(&args.librato_username.unwrap(),
&args.librato_token.unwrap(),
metric_source,
&args.librato_host.unwrap())));
thread::spawn(move || {
let re = Regex::new(r"(?x)(source=(?P<source>.*),+)?").unwrap();
let metric_source =
re.captures(&cp_args.tags).unwrap().name("source").unwrap_or("cernan");
librato::Librato::new(&cp_args.librato_username.unwrap(),
&cp_args.librato_token.unwrap(),
metric_source,
&cp_args.librato_host.unwrap())
.run(recv);
});
backends.push(send);
}
backends
}
8 changes: 4 additions & 4 deletions src/backends/librato.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,12 @@ mod test {
fn test_format_librato_buckets_no_timers() {
let mut librato = Librato::new("user", "token", "test-src", "http://librato.example.com");
librato.deliver(Arc::new(Metric::new(Atom::from("test.counter"),
1.0,
MetricKind::Counter(1.0))));
1.0,
MetricKind::Counter(1.0))));
librato.deliver(Arc::new(Metric::new(Atom::from("test.gauge"), 3.211, MetricKind::Gauge)));
librato.deliver(Arc::new(Metric::new(Atom::from("src-test.gauge.2"),
3.211,
MetricKind::Gauge)));
3.211,
MetricKind::Gauge)));
librato.deliver(Arc::new(Metric::new(Atom::from("test.timer"), 12.101, MetricKind::Timer)));
librato.deliver(Arc::new(Metric::new(Atom::from("test.timer"), 1.101, MetricKind::Timer)));
librato.deliver(Arc::new(Metric::new(Atom::from("test.timer"), 3.101, MetricKind::Timer)));
Expand Down
60 changes: 30 additions & 30 deletions src/backends/wavefront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,25 +147,25 @@ mod test {
let mut wavefront = Wavefront::new("127.0.0.1", 2003, true, "source=test-src".to_string());
let dt = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 12);
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.counter"),
1.0,
Some(dt),
MetricKind::Counter(1.0))));
1.0,
Some(dt),
MetricKind::Counter(1.0))));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.gauge"),
3.211,
Some(dt),
MetricKind::Gauge)));
3.211,
Some(dt),
MetricKind::Gauge)));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
12.101,
Some(dt),
MetricKind::Timer)));
12.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
1.101,
Some(dt),
MetricKind::Timer)));
1.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
3.101,
Some(dt),
MetricKind::Timer)));
3.101,
Some(dt),
MetricKind::Timer)));
let result = wavefront.format_stats(Some(10101));
let lines: Vec<&str> = result.lines().collect();

Expand All @@ -191,25 +191,25 @@ mod test {
let mut wavefront = Wavefront::new("127.0.0.1", 2003, false, "source=test-src".to_string());
let dt = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 12);
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.counter"),
1.0,
Some(dt),
MetricKind::Counter(1.0))));
1.0,
Some(dt),
MetricKind::Counter(1.0))));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.gauge"),
3.211,
Some(dt),
MetricKind::Gauge)));
3.211,
Some(dt),
MetricKind::Gauge)));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
12.101,
Some(dt),
MetricKind::Timer)));
12.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
1.101,
Some(dt),
MetricKind::Timer)));
1.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
3.101,
Some(dt),
MetricKind::Timer)));
3.101,
Some(dt),
MetricKind::Timer)));
let result = wavefront.format_stats(Some(10101));
let lines: Vec<&str> = result.lines().collect();

Expand Down
72 changes: 44 additions & 28 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ pub fn parse_args() -> Args {
.author("Brian L. Troutwine <blt@postmates.com>")
.about("telemetry aggregation and shipping, last up the ladder")
.arg(Arg::with_name("config-file")
.long("config")
.short("C")
.value_name("config")
.help("The config file to feed in.")
.takes_value(true))
.long("config")
.short("C")
.value_name("config")
.help("The config file to feed in.")
.takes_value(true))
.arg(Arg::with_name("verbose")
.short("v")
.multiple(true)
.help("Turn on verbose output."))
.short("v")
.multiple(true)
.help("Turn on verbose output."))
.get_matches();

let verb = if args.is_present("verbose") {
Expand All @@ -69,41 +69,56 @@ pub fn parse_args() -> Args {
Some(tbl) => {
let mut tags = String::new();
let ttbl = tbl.as_table().unwrap();
for (k,v) in (*ttbl).iter() {
for (k, v) in (*ttbl).iter() {
write!(tags, "{}={},", k, v.as_str().unwrap()).unwrap();
}
tags.pop();
tags
},
None => "".to_string()
}
None => "".to_string(),
};

let mk_wavefront = value.lookup("backends.wavefront").is_some();
let mk_console = value.lookup("backends.console").is_some();
let mk_librato = value.lookup("backends.librato").is_some();

let (wport, whost, wskpaggr) = if mk_wavefront {
(
// wavefront port
value.lookup("backends.wavefront.port").unwrap_or(&Value::Integer(2878)).as_integer().map(|i| i as u16),
// wavefront host
value.lookup("backends.wavefront.host").unwrap_or(&Value::String("127.0.0.1".to_string())).as_str().map(|s| s.to_string()),
// wavefront skip aggrs
value.lookup("backends.wavefront.skip-aggrs").unwrap_or(&Value::Boolean(false)).as_bool().unwrap_or(false),
)
(// wavefront port
value.lookup("backends.wavefront.port")
.unwrap_or(&Value::Integer(2878))
.as_integer()
.map(|i| i as u16),
// wavefront host
value.lookup("backends.wavefront.host")
.unwrap_or(&Value::String("127.0.0.1".to_string()))
.as_str()
.map(|s| s.to_string()),
// wavefront skip aggrs
value.lookup("backends.wavefront.skip-aggrs")
.unwrap_or(&Value::Boolean(false))
.as_bool()
.unwrap_or(false))
} else {
(None, None, false)
};

let (luser, lhost, ltoken) = if mk_librato {
(
// librato username
value.lookup("backends.librato.username").unwrap_or(&Value::String("statsd".to_string())).as_str().map(|s| s.to_string()),
// librato token
value.lookup("backends.librato.token").unwrap_or(&Value::String("statsd".to_string())).as_str().map(|s| s.to_string()),
// librato host
value.lookup("backends.librato.host").unwrap_or(&Value::String("https://metrics-api.librato.com/v1/metrics".to_string())).as_str().map(|s| s.to_string()),
)
(// librato username
value.lookup("backends.librato.username")
.unwrap_or(&Value::String("statsd".to_string()))
.as_str()
.map(|s| s.to_string()),
// librato token
value.lookup("backends.librato.token")
.unwrap_or(&Value::String("statsd".to_string()))
.as_str()
.map(|s| s.to_string()),
// librato host
value.lookup("backends.librato.host")
.unwrap_or(&Value::String("https://metrics-api.librato.com/v1/metrics"
.to_string()))
.as_str()
.map(|s| s.to_string()))
} else {
(None, None, None)
};
Expand All @@ -120,7 +135,8 @@ pub fn parse_args() -> Args {
flush_interval: value.lookup("flush-interval")
.unwrap_or(&Value::Integer(10))
.as_integer()
.expect("flush-interval must be integer") as u64,
.expect("flush-interval must be \
integer") as u64,
console: mk_console,
wavefront: mk_wavefront,
librato: mk_librato,
Expand Down
32 changes: 6 additions & 26 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ extern crate fern;
#[macro_use]
extern crate log;

use std::sync::Arc;
use std::sync::mpsc::channel;
use std::thread;
use chrono::UTC;
Expand Down Expand Up @@ -98,32 +99,11 @@ fn main() {
Err(e) => panic!(format!("Event channel has hung up: {:?}", e)),
};

match result {
server::Event::TimerFlush => {
trace!("TimerFlush");
// TODO improve this, limit here will be backend stalling and
// holding up all others
for backend in &mut backends {
backend.flush();
}
}

server::Event::Graphite(metrics) => {
for metric in &metrics {
for backend in &mut backends {
backend.deliver(metric.clone());
}
}
}

server::Event::Statsd(metrics) => {
for metric in &metrics {
for backend in &mut backends {
backend.deliver(metric.clone());
}
}
}

let arc_res = Arc::new(result);
for backend in &mut backends {
// warning: this should not be cloned, is a dup of an existing vec
// which we'll dump soon
backend.send(arc_res.clone()).expect("oops, couldn't send!");
}
}
}
6 changes: 4 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ pub fn udp_server(chan: Sender<Event>, port: u16) {
error!("BAD PACKET: {:?}", val);
}
}
}).ok();
})
.ok();
}
}

Expand Down Expand Up @@ -85,7 +86,8 @@ fn handle_client(chan: Sender<Event>, stream: TcpStream) {
error!("BAD PACKET: {:?}", val);
}
}
}).ok();
})
.ok();
}
Err(_) => break,
}
Expand Down

0 comments on commit 8043ac4

Please sign in to comment.