diff --git a/config.go b/config.go index 08cb2cd7..1b4da23c 100644 --- a/config.go +++ b/config.go @@ -16,13 +16,14 @@ package main import ( "fmt" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - "gopkg.in/yaml.v2" "os" "strings" "text/template" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "gopkg.in/yaml.v2" + "github.com/sirupsen/logrus" ) @@ -45,6 +46,7 @@ var ( kafkaSaslUsername = "" kafkaSaslPassword = "" serializer Serializer + kafkaAcks = "all" ) func init() { @@ -111,6 +113,9 @@ func init() { if value := os.Getenv("KAFKA_SASL_PASSWORD"); value != "" { kafkaSaslPassword = value } + if value := os.Getenv("KAFKA_ACKS"); value != "" { + kafkaAcks = value + } if value := os.Getenv("MATCH"); value != "" { matchList, err := parseMatchList(value) diff --git a/main.go b/main.go index e100e613..3553be6d 100644 --- a/main.go +++ b/main.go @@ -33,6 +33,7 @@ func main() { "batch.num.messages": kafkaBatchNumMessages, "go.batch.producer": true, // Enable batch producer (for increased performance). "go.delivery.reports": false, // per-message delivery reports to the Events() channel + "acks": kafkaAcks, } if kafkaSslClientCertFile != "" && kafkaSslClientKeyFile != "" && kafkaSslCACertFile != "" {