diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 698d0dd1e04..86e0a3a3caf 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -130,6 +130,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix issue where beats may report incorrect metrics for its own process when running inside a container {pull}39627[39627] - Normalize AWS RDS CPU Utilization values before making the metadata API call. {pull}39664[39664] - Fix behavior of pagetypeinfo metrics {pull}39985[39985] +- [K8s Integration] Enhance HTTP authentication in case of token updates for Apiserver, Controllermanager and Scheduler metricsets {issue}41910[41910] {pull}42016[42016] *Osquerybeat* diff --git a/metricbeat/helper/prometheus/prometheus.go b/metricbeat/helper/prometheus/prometheus.go index f4e06df7e1f..50929c128c3 100644 --- a/metricbeat/helper/prometheus/prometheus.go +++ b/metricbeat/helper/prometheus/prometheus.go @@ -35,6 +35,9 @@ const acceptHeader = `text/plain;version=0.0.4;q=0.5,*/*;q=0.1` // Prometheus helper retrieves prometheus formatted metrics type Prometheus interface { + // GetHttp returns the HTTP Client that handles the connection towards remote endpoint + GetHttp() (*helper.HTTP, error) + // GetFamilies requests metric families from prometheus endpoint and returns them GetFamilies() ([]*MetricFamily, error) @@ -66,6 +69,15 @@ func NewPrometheusClient(base mb.BaseMetricSet) (Prometheus, error) { return &prometheus{http, base.Logger()}, nil } +// GetHttp returns HTTP Client +func (p *prometheus) GetHttp() (*helper.HTTP, error) { + httpClient, ok := p.httpfetcher.(*helper.HTTP) + if !ok { + return nil, fmt.Errorf("httpfetcher is not of type *helper.HTTP") + } + return httpClient, nil +} + // GetFamilies requests metric families from prometheus endpoint and returns them func (p *prometheus) GetFamilies() ([]*MetricFamily, error) { var reader io.Reader diff --git a/metricbeat/module/kubernetes/apiserver/metricset.go b/metricbeat/module/kubernetes/apiserver/metricset.go index 9dd9a81976d..5457093e553 100644 --- a/metricbeat/module/kubernetes/apiserver/metricset.go +++ b/metricbeat/module/kubernetes/apiserver/metricset.go @@ -19,9 +19,14 @@ package apiserver import ( "fmt" + "net/http" + "strings" + "time" + "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -29,9 +34,11 @@ import ( // Metricset for apiserver is a prometheus based metricset type Metricset struct { mb.BaseMetricSet + http *helper.HTTP prometheusClient prometheus.Prometheus prometheusMappings *prometheus.MetricsMapping clusterMeta mapstr.M + mod k8smod.Module } var _ mb.ReportingMetricSetV2Error = (*Metricset)(nil) @@ -41,11 +48,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } + + http, err := pc.GetHttp() + if err != nil { + return nil, fmt.Errorf("the http connection is not valid") + } ms := &Metricset{ BaseMetricSet: base, + http: http, prometheusClient: pc, prometheusMappings: mapping, clusterMeta: util.AddClusterECSMeta(base), + mod: mod, } return ms, nil @@ -54,20 +73,36 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch gathers information from the apiserver and reports events with this information. func (m *Metricset) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + errorString := fmt.Sprintf("%s", err) + errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized) + if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) { + count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error + for count > 0 { + if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil { + events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + } + if err != nil { + time.Sleep(m.mod.Config().Period) + count-- + } else { + break + } + } + } + // We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed if err != nil { return fmt.Errorf("error getting metrics: %w", err) - } - - for _, e := range events { - event := mb.TransformMapStrToEvent("kubernetes", e, nil) - if len(m.clusterMeta) != 0 { - event.RootFields.DeepUpdate(m.clusterMeta) - } - isOpen := reporter.Event(event) - if !isOpen { - return nil + } else { + for _, e := range events { + event := mb.TransformMapStrToEvent("kubernetes", e, nil) + if len(m.clusterMeta) != 0 { + event.RootFields.DeepUpdate(m.clusterMeta) + } + isOpen := reporter.Event(event) + if !isOpen { + return nil + } } + return nil } - - return nil } diff --git a/metricbeat/module/kubernetes/controllermanager/controllermanager.go b/metricbeat/module/kubernetes/controllermanager/controllermanager.go index dbfcddc2b6b..6c7b1c8ae52 100644 --- a/metricbeat/module/kubernetes/controllermanager/controllermanager.go +++ b/metricbeat/module/kubernetes/controllermanager/controllermanager.go @@ -19,9 +19,14 @@ package controllermanager import ( "fmt" + "net/http" + "strings" + "time" + "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -74,9 +79,11 @@ func init() { // MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling. type MetricSet struct { mb.BaseMetricSet + http *helper.HTTP prometheusClient prometheus.Prometheus prometheusMappings *prometheus.MetricsMapping clusterMeta mapstr.M + mod k8smod.Module } // New create a new instance of the MetricSet @@ -87,11 +94,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } + + http, err := pc.GetHttp() + if err != nil { + return nil, fmt.Errorf("the http connection is not valid") + } ms := &MetricSet{ BaseMetricSet: base, + http: http, prometheusClient: pc, prometheusMappings: mapping, clusterMeta: util.AddClusterECSMeta(base), + mod: mod, } return ms, nil } @@ -99,19 +118,37 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch gathers information from the apiserver and reports events with this information. func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + errorString := fmt.Sprintf("%s", err) + errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized) + if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) { + count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error + for count > 0 { + if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil { + events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + } + if err != nil { + time.Sleep(m.mod.Config().Period) + count-- + } else { + break + } + } + } + // We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed if err != nil { return fmt.Errorf("error getting metrics: %w", err) - } - for _, e := range events { - event := mb.TransformMapStrToEvent("kubernetes", e, nil) - if len(m.clusterMeta) != 0 { - event.RootFields.DeepUpdate(m.clusterMeta) - } - isOpen := reporter.Event(event) - if !isOpen { - return nil + } else { + for _, e := range events { + event := mb.TransformMapStrToEvent("kubernetes", e, nil) + if len(m.clusterMeta) != 0 { + event.RootFields.DeepUpdate(m.clusterMeta) + } + isOpen := reporter.Event(event) + if !isOpen { + return nil + } } - } - return nil + return nil + } } diff --git a/metricbeat/module/kubernetes/scheduler/scheduler.go b/metricbeat/module/kubernetes/scheduler/scheduler.go index f512c96b7f2..1b563ad000a 100644 --- a/metricbeat/module/kubernetes/scheduler/scheduler.go +++ b/metricbeat/module/kubernetes/scheduler/scheduler.go @@ -19,9 +19,14 @@ package scheduler import ( "fmt" + "net/http" + "strings" + "time" + "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -78,9 +83,11 @@ func init() { // MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling. type MetricSet struct { mb.BaseMetricSet + http *helper.HTTP prometheusClient prometheus.Prometheus prometheusMappings *prometheus.MetricsMapping clusterMeta mapstr.M + mod k8smod.Module } // New create a new instance of the MetricSet @@ -91,11 +98,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } + + http, err := pc.GetHttp() + if err != nil { + return nil, fmt.Errorf("the http connection is not valid") + } ms := &MetricSet{ BaseMetricSet: base, + http: http, prometheusClient: pc, prometheusMappings: mapping, clusterMeta: util.AddClusterECSMeta(base), + mod: mod, } return ms, nil } @@ -103,20 +122,37 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch gathers information from the apiserver and reports events with this information. func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + errorString := fmt.Sprintf("%s", err) + errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized) + if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) { + count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error + for count > 0 { + if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil { + events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + } + if err != nil { + time.Sleep(m.mod.Config().Period) + count-- + } else { + break + } + } + } + // We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed if err != nil { return fmt.Errorf("error getting metrics: %w", err) - } - - for _, e := range events { - event := mb.TransformMapStrToEvent("kubernetes", e, nil) - if len(m.clusterMeta) != 0 { - event.RootFields.DeepUpdate(m.clusterMeta) + } else { + for _, e := range events { + event := mb.TransformMapStrToEvent("kubernetes", e, nil) + if len(m.clusterMeta) != 0 { + event.RootFields.DeepUpdate(m.clusterMeta) + } + isOpen := reporter.Event(event) + if !isOpen { + return nil + } } - isOpen := reporter.Event(event) - if !isOpen { - return nil - } - } - return nil + return nil + } }