Skip to content

Commit

Permalink
[8.16](backport #42016) [Kubernetes Integration] Fix for apiserver to…
Browse files Browse the repository at this point in the history
…ken expiration (#42230)

* [Kubernetes Integration] Fix for apiserver token expiration (#42016)

* initial fix for apiserver

* adding fix for controller and schedule

(cherry picked from commit 7e25c4d)

* correcting changelog.next.asciidoc file

---------

Co-authored-by: Andrew Gizas <andreas.gkizas@elastic.co>
Co-authored-by: Denis <denis.rechkunov@elastic.co>
  • Loading branch information
3 people authored Jan 7, 2025
1 parent c3e99f4 commit 5cb6e21
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix issue where beats may report incorrect metrics for its own process when running inside a container {pull}39627[39627]
- Normalize AWS RDS CPU Utilization values before making the metadata API call. {pull}39664[39664]
- Fix behavior of pagetypeinfo metrics {pull}39985[39985]
- [K8s Integration] Enhance HTTP authentication in case of token updates for Apiserver, Controllermanager and Scheduler metricsets {issue}41910[41910] {pull}42016[42016]

*Osquerybeat*

Expand Down
12 changes: 12 additions & 0 deletions metricbeat/helper/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const acceptHeader = `text/plain;version=0.0.4;q=0.5,*/*;q=0.1`

// Prometheus helper retrieves prometheus formatted metrics
type Prometheus interface {
// GetHttp returns the HTTP Client that handles the connection towards remote endpoint
GetHttp() (*helper.HTTP, error)

// GetFamilies requests metric families from prometheus endpoint and returns them
GetFamilies() ([]*MetricFamily, error)

Expand Down Expand Up @@ -66,6 +69,15 @@ func NewPrometheusClient(base mb.BaseMetricSet) (Prometheus, error) {
return &prometheus{http, base.Logger()}, nil
}

// GetHttp returns HTTP Client
func (p *prometheus) GetHttp() (*helper.HTTP, error) {
httpClient, ok := p.httpfetcher.(*helper.HTTP)
if !ok {
return nil, fmt.Errorf("httpfetcher is not of type *helper.HTTP")
}
return httpClient, nil
}

// GetFamilies requests metric families from prometheus endpoint and returns them
func (p *prometheus) GetFamilies() ([]*MetricFamily, error) {
var reader io.Reader
Expand Down
59 changes: 47 additions & 12 deletions metricbeat/module/kubernetes/apiserver/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@ package apiserver

import (
"fmt"
"net/http"
"strings"
"time"

"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-libs/mapstr"
)

// Metricset for apiserver is a prometheus based metricset
type Metricset struct {
mb.BaseMetricSet
http *helper.HTTP
prometheusClient prometheus.Prometheus
prometheusMappings *prometheus.MetricsMapping
clusterMeta mapstr.M
mod k8smod.Module
}

var _ mb.ReportingMetricSetV2Error = (*Metricset)(nil)
Expand All @@ -41,11 +48,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}

http, err := pc.GetHttp()
if err != nil {
return nil, fmt.Errorf("the http connection is not valid")
}
ms := &Metricset{
BaseMetricSet: base,
http: http,
prometheusClient: pc,
prometheusMappings: mapping,
clusterMeta: util.AddClusterECSMeta(base),
mod: mod,
}

return ms, nil
Expand All @@ -54,20 +73,36 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch gathers information from the apiserver and reports events with this information.
func (m *Metricset) Fetch(reporter mb.ReporterV2) error {
events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
errorString := fmt.Sprintf("%s", err)
errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized)
if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) {
count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error
for count > 0 {
if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil {
events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
}
if err != nil {
time.Sleep(m.mod.Config().Period)
count--
} else {
break
}
}
}
// We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed
if err != nil {
return fmt.Errorf("error getting metrics: %w", err)
}

for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
} else {
for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
}
}
return nil
}

return nil
}
59 changes: 48 additions & 11 deletions metricbeat/module/kubernetes/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ package controllermanager

import (
"fmt"
"net/http"
"strings"
"time"

"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -74,9 +79,11 @@ func init() {
// MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling.
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
prometheusClient prometheus.Prometheus
prometheusMappings *prometheus.MetricsMapping
clusterMeta mapstr.M
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -87,31 +94,61 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}

http, err := pc.GetHttp()
if err != nil {
return nil, fmt.Errorf("the http connection is not valid")
}
ms := &MetricSet{
BaseMetricSet: base,
http: http,
prometheusClient: pc,
prometheusMappings: mapping,
clusterMeta: util.AddClusterECSMeta(base),
mod: mod,
}
return ms, nil
}

// Fetch gathers information from the apiserver and reports events with this information.
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
errorString := fmt.Sprintf("%s", err)
errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized)
if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) {
count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error
for count > 0 {
if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil {
events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
}
if err != nil {
time.Sleep(m.mod.Config().Period)
count--
} else {
break
}
}
}
// We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed
if err != nil {
return fmt.Errorf("error getting metrics: %w", err)
}
for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
} else {
for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
}
}
}

return nil
return nil
}
}
60 changes: 48 additions & 12 deletions metricbeat/module/kubernetes/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ package scheduler

import (
"fmt"
"net/http"
"strings"
"time"

"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -78,9 +83,11 @@ func init() {
// MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling.
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
prometheusClient prometheus.Prometheus
prometheusMappings *prometheus.MetricsMapping
clusterMeta mapstr.M
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -91,32 +98,61 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}

http, err := pc.GetHttp()
if err != nil {
return nil, fmt.Errorf("the http connection is not valid")
}
ms := &MetricSet{
BaseMetricSet: base,
http: http,
prometheusClient: pc,
prometheusMappings: mapping,
clusterMeta: util.AddClusterECSMeta(base),
mod: mod,
}
return ms, nil
}

// Fetch gathers information from the apiserver and reports events with this information.
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
errorString := fmt.Sprintf("%s", err)
errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized)
if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) {
count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error
for count > 0 {
if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil {
events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
}
if err != nil {
time.Sleep(m.mod.Config().Period)
count--
} else {
break
}
}
}
// We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed
if err != nil {
return fmt.Errorf("error getting metrics: %w", err)
}

for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
} else {
for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
}
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
}
}

return nil
return nil
}
}

0 comments on commit 5cb6e21

Please sign in to comment.