Skip to content

Commit

Permalink
All backends now run in separate threads
Browse files Browse the repository at this point in the history
This work is a follow on from the last commit and moves us closer
to the world in which the main thread is a very fast router. There
is little routing going on here as you can see because we have not
yet fully addressed issue #39. We're on a good foot to do it though
that is for sure.

On that day, many Shubs and Zuuls will know what it is to be roasted
in the depths of a Sloar that day, I can tell you!

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
Brian L. Troutwine committed Aug 9, 2016
1 parent 99f081d commit c1e75c1
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 157 deletions.
73 changes: 58 additions & 15 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,81 @@ use metric::Metric;
use config::Args;

use regex::Regex;
use std::rc::Rc;
use std::sync::Arc;

use std::sync::mpsc::{Receiver, Sender, channel};
use std::thread;

use server;

/// A 'backend' is a sink for metrics.
pub trait Backend {
fn flush(&mut self) -> ();
fn deliver(&mut self, point: Rc<Metric>) -> ();
fn deliver(&mut self, point: Arc<Metric>) -> ();
fn run(&mut self, recv: Receiver<Arc<server::Event>>) {
for event in recv.recv().iter() {
match *(*event) {
server::Event::TimerFlush => self.flush(),
server::Event::Graphite(ref metrics) => {
for metric in metrics {
self.deliver(metric.clone());
}
}
server::Event::Statsd(ref metrics) => {
for metric in metrics {
self.deliver(metric.clone());
}
}
}
}
}
}

/// Creates the collection of backends based on the paraemeters
///
pub fn factory(args: Args) -> Vec<Box<Backend>> {
let mut backends: Vec<Box<Backend>> = Vec::with_capacity(3);
pub fn factory(args: Args) -> Vec<Sender<Arc<server::Event>>> {
// create the channel
// spawn the thread

let mut backends = Vec::with_capacity(3);

if args.console {
backends.push(Box::new(console::Console::new()));
let (send, recv) = channel();
thread::spawn(move || {
console::Console::new().run(recv);
});
backends.push(send);
}
if args.wavefront {
let (send, recv) = channel();
let wf_tags: String = args.tags.replace(",", " ");
backends.push(Box::new(wavefront::Wavefront::new(&args.wavefront_host.unwrap(),
args.wavefront_port.unwrap(),
args.wavefront_skip_aggrs,
wf_tags)));
let cp_args = args.clone();
thread::spawn(move || {
wavefront::Wavefront::new(&cp_args.wavefront_host.unwrap(),
cp_args.wavefront_port.unwrap(),
cp_args.wavefront_skip_aggrs,
wf_tags)
.run(recv);
});
backends.push(send);
}
if args.librato {
let re = Regex::new(r"(?x)(source=(?P<source>.*),+)?").unwrap();
let metric_source = re.captures(&args.tags).unwrap().name("source").unwrap_or("cernan");
let (send, recv) = channel();
let cp_args = args.clone();

// librato does not support arbitrary tags, only a 'source' tag. We have
// to parse the source tag--if it exists--out and ship only that.
backends.push(Box::new(librato::Librato::new(&args.librato_username.unwrap(),
&args.librato_token.unwrap(),
metric_source,
&args.librato_host.unwrap())));
thread::spawn(move || {
let re = Regex::new(r"(?x)(source=(?P<source>.*),+)?").unwrap();
let metric_source =
re.captures(&cp_args.tags).unwrap().name("source").unwrap_or("cernan");
librato::Librato::new(&cp_args.librato_username.unwrap(),
&cp_args.librato_token.unwrap(),
metric_source,
&cp_args.librato_host.unwrap())
.run(recv);
});
backends.push(send);
}
backends
}
4 changes: 2 additions & 2 deletions src/backends/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use backend::Backend;
use buckets::Buckets;
use metric::Metric;
use chrono;
use std::rc::Rc;
use std::sync::Arc;

pub struct Console {
aggrs: Buckets,
Expand All @@ -28,7 +28,7 @@ fn fmt_line(key: &str, value: &f64) {


impl Backend for Console {
fn deliver(&mut self, point: Rc<Metric>) {
fn deliver(&mut self, point: Arc<Metric>) {
debug!("console deliver");
self.aggrs.add(&point);
}
Expand Down
26 changes: 13 additions & 13 deletions src/backends/librato.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use url;
use chrono;
use mime::Mime;
use metric::Metric;
use std::rc::Rc;
use std::sync::Arc;

pub struct Librato {
username: String,
Expand Down Expand Up @@ -123,7 +123,7 @@ impl Librato {
}

impl Backend for Librato {
fn deliver(&mut self, point: Rc<Metric>) {
fn deliver(&mut self, point: Arc<Metric>) {
debug!("librato deliver");
self.aggrs.add(&point);
}
Expand Down Expand Up @@ -155,23 +155,23 @@ impl Backend for Librato {
mod test {
use metric::{Metric, MetricKind};
use backend::Backend;
use std::rc::Rc;
use std::sync::Arc;
use string_cache::Atom;
use super::*;

#[test]
fn test_format_librato_buckets_no_timers() {
let mut librato = Librato::new("user", "token", "test-src", "http://librato.example.com");
librato.deliver(Rc::new(Metric::new(Atom::from("test.counter"),
1.0,
MetricKind::Counter(1.0))));
librato.deliver(Rc::new(Metric::new(Atom::from("test.gauge"), 3.211, MetricKind::Gauge)));
librato.deliver(Rc::new(Metric::new(Atom::from("src-test.gauge.2"),
3.211,
MetricKind::Gauge)));
librato.deliver(Rc::new(Metric::new(Atom::from("test.timer"), 12.101, MetricKind::Timer)));
librato.deliver(Rc::new(Metric::new(Atom::from("test.timer"), 1.101, MetricKind::Timer)));
librato.deliver(Rc::new(Metric::new(Atom::from("test.timer"), 3.101, MetricKind::Timer)));
librato.deliver(Arc::new(Metric::new(Atom::from("test.counter"),
1.0,
MetricKind::Counter(1.0))));
librato.deliver(Arc::new(Metric::new(Atom::from("test.gauge"), 3.211, MetricKind::Gauge)));
librato.deliver(Arc::new(Metric::new(Atom::from("src-test.gauge.2"),
3.211,
MetricKind::Gauge)));
librato.deliver(Arc::new(Metric::new(Atom::from("test.timer"), 12.101, MetricKind::Timer)));
librato.deliver(Arc::new(Metric::new(Atom::from("test.timer"), 1.101, MetricKind::Timer)));
librato.deliver(Arc::new(Metric::new(Atom::from("test.timer"), 3.101, MetricKind::Timer)));
let result = librato.format_stats(Some(10101));

println!("{:?}", result);
Expand Down
97 changes: 47 additions & 50 deletions src/backends/wavefront.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use std::net::{IpAddr, SocketAddrV4, TcpStream};
use std::net::{SocketAddr, TcpStream};
use std::fmt::Write;
use std::io::Write as IoWrite;
use chrono;
use metric::Metric;
use buckets::Buckets;
use backend::Backend;
use std::rc::Rc;
use std::sync::Arc;
use dns_lookup;

pub struct Wavefront {
addr: SocketAddrV4, // TODO, support IPv6
addr: SocketAddr,
tags: String,
mk_aggrs: bool,
// The wavefront implementation keeps an aggregate, depricated, and ships
// exact points. The aggregate is 'aggrs' and the exact points is 'points'.
aggrs: Buckets,
points: Vec<Rc<Metric>>,
points: Vec<Arc<Metric>>,
}

impl Wavefront {
Expand All @@ -30,10 +30,7 @@ impl Wavefront {
match dns_lookup::lookup_host(host) {
Ok(mut lh) => {
let ip = lh.next().expect("No IPs associated with host").unwrap();
let addr = match ip {
IpAddr::V4(ipv4) => SocketAddrV4::new(ipv4, port),
IpAddr::V6(_) => panic!("IPv6 addresses not supported!"),
};
let addr = SocketAddr::new(ip, port);
Wavefront {
addr: addr,
tags: tags,
Expand Down Expand Up @@ -138,7 +135,7 @@ impl Backend for Wavefront {
}
}

fn deliver(&mut self, point: Rc<Metric>) {
fn deliver(&mut self, point: Arc<Metric>) {
debug!("wavefront deliver");
if self.mk_aggrs {
self.aggrs.add(&point);
Expand All @@ -152,34 +149,34 @@ mod test {
use metric::{Metric, MetricKind};
use backend::Backend;
use chrono::{UTC, TimeZone};
use std::rc::Rc;
use std::sync::Arc;
use string_cache::Atom;
use super::*;

#[test]
fn test_format_wavefront() {
let mut wavefront = Wavefront::new("localhost", 2003, true, "source=test-src".to_string());
let dt = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 12);
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.counter"),
1.0,
Some(dt),
MetricKind::Counter(1.0))));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.gauge"),
3.211,
Some(dt),
MetricKind::Gauge)));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"),
12.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"),
1.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"),
3.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.counter"),
1.0,
Some(dt),
MetricKind::Counter(1.0))));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.gauge"),
3.211,
Some(dt),
MetricKind::Gauge)));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
12.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
1.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
3.101,
Some(dt),
MetricKind::Timer)));
let result = wavefront.format_stats(Some(10101));
let lines: Vec<&str> = result.lines().collect();

Expand All @@ -204,26 +201,26 @@ mod test {
fn test_format_wavefront_skip_aggrs() {
let mut wavefront = Wavefront::new("127.0.0.1", 2003, false, "source=test-src".to_string());
let dt = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 12);
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.counter"),
1.0,
Some(dt),
MetricKind::Counter(1.0))));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.gauge"),
3.211,
Some(dt),
MetricKind::Gauge)));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"),
12.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"),
1.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"),
3.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.counter"),
1.0,
Some(dt),
MetricKind::Counter(1.0))));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.gauge"),
3.211,
Some(dt),
MetricKind::Gauge)));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
12.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
1.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
3.101,
Some(dt),
MetricKind::Timer)));
let result = wavefront.format_stats(Some(10101));
let lines: Vec<&str> = result.lines().collect();

Expand Down
6 changes: 3 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ pub fn parse_args() -> Args {
.use_delimiter(false)
.default_value("source=cernan"))
.arg(Arg::with_name("verbose")
.short("v")
.multiple(true)
.help("Turn on verbose output."))
.short("v")
.multiple(true)
.help("Turn on verbose output."))
.get_matches();

let verb = if args.is_present("verbose") {
Expand Down
Loading

0 comments on commit c1e75c1

Please sign in to comment.