Skip to content

Commit

Permalink
Ensure that flush timer and file sources run (#138)
Browse files Browse the repository at this point in the history
I accidentally left off the runs for the file and flush timer.
The end result was that no metrics were able to be flushed from
cernan. Also, files would have been broken.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
blt authored Oct 31, 2016
1 parent 55d96b4 commit 481e543
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 5 deletions.
2 changes: 2 additions & 0 deletions example-config.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
data-directory = "data/"

flush-interval = 10

# cernan listens on two telemetry protocols, statsd and graphite. The following
# two configuration items set the ports cernan will listen on.
[statsd]
Expand Down
4 changes: 2 additions & 2 deletions src/bin/cernan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ fn main() {
let fp_sends = sends.clone();
let ftags = args.tags.clone();
joins.push(thread::spawn(move || {
cernan::source::FileServer::new(fp_sends, lf, ftags);
cernan::source::FileServer::new(fp_sends, lf, ftags).run();
}));
}
}
Expand All @@ -148,7 +148,7 @@ fn main() {
let flush_interval = args.flush_interval;
let flush_interval_sends = sends.clone();
joins.push(thread::spawn(move || {
cernan::source::FlushTimer::new(flush_interval_sends, flush_interval);
cernan::source::FlushTimer::new(flush_interval_sends, flush_interval).run();
}));

joins.push(thread::spawn(move || {
Expand Down
6 changes: 3 additions & 3 deletions src/sink/wavefront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,22 @@ impl Sink for Wavefront {
let mut attempts = 0;
loop {
if attempts > 0 {
debug!("[wavefront] delivery attempts: {}", attempts);
debug!("delivery attempts: {}", attempts);
}
time::delay(attempts);
match TcpStream::connect(self.addr) {
Ok(mut stream) => {
let res = stream.write(self.format_stats().as_bytes());
if res.is_ok() {
trace!("[wavefront] flushed to wavefront!");
trace!("flushed to wavefront!");
self.aggrs.reset();
break;
} else {
attempts += 1;
}
}
Err(e) => {
info!("[wavefront] unable to connect to proxy at addr {} with error {}",
info!("unable to connect to proxy at addr {} with error {}",
self.addr,
e)
}
Expand Down
1 change: 1 addition & 0 deletions src/source/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ impl FlushTimer {
impl Source for FlushTimer {
fn run(&mut self) {
let duration = Duration::new(self.interval, 0);
debug!("flush-interval: {:?}", duration);
loop {
sleep(duration);
send("flush", &mut self.chans, &metric::Event::TimerFlush);
Expand Down

0 comments on commit 481e543

Please sign in to comment.