Skip to content

Commit

Permalink
Backpressure and better file watch (#146)
Browse files Browse the repository at this point in the history
* Cut of explicit backpressure notion between in the sinks

Previously we'd go as fast as possible in sinks pulling values off
the channels. This was a silly goof. It's entirely likely that the
far side will not be ready to accept what we've buffered and we'll
grow the buffer. This commit caps that buffer growth.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Roll our own file-watchers

It does not seem that notify is tested at high load, nor, I guess
is inotify used in this fashion really appropriate. What we care
about is reading large blocks of text off-disk with minimal latency,
not necessarily knowing when every little thing happens.

The good news is the codebase is now more direct and has the same
behaviour cross-platform. All we require is that the filesystem
support the 'created' metadata for files.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Properly handle errors

This commit ensures that we correctly handle absent files and other
such unfortunate circumstances.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Use std_dev, std_ino to id a file

This ought to be the most portable method for determining if the
file that we have open is in fact the file that we want to have
open. All other methods used so far are not portable enough.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Make the firehose configurable

This commit makes the knobs present in the firehose sink available
through configuration.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Update documentation

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
blt authored Nov 19, 2016
1 parent 9383e88 commit 660773e
Show file tree
Hide file tree
Showing 14 changed files with 380 additions and 276 deletions.
162 changes: 7 additions & 155 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ clap = "2.10.0"
fern = "0.3.5"
flate2 = "0.2"
fnv = "1.0.5"
glob = "0.2.11"
lazy_static = "0.2.1"
log = "0.3.6"
notify = "2.6.3"
quantiles = "0.2.0"
rand = "0.3"
regex = "0.1"
Expand Down
59 changes: 57 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,38 @@ port = 8125
port = 2003
```

The `federation_receiver` interface is opt-in. It is discussed in the section
[Federation](#federation).
In addition to network ports, cernan is able to ingest log files. This is
configured in a different manner than the above as there may be many different
file ingesters.

```
[[file]]
path = "/var/log/upstart/very_important.log"
```

Will follow a single file.

```
[[file]]
path = "/var/log/**/*.log"
```

Will follow all files that match the above pattern.

```
[[file]]
path = "/var/log/**/*.log"
[[file]]
path = "/tmp/temporary.log"
```

Will follow all the files that match `/var/log/**/*.log` as well as
`/tmp/temporary.log`. Cernan will pick up new files that match the given
patterns, though it make take up to a minute for cernan to start ingesting them.

The `federation_receiver` interface is opt-in. It is discussed in the
section [Federation](#federation).

## Changing how frequently metrics are output

Expand Down Expand Up @@ -261,6 +291,7 @@ enabled with their default:
[console]
[wavefront]
[null]
[[firehose]]
```
For sinks which support it cernan can report metadata about the
Expand Down Expand Up @@ -329,6 +360,30 @@ port = 1972
host = "127.0.0.1"
```
### firehose
The `firehose` sink accepts logging information and emits it
into
[Amazon Kinesis Firehose](https://aws.amazon.com/kinesis/firehose/). Kinesis
Firehose can be configured to emit into multiple targets.
You may configure multiple firehoses. In the following, two firehoses are
configured:
```
[[firehose]]
delivery_stream = "stream_one"
batch_size = 20

[[firehose]]
delivery_stream = "stream_two"
batch_size = 800
region = "us-east-1"
```
By default, region is equivalent to `us-west-2`. In the above `stream_one`
should exist in `us-west-2` and `stream_two` in `us-east-1`.
## Federation
Sometimes it is desirable to forward some or all of the points inbound to
Expand Down
1 change: 0 additions & 1 deletion example-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,3 @@ bin_width = 1
port = 2878
host = "127.0.0.1"
bin_width = 1

29 changes: 10 additions & 19 deletions src/bin/cernan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use std::thread;
use chrono::UTC;

use cernan::source::Source;
use cernan::sink::Sink;
use cernan::filter::Filter;
use cernan::sink::{Sink, FirehoseConfig};

fn main() {
let args = cernan::config::parse_args();
Expand Down Expand Up @@ -79,12 +78,13 @@ fn main() {
}));
}

for ds in &args.firehose_delivery_streams {
let fh_name = ds.clone();
let (firehose_send, firehose_recv) = cernan::mpsc::channel(&fh_name, &args.data_directory);
for fh in &args.firehosen {
let f: FirehoseConfig = fh.clone();
let ds: String = f.delivery_stream.clone();
let (firehose_send, firehose_recv) = cernan::mpsc::channel(&ds, &args.data_directory);
sends.push(firehose_send);
joins.push(thread::spawn(move || {
cernan::sink::Firehose::new(&fh_name).run(firehose_recv);
cernan::sink::Firehose::new(f).run(firehose_recv);
}));
}

Expand All @@ -98,28 +98,19 @@ fn main() {
}))
}

let (statsd_id_send, statsd_id_recv) = cernan::mpsc::channel("statsd_id", &args.data_directory);
let statsd_sends = sends.clone();
if let Some(config) = args.statsd_config {
joins.push(thread::spawn(move || {
cernan::source::Statsd::new(vec![statsd_id_send], config).run();
cernan::source::Statsd::new(statsd_sends, config).run();
}));
}
let statsd_id_sends = sends.clone();
joins.push(thread::spawn(move || {
cernan::filter::Id::new().run(statsd_id_recv, statsd_id_sends)
}));

let (graphite_scrub_send, graphite_scrub_recv) = cernan::mpsc::channel("graphite_scrub",
&args.data_directory);
let graphite_sends = sends.clone();
if let Some(config) = args.graphite_config {
joins.push(thread::spawn(move || {
cernan::source::Graphite::new(vec![graphite_scrub_send], config).run();
cernan::source::Graphite::new(graphite_sends, config).run();
}));
}
let graphite_scrub_sends = sends.clone();
joins.push(thread::spawn(move || {
cernan::filter::CollectdScrub::new().run(graphite_scrub_recv, graphite_scrub_sends)
}));

for config in args.files {
let fp_sends = sends.clone();
Expand Down
88 changes: 77 additions & 11 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use std::io::Read;
use std::str::FromStr;
use metric::TagMap;
use std::path::{Path, PathBuf};
use rusoto::Region;

const VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");

use super::source::{FederationReceiverConfig, GraphiteConfig, StatsdConfig, FileServerConfig};
use super::sink::{WavefrontConfig, ConsoleConfig, FederationTransmitterConfig, NullConfig};
use super::sink::{WavefrontConfig, ConsoleConfig, FederationTransmitterConfig, NullConfig,
FirehoseConfig};

#[derive(Debug)]
pub struct Args {
Expand All @@ -26,9 +28,9 @@ pub struct Args {
pub flush_interval: u64,
pub console: Option<ConsoleConfig>,
pub null: Option<NullConfig>,
pub firehose_delivery_streams: Vec<String>,
pub wavefront: Option<WavefrontConfig>,
pub fed_transmitter: Option<FederationTransmitterConfig>,
pub firehosen: Vec<FirehoseConfig>,
pub files: Vec<FileServerConfig>,
pub verbose: u64,
pub version: String,
Expand Down Expand Up @@ -119,7 +121,7 @@ pub fn parse_args() -> Args {
console: console,
null: null,
wavefront: wavefront,
firehose_delivery_streams: Vec::default(),
firehosen: Vec::default(),
fed_transmitter: None,
files: Default::default(),
verbose: verb,
Expand Down Expand Up @@ -219,12 +221,41 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args {
}
}

let mut fh_delivery_streams: Vec<String> = Vec::new();
let mut firehosen: Vec<FirehoseConfig> = Vec::new();
if let Some(array) = value.lookup("firehose") {
if let Some(tbl) = array.as_slice() {
for val in tbl {
match val.lookup("delivery_stream") {
Some(ds) => fh_delivery_streams.push(ds.as_str().unwrap().to_owned()),
match val.lookup("delivery_stream").map(|x| x.as_str()) {
Some(ds) => {
let bs = val.lookup("batch_size")
.unwrap_or(&Value::Integer(450))
.as_integer()
.map(|i| i as usize)
.unwrap();
let r = val.lookup("region")
.unwrap_or(&Value::String("us-west-2".into()))
.as_str()
.map(|s| match s {
"ap-northeast-1" => Region::ApNortheast1,
"ap-northeast-2" => Region::ApNortheast2,
"ap-south-1" => Region::ApSouth1,
"ap-southeast-1" => Region::ApSoutheast1,
"ap-southeast-2" => Region::ApSoutheast2,
"cn-north-1" => Region::CnNorth1,
"eu-central-1" => Region::EuCentral1,
"eu-west-1" => Region::EuWest1,
"sa-east-1" => Region::SaEast1,
"us-east-1" => Region::UsEast1,
"us-west-1" => Region::UsWest1,
"us-west-2" => Region::UsWest2,
_ => Region::UsWest2,
});
firehosen.push(FirehoseConfig {
delivery_stream: ds.unwrap().to_string(),
batch_size: bs,
region: r.unwrap(),
})
}
None => continue,
}
}
Expand Down Expand Up @@ -315,7 +346,7 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args {
console: console,
null: null,
wavefront: wavefront,
firehose_delivery_streams: fh_delivery_streams,
firehosen: firehosen,
fed_transmitter: fedtrn,
files: files,
verbose: verbosity,
Expand All @@ -328,6 +359,7 @@ mod test {
use super::*;
use metric::TagMap;
use std::path::PathBuf;
use rusoto::Region;

#[test]
fn config_file_default() {
Expand All @@ -343,7 +375,7 @@ mod test {
assert_eq!(args.flush_interval, 60);
assert!(args.console.is_none());
assert!(args.null.is_none());
assert_eq!(true, args.firehose_delivery_streams.is_empty());
assert_eq!(true, args.firehosen.is_empty());
assert!(args.wavefront.is_none());
assert!(args.fed_transmitter.is_none());
assert_eq!(args.verbose, 4);
Expand Down Expand Up @@ -566,8 +598,42 @@ delivery_stream = "stream_two"

let args = parse_config_file(config, 4);

assert_eq!(args.firehose_delivery_streams,
vec!["stream_one", "stream_two"]);
assert_eq!(args.firehosen.len(), 2);

assert_eq!(args.firehosen[0].delivery_stream, "stream_one");
assert_eq!(args.firehosen[0].batch_size, 450);
assert_eq!(args.firehosen[0].region, Region::UsWest2);

assert_eq!(args.firehosen[1].delivery_stream, "stream_two");
assert_eq!(args.firehosen[1].batch_size, 450);
assert_eq!(args.firehosen[1].region, Region::UsWest2);
}

#[test]
fn config_file_firehose_complicated() {
let config = r#"
[[firehose]]
delivery_stream = "stream_one"
batch_size = 20
[[firehose]]
delivery_stream = "stream_two"
batch_size = 800
region = "us-east-1"
"#
.to_string();

let args = parse_config_file(config, 4);

assert_eq!(args.firehosen.len(), 2);

assert_eq!(args.firehosen[0].delivery_stream, "stream_one");
assert_eq!(args.firehosen[0].batch_size, 20);
assert_eq!(args.firehosen[0].region, Region::UsWest2);

assert_eq!(args.firehosen[1].delivery_stream, "stream_two");
assert_eq!(args.firehosen[1].batch_size, 800);
assert_eq!(args.firehosen[1].region, Region::UsEast1);
}

#[test]
Expand Down Expand Up @@ -667,7 +733,7 @@ mission = "from_gad"
assert_eq!(args.flush_interval, 128);
assert!(args.console.is_some());
assert!(args.null.is_some());
assert_eq!(true, args.firehose_delivery_streams.is_empty());
assert!(args.firehosen.is_empty());
assert!(args.wavefront.is_some());
let wavefront = args.wavefront.unwrap();
assert_eq!(wavefront.host, String::from("example.com"));
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ extern crate toml;
extern crate clap;
extern crate chrono;
extern crate fnv;
extern crate notify;
extern crate glob;
extern crate bincode;
extern crate serde;
#[macro_use]
Expand Down
10 changes: 6 additions & 4 deletions src/sink/console.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use sink::Sink;
use sink::{Sink, Valve};
use buckets::Buckets;
use metric::{Metric, LogLine};
use chrono;
Expand Down Expand Up @@ -37,12 +37,14 @@ fn fmt_line(key: &str, value: f64) {


impl Sink for Console {
fn deliver(&mut self, point: Metric) {
fn deliver(&mut self, point: Metric) -> Valve<Metric> {
self.aggrs.add(point);
Valve::Open
}

fn deliver_lines(&mut self, _: Vec<LogLine>) {
// nothing, intentionally
fn deliver_lines(&mut self, _: Vec<LogLine>) -> Valve<Vec<LogLine>> {
// drop the line, intentionally
Valve::Open
}

fn flush(&mut self) {
Expand Down
12 changes: 9 additions & 3 deletions src/sink/federation_transmitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use bincode::SizeLimit;
use bincode::serde::serialize_into;
use metric;
use mpsc;
use sink::Sink;
use sink::{Sink,Valve};
use std::io::Write;
use std::net::{TcpStream, ToSocketAddrs};
use time;
Expand Down Expand Up @@ -43,18 +43,24 @@ impl Default for FederationTransmitter {
}

impl Sink for FederationTransmitter {
fn deliver(&mut self, _: metric::Metric) {
fn deliver(&mut self, _: metric::Metric) -> Valve<metric::Metric> {
// intentionally nothing
Valve::Open
}

fn deliver_lines(&mut self, _: Vec<metric::LogLine>) {
fn deliver_lines(&mut self, _: Vec<metric::LogLine>) -> Valve<Vec<metric::LogLine>> {
// intentionally nothing
Valve::Open
}

fn run(&mut self, mut recv: mpsc::Receiver<metric::Event>) {
let mut attempts = 0;
loop {
time::delay(attempts);
if self.buffer.len() > 10_000 {
attempts += 1;
continue
}
match recv.next() {
None => attempts += 1,
Some(event) => {
Expand Down
Loading

0 comments on commit 660773e

Please sign in to comment.