Skip to content

Commit

Permalink
Bump to 0.8.6
Browse files Browse the repository at this point in the history
This release incorporates improvements to the postmates/quantiles
library that speed up the processing of Summaries in Prometheus
sink, resolving (or greatly reducing) exposition failures in that
sink. An InfluxDB formatting bug has also been repaired, which is
fancy.

  * 5028ed9 :: consistency: move initialization out of block (#358)
  * 6197310 :: Fix InfluxDB format bug (#363)
  * bbf98a5 :: Target quantiles 0.7 (#365)

This release contains the first contributions of @doubleyou and
@ibotty! :D

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
Brian L. Troutwine committed Dec 12, 2017
1 parent bbf98a5 commit 56ec77b
Show file tree
Hide file tree
Showing 18 changed files with 99 additions and 108 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license = "MIT"
name = "cernan"
readme = "README.md"
repository = "https://github.com/postmates/cernan"
version = "0.8.6-pre"
version = "0.8.6"

[[bin]]
name = "cernan"
Expand Down
7 changes: 2 additions & 5 deletions src/bin/cernan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ fn populate_forwards(
None => {
error!(
"Unable to fulfill configured forward: {} => {}",
config_path,
fwd
config_path, fwd
);
process::exit(0);
}
Expand Down Expand Up @@ -272,16 +271,14 @@ fn main() {
if config_topology.get(forward).is_none() {
error!(
"Unable to fulfill configured forward: {} => {}",
key,
forward,
key, forward,
);
process::exit(1);
}
}
}
}


// SINKS
//
if let Some(config) = mem::replace(&mut args.null, None) {
Expand Down
6 changes: 3 additions & 3 deletions src/buckets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ impl Buckets {
/// bucket.add(metric);
/// ```
pub fn add(&mut self, value: Telemetry) {
let hsh = match self.keys.binary_search_by(
|probe| probe.partial_cmp(&value.name_tag_hash()).unwrap(),
) {
let hsh = match self.keys.binary_search_by(|probe| {
probe.partial_cmp(&value.name_tag_hash()).unwrap()
}) {
Ok(hsh_idx) => self.values.index_mut(hsh_idx),
Err(hsh_idx) => {
self.keys.insert(hsh_idx, value.name_tag_hash());
Expand Down
53 changes: 22 additions & 31 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,7 @@ pub fn parse_config_file(buffer: &str, verbosity: u64) -> Args {

args.flush_interval = value
.get("flush-interval")
.map(|fi| {
fi.as_integer().expect("could not parse flush-interval") as u64
})
.map(|fi| fi.as_integer().expect("could not parse flush-interval") as u64)
.unwrap_or(args.flush_interval);

let global_tags: TagMap = match value.get("tags") {
Expand Down Expand Up @@ -375,10 +373,8 @@ pub fn parse_config_file(buffer: &str, verbosity: u64) -> Args {
if let Some(sinks) = value.get("sinks") {
let sinks = sinks.as_table().expect("sinks must be in table format");

args.null = sinks.get("null").map(|_| {
NullConfig {
config_path: "sinks.null".to_string(),
}
args.null = sinks.get("null").map(|_| NullConfig {
config_path: "sinks.null".to_string(),
});

args.console = sinks.get("console").map(|snk| {
Expand Down Expand Up @@ -409,25 +405,21 @@ pub fn parse_config_file(buffer: &str, verbosity: u64) -> Args {

res.pad_control = snk.get("padding")
.and_then(|t| t.as_table())
.map(|tbl| {
PadControl {
set: tbl.get("set").map_or(false, |v| {
v.as_bool()
.expect("could not parse padding.set as boolean")
}),
sum: tbl.get("sum").map_or(false, |v| {
v.as_bool()
.expect("could not parse padding.sum as boolean")
}),
summarize: tbl.get("summarize").map_or(false, |v| {
v.as_bool()
.expect("could not parse padding.summarize as boolean")
}),
histogram: tbl.get("histogram").map_or(false, |v| {
v.as_bool()
.expect("could not parse padding.histogram as boolean")
}),
}
.map(|tbl| PadControl {
set: tbl.get("set").map_or(false, |v| {
v.as_bool().expect("could not parse padding.set as boolean")
}),
sum: tbl.get("sum").map_or(false, |v| {
v.as_bool().expect("could not parse padding.sum as boolean")
}),
summarize: tbl.get("summarize").map_or(false, |v| {
v.as_bool()
.expect("could not parse padding.summarize as boolean")
}),
histogram: tbl.get("histogram").map_or(false, |v| {
v.as_bool()
.expect("could not parse padding.histogram as boolean")
}),
})
.unwrap_or(res.pad_control);

Expand Down Expand Up @@ -499,9 +491,7 @@ pub fn parse_config_file(buffer: &str, verbosity: u64) -> Args {
.unwrap_or(res.port);

res.secure = snk.get("secure")
.map(|p| {
p.as_bool().expect("could not parse sinks.influxdb.secure")
})
.map(|p| p.as_bool().expect("could not parse sinks.influxdb.secure"))
.unwrap_or(res.secure);

res.host = snk.get("host")
Expand Down Expand Up @@ -676,8 +666,9 @@ pub fn parse_config_file(buffer: &str, verbosity: u64) -> Args {
let mut res = FirehoseConfig::default();
res.config_path = Some(format!("sinks.firehose.{}", name));

let ds = tbl.get("delivery_stream")
.map(|x| x.as_str().expect("delivery_stream must be a string"));
let ds = tbl.get("delivery_stream").map(|x| {
x.as_str().expect("delivery_stream must be a string")
});
if !ds.is_some() {
continue;
}
Expand Down
14 changes: 8 additions & 6 deletions src/filter/programmable_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,10 @@ impl filter::Filter for ProgrammableFilter {
.unwrap();
let fail =
metric::Event::Telemetry(sync::Arc::new(Some(filter_telem)));
return Err(
filter::FilterError::NoSuchFunction("process_metric", fail),
);
return Err(filter::FilterError::NoSuchFunction(
"process_metric",
fail,
));
}

let mut pyld = Payload::from_metric(
Expand Down Expand Up @@ -610,9 +611,10 @@ impl filter::Filter for ProgrammableFilter {
.harden()
.unwrap(),
);
return Err(
filter::FilterError::NoSuchFunction("process_log", fail),
);
return Err(filter::FilterError::NoSuchFunction(
"process_log",
fail,
));
}

let mut pyld = Payload::from_log(
Expand Down
49 changes: 27 additions & 22 deletions src/metric/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub enum Value {
/// DO NOT USE - PUBLIC FOR TESTING ONLY
Histogram(Histogram<f64>),
/// DO NOT USE - PUBLIC FOR TESTING ONLY
Quantiles{ ckms: CKMS<f64>, sum: f64 },
Quantiles { ckms: CKMS<f64>, sum: f64 },
}

pub struct SoftTelemetry {
Expand Down Expand Up @@ -102,10 +102,10 @@ impl Add for Value {
y.insert(x);
Value::Histogram(y)
}
Value::Quantiles{ mut ckms, mut sum } => {
Value::Quantiles { mut ckms, mut sum } => {
ckms.insert(x);
sum += x;
Value::Quantiles{ ckms, sum }
Value::Quantiles { ckms, sum }
}
},
Value::Histogram(mut x) => match rhs {
Expand All @@ -115,9 +115,9 @@ impl Add for Value {
x += y;
Value::Histogram(x)
}
Value::Quantiles{ ckms, sum } => Value::Quantiles{ ckms, sum },
Value::Quantiles { ckms, sum } => Value::Quantiles { ckms, sum },
},
Value::Quantiles{ mut ckms, mut sum } => match rhs {
Value::Quantiles { mut ckms, mut sum } => match rhs {
Value::Set(y) => Value::Set(y),
Value::Sum(y) => Value::Sum(sum + y),
Value::Histogram(mut y) => {
Expand All @@ -126,10 +126,13 @@ impl Add for Value {
}
Value::Histogram(y)
}
Value::Quantiles{ ckms: rhs, sum: rhs_sum } => {
Value::Quantiles {
ckms: rhs,
sum: rhs_sum,
} => {
ckms += rhs;
sum += rhs_sum;
Value::Quantiles{ ckms, sum }
Value::Quantiles { ckms, sum }
}
},
}
Expand All @@ -143,7 +146,7 @@ impl Value {
(&Value::Set(_), &Value::Set(_)) => true,
(&Value::Sum(_), &Value::Sum(_)) => true,
(&Value::Histogram(_), &Value::Histogram(_)) => true,
(&Value::Quantiles{ .. }, &Value::Quantiles{ .. }) => true,
(&Value::Quantiles { .. }, &Value::Quantiles { .. }) => true,
_ => false,
}
}
Expand Down Expand Up @@ -180,7 +183,7 @@ impl Value {
match *self {
Value::Set(_) | Value::Sum(_) => 1,
Value::Histogram(ref histo) => histo.count(),
Value::Quantiles{ ref ckms, .. } => ckms.count(),
Value::Quantiles { ref ckms, .. } => ckms.count(),
}
}

Expand All @@ -203,7 +206,7 @@ impl Value {
Value::Set(_) => AggregationMethod::Set,
Value::Sum(_) => AggregationMethod::Sum,
Value::Histogram(_) => AggregationMethod::Histogram,
Value::Quantiles{ .. } => AggregationMethod::Summarize,
Value::Quantiles { .. } => AggregationMethod::Summarize,
}
}

Expand Down Expand Up @@ -256,12 +259,12 @@ impl Default for Telemetry {
}

impl SoftTelemetry {
/// Set the override sample sum
/// Set the override sample sum
pub fn sample_sum(mut self, sum: f64) -> SoftTelemetry {
self.override_sample_sum = Some(sum);
self
}

/// Set the override count
pub fn count(mut self, count: u64) -> SoftTelemetry {
self.override_count = Some(count);
Expand Down Expand Up @@ -681,7 +684,10 @@ impl Telemetry {
Some(Value::Histogram(ref mut histo)) => {
histo.insert(value);
}
Some(Value::Quantiles{ ref mut ckms, ref mut sum }) => {
Some(Value::Quantiles {
ref mut ckms,
ref mut sum,
}) => {
ckms.insert(value);
*sum += value;
}
Expand Down Expand Up @@ -731,7 +737,7 @@ impl Telemetry {
match self.value {
Some(Value::Set(_)) | Some(Value::Sum(_)) | None => None,
Some(Value::Histogram(ref histo)) => histo.sum(),
Some(Value::Quantiles{ sum, .. }) => {
Some(Value::Quantiles { sum, .. }) => {
if self.override_sample_sum.is_some() {
self.override_sample_sum
} else {
Expand All @@ -745,12 +751,10 @@ impl Telemetry {
pub fn count(&self) -> usize {
if let Some(cnt) = self.override_count {
cnt as usize
} else if let Some(ref v) = self.value {
v.count()
} else {
if let Some(ref v) = self.value {
v.count()
} else {
0
}
0
}
}

Expand All @@ -770,7 +774,7 @@ impl Telemetry {
match self.value {
Some(Value::Set(x)) => Some(x),
Some(Value::Sum(x)) => Some(x),
Some(Value::Quantiles{ ref ckms, .. }) => ckms.query(1.0).map(|x| x.1),
Some(Value::Quantiles { ref ckms, .. }) => ckms.query(1.0).map(|x| x.1),
Some(Value::Histogram(ref histo)) => histo.sum(),
None => unreachable!(),
}
Expand All @@ -785,7 +789,7 @@ impl Telemetry {
pub fn samples(&self) -> Vec<f64> {
match self.value {
Some(Value::Set(x)) | Some(Value::Sum(x)) => vec![x],
Some(Value::Quantiles{ ref ckms, .. }) => ckms.clone().into_vec(),
Some(Value::Quantiles { ref ckms, .. }) => ckms.clone().into_vec(),
Some(Value::Histogram(ref histo)) => histo
.clone()
.into_vec()
Expand Down Expand Up @@ -1000,7 +1004,8 @@ mod tests {
AggregationMethod::Histogram => mb.kind(AggregationMethod::Histogram)
.bounds(vec![1.0, 10.0, 100.0, 1000.0]),
};
mb = if persist { mb.persist(true) } else { mb };
mb =
if persist { mb.persist(true) } else { mb };
mb.harden().unwrap()
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/protocols/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ pub fn parse_statsd(
Ok(f) => f,
Err(_) => return false,
};
let mut metric = sync::Arc::make_mut(
&mut sync::Arc::clone(metric),
).take()
let mut metric = sync::Arc::make_mut(&mut sync::Arc::clone(
metric,
)).take()
.unwrap()
.thaw();
metric = metric.name(name);
Expand Down Expand Up @@ -457,7 +457,6 @@ mod tests {
assert!(!parse_statsd("", &mut res, &metric, &config));
}


#[test]
fn test_metric_parse_invalid_no_value() {
let metric = sync::Arc::new(Some(Telemetry::default()));
Expand Down
Loading

0 comments on commit 56ec77b

Please sign in to comment.