Skip to content

Commit

Permalink
Increase existing telemetry, adjust file delays (#237)
Browse files Browse the repository at this point in the history
This commit contains a smattering of adjustments. The main show
is the inclusion of Internal setup code in bin/cernan -- oops, it
was only present on my disk -- and fiddling with the way backoffs
occur in file.rs. We reduce the CPU load of each FileServer thread
by only gradually scaling down to faster polls. This means a group
of slow files will be polled more slowly that a group with one
fast file in it.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
blt authored Mar 31, 2017
1 parent f2769b8 commit a963db3
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 6 deletions.
3 changes: 3 additions & 0 deletions examples/configs/basic.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ flush-interval = 5
source = "cernan"

[sources]
[sources.internal]
forwards = ["sinks.console", "sinks.null"]

[sources.statsd.primary]
enabled = true
port = 8125
Expand Down
12 changes: 12 additions & 0 deletions src/bin/cernan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,18 @@ fn main() {
cernan::source::NativeServer::new(native_server_send, config).run();
}))
}

let internal_config = args.internal;
let mut internal_send = Vec::new();
populate_forwards(&mut internal_send,
Some(&mut flush_sends),
&internal_config.forwards,
&internal_config.config_path,
&sends);
joins.push(thread::spawn(move || {
cernan::source::Internal::new(internal_send, internal_config).run();
}));

for config in args.statsds.values() {
let c = (*config).clone();
let mut statsd_sends = Vec::new();
Expand Down
3 changes: 3 additions & 0 deletions src/sink/wavefront.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use buckets::Buckets;
use metric::{AggregationMethod, LogLine, TagMap, Telemetry};
use sink::{Sink, Valve};
use source::report_telemetry;
use std::cmp;
use std::io::Write as IoWrite;
use std::net::TcpStream;
Expand Down Expand Up @@ -143,6 +144,8 @@ impl Sink for Wavefront {

fn flush(&mut self) {
loop {
report_telemetry("cernan.sinks.wavefront.delivery_attempts",
self.delivery_attempts as f64);
if self.delivery_attempts > 0 {
debug!("delivery attempts: {}", self.delivery_attempts);
}
Expand Down
4 changes: 2 additions & 2 deletions src/source/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl Source for FileServer {
}
}
let start = Instant::now();
let mut attempts = 0;
let mut attempts: u32 = 0;
loop {
// file poll
if fp_map.is_empty() {
Expand All @@ -220,7 +220,7 @@ impl Source for FileServer {
let mut lines_read = 0;
match file.read_line(&mut buffer) {
Ok(sz) => {
attempts = 0;
attempts = attempts.saturating_sub(1);
if sz > 0 {
lines_read += 1;
buffer.pop();
Expand Down
4 changes: 2 additions & 2 deletions src/source/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ pub fn report_telemetry<S>(name: S, value: f64) -> ()
/// floor.
impl Source for Internal {
fn run(&mut self) {
let mut attempts = 0;
let mut attempts: u32 = 0;
loop {
if let Some(mut telem) = Q.lock().unwrap().pop_front() {
attempts -= 1;
attempts = attempts.saturating_sub(1);
if !self.chans.is_empty() {
telem = telem.overlay_tags_from_map(&self.tags);
util::send("internal",
Expand Down
2 changes: 1 addition & 1 deletion src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod statsd;
pub use self::file::{FileServer, FileServerConfig};
pub use self::flush::FlushTimer;
pub use self::graphite::{Graphite, GraphiteConfig};
pub use self::internal::{Internal, InternalConfig};
pub use self::internal::{Internal, InternalConfig, report_telemetry};
pub use self::native::{NativeServer, NativeServerConfig};
pub use self::statsd::{Statsd, StatsdConfig};

Expand Down
1 change: 0 additions & 1 deletion src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub fn update_time() {
thread::sleep(dur);
let now = UTC::now().timestamp() as usize;
let order = Ordering::Relaxed;
trace!("updated cernan {:?} now, is: {}", order, now);
NOW.store(now, order);
}
}
Expand Down

0 comments on commit a963db3

Please sign in to comment.