Skip to content

Commit

Permalink
Bump to 0.7.9
Browse files Browse the repository at this point in the history
This release has two major changes: improvements to the elasticsearch
sink thanks to the careful shepherding of @benley and self-telemetry
updates to the elasticsearch sink. The later is driven by Postmates'
desire to depricate AWS Firehose.

  * 916841a :: elasticsearch improvements, self-telemetry
  * 9bbd951 :: prometheus sink obeys all aggregations

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
Brian L. Troutwine committed Sep 28, 2017
1 parent 916841a commit fa9ba9a
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license = "MIT"
name = "cernan"
readme = "README.md"
repository = "https://github.com/postmates/cernan"
version = "0.7.9-pre"
version = "0.7.9"

[[bin]]
name = "cernan"
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,8 @@ pub fn parse_config_file(buffer: &str, verbosity: u64) -> Args {
res.retain_limit = snk.get("retain_limit")
.map(|p| {
p.as_integer()
.expect("could not parse sinks.prometheus.retain_limit") as
usize
.expect("could not parse sinks.prometheus.retain_limit")
as usize
})
.unwrap_or(res.retain_limit);

Expand Down
105 changes: 67 additions & 38 deletions src/sink/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ lazy_static! {
pub static ref ELASTIC_RECORDS_TOTAL_DELIVERED: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total records that failed to be delivered due to error
pub static ref ELASTIC_RECORDS_TOTAL_FAILED: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Unknown error occurred during attempted flush
/// Unknown error occurred during attempted flush
pub static ref ELASTIC_ERROR_UNKNOWN: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of index bulk action errors
pub static ref ELASTIC_BULK_ACTION_INDEX_ERR: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
Expand All @@ -32,21 +32,21 @@ lazy_static! {
/// Total number of delete bulk action errors
pub static ref ELASTIC_BULK_ACTION_DELETE_ERR: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));

/// Total number of api errors due to index not found
/// Total number of api errors due to index not found
pub static ref ELASTIC_ERROR_API_INDEX_NOT_FOUND: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of api errors due to parsing
/// Total number of api errors due to parsing
pub static ref ELASTIC_ERROR_API_PARSING: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of api errors due to mapper parsing
pub static ref ELASTIC_ERROR_API_MAPPER_PARSING: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of api errors due to action request validation
pub static ref ELASTIC_ERROR_API_ACTION_REQUEST_VALIDATION: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of api errors due to unknown reasons
/// Total number of api errors due to unknown reasons
pub static ref ELASTIC_ERROR_API_UNKNOWN: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of api errors due to parse respose json errors
pub static ref ELASTIC_ERROR_RESPONSE_JSON: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of api errors due to parse respose io errors
pub static ref ELASTIC_ERROR_RESPONSE_IO: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of api errors due to json parsing
/// Total number of api errors due to json parsing
pub static ref ELASTIC_ERROR_JSON: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of api errors due to reqwest client
pub static ref ELASTIC_ERROR_REQUEST_FAILURE: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -166,16 +166,15 @@ impl Sink for Elasticsearch {
let mut buffer = String::with_capacity(4048);
self.bulk_body(&mut buffer);
debug!("BODY: {:?}", buffer);
let bulk_resp: Result<BulkResponse> = client
.request(BulkRequest::new(buffer))
.send()
.and_then(into_response);

let bulk_resp: Result<BulkResponse> =
client.request(BulkRequest::new(buffer)).send().and_then(into_response);

match bulk_resp {
Ok(bulk) => {
self.buffer.clear();
ELASTIC_RECORDS_DELIVERY.fetch_add(1, Ordering::Relaxed);
ELASTIC_RECORDS_TOTAL_DELIVERED.fetch_add(bulk.items.ok.len(), Ordering::Relaxed);
ELASTIC_RECORDS_TOTAL_DELIVERED
.fetch_add(bulk.items.ok.len(), Ordering::Relaxed);
ELASTIC_RECORDS_TOTAL_FAILED
.fetch_add(bulk.items.err.len(), Ordering::Relaxed);
if !bulk.items.err.is_empty() {
Expand Down Expand Up @@ -213,51 +212,81 @@ impl Sink for Elasticsearch {
use elastic::client::responses::parse::ParseResponseError;
match *response_err {
ParseResponseError::Json(ref e) => {
ELASTIC_ERROR_RESPONSE_JSON.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, Parse Response Error (JSON): {}", e.description());
},
ELASTIC_ERROR_RESPONSE_JSON
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, Parse Response Error (JSON): {}",
e.description()
);
}
ParseResponseError::Io(ref e) => {
ELASTIC_ERROR_RESPONSE_IO.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, Parse Response Error (IO): {}", e.description());
},
ELASTIC_ERROR_RESPONSE_IO
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, Parse Response Error (IO): {}",
e.description()
);
}
}
},
}
ErrorKind::Json(ref e) => {
ELASTIC_ERROR_JSON.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, JSON: {}", e.description());
},
}
ErrorKind::Client(ref e) => {
ELASTIC_ERROR_REQUEST_FAILURE.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, Reqwest Client Error: {}", e.description());
},
error!(
"Unable to write, Reqwest Client Error: {}",
e.description()
);
}
ErrorKind::Msg(ref msg) => {
error!("Unable to write, msg: {}", msg);
},
}
ErrorKind::Api(ref err) => {
use elastic::error::ApiError;
match *err {
ApiError::IndexNotFound { ref index } => {
ELASTIC_ERROR_API_INDEX_NOT_FOUND.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, API Error (Index Not Found): {}", index);
},
ELASTIC_ERROR_API_INDEX_NOT_FOUND
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Index Not Found): {}",
index
);
}
ApiError::Parsing { ref reason, .. } => {
ELASTIC_ERROR_API_PARSING.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, API Error (Parsing): {}", reason);
},
ELASTIC_ERROR_API_PARSING
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Parsing): {}",
reason
);
}
ApiError::MapperParsing { ref reason, .. } => {
ELASTIC_ERROR_API_MAPPER_PARSING.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, API Error (Mapper Parsing): {}", reason);
},
ApiError::ActionRequestValidation { ref reason, .. } => {
ELASTIC_ERROR_API_ACTION_REQUEST_VALIDATION.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, API Error (Action Request Validation): {}", reason);
},
ELASTIC_ERROR_API_MAPPER_PARSING
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Mapper Parsing): {}",
reason
);
}
ApiError::ActionRequestValidation {
ref reason, ..
} => {
ELASTIC_ERROR_API_ACTION_REQUEST_VALIDATION
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Action Request Validation): {}",
reason
);
}
ApiError::Other { .. } => {
ELASTIC_ERROR_API_UNKNOWN.fetch_add(1, Ordering::Relaxed);
ELASTIC_ERROR_API_UNKNOWN
.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, API Error (Unknown)");
},
}
}
},
}
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/sink/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ struct PrometheusAggr {
windowed: HashMap<u64, Vec<metric::Telemetry>, BuildHasherDefault<SeaHasher>>,
}

/// When we store AggregationMethod::Summarize into windows we have to be
/// When we store `AggregationMethod::Summarize` into windows we have to be
/// careful to kep the count and summation of past bins we've dropped
/// off. That's the purpose of WindowedRetainer.
/// off. That's the purpose of `WindowedRetainer`.
#[derive(Clone, Debug)]
struct WindowedRetainer {
historic_count: usize,
Expand Down Expand Up @@ -250,7 +250,7 @@ impl PrometheusAggr {
for t in ts_vec.drain(0..overage) {
let retainer = self.retainers
.entry(id)
.or_insert_with(|| WindowedRetainer::default());
.or_insert_with(WindowedRetainer::default);
retainer.historic_sum += t.samples_sum().unwrap_or(0.0);
retainer.historic_count += t.count();
}
Expand Down Expand Up @@ -339,7 +339,7 @@ impl Prometheus {
pub fn new(config: PrometheusConfig) -> Prometheus {
let aggrs =
sync::Arc::new(sync::Mutex::new(PrometheusAggr::new(config.retain_limit)));
let srv_aggrs = aggrs.clone();
let srv_aggrs = sync::Arc::clone(&aggrs);
let listener = Server::http((config.host.as_str(), config.port))
.unwrap()
.handle_threads(SenderHandler { aggr: srv_aggrs }, 1)
Expand Down Expand Up @@ -614,8 +614,8 @@ fn write_text(
buf.push_str(&value.name);
buf.push_str(" summary\n");
}
let sum_tags = value.tags.clone();
let count_tags = value.tags.clone();
let sum_tags = sync::Arc::clone(&value.tags);
let count_tags = sync::Arc::clone(&value.tags);
for q in &[0.0, 1.0, 0.25, 0.5, 0.75, 0.90, 0.95, 0.99, 0.999] {
buf.push_str(&value.name);
buf.push_str("{quantile=\"");
Expand Down

0 comments on commit fa9ba9a

Please sign in to comment.