diff --git a/src/backend.rs b/src/backend.rs index fd76a27b..a0809b5d 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -4,38 +4,81 @@ use metric::Metric; use config::Args; use regex::Regex; -use std::rc::Rc; +use std::sync::Arc; + +use std::sync::mpsc::{Receiver, Sender, channel}; +use std::thread; + +use server; /// A 'backend' is a sink for metrics. pub trait Backend { fn flush(&mut self) -> (); - fn deliver(&mut self, point: Rc) -> (); + fn deliver(&mut self, point: Arc) -> (); + fn run(&mut self, recv: Receiver>) { + for event in recv.recv().iter() { + match *(*event) { + server::Event::TimerFlush => self.flush(), + server::Event::Graphite(ref metrics) => { + for metric in metrics { + self.deliver(metric.clone()); + } + } + server::Event::Statsd(ref metrics) => { + for metric in metrics { + self.deliver(metric.clone()); + } + } + } + } + } } /// Creates the collection of backends based on the paraemeters /// -pub fn factory(args: Args) -> Vec> { - let mut backends: Vec> = Vec::with_capacity(3); +pub fn factory(args: Args) -> Vec>> { + // create the channel + // spawn the thread + + let mut backends = Vec::with_capacity(3); if args.console { - backends.push(Box::new(console::Console::new())); + let (send, recv) = channel(); + thread::spawn(move || { + console::Console::new().run(recv); + }); + backends.push(send); } if args.wavefront { + let (send, recv) = channel(); let wf_tags: String = args.tags.replace(",", " "); - backends.push(Box::new(wavefront::Wavefront::new(&args.wavefront_host.unwrap(), - args.wavefront_port.unwrap(), - args.wavefront_skip_aggrs, - wf_tags))); + let cp_args = args.clone(); + thread::spawn(move || { + wavefront::Wavefront::new(&cp_args.wavefront_host.unwrap(), + cp_args.wavefront_port.unwrap(), + cp_args.wavefront_skip_aggrs, + wf_tags) + .run(recv); + }); + backends.push(send); } if args.librato { - let re = Regex::new(r"(?x)(source=(?P.*),+)?").unwrap(); - let metric_source = re.captures(&args.tags).unwrap().name("source").unwrap_or("cernan"); + let (send, recv) = channel(); + let cp_args = args.clone(); + // librato does not support arbitrary tags, only a 'source' tag. We have // to parse the source tag--if it exists--out and ship only that. - backends.push(Box::new(librato::Librato::new(&args.librato_username.unwrap(), - &args.librato_token.unwrap(), - metric_source, - &args.librato_host.unwrap()))); + thread::spawn(move || { + let re = Regex::new(r"(?x)(source=(?P.*),+)?").unwrap(); + let metric_source = + re.captures(&cp_args.tags).unwrap().name("source").unwrap_or("cernan"); + librato::Librato::new(&cp_args.librato_username.unwrap(), + &cp_args.librato_token.unwrap(), + metric_source, + &cp_args.librato_host.unwrap()) + .run(recv); + }); + backends.push(send); } backends } diff --git a/src/backends/console.rs b/src/backends/console.rs index e6915c17..89f736f3 100644 --- a/src/backends/console.rs +++ b/src/backends/console.rs @@ -2,7 +2,7 @@ use backend::Backend; use buckets::Buckets; use metric::Metric; use chrono; -use std::rc::Rc; +use std::sync::Arc; pub struct Console { aggrs: Buckets, @@ -28,7 +28,7 @@ fn fmt_line(key: &str, value: &f64) { impl Backend for Console { - fn deliver(&mut self, point: Rc) { + fn deliver(&mut self, point: Arc) { debug!("console deliver"); self.aggrs.add(&point); } diff --git a/src/backends/librato.rs b/src/backends/librato.rs index 4c4c4094..c58b1c6b 100644 --- a/src/backends/librato.rs +++ b/src/backends/librato.rs @@ -8,7 +8,7 @@ use url; use chrono; use mime::Mime; use metric::Metric; -use std::rc::Rc; +use std::sync::Arc; pub struct Librato { username: String, @@ -123,7 +123,7 @@ impl Librato { } impl Backend for Librato { - fn deliver(&mut self, point: Rc) { + fn deliver(&mut self, point: Arc) { debug!("librato deliver"); self.aggrs.add(&point); } @@ -155,23 +155,23 @@ impl Backend for Librato { mod test { use metric::{Metric, MetricKind}; use backend::Backend; - use std::rc::Rc; + use std::sync::Arc; use string_cache::Atom; use super::*; #[test] fn test_format_librato_buckets_no_timers() { let mut librato = Librato::new("user", "token", "test-src", "http://librato.example.com"); - librato.deliver(Rc::new(Metric::new(Atom::from("test.counter"), - 1.0, - MetricKind::Counter(1.0)))); - librato.deliver(Rc::new(Metric::new(Atom::from("test.gauge"), 3.211, MetricKind::Gauge))); - librato.deliver(Rc::new(Metric::new(Atom::from("src-test.gauge.2"), - 3.211, - MetricKind::Gauge))); - librato.deliver(Rc::new(Metric::new(Atom::from("test.timer"), 12.101, MetricKind::Timer))); - librato.deliver(Rc::new(Metric::new(Atom::from("test.timer"), 1.101, MetricKind::Timer))); - librato.deliver(Rc::new(Metric::new(Atom::from("test.timer"), 3.101, MetricKind::Timer))); + librato.deliver(Arc::new(Metric::new(Atom::from("test.counter"), + 1.0, + MetricKind::Counter(1.0)))); + librato.deliver(Arc::new(Metric::new(Atom::from("test.gauge"), 3.211, MetricKind::Gauge))); + librato.deliver(Arc::new(Metric::new(Atom::from("src-test.gauge.2"), + 3.211, + MetricKind::Gauge))); + librato.deliver(Arc::new(Metric::new(Atom::from("test.timer"), 12.101, MetricKind::Timer))); + librato.deliver(Arc::new(Metric::new(Atom::from("test.timer"), 1.101, MetricKind::Timer))); + librato.deliver(Arc::new(Metric::new(Atom::from("test.timer"), 3.101, MetricKind::Timer))); let result = librato.format_stats(Some(10101)); println!("{:?}", result); diff --git a/src/backends/wavefront.rs b/src/backends/wavefront.rs index 9a89c023..af47049b 100644 --- a/src/backends/wavefront.rs +++ b/src/backends/wavefront.rs @@ -1,21 +1,21 @@ -use std::net::{IpAddr, SocketAddrV4, TcpStream}; +use std::net::{SocketAddr, TcpStream}; use std::fmt::Write; use std::io::Write as IoWrite; use chrono; use metric::Metric; use buckets::Buckets; use backend::Backend; -use std::rc::Rc; +use std::sync::Arc; use dns_lookup; pub struct Wavefront { - addr: SocketAddrV4, // TODO, support IPv6 + addr: SocketAddr, tags: String, mk_aggrs: bool, // The wavefront implementation keeps an aggregate, depricated, and ships // exact points. The aggregate is 'aggrs' and the exact points is 'points'. aggrs: Buckets, - points: Vec>, + points: Vec>, } impl Wavefront { @@ -30,10 +30,7 @@ impl Wavefront { match dns_lookup::lookup_host(host) { Ok(mut lh) => { let ip = lh.next().expect("No IPs associated with host").unwrap(); - let addr = match ip { - IpAddr::V4(ipv4) => SocketAddrV4::new(ipv4, port), - IpAddr::V6(_) => panic!("IPv6 addresses not supported!"), - }; + let addr = SocketAddr::new(ip, port); Wavefront { addr: addr, tags: tags, @@ -138,7 +135,7 @@ impl Backend for Wavefront { } } - fn deliver(&mut self, point: Rc) { + fn deliver(&mut self, point: Arc) { debug!("wavefront deliver"); if self.mk_aggrs { self.aggrs.add(&point); @@ -152,7 +149,7 @@ mod test { use metric::{Metric, MetricKind}; use backend::Backend; use chrono::{UTC, TimeZone}; - use std::rc::Rc; + use std::sync::Arc; use string_cache::Atom; use super::*; @@ -160,26 +157,26 @@ mod test { fn test_format_wavefront() { let mut wavefront = Wavefront::new("localhost", 2003, true, "source=test-src".to_string()); let dt = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 12); - wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.counter"), - 1.0, - Some(dt), - MetricKind::Counter(1.0)))); - wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.gauge"), - 3.211, - Some(dt), - MetricKind::Gauge))); - wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"), - 12.101, - Some(dt), - MetricKind::Timer))); - wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"), - 1.101, - Some(dt), - MetricKind::Timer))); - wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"), - 3.101, - Some(dt), - MetricKind::Timer))); + wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.counter"), + 1.0, + Some(dt), + MetricKind::Counter(1.0)))); + wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.gauge"), + 3.211, + Some(dt), + MetricKind::Gauge))); + wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"), + 12.101, + Some(dt), + MetricKind::Timer))); + wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"), + 1.101, + Some(dt), + MetricKind::Timer))); + wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"), + 3.101, + Some(dt), + MetricKind::Timer))); let result = wavefront.format_stats(Some(10101)); let lines: Vec<&str> = result.lines().collect(); @@ -204,26 +201,26 @@ mod test { fn test_format_wavefront_skip_aggrs() { let mut wavefront = Wavefront::new("127.0.0.1", 2003, false, "source=test-src".to_string()); let dt = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 12); - wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.counter"), - 1.0, - Some(dt), - MetricKind::Counter(1.0)))); - wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.gauge"), - 3.211, - Some(dt), - MetricKind::Gauge))); - wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"), - 12.101, - Some(dt), - MetricKind::Timer))); - wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"), - 1.101, - Some(dt), - MetricKind::Timer))); - wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"), - 3.101, - Some(dt), - MetricKind::Timer))); + wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.counter"), + 1.0, + Some(dt), + MetricKind::Counter(1.0)))); + wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.gauge"), + 3.211, + Some(dt), + MetricKind::Gauge))); + wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"), + 12.101, + Some(dt), + MetricKind::Timer))); + wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"), + 1.101, + Some(dt), + MetricKind::Timer))); + wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"), + 3.101, + Some(dt), + MetricKind::Timer))); let result = wavefront.format_stats(Some(10101)); let lines: Vec<&str> = result.lines().collect(); diff --git a/src/config.rs b/src/config.rs index 323d2cfc..eecc3872 100644 --- a/src/config.rs +++ b/src/config.rs @@ -105,9 +105,9 @@ pub fn parse_args() -> Args { .use_delimiter(false) .default_value("source=cernan")) .arg(Arg::with_name("verbose") - .short("v") - .multiple(true) - .help("Turn on verbose output.")) + .short("v") + .multiple(true) + .help("Turn on verbose output.")) .get_matches(); let verb = if args.is_present("verbose") { diff --git a/src/main.rs b/src/main.rs index ed6c1212..dedaf571 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,7 @@ extern crate dns_lookup; extern crate log; use std::str; +use std::sync::Arc; use std::sync::mpsc::channel; use std::thread; use chrono::UTC; @@ -108,59 +109,9 @@ fn main() { Err(e) => panic!(format!("Event channel has hung up: {:?}", e)), }; - match result { - server::Event::TimerFlush => { - trace!("TimerFlush"); - // TODO improve this, limit here will be backend stalling and - // holding up all others - for backend in &mut backends { - backend.flush(); - } - } - - server::Event::TcpMessage(buf) => { - str::from_utf8(&buf) - .map(|val| { - debug!("graphite - {}", val); - match metric::Metric::parse_graphite(val) { - Some(metrics) => { - for metric in &metrics { - for backend in &mut backends { - backend.deliver(metric.clone()); - } - } - Ok(metrics.len()) - } - None => { - error!("BAD PACKET: {:?}", val); - Err("could not interpret") - } - } - }) - .ok(); - } - - server::Event::UdpMessage(buf) => { - str::from_utf8(&buf) - .map(|val| { - debug!("statsd - {}", val); - match metric::Metric::parse_statsd(val) { - Some(metrics) => { - for metric in &metrics { - for backend in &mut backends { - backend.deliver(metric.clone()); - } - } - Ok(metrics.len()) - } - None => { - println!("BAD PACKET: {:?}", val); - Err("could not interpret") - } - } - }) - .ok(); - } + let arc_res = Arc::new(result); + for backend in &mut backends { + backend.send(arc_res.clone()).expect("oops, couldn't send!"); } } } diff --git a/src/metric.rs b/src/metric.rs index 561fcaa1..4dcd78dd 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -1,6 +1,6 @@ use metrics::*; use chrono::{UTC, DateTime}; -use std::rc::Rc; +use std::sync::Arc; use string_cache::Atom; #[derive(PartialEq, Debug)] @@ -54,11 +54,11 @@ impl Metric { /// /// Multiple metrics can be sent in a single UDP packet /// separated by newlines. - pub fn parse_statsd(source: &str) -> Option>> { + pub fn parse_statsd(source: &str) -> Option>> { statsd::parse_MetricPayload(source).ok() } - pub fn parse_graphite(source: &str) -> Option>> { + pub fn parse_graphite(source: &str) -> Option>> { graphite::parse_MetricPayload(source).ok() } } diff --git a/src/metrics/graphite.lalrpop b/src/metrics/graphite.lalrpop index 11a508c9..d79b15e8 100644 --- a/src/metrics/graphite.lalrpop +++ b/src/metrics/graphite.lalrpop @@ -1,6 +1,6 @@ use std::str::FromStr; use metric::{MetricKind, Metric}; -use std::rc::Rc; +use std::sync::Arc; use string_cache::Atom; use chrono::UTC; use chrono::TimeZone; @@ -11,8 +11,8 @@ MetricName: Atom = => Atom::from(s); Num: f64 = => f64::from_str(s).unwrap(); -MetricLine: Rc = { - => Rc::new(Metric::new_with_time(name, val, Some(UTC.timestamp(t as i64, 0)), MetricKind::Raw)) +MetricLine: Arc = { + => Arc::new(Metric::new_with_time(name, val, Some(UTC.timestamp(t as i64, 0)), MetricKind::Raw)) }; -pub MetricPayload: Vec> = { ()+ }; +pub MetricPayload: Vec> = { ()+ }; diff --git a/src/metrics/statsd.lalrpop b/src/metrics/statsd.lalrpop index 54b1c42a..21ea930d 100644 --- a/src/metrics/statsd.lalrpop +++ b/src/metrics/statsd.lalrpop @@ -1,6 +1,6 @@ use std::str::FromStr; use metric::{MetricKind, Metric}; -use std::rc::Rc; +use std::sync::Arc; use string_cache::Atom; grammar; @@ -17,8 +17,8 @@ MetricName: Atom = => Atom::from(s); Num: f64 = => f64::from_str(s).unwrap(); -MetricLine: Rc = { - ":" "|" => Rc::new(Metric::new(name, val, k)) +MetricLine: Arc = { + ":" "|" => Arc::new(Metric::new(name, val, k)) }; -pub MetricPayload: Vec> = { ()+ }; +pub MetricPayload: Vec> = { ()+ }; diff --git a/src/server.rs b/src/server.rs index 1def5b38..f51dac33 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,10 +6,13 @@ use std::time::Duration; use std::thread; use std::io::prelude::*; use std::io::BufReader; +use std::sync::Arc; +use std::str; +use metric; pub enum Event { - UdpMessage(Vec), - TcpMessage(Vec), + Statsd(Vec>), + Graphite(Vec>), TimerFlush, } @@ -20,12 +23,23 @@ pub fn udp_server_v6(chan: Sender, port: u16) { info!("statsd server started on ::1 {}", port); let mut buf = [0; 8192]; loop { - let (len, _) = match socket.recv_from(&mut buf) { + match socket.recv_from(&mut buf) { Ok(r) => r, Err(_) => panic!("Could not read UDP socket."), }; - let bytes = Vec::from(&buf[..len]); - chan.send(Event::UdpMessage(bytes)).expect("[UDP] Unable to write into chan!"); + str::from_utf8(&buf) + .map(|val| { + trace!("statsd - {}", val); + match metric::Metric::parse_statsd(val) { + Some(metrics) => { + chan.send(Event::Statsd(metrics)).unwrap(); + } + None => { + error!("BAD PACKET: {:?}", val); + } + } + }) + .ok(); } } @@ -35,12 +49,23 @@ pub fn udp_server_v4(chan: Sender, port: u16) { info!("statsd server started on 127.0.0.1:{}", port); let mut buf = [0; 8192]; loop { - let (len, _) = match socket.recv_from(&mut buf) { + match socket.recv_from(&mut buf) { Ok(r) => r, Err(_) => panic!("Could not read UDP socket."), }; - let bytes = Vec::from(&buf[..len]); - chan.send(Event::UdpMessage(bytes)).expect("[UDP] Unable to write into chan!"); + str::from_utf8(&buf) + .map(|val| { + trace!("statsd - {}", val); + match metric::Metric::parse_statsd(val) { + Some(metrics) => { + chan.send(Event::Statsd(metrics)).unwrap(); + } + None => { + error!("BAD PACKET: {:?}", val); + } + } + }) + .ok(); } } @@ -79,8 +104,20 @@ fn handle_client(chan: Sender, stream: TcpStream) { for line in line_reader.lines() { match line { Ok(line) => { - chan.send(Event::TcpMessage(line.into_bytes())) - .expect("[TCP] Unable to write to chan!"); + let buf = line.into_bytes(); + str::from_utf8(&buf) + .map(|val| { + debug!("graphite - {}", val); + match metric::Metric::parse_graphite(val) { + Some(metrics) => { + chan.send(Event::Graphite(metrics)).unwrap(); + } + None => { + error!("BAD PACKET: {:?}", val); + } + } + }) + .ok(); } Err(_) => break, }