Skip to content

Commit

Permalink
Push message parsing into source threads (#46)
Browse files Browse the repository at this point in the history
Cernan needs to evolve along the lines of the main thread
simply being a very fast router. This becomes especially
pressing when routing from arbitrary source to arbitrary
sink will be available to the end-user. (See #39.)

This commit does not remove the burden of flushing each
backend from the main thread but it does remove the
burden of parsing _and_ flushing. Rc sync has been
changed to Arc to allow a vector of metrics to be
sendable.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
blt authored Jul 19, 2016
1 parent 36fc9af commit caeded6
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 95 deletions.
4 changes: 2 additions & 2 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use metric::Metric;
use config::Args;

use regex::Regex;
use std::rc::Rc;
use std::sync::Arc;

/// A 'backend' is a sink for metrics.
pub trait Backend {
fn flush(&mut self) -> ();
fn deliver(&mut self, point: Rc<Metric>) -> ();
fn deliver(&mut self, point: Arc<Metric>) -> ();
}

/// Creates the collection of backends based on the paraemeters
Expand Down
4 changes: 2 additions & 2 deletions src/backends/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use backend::Backend;
use buckets::Buckets;
use metric::Metric;
use chrono;
use std::rc::Rc;
use std::sync::Arc;

pub struct Console {
aggrs: Buckets,
Expand All @@ -28,7 +28,7 @@ fn fmt_line(key: &str, value: &f64) {


impl Backend for Console {
fn deliver(&mut self, point: Rc<Metric>) {
fn deliver(&mut self, point: Arc<Metric>) {
debug!("console deliver");
self.aggrs.add(&point);
}
Expand Down
18 changes: 9 additions & 9 deletions src/backends/librato.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use url;
use chrono;
use mime::Mime;
use metric::Metric;
use std::rc::Rc;
use std::sync::Arc;

pub struct Librato {
username: String,
Expand Down Expand Up @@ -123,7 +123,7 @@ impl Librato {
}

impl Backend for Librato {
fn deliver(&mut self, point: Rc<Metric>) {
fn deliver(&mut self, point: Arc<Metric>) {
debug!("librato deliver");
self.aggrs.add(&point);
}
Expand Down Expand Up @@ -152,23 +152,23 @@ impl Backend for Librato {
mod test {
use metric::{Metric, MetricKind};
use backend::Backend;
use std::rc::Rc;
use std::sync::Arc;
use string_cache::Atom;
use super::*;

#[test]
fn test_format_librato_buckets_no_timers() {
let mut librato = Librato::new("user", "token", "test-src", "http://librato.example.com");
librato.deliver(Rc::new(Metric::new(Atom::from("test.counter"),
librato.deliver(Arc::new(Metric::new(Atom::from("test.counter"),
1.0,
MetricKind::Counter(1.0))));
librato.deliver(Rc::new(Metric::new(Atom::from("test.gauge"), 3.211, MetricKind::Gauge)));
librato.deliver(Rc::new(Metric::new(Atom::from("src-test.gauge.2"),
librato.deliver(Arc::new(Metric::new(Atom::from("test.gauge"), 3.211, MetricKind::Gauge)));
librato.deliver(Arc::new(Metric::new(Atom::from("src-test.gauge.2"),
3.211,
MetricKind::Gauge)));
librato.deliver(Rc::new(Metric::new(Atom::from("test.timer"), 12.101, MetricKind::Timer)));
librato.deliver(Rc::new(Metric::new(Atom::from("test.timer"), 1.101, MetricKind::Timer)));
librato.deliver(Rc::new(Metric::new(Atom::from("test.timer"), 3.101, MetricKind::Timer)));
librato.deliver(Arc::new(Metric::new(Atom::from("test.timer"), 12.101, MetricKind::Timer)));
librato.deliver(Arc::new(Metric::new(Atom::from("test.timer"), 1.101, MetricKind::Timer)));
librato.deliver(Arc::new(Metric::new(Atom::from("test.timer"), 3.101, MetricKind::Timer)));
let result = librato.format_stats(Some(10101));

println!("{:?}", result);
Expand Down
28 changes: 14 additions & 14 deletions src/backends/wavefront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use chrono;
use metric::Metric;
use buckets::Buckets;
use backend::Backend;
use std::rc::Rc;
use std::sync::Arc;

pub struct Wavefront {
addr: SocketAddrV4,
Expand All @@ -15,7 +15,7 @@ pub struct Wavefront {
// The wavefront implementation keeps an aggregate, depricated, and ships
// exact points. The aggregate is 'aggrs' and the exact points is 'points'.
aggrs: Buckets,
points: Vec<Rc<Metric>>,
points: Vec<Arc<Metric>>,
}

impl Wavefront {
Expand Down Expand Up @@ -124,7 +124,7 @@ impl Backend for Wavefront {
}
}

fn deliver(&mut self, point: Rc<Metric>) {
fn deliver(&mut self, point: Arc<Metric>) {
debug!("wavefront deliver");
if self.mk_aggrs {
self.aggrs.add(&point);
Expand All @@ -138,31 +138,31 @@ mod test {
use metric::{Metric, MetricKind};
use backend::Backend;
use chrono::{UTC, TimeZone};
use std::rc::Rc;
use std::sync::Arc;
use string_cache::Atom;
use super::*;

#[test]
fn test_format_wavefront() {
let mut wavefront = Wavefront::new("127.0.0.1", 2003, true, "source=test-src".to_string());
let dt = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 12);
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.counter"),
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.counter"),
1.0,
Some(dt),
MetricKind::Counter(1.0))));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.gauge"),
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.gauge"),
3.211,
Some(dt),
MetricKind::Gauge)));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"),
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
12.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"),
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
1.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"),
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
3.101,
Some(dt),
MetricKind::Timer)));
Expand Down Expand Up @@ -190,23 +190,23 @@ mod test {
fn test_format_wavefront_skip_aggrs() {
let mut wavefront = Wavefront::new("127.0.0.1", 2003, false, "source=test-src".to_string());
let dt = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 12);
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.counter"),
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.counter"),
1.0,
Some(dt),
MetricKind::Counter(1.0))));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.gauge"),
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.gauge"),
3.211,
Some(dt),
MetricKind::Gauge)));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"),
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
12.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"),
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
1.101,
Some(dt),
MetricKind::Timer)));
wavefront.deliver(Rc::new(Metric::new_with_time(Atom::from("test.timer"),
wavefront.deliver(Arc::new(Metric::new_with_time(Atom::from("test.timer"),
3.101,
Some(dt),
MetricKind::Timer)));
Expand Down
54 changes: 13 additions & 41 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ extern crate fern;
#[macro_use]
extern crate log;

use std::str;
use std::sync::mpsc::channel;
use std::thread;
use chrono::UTC;
Expand Down Expand Up @@ -109,49 +108,22 @@ fn main() {
}
}

server::Event::TcpMessage(buf) => {
str::from_utf8(&buf)
.map(|val| {
debug!("graphite - {}", val);
match metric::Metric::parse_graphite(val) {
Some(metrics) => {
for metric in &metrics {
for backend in &mut backends {
backend.deliver(metric.clone());
}
}
Ok(metrics.len())
}
None => {
error!("BAD PACKET: {:?}", val);
Err("could not interpret")
}
}
})
.ok();
server::Event::Graphite(metrics) => {
for metric in &metrics {
for backend in &mut backends {
backend.deliver(metric.clone());
}
}
}

server::Event::UdpMessage(buf) => {
str::from_utf8(&buf)
.map(|val| {
debug!("statsd - {}", val);
match metric::Metric::parse_statsd(val) {
Some(metrics) => {
for metric in &metrics {
for backend in &mut backends {
backend.deliver(metric.clone());
}
}
Ok(metrics.len())
}
None => {
println!("BAD PACKET: {:?}", val);
Err("could not interpret")
}
}
})
.ok();
server::Event::Statsd(metrics) => {
for metric in &metrics {
for backend in &mut backends {
backend.deliver(metric.clone());
}
}
}

}
}
}
6 changes: 3 additions & 3 deletions src/metric.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use metrics::*;
use chrono::{UTC, DateTime};
use std::rc::Rc;
use std::sync::Arc;
use string_cache::Atom;

#[derive(PartialEq, Debug)]
Expand Down Expand Up @@ -54,11 +54,11 @@ impl Metric {
///
/// Multiple metrics can be sent in a single UDP packet
/// separated by newlines.
pub fn parse_statsd(source: &str) -> Option<Vec<Rc<Metric>>> {
pub fn parse_statsd(source: &str) -> Option<Vec<Arc<Metric>>> {
statsd::parse_MetricPayload(source).ok()
}

pub fn parse_graphite(source: &str) -> Option<Vec<Rc<Metric>>> {
pub fn parse_graphite(source: &str) -> Option<Vec<Arc<Metric>>> {
graphite::parse_MetricPayload(source).ok()
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/metrics/graphite.lalrpop
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::str::FromStr;
use metric::{MetricKind, Metric};
use std::rc::Rc;
use std::sync::Arc;
use string_cache::Atom;
use chrono::UTC;
use chrono::TimeZone;
Expand All @@ -11,8 +11,8 @@ MetricName: Atom = <s:r"[A-Za-z][_A-Z0-9a-z+/=\.-]*"> => Atom::from(s);

Num: f64 = <s:r"[-+]?[0-9]+\.?[0-9]*"> => f64::from_str(s).unwrap();

MetricLine: Rc<Metric> = {
<name:MetricName> <val:Num> <t:Num> => Rc::new(Metric::new_with_time(name, val, Some(UTC.timestamp(t as i64, 0)), MetricKind::Raw))
MetricLine: Arc<Metric> = {
<name:MetricName> <val:Num> <t:Num> => Arc::new(Metric::new_with_time(name, val, Some(UTC.timestamp(t as i64, 0)), MetricKind::Raw))
};

pub MetricPayload: Vec<Rc<Metric>> = { (<MetricLine>)+ };
pub MetricPayload: Vec<Arc<Metric>> = { (<MetricLine>)+ };
8 changes: 4 additions & 4 deletions src/metrics/statsd.lalrpop
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::str::FromStr;
use metric::{MetricKind, Metric};
use std::rc::Rc;
use std::sync::Arc;
use string_cache::Atom;

grammar;
Expand All @@ -17,8 +17,8 @@ MetricName: Atom = <s:r"[A-Za-z][_A-Z0-9a-z+/=\.-]*"> => Atom::from(s);

Num: f64 = <s:r"[-+]?[0-9]+\.?[0-9]*"> => f64::from_str(s).unwrap();

MetricLine: Rc<Metric> = {
<name:MetricName> ":" <val:Num> "|" <k:Kind> => Rc::new(Metric::new(name, val, k))
MetricLine: Arc<Metric> = {
<name:MetricName> ":" <val:Num> "|" <k:Kind> => Arc::new(Metric::new(name, val, k))
};

pub MetricPayload: Vec<Rc<Metric>> = { (<MetricLine>)+ };
pub MetricPayload: Vec<Arc<Metric>> = { (<MetricLine>)+ };
Loading

0 comments on commit caeded6

Please sign in to comment.