Skip to content

Commit

Permalink
Correct Overload Issues (#300)
Browse files Browse the repository at this point in the history
* Investigate cernan freeze-up issue

Latest cernan has an issue with freezing up and allocating memory
while it's frozen. I dunno where or why but it seems to freeze the
whole topology when it happens. That is... weird.

This behaviour can be reproduced with the following config:

```
data-directory = "data/"
scripts-directory = "examples/scripts/"

flush-interval = 5

[tags]
source = "cernan"

[sources]
  [sources.internal]
  # forwards = ["filters.delay.two_seconds"]
  forwards = ["sinks.console"]

  [sources.graphite.primary]
  port = 2004
  forwards = ["filters.delay.two_seconds"]

[filters]
  [filters.delay.two_seconds]
  tolerance = 2
  forwards = ["sinks.console"]

[sinks]
  [sinks.console]
```

and the emission script at https://gist.github.com/blt/045160a243741d2390a03762900d4329

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Use atomic usizes in place of report_telemetry

The report_telemetry function created a new Telemetry for every
point recorded. This was very expensive. We now require that
modules which want to do static named telemetry expose atomic
usizes which internal.rs will pull and emit.

This significantly reduces cernan's CPU and memory load in high
self-telemetry situations.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Add further self-telemetry

This commit adds self-telemetry to the delay_filter and more to
the graphite source. cernan.graphite.packet and cernan.graphite.telemetry.received
should be roughly the same when the payloads have a single line
in them but this is not the case.

Worth investigation.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Adjust constraints on telem read/write

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Simplify Buckets iteration, correct missing metrics

This commit corrects the issue with cernan that started off the whole
chain of investigation. I'm not 100% sure what the issue was, other
than that the indexing present in the old implementation of iteration
for buckets::Iter was too complicated and gave wrong results. While
simplifying the implementation to understand it better I also fixed
what ailed cernan.

Welp.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* peg hopper to 0.3.1

Signed-off-by: Tom Santero <tom.santero@postmates.com>

* Repair basic.toml

This configuration file had some pretty drastic changes checked
in to it. The majority of these changes were not intended.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
blt authored and tsantero committed Aug 31, 2017
1 parent 9831b87 commit b1b1a5a
Show file tree
Hide file tree
Showing 20 changed files with 601 additions and 250 deletions.
104 changes: 61 additions & 43 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ doc = false
byteorder = "1.0"
chrono = "0.4"
clap = "2.20.5"
coco = "0.2.0"
fern = "0.4"
elastic = "0.13"
elastic_types = "0.19"
glob = "0.2.11"
hopper = "0.3"
hopper = "0.3.1"
hyper = "0.10"
hyper-native-tls = "0.2"
lazy_static = "0.2.1"
Expand Down
17 changes: 12 additions & 5 deletions examples/configs/basic.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ source = "cernan"
[sources.statsd.primary]
enabled = true
port = 8125
forwards = ["sinks.console", "sinks.null", "sinks.influxdb", "sinks.prometheus"]
forwards = ["sinks.console", "filters.delay.two_seconds"]

[sources.native.primary]
ip = "127.0.0.1"
port = 1972
forwards = ["sinks.console", "sinks.null", "sinks.influxdb", "sinks.prometheus"]
forwards = ["filters.delay.two_seconds"]

[sources.graphite.primary]
enabled = true
port = 2004
forwards = ["filters.collectd_scrub"]
forwards = ["filters.programmable.collectd_scrub"]

[sources.files]
[sources.files.example_log]
Expand All @@ -35,8 +35,16 @@ source = "cernan"
forwards = ["sinks.firehose.stream_two"]

[filters]
[filters.collectd_scrub]
[filters.programmable.collectd_scrub]
script = "collectd_scrub.lua"
forwards = ["filters.delay.two_seconds"]

[filters.delay.two_seconds]
tolerance = 2
forwards = ["filters.flush_boundary.two_seconds"]

[filters.flush_boundary.two_seconds]
tolerance = 2
forwards = ["sinks.console", "sinks.null", "sinks.influxdb", "sinks.prometheus"]

[sinks]
Expand Down Expand Up @@ -64,4 +72,3 @@ source = "cernan"
delivery_stream = "stream_two"
batch_size = 800
region = "us-east-1"

56 changes: 41 additions & 15 deletions src/buckets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,21 @@ impl Default for Buckets {
pub struct Iter<'a> {
buckets: &'a Buckets,
key_index: usize,
value_index: Option<usize>,
value_index: usize,
}

impl<'a> Iterator for Iter<'a> {
type Item = &'a Telemetry;

fn next(&mut self) -> Option<&'a Telemetry> {
while self.key_index < self.buckets.keys.len() {
if let Some(value_index) = self.value_index {
if value_index < self.buckets.values[self.key_index].len() {
let v = &self.buckets.values[self.key_index][value_index];
self.value_index = Some(value_index + 1);
return Some(v);
} else {
self.value_index = None;
self.key_index += 1;
}
} else if !self.buckets.values[self.key_index].is_empty() {
let v = &self.buckets.values[self.key_index][0];
self.value_index = Some(1);
if self.value_index < self.buckets.values[self.key_index].len() {
let v = &self.buckets.values[self.key_index][self.value_index];
self.value_index += 1;
return Some(v);
} else {
return None;
self.value_index = 0;
self.key_index += 1;
}
}
None
Expand Down Expand Up @@ -212,7 +204,7 @@ impl Buckets {
Iter {
buckets: self,
key_index: 0,
value_index: None,
value_index: 0,
}
}
}
Expand Down Expand Up @@ -829,6 +821,40 @@ mod test {
QuickCheck::new().quickcheck(inner as fn(Vec<Telemetry>) -> TestResult);
}

#[test]
fn same_names_in_and_out() {
fn inner(ms: Vec<Telemetry>, loops: usize) -> TestResult {
if loops == 0 {
return TestResult::discard()
}

let mut bucket = Buckets::new(1);

let mut expected_names: HashSet<String> = HashSet::new();
for m in ms.clone() {
expected_names.insert(m.name);
}
if expected_names.len() == 1 {
return TestResult::discard()
}

for _ in 0..loops {
for m in ms.clone() {
bucket.add(m);
}
let mut names: HashSet<String> = HashSet::new();
for telem in bucket.iter() {
names.insert(telem.name.clone());
}
bucket.reset();
assert_eq!(expected_names, names);
}

TestResult::passed()
}
QuickCheck::new().quickcheck(inner as fn(Vec<Telemetry>, usize) -> TestResult);
}

#[test]
fn test_reset_clears_space() {
fn qos_ret(ms: Vec<Telemetry>) -> TestResult {
Expand Down
51 changes: 36 additions & 15 deletions src/filter/delay_filter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,27 @@
//! Filter streams to within a bounded interval of current time.
//!
//! This filter is intended to remove items from the stream which are too old,
//! as defined by the current time and the configured `tolerance`. That is, if
//! for some time `T`, `(T - time::now()).abs() > tolerance` the item associated
//! with `T` will be rejected.
use filter;
use metric;
use source::report_telemetry;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use time;
use util;

lazy_static! {
/// Total number of telemetry rejected for age
pub static ref DELAY_TELEM_REJECT: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of telemetry accepted for age
pub static ref DELAY_TELEM_ACCEPT: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of logline rejected for age
pub static ref DELAY_LOG_REJECT: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of logline accepted for age
pub static ref DELAY_LOG_ACCEPT: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
}

/// Filter streams to within a bounded interval of current time.
///
Expand All @@ -10,7 +30,6 @@ use time;
/// for some time `T`, `(T - time::now()).abs() > tolerance` the item associated
/// with `T` will be rejected.
pub struct DelayFilter {
config_path: String,
tolerance: i64,
}

Expand All @@ -29,39 +48,41 @@ impl DelayFilter {
/// Create a new DelayFilter
pub fn new(config: DelayFilterConfig) -> DelayFilter {
DelayFilter {
config_path: config
.config_path
.expect("must supply config_path for delay filter"),
tolerance: config.tolerance,
}
}
}

impl filter::Filter for DelayFilter {
fn valve_state(&self) -> util::Valve {
util::Valve::Open
}

fn process(
&mut self,
event: metric::Event,
res: &mut Vec<metric::Event>,
) -> Result<(), filter::FilterError> {
match event {
metric::Event::Telemetry(m) => {
report_telemetry(format!("{}.telemetry", self.config_path), 1.0);
if let Some(ref telem) = *m {
let telem = telem.clone();
if (telem.timestamp - time::now()).abs() < self.tolerance {
res.push(metric::Event::new_telemetry(telem));
}
metric::Event::Telemetry(m) => if let Some(ref telem) = *m {
let telem = telem.clone();
if (telem.timestamp - time::now()).abs() < self.tolerance {
DELAY_TELEM_ACCEPT.fetch_add(1, Ordering::Relaxed);
res.push(metric::Event::new_telemetry(telem));
} else {
DELAY_TELEM_REJECT.fetch_add(1, Ordering::Relaxed);
}
}
},
metric::Event::Log(l) => if let Some(ref log) = *l {
report_telemetry(format!("{}.log", self.config_path), 1.0);
let log = log.clone();
if (log.time - time::now()).abs() < self.tolerance {
DELAY_LOG_ACCEPT.fetch_add(1, Ordering::Relaxed);
res.push(metric::Event::new_log(log));
} else {
DELAY_LOG_REJECT.fetch_add(1, Ordering::Relaxed);
}
},
metric::Event::TimerFlush(f) => {
report_telemetry(format!("{}.flush", self.config_path), 1.0);
res.push(metric::Event::TimerFlush(f));
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/filter/flush_boundary_filter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use filter;
use metric;
use std::mem;
use util;

/// Buffer events for a set period of flushes
///
Expand Down Expand Up @@ -51,9 +52,22 @@ impl FlushBoundaryFilter {
holds: Vec::new(),
}
}

/// Count the number of stored events in the filter
pub fn count(&self) -> usize {
self.holds.iter().fold(0, |acc, hld| acc + hld.events.len())
}
}

impl filter::Filter for FlushBoundaryFilter {
fn valve_state(&self) -> util::Valve {
if self.count() > 10_000 {
util::Valve::Closed
} else {
util::Valve::Open
}
}

fn process(
&mut self,
event: metric::Event,
Expand Down
31 changes: 21 additions & 10 deletions src/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use time;
use util;

mod programmable_filter;
mod delay_filter;
pub mod delay_filter;
mod flush_boundary_filter;

pub use self::delay_filter::{DelayFilter, DelayFilterConfig};
Expand Down Expand Up @@ -54,6 +54,10 @@ pub trait Filter {
res: &mut Vec<metric::Event>,
) -> Result<(), FilterError>;

/// Lookup the `Filter` valve state. See `Valve` documentation for more
/// information.
fn valve_state(&self) -> util::Valve;

/// Run the Filter
///
/// It is not expected that most Filters will re-implement this. If this is
Expand All @@ -73,22 +77,29 @@ pub trait Filter {
None => attempts += 1,
Some(event) => {
attempts = 0;
match self.process(event, &mut events) {
Ok(()) => {
match self.valve_state() {
util::Valve::Open => match self.process(event, &mut events) {
Ok(()) => {
for ev in events.drain(..) {
util::send(&mut chans, ev)
}
}
Err(fe) => {
error!(
"Failed to run filter with error: {:?}",
name_in_fe(&fe)
);
let event = event_in_fe(fe);
util::send(&mut chans, event);
Err(fe) => {
error!(
"Failed to run filter with error: {:?}",
name_in_fe(&fe)
);
let event = event_in_fe(fe);
util::send(&mut chans, event);
}
},
util::Valve::Closed => {
attempts += 1;
continue;
}
}
}

}
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/filter/programmable_filter.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use filter;
use libc::c_int;

use lua;
use lua::{Function, State, ThreadStatus};
use lua::ffi::lua_State;
use metric;
use std::path::PathBuf;
use std::sync;
use util;

struct Payload<'a> {
metrics: Vec<Box<metric::Telemetry>>,
Expand Down Expand Up @@ -508,6 +508,10 @@ impl ProgrammableFilter {
}

impl filter::Filter for ProgrammableFilter {
fn valve_state(&self) -> util::Valve {
util::Valve::Open
}

fn process(
&mut self,
event: metric::Event,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ extern crate rusoto_core;
extern crate rusoto_firehose;
extern crate seahash;
extern crate serde;
extern crate coco;
#[macro_use]
extern crate serde_json;
extern crate toml;
Expand Down
4 changes: 3 additions & 1 deletion src/metric/tagmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ where
if self.inner.len() != other.inner.len() {
false
} else {
for (&(ref k, ref v), &(ref o_k, ref o_v)) in self.inner.iter().zip(other.inner.iter()) {
for (&(ref k, ref v), &(ref o_k, ref o_v)) in
self.inner.iter().zip(other.inner.iter())
{
if (k != o_k) || (v != o_v) {
return false;
}
Expand Down
Loading

0 comments on commit b1b1a5a

Please sign in to comment.