Skip to content

Commit

Permalink
Don't cache DNS -> IP in federation_transmitter / wavefront (#143)
Browse files Browse the repository at this point in the history
This commit ensures that we don't accidentally cache a bad IP based
off a DNS lookup done toward the start of cernan. This commit also
removes dns_lookup in favor of the built-in facility for such. Every
attempt is made to use returns IPs until one works.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
blt authored Nov 9, 2016
1 parent ffa702e commit 2c88968
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 47 deletions.
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ doc = false
bincode = "0.6.0"
chrono = "0.2"
clap = "2.10.0"
dns-lookup = "0.2.1"
fern = "0.3.5"
flate2 = "0.2"
fnv = "1.0.5"
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ extern crate toml;
extern crate clap;
extern crate chrono;
extern crate fnv;
extern crate dns_lookup;
extern crate notify;
extern crate bincode;
extern crate serde;
Expand Down
36 changes: 26 additions & 10 deletions src/sink/federation_transmitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,34 @@ impl Sink for FederationTransmitter {
t.insert(0, pyld_sz_bytes[2]);
t.insert(0, pyld_sz_bytes[3]);

let srv: Vec<_> = (self.host.as_str(), self.port)
.to_socket_addrs()
.expect("Unable to resolve domain")
.collect();
match TcpStream::connect(srv.first().unwrap()) {
Ok(mut stream) => {
let res = stream.write(&t[..]);
if res.is_ok() {
self.buffer.clear();
let addrs = (self.host.as_str(), self.port).to_socket_addrs();
match addrs {
Ok(srv) => {
let ips: Vec<_> = srv.collect();
for ip in ips {
match TcpStream::connect(ip) {
Ok(mut stream) => {
let res = stream.write(&t[..]);
if res.is_ok() {
self.buffer.clear();
return;
}
}
Err(e) => {
info!("Unable to connect to proxy at {} using addr {} with error \
{}",
self.host,
ip,
e)
}
}
}
}
Err(e) => {
info!("Unable to perform DNS lookup on host {} with error {}",
self.host,
e);
}
Err(e) => debug!("Unable to connect: {}", e),
}
}
}
Expand Down
60 changes: 35 additions & 25 deletions src/sink/wavefront.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::net::{SocketAddr, TcpStream};
use std::net::TcpStream;
use std::fmt::Write;
use std::io::Write as IoWrite;
use metric::{Metric, LogLine, TagMap};
use buckets::Buckets;
use sink::Sink;
use dns_lookup;
use std::net::ToSocketAddrs;
use time;

pub struct Wavefront {
addr: SocketAddr,
host: String,
port: u16,
aggrs: Buckets,
}

Expand Down Expand Up @@ -44,16 +45,10 @@ fn fmt_tags(tags: &TagMap) -> String {

impl Wavefront {
pub fn new(config: WavefrontConfig) -> Wavefront {
match dns_lookup::lookup_host(&config.host) {
Ok(mut lh) => {
let ip = lh.next().expect("No IPs associated with host").unwrap();
let addr = SocketAddr::new(ip, config.port);
Wavefront {
addr: addr,
aggrs: Buckets::new(config.bin_width),
}
}
Err(_) => panic!("Could not lookup host"),
Wavefront {
host: config.host,
port: config.port,
aggrs: Buckets::new(config.bin_width),
}
}

Expand Down Expand Up @@ -127,21 +122,36 @@ impl Sink for Wavefront {
debug!("delivery attempts: {}", attempts);
}
time::delay(attempts);
match TcpStream::connect(self.addr) {
Ok(mut stream) => {
let res = stream.write(self.format_stats().as_bytes());
if res.is_ok() {
trace!("flushed to wavefront!");
self.aggrs.reset();
break;
} else {
attempts += 1;
let addrs = (self.host.as_str(), self.port).to_socket_addrs();
match addrs {
Ok(srv) => {
let ips: Vec<_> = srv.collect();
for ip in ips {
match TcpStream::connect(ip) {
Ok(mut stream) => {
let res = stream.write(self.format_stats().as_bytes());
if res.is_ok() {
trace!("flushed to wavefront!");
self.aggrs.reset();
return;
} else {
attempts += 1;
}
}
Err(e) => {
info!("Unable to connect to proxy at {} using addr {} with error \
{}",
self.host,
ip,
e)
}
}
}
}
Err(e) => {
info!("unable to connect to proxy at addr {} with error {}",
self.addr,
e)
info!("Unable to perform DNS lookup on host {} with error {}",
self.host,
e);
}
}
}
Expand Down

0 comments on commit 2c88968

Please sign in to comment.