Skip to content

Commit

Permalink
Clippy + rustfmt changes
Browse files Browse the repository at this point in the history
This commit just fiddles with the layout of the project via rustfmt
and makes the suggestions that clippy requested a reality.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
Brian L. Troutwine committed Nov 27, 2017
1 parent 6bf406b commit e2cc2f0
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 90 deletions.
12 changes: 6 additions & 6 deletions src/bin/cernan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,15 @@ fn main() {
if let Some(config) = mem::replace(&mut args.null, None) {
let recv = receivers.remove(&config.config_path).unwrap();
joins.push(thread::spawn(move || {
cernan::sink::Null::new(config).run(recv);
cernan::sink::Null::new(&config).run(recv);
}));
}
if let Some(config) = mem::replace(&mut args.console, None) {
let recv = receivers
.remove(&config.config_path.clone().unwrap())
.unwrap();
joins.push(thread::spawn(move || {
cernan::sink::Console::new(config).run(recv);
cernan::sink::Console::new(&config).run(recv);
}));
}
if let Some(config) = mem::replace(&mut args.wavefront, None) {
Expand All @@ -319,15 +319,15 @@ fn main() {
.remove(&config.config_path.clone().unwrap())
.unwrap();
joins.push(thread::spawn(move || {
cernan::sink::Prometheus::new(config).run(recv);
cernan::sink::Prometheus::new(&config).run(recv);
}));
}
if let Some(config) = mem::replace(&mut args.influxdb, None) {
let recv = receivers
.remove(&config.config_path.clone().unwrap())
.unwrap();
joins.push(thread::spawn(move || {
cernan::sink::InfluxDB::new(config).run(recv);
cernan::sink::InfluxDB::new(&config).run(recv);
}));
}
if let Some(config) = mem::replace(&mut args.native_sink_config, None) {
Expand Down Expand Up @@ -402,7 +402,7 @@ fn main() {
&senders,
);
joins.push(thread::spawn(move || {
cernan::filter::DelayFilter::new(c).run(recv, downstream_sends);
cernan::filter::DelayFilter::new(&c).run(recv, downstream_sends);
}));
}
});
Expand All @@ -425,7 +425,7 @@ fn main() {
&senders,
);
joins.push(thread::spawn(move || {
cernan::filter::FlushBoundaryFilter::new(c)
cernan::filter::FlushBoundaryFilter::new(&c)
.run(recv, downstream_sends);
}));
}
Expand Down
2 changes: 1 addition & 1 deletion src/filter/delay_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct DelayFilterConfig {

impl DelayFilter {
/// Create a new DelayFilter
pub fn new(config: DelayFilterConfig) -> DelayFilter {
pub fn new(config: &DelayFilterConfig) -> DelayFilter {
DelayFilter {
tolerance: config.tolerance,
}
Expand Down
2 changes: 1 addition & 1 deletion src/filter/flush_boundary_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Hold {

impl FlushBoundaryFilter {
/// Create a new FlushBoundaryFilter
pub fn new(config: FlushBoundaryFilterConfig) -> FlushBoundaryFilter {
pub fn new(config: &FlushBoundaryFilterConfig) -> FlushBoundaryFilter {
FlushBoundaryFilter {
tolerance: config.tolerance,
holds: Vec::new(),
Expand Down
2 changes: 1 addition & 1 deletion src/sink/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Console {
/// bin_width: 2, flush_interval: 60 };
/// let c = Console::new(config);
/// ```
pub fn new(config: ConsoleConfig) -> Console {
pub fn new(config: &ConsoleConfig) -> Console {
Console {
aggrs: Buckets::new(config.bin_width),
buffer: Vec::new(),
Expand Down
127 changes: 59 additions & 68 deletions src/sink/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ lazy_static! {
pub static ref ELASTIC_ERROR_API_INDEX_ALREADY_EXISTS: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of api errors due to unknown reasons
pub static ref ELASTIC_ERROR_API_UNKNOWN: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of client errors, no specific reasons
/// Total number of client errors, no specific reasons
pub static ref ELASTIC_ERROR_CLIENT: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
}

Expand Down Expand Up @@ -172,7 +172,8 @@ impl Sink for Elasticsearch {
debug!("BODY: {:?}", buffer);
let bulk_resp: Result<BulkResponse> = client
.request(BulkRequest::new(buffer))
.send().unwrap()
.send()
.unwrap()
.into_response::<BulkResponse>();

match bulk_resp {
Expand All @@ -181,11 +182,11 @@ impl Sink for Elasticsearch {
ELASTIC_RECORDS_DELIVERY.fetch_add(1, Ordering::Relaxed);
for item in bulk.iter() {
match item {
Ok(ref _item) => {
Ok(_item) => {
ELASTIC_RECORDS_TOTAL_DELIVERED
.fetch_add(1, Ordering::Relaxed);
},
Err(ref item) => {
}
Err(item) => {
ELASTIC_RECORDS_TOTAL_FAILED
.fetch_add(1, Ordering::Relaxed);
if let Some(cause) = item.cause() {
Expand Down Expand Up @@ -214,78 +215,68 @@ impl Sink for Elasticsearch {
}
}
}
Err(err) => {
match err {
error::Error::Api(ref api_error) => {
use elastic::error::ApiError;
match *api_error {
ApiError::IndexNotFound { ref index } => {
ELASTIC_ERROR_API_INDEX_NOT_FOUND
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Index Not Found): {}",
index
);
}
ApiError::Parsing { ref reason, .. } => {
ELASTIC_ERROR_API_PARSING
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Parsing): {}",
reason
);
}
ApiError::MapperParsing { ref reason, .. } => {
ELASTIC_ERROR_API_MAPPER_PARSING
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Mapper Parsing): {}",
reason
);
}
ApiError::ActionRequestValidation {
ref reason, ..
} => {
ELASTIC_ERROR_API_ACTION_REQUEST_VALIDATION
.fetch_add(1, Ordering::Relaxed);
error!(
Err(err) => match err {
error::Error::Api(ref api_error) => {
use elastic::error::ApiError;
match *api_error {
ApiError::IndexNotFound { ref index } => {
ELASTIC_ERROR_API_INDEX_NOT_FOUND
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Index Not Found): {}",
index
);
}
ApiError::Parsing { ref reason, .. } => {
ELASTIC_ERROR_API_PARSING.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, API Error (Parsing): {}", reason);
}
ApiError::MapperParsing { ref reason, .. } => {
ELASTIC_ERROR_API_MAPPER_PARSING
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Mapper Parsing): {}",
reason
);
}
ApiError::ActionRequestValidation { ref reason, .. } => {
ELASTIC_ERROR_API_ACTION_REQUEST_VALIDATION
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Action Request Validation): {}",
reason
);
}
ApiError::DocumentMissing {
ref index, ..
} => {
ELASTIC_ERROR_API_DOCUMENT_MISSING
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Document Missing): {}",
index
);
}
ApiError::IndexAlreadyExists {
ref index, ..
} => {
ELASTIC_ERROR_API_INDEX_ALREADY_EXISTS
.fetch_add(1, Ordering::Relaxed);
error!(
}
ApiError::DocumentMissing { ref index, .. } => {
ELASTIC_ERROR_API_DOCUMENT_MISSING
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Document Missing): {}",
index
);
}
ApiError::IndexAlreadyExists { ref index, .. } => {
ELASTIC_ERROR_API_INDEX_ALREADY_EXISTS
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Index Already Exists): {}",
index
);
}
_ => {
ELASTIC_ERROR_API_UNKNOWN
.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, API Error (Unknown)");
}
}
},
error::Error::Client(ref client_error) => {
ELASTIC_ERROR_CLIENT.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, client error: {}", client_error.description());
_ => {
ELASTIC_ERROR_API_UNKNOWN.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, API Error (Unknown)");
}
}
}
}
error::Error::Client(ref client_error) => {
ELASTIC_ERROR_CLIENT.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, client error: {}",
client_error.description()
);
}
},
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/sink/firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ impl Sink for Firehose {
fn flush(&mut self) {
let provider = DefaultCredentialsProvider::new().unwrap();
let dispatcher = default_tls_client().unwrap();
let client = KinesisFirehoseClient::new(dispatcher, provider, self.region.clone());
let client =
KinesisFirehoseClient::new(dispatcher, provider, self.region.clone());

if self.buffer.is_empty() {
return;
Expand Down
2 changes: 1 addition & 1 deletion src/sink/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ where

impl InfluxDB {
/// Create a new `InfluxDB` given an InfluxDBConfig
pub fn new(config: InfluxDBConfig) -> InfluxDB {
pub fn new(config: &InfluxDBConfig) -> InfluxDB {
let scheme = if config.secure { "https" } else { "http" };
let uri = Url::parse(&format!(
"{}://{}:{}/write?db={}",
Expand Down
2 changes: 1 addition & 1 deletion src/sink/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub struct Null {}

impl Null {
/// Create a new Null sink
pub fn new(_config: NullConfig) -> Null {
pub fn new(_config: &NullConfig) -> Null {
Null {}
}
}
Expand Down
19 changes: 13 additions & 6 deletions src/sink/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Accumulator {
pub fn kind(&self) -> AggregationMethod {
match *self {
Accumulator::Perpetual(ref t) => t.kind(),
Accumulator::Windowed { cap: _, samples: _ } => {
Accumulator::Windowed { .. } => {
AggregationMethod::Summarize
}
}
Expand All @@ -116,7 +116,10 @@ impl Accumulator {
pub fn total_samples(&self) -> usize {
match *self {
Accumulator::Perpetual(_) => 1,
Accumulator::Windowed { cap: _, samples: ref s } => s.len(),
Accumulator::Windowed {
cap: _,
samples: ref s,
} => s.len(),
}
}

Expand Down Expand Up @@ -304,8 +307,8 @@ impl<'a> Iterator for Iter<'a> {
return Some(t.clone());
}
Accumulator::Windowed {
cap: _,
ref samples,
..
} => {
self.idx += 1;
match samples.len() {
Expand Down Expand Up @@ -373,7 +376,7 @@ impl Prometheus {
/// Create a new prometheus sink
///
/// Please see documentation on `PrometheusConfig` for more details.
pub fn new(config: PrometheusConfig) -> Prometheus {
pub fn new(config: &PrometheusConfig) -> Prometheus {
let aggrs = sync::Arc::new(sync::Mutex::new(
PrometheusAggr::new(config.capacity_in_seconds),
));
Expand Down Expand Up @@ -785,7 +788,10 @@ mod test {
#[test]
fn test_accumlator_window_boundary_obeyed() {
fn inner(cap: usize, telems: Vec<metric::Telemetry>) -> TestResult {
let mut windowed = Accumulator::Windowed { cap: cap, samples: Vec::new() };
let mut windowed = Accumulator::Windowed {
cap: cap,
samples: Vec::new(),
};

for t in telems.into_iter() {
if t.kind() != AggregationMethod::Summarize {
Expand All @@ -798,7 +804,8 @@ mod test {

TestResult::passed()
}
QuickCheck::new().quickcheck(inner as fn(usize, Vec<metric::Telemetry>) -> TestResult);
QuickCheck::new()
.quickcheck(inner as fn(usize, Vec<metric::Telemetry>) -> TestResult);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion src/source/file/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl Source for FileServer {
{
if let Ok(path) = entry {
let entry = fp_map.entry(path.clone());
if let Ok(fw) = FileWatcher::new(path) {
if let Ok(fw) = FileWatcher::new(&path) {
entry.or_insert(fw);
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/source/file/file_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl FileWatcher {
/// The input path will be used by `FileWatcher` to prime its state
/// machine. A `FileWatcher` tracks _only one_ file. This function returns
/// None if the path does not exist or is not readable by cernan.
pub fn new(path: PathBuf) -> io::Result<FileWatcher> {
pub fn new(path: &PathBuf) -> io::Result<FileWatcher> {
match fs::File::open(&path) {
Ok(f) => {
let mut rdr = io::BufReader::new(f);
Expand Down
4 changes: 2 additions & 2 deletions src/source/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ impl Source for Internal {
self.chans
);
atom_non_zero_telem!(
"cernan.sinks.elasticsearch.error.client",
sink::elasticsearch::ELASTIC_ERROR_CLIENT,
"cernan.sinks.elasticsearch.error.client",
sink::elasticsearch::ELASTIC_ERROR_CLIENT,
self.tags,
self.chans
);
Expand Down

0 comments on commit e2cc2f0

Please sign in to comment.