Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kubernetes Integration] Fix for apiserver token expiration #42016

Merged
merged 26 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f08556f
initial fix for apiserver
gizas Dec 12, 2024
74dc838
adding fix for controller and schedule
gizas Dec 13, 2024
f3b622f
adding fix for controller and schedule
gizas Dec 13, 2024
0875f70
adding fix for controller and schedule
gizas Dec 13, 2024
c7d2066
adding changelog
gizas Dec 13, 2024
777d2d2
Merge branch 'main' of github.com:elastic/beats into k8sapibearer
gizas Dec 13, 2024
bfe1908
adding changelog
gizas Dec 13, 2024
b75cf69
adding changelog
gizas Dec 13, 2024
e49061b
fixing spelling lint
gizas Dec 13, 2024
7eb0658
Merge branch 'main' into k8sapibearer
gizas Dec 13, 2024
2624305
removing continue
gizas Dec 16, 2024
34fa34b
Merge branch 'k8sapibearer' of github.com:elastic/beats into k8sapibe…
gizas Dec 16, 2024
5ec1c2e
removing fmt in err
gizas Dec 16, 2024
ab6e5fd
reverting fmt for err
gizas Dec 16, 2024
dd36379
Merge branch 'main' into k8sapibearer
gizas Dec 16, 2024
be66ac6
adding mock test for apiserver
gizas Dec 16, 2024
2b80136
Merge branch 'k8sapibearer' of github.com:elastic/beats into k8sapibe…
gizas Dec 16, 2024
f51c984
removing mock test for apiserver
gizas Dec 16, 2024
f879abd
removing mock test for apiserver
gizas Dec 16, 2024
0c8897b
Merge branch 'main' into k8sapibearer
gizas Dec 17, 2024
44a0fae
update errorString
gizas Dec 18, 2024
12e23c3
Merge branch 'k8sapibearer' of github.com:elastic/beats into k8sapibe…
gizas Dec 18, 2024
2b5d2f9
Merge branch 'main' of github.com:elastic/beats into k8sapibearer
gizas Dec 18, 2024
f966a0f
update errorString
gizas Dec 18, 2024
04c35e8
Merge branch 'main' into k8sapibearer
gizas Dec 18, 2024
6e971f0
Merge branch 'main' into k8sapibearer
gizas Dec 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Do not report non-existant 0 values for RSS metrics in docker/memory {pull}41449[41449]
- Log Cisco Meraki `getDevicePerformanceScores` errors without stopping metrics collection. {pull}41622[41622]
- Don't skip first bucket value in GCP metrics metricset for distribution type metrics {pull}41822[41822]
- [K8s Integration] Enhance HTTP authentication in case of token updates for Apiserver, Controllermanager and Scheduler metricsets {issue}41910[41910] {pull}42016[42016]


*Osquerybeat*
Expand Down
12 changes: 12 additions & 0 deletions metricbeat/helper/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const acceptHeader = `text/plain;version=0.0.4;q=0.5,*/*;q=0.1`

// Prometheus helper retrieves prometheus formatted metrics
type Prometheus interface {
// GetHttp returns the HTTP Client that handles the connection towards remote endpoint
GetHttp() (*helper.HTTP, error)

// GetFamilies requests metric families from prometheus endpoint and returns them
GetFamilies() ([]*MetricFamily, error)

Expand Down Expand Up @@ -66,6 +69,15 @@ func NewPrometheusClient(base mb.BaseMetricSet) (Prometheus, error) {
return &prometheus{http, base.Logger()}, nil
}

// GetHttp returns HTTP Client
func (p *prometheus) GetHttp() (*helper.HTTP, error) {
httpClient, ok := p.httpfetcher.(*helper.HTTP)
if !ok {
return nil, fmt.Errorf("httpfetcher is not of type *helper.HTTP")
}
return httpClient, nil
}

// GetFamilies requests metric families from prometheus endpoint and returns them
func (p *prometheus) GetFamilies() ([]*MetricFamily, error) {
var reader io.Reader
Expand Down
61 changes: 49 additions & 12 deletions metricbeat/module/kubernetes/apiserver/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@ package apiserver

import (
"fmt"
"net/http"
"strings"
"time"

"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-libs/mapstr"
)

// Metricset for apiserver is a prometheus based metricset
type Metricset struct {
mb.BaseMetricSet
http *helper.HTTP
prometheusClient prometheus.Prometheus
prometheusMappings *prometheus.MetricsMapping
clusterMeta mapstr.M
mod k8smod.Module
}

var _ mb.ReportingMetricSetV2Error = (*Metricset)(nil)
Expand All @@ -41,11 +48,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}

http, err := pc.GetHttp()
if err != nil {
return nil, fmt.Errorf("the http connection is not valid")
}
ms := &Metricset{
BaseMetricSet: base,
http: http,
prometheusClient: pc,
prometheusMappings: mapping,
clusterMeta: util.AddClusterECSMeta(base),
mod: mod,
}

return ms, nil
Expand All @@ -54,20 +73,38 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch gathers information from the apiserver and reports events with this information.
func (m *Metricset) Fetch(reporter mb.ReporterV2) error {
events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
if err != nil {
return fmt.Errorf("error getting metrics: %w", err)
}

for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
error_string := fmt.Sprintf("%s", err)
gizas marked this conversation as resolved.
Show resolved Hide resolved
errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized)
if err != nil && strings.Contains(error_string, errorUnauthorisedMsg) {
count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error
for count > 0 {
if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil {
events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
}
if err != nil {
time.Sleep(m.mod.Config().Period)
count--
continue
gizas marked this conversation as resolved.
Show resolved Hide resolved
} else {
break
}
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
}
// We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed
if err != nil {
return fmt.Errorf("error getting metrics: %w", err)
} else {
gizas marked this conversation as resolved.
Show resolved Hide resolved
for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
}
}
return nil
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ package controllermanager

import (
"fmt"
"net/http"
"strings"
"time"

"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -74,9 +79,11 @@ func init() {
// MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling.
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
prometheusClient prometheus.Prometheus
prometheusMappings *prometheus.MetricsMapping
clusterMeta mapstr.M
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -87,31 +94,62 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}

http, err := pc.GetHttp()
if err != nil {
return nil, fmt.Errorf("the http connection is not valid")
}
ms := &MetricSet{
BaseMetricSet: base,
http: http,
prometheusClient: pc,
prometheusMappings: mapping,
clusterMeta: util.AddClusterECSMeta(base),
mod: mod,
}
return ms, nil
}

// Fetch gathers information from the apiserver and reports events with this information.
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
error_string := fmt.Sprintf("%s", err)
errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized)
if err != nil && strings.Contains(error_string, errorUnauthorisedMsg) {
count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error
for count > 0 {
if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil {
events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
}
if err != nil {
time.Sleep(m.mod.Config().Period)
count--
continue
} else {
break
}
}
}
// We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed
if err != nil {
return fmt.Errorf("error getting metrics: %w", err)
}
for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
} else {
for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
}
}
}

return nil
return nil
}
}
61 changes: 49 additions & 12 deletions metricbeat/module/kubernetes/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ package scheduler

import (
"fmt"
"net/http"
"strings"
"time"

"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -78,9 +83,11 @@ func init() {
// MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling.
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
prometheusClient prometheus.Prometheus
prometheusMappings *prometheus.MetricsMapping
clusterMeta mapstr.M
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -91,32 +98,62 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}

http, err := pc.GetHttp()
if err != nil {
return nil, fmt.Errorf("the http connection is not valid")
}
ms := &MetricSet{
BaseMetricSet: base,
http: http,
prometheusClient: pc,
prometheusMappings: mapping,
clusterMeta: util.AddClusterECSMeta(base),
mod: mod,
}
return ms, nil
}

// Fetch gathers information from the apiserver and reports events with this information.
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
error_string := fmt.Sprintf("%s", err)
errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized)
if err != nil && strings.Contains(error_string, errorUnauthorisedMsg) {
count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error
for count > 0 {
if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil {
events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
}
if err != nil {
time.Sleep(m.mod.Config().Period)
count--
continue
} else {
break
}
}
}
// We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed
if err != nil {
return fmt.Errorf("error getting metrics: %w", err)
}

for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
} else {
for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
}
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
}
}

return nil
return nil
}
}
Loading