From 46cb41614e241ef9f7257d19e12b51b702fd87b9 Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Tue, 17 Dec 2024 18:19:19 +0100 Subject: [PATCH 1/3] feat(event): Add MultiValueEvent interface and MultiObserverEvent implementation This PR introduces the first step in refactoring the event handling system to better support multiple values in a single event, which will help reduce allocations when processing events. This is part of a larger effort to improve performance and reduce memory allocations in the statsd exporter. Changes: - Add new `MultiValueEvent` interface that supports multiple values per event - Add `MultiObserverEvent` implementation for handling multiple observations - Add `ExplodableEvent` interface for backward compatibility - Add `Values()` method to existing event types - Add comprehensive tests for new interfaces and implementations This change is the foundation for future improvements that will: 1. Move explosion logic to a dedicated package 2. Update the line parser to use multi-value events 3. Modify the exporter to handle multi-value events directly 4. Eventually remove the need for event explosion The changes in this PR are backward compatible and don't affect existing functionality. Relates to #577 Signed-off-by: Pedro Tanaka --- pkg/event/event.go | 68 ++++++++++++++ pkg/event/event_test.go | 191 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 259 insertions(+) diff --git a/pkg/event/event.go b/pkg/event/event.go index d5e65cef..1765437b 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -39,6 +39,7 @@ func (c *CounterEvent) MetricName() string { return c.CMetricName } func (c *CounterEvent) Value() float64 { return c.CValue } func (c *CounterEvent) Labels() map[string]string { return c.CLabels } func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter } +func (c *CounterEvent) Values() []float64 { return []float64{c.CValue} } type GaugeEvent struct { GMetricName string @@ -51,6 +52,7 @@ func (g *GaugeEvent) MetricName() string { return g.GMetricName } func (g *GaugeEvent) Value() float64 { return g.GValue } func (g *GaugeEvent) Labels() map[string]string { return g.GLabels } func (g *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge } +func (g *GaugeEvent) Values() []float64 { return []float64{g.GValue} } type ObserverEvent struct { OMetricName string @@ -62,6 +64,7 @@ func (o *ObserverEvent) MetricName() string { return o.OMetricName } func (o *ObserverEvent) Value() float64 { return o.OValue } func (o *ObserverEvent) Labels() map[string]string { return o.OLabels } func (o *ObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver } +func (o *ObserverEvent) Values() []float64 { return []float64{o.OValue} } type Events []Event @@ -136,3 +139,68 @@ type UnbufferedEventHandler struct { func (ueh *UnbufferedEventHandler) Queue(events Events) { ueh.C <- events } + +// MultiValueEvent is an event that contains multiple values, it is going to replace the existing Event interface. +type MultiValueEvent interface { + MetricName() string + Value() float64 + Labels() map[string]string + MetricType() mapper.MetricType + Values() []float64 +} + +type MultiObserverEvent struct { + OMetricName string + OValues []float64 // DataDog extensions allow multiple values in a single sample + OLabels map[string]string + SampleRate float64 +} + +type ExplodableEvent interface { + Explode() []Event +} + +func (m *MultiObserverEvent) MetricName() string { return m.OMetricName } +func (m *MultiObserverEvent) Value() float64 { return m.OValues[0] } +func (m *MultiObserverEvent) Labels() map[string]string { return m.OLabels } +func (m *MultiObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver } +func (m *MultiObserverEvent) Values() []float64 { return m.OValues } + +// Explode returns a list of events that are the result of exploding the multi-value event. +// This will be used as a middle-step in the pipeline to convert multi-value events to single-value events. +// And keep the exporter code compatible with previous versions. +func (m *MultiObserverEvent) Explode() []Event { + if len(m.OValues) == 1 && m.SampleRate == 0 { + return []Event{m} + } + + events := make([]Event, 0, len(m.OValues)) + for _, value := range m.OValues { + labels := make(map[string]string, len(m.OLabels)) + for k, v := range m.OLabels { + labels[k] = v + } + + events = append(events, &ObserverEvent{ + OMetricName: m.OMetricName, + OValue: value, + OLabels: labels, + }) + } + + if m.SampleRate > 0 && m.SampleRate < 1 { + multiplier := int(1 / m.SampleRate) + multipliedEvents := make([]Event, 0, len(events)*multiplier) + for i := 0; i < multiplier; i++ { + multipliedEvents = append(multipliedEvents, events...) + } + return multipliedEvents + } + + return events +} + +var ( + _ ExplodableEvent = &MultiObserverEvent{} + _ MultiValueEvent = &MultiObserverEvent{} +) diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index 192ce29b..8c97f6c7 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -14,11 +14,13 @@ package event import ( + "reflect" "testing" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/statsd_exporter/pkg/clock" + "github.com/prometheus/statsd_exporter/pkg/mapper" ) var eventsFlushed = prometheus.NewCounter( @@ -85,3 +87,192 @@ func TestEventIntervalFlush(t *testing.T) { t.Fatal("Expected 10 events in the event channel, but got", len(events)) } } + +func TestMultiValueEvent(t *testing.T) { + tests := []struct { + name string + event MultiValueEvent + wantValues []float64 + wantName string + wantType mapper.MetricType + wantLabels map[string]string + }{ + { + name: "MultiObserverEvent with single value", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0, + }, + wantValues: []float64{1.0}, + wantName: "test_metric", + wantType: mapper.MetricTypeObserver, + wantLabels: map[string]string{"label": "value"}, + }, + { + name: "MultiObserverEvent with multiple values", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0, 2.0, 3.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0.5, + }, + wantValues: []float64{1.0, 2.0, 3.0}, + wantName: "test_metric", + wantType: mapper.MetricTypeObserver, + wantLabels: map[string]string{"label": "value"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.event.Values(); !reflect.DeepEqual(got, tt.wantValues) { + t.Errorf("MultiValueEvent.Values() = %v, want %v", got, tt.wantValues) + } + if got := tt.event.MetricName(); got != tt.wantName { + t.Errorf("MultiValueEvent.MetricName() = %v, want %v", got, tt.wantName) + } + if got := tt.event.MetricType(); got != tt.wantType { + t.Errorf("MultiValueEvent.MetricType() = %v, want %v", got, tt.wantType) + } + if got := tt.event.Labels(); !reflect.DeepEqual(got, tt.wantLabels) { + t.Errorf("MultiValueEvent.Labels() = %v, want %v", got, tt.wantLabels) + } + if got := tt.event.Value(); got != tt.wantValues[0] { + t.Errorf("MultiValueEvent.Value() = %v, want %v", got, tt.wantValues[0]) + } + }) + } +} + +func TestMultiObserverEvent_Explode(t *testing.T) { + tests := []struct { + name string + event *MultiObserverEvent + wantEvents []Event + }{ + { + name: "single value no sampling", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0, + }, + wantEvents: []Event{ + &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0, + }, + }, + }, + { + name: "multiple values no sampling", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0, 2.0, 3.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0, + }, + wantEvents: []Event{ + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 1.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 2.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 3.0, + OLabels: map[string]string{"label": "value"}, + }, + }, + }, + { + name: "multiple values with sampling", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0, 2.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0.5, + }, + wantEvents: []Event{ + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 1.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 2.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 1.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 2.0, + OLabels: map[string]string{"label": "value"}, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.event.Explode() + if !reflect.DeepEqual(got, tt.wantEvents) { + t.Errorf("MultiObserverEvent.Explode() = %v, want %v", got, tt.wantEvents) + } + }) + } +} + +func TestEventImplementations(t *testing.T) { + tests := []struct { + name string + event interface{} + }{ + { + name: "MultiObserverEvent implements MultiValueEvent", + event: &MultiObserverEvent{}, + }, + { + name: "MultiObserverEvent implements ExplodableEvent", + event: &MultiObserverEvent{}, + }, + { + name: "MultiObserverEvent implements Event", + event: &MultiObserverEvent{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + switch tt.name { + case "MultiObserverEvent implements MultiValueEvent": + if _, ok := tt.event.(MultiValueEvent); !ok { + t.Error("MultiObserverEvent does not implement MultiValueEvent") + } + case "MultiObserverEvent implements ExplodableEvent": + if _, ok := tt.event.(ExplodableEvent); !ok { + t.Error("MultiObserverEvent does not implement ExplodableEvent") + } + case "MultiObserverEvent implements Event": + if _, ok := tt.event.(Event); !ok { + t.Error("MultiObserverEvent does not implement Event") + } + } + }) + } +} From e5c2d405812b9fda461a093c05b0c53c9dbaca3b Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Mon, 23 Dec 2024 11:20:49 +0100 Subject: [PATCH 2/3] Make previous events implement MultiValueEvent and simplify tests; Signed-off-by: Pedro Tanaka --- pkg/event/event.go | 4 +- pkg/event/event_test.go | 114 ++++++++++++++++++++++++++-------------- 2 files changed, 79 insertions(+), 39 deletions(-) diff --git a/pkg/event/event.go b/pkg/event/event.go index 1765437b..95848282 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -143,7 +143,6 @@ func (ueh *UnbufferedEventHandler) Queue(events Events) { // MultiValueEvent is an event that contains multiple values, it is going to replace the existing Event interface. type MultiValueEvent interface { MetricName() string - Value() float64 Labels() map[string]string MetricType() mapper.MetricType Values() []float64 @@ -203,4 +202,7 @@ func (m *MultiObserverEvent) Explode() []Event { var ( _ ExplodableEvent = &MultiObserverEvent{} _ MultiValueEvent = &MultiObserverEvent{} + _ MultiValueEvent = &CounterEvent{} + _ MultiValueEvent = &GaugeEvent{} + _ MultiValueEvent = &ObserverEvent{} ) diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index 8c97f6c7..88040b97 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -123,6 +123,42 @@ func TestMultiValueEvent(t *testing.T) { wantType: mapper.MetricTypeObserver, wantLabels: map[string]string{"label": "value"}, }, + { + name: "CounterEvent implements MultiValueEvent", + event: &CounterEvent{ + CMetricName: "test_counter", + CValue: 42.0, + CLabels: map[string]string{"label": "value"}, + }, + wantValues: []float64{42.0}, + wantName: "test_counter", + wantType: mapper.MetricTypeCounter, + wantLabels: map[string]string{"label": "value"}, + }, + { + name: "GaugeEvent implements MultiValueEvent", + event: &GaugeEvent{ + GMetricName: "test_gauge", + GValue: 123.0, + GLabels: map[string]string{"label": "value"}, + }, + wantValues: []float64{123.0}, + wantName: "test_gauge", + wantType: mapper.MetricTypeGauge, + wantLabels: map[string]string{"label": "value"}, + }, + { + name: "ObserverEvent implements MultiValueEvent", + event: &ObserverEvent{ + OMetricName: "test_observer", + OValue: 99.0, + OLabels: map[string]string{"label": "value"}, + }, + wantValues: []float64{99.0}, + wantName: "test_observer", + wantType: mapper.MetricTypeObserver, + wantLabels: map[string]string{"label": "value"}, + }, } for _, tt := range tests { @@ -139,9 +175,6 @@ func TestMultiValueEvent(t *testing.T) { if got := tt.event.Labels(); !reflect.DeepEqual(got, tt.wantLabels) { t.Errorf("MultiValueEvent.Labels() = %v, want %v", got, tt.wantLabels) } - if got := tt.event.Value(); got != tt.wantValues[0] { - t.Errorf("MultiValueEvent.Value() = %v, want %v", got, tt.wantValues[0]) - } }) } } @@ -239,40 +272,45 @@ func TestMultiObserverEvent_Explode(t *testing.T) { } func TestEventImplementations(t *testing.T) { - tests := []struct { - name string - event interface{} - }{ - { - name: "MultiObserverEvent implements MultiValueEvent", - event: &MultiObserverEvent{}, - }, - { - name: "MultiObserverEvent implements ExplodableEvent", - event: &MultiObserverEvent{}, - }, - { - name: "MultiObserverEvent implements Event", - event: &MultiObserverEvent{}, - }, - } + t.Run("MultiObserverEvent implements MultiValueEvent", func(t *testing.T) { + event := &MultiObserverEvent{} + if _, ok := interface{}(event).(MultiValueEvent); !ok { + t.Error("MultiObserverEvent does not implement MultiValueEvent") + } + }) - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - switch tt.name { - case "MultiObserverEvent implements MultiValueEvent": - if _, ok := tt.event.(MultiValueEvent); !ok { - t.Error("MultiObserverEvent does not implement MultiValueEvent") - } - case "MultiObserverEvent implements ExplodableEvent": - if _, ok := tt.event.(ExplodableEvent); !ok { - t.Error("MultiObserverEvent does not implement ExplodableEvent") - } - case "MultiObserverEvent implements Event": - if _, ok := tt.event.(Event); !ok { - t.Error("MultiObserverEvent does not implement Event") - } - } - }) - } + t.Run("MultiObserverEvent implements ExplodableEvent", func(t *testing.T) { + event := &MultiObserverEvent{} + if _, ok := interface{}(event).(ExplodableEvent); !ok { + t.Error("MultiObserverEvent does not implement ExplodableEvent") + } + }) + + t.Run("MultiObserverEvent implements Event", func(t *testing.T) { + event := &MultiObserverEvent{} + if _, ok := interface{}(event).(Event); !ok { + t.Error("MultiObserverEvent does not implement Event") + } + }) + + t.Run("CounterEvent implements MultiValueEvent", func(t *testing.T) { + event := &CounterEvent{} + if _, ok := interface{}(event).(MultiValueEvent); !ok { + t.Error("CounterEvent does not implement MultiValueEvent") + } + }) + + t.Run("GaugeEvent implements MultiValueEvent", func(t *testing.T) { + event := &GaugeEvent{} + if _, ok := interface{}(event).(MultiValueEvent); !ok { + t.Error("GaugeEvent does not implement MultiValueEvent") + } + }) + + t.Run("ObserverEvent implements MultiValueEvent", func(t *testing.T) { + event := &ObserverEvent{} + if _, ok := interface{}(event).(MultiValueEvent); !ok { + t.Error("ObserverEvent does not implement MultiValueEvent") + } + }) } From a125dac85b48fbdce5a8b07e59f949193bfb5103 Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Mon, 23 Dec 2024 11:24:37 +0100 Subject: [PATCH 3/3] Rewording Explode -> Expand Signed-off-by: Pedro Tanaka --- pkg/event/event.go | 10 ++++----- pkg/event/event_test.go | 50 +++-------------------------------------- 2 files changed, 8 insertions(+), 52 deletions(-) diff --git a/pkg/event/event.go b/pkg/event/event.go index 95848282..4f8a5311 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -155,8 +155,8 @@ type MultiObserverEvent struct { SampleRate float64 } -type ExplodableEvent interface { - Explode() []Event +type ExpandableEvent interface { + Expand() []Event } func (m *MultiObserverEvent) MetricName() string { return m.OMetricName } @@ -165,10 +165,10 @@ func (m *MultiObserverEvent) Labels() map[string]string { return m.OLabels } func (m *MultiObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver } func (m *MultiObserverEvent) Values() []float64 { return m.OValues } -// Explode returns a list of events that are the result of exploding the multi-value event. +// Expand returns a list of events that are the result of expanding the multi-value event. // This will be used as a middle-step in the pipeline to convert multi-value events to single-value events. // And keep the exporter code compatible with previous versions. -func (m *MultiObserverEvent) Explode() []Event { +func (m *MultiObserverEvent) Expand() []Event { if len(m.OValues) == 1 && m.SampleRate == 0 { return []Event{m} } @@ -200,7 +200,7 @@ func (m *MultiObserverEvent) Explode() []Event { } var ( - _ ExplodableEvent = &MultiObserverEvent{} + _ ExpandableEvent = &MultiObserverEvent{} _ MultiValueEvent = &MultiObserverEvent{} _ MultiValueEvent = &CounterEvent{} _ MultiValueEvent = &GaugeEvent{} diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index 88040b97..a4e21d50 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -179,7 +179,7 @@ func TestMultiValueEvent(t *testing.T) { } } -func TestMultiObserverEvent_Explode(t *testing.T) { +func TestMultiObserverEvent_Expand(t *testing.T) { tests := []struct { name string event *MultiObserverEvent @@ -263,54 +263,10 @@ func TestMultiObserverEvent_Explode(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := tt.event.Explode() + got := tt.event.Expand() if !reflect.DeepEqual(got, tt.wantEvents) { - t.Errorf("MultiObserverEvent.Explode() = %v, want %v", got, tt.wantEvents) + t.Errorf("MultiObserverEvent.Expand() = %v, want %v", got, tt.wantEvents) } }) } } - -func TestEventImplementations(t *testing.T) { - t.Run("MultiObserverEvent implements MultiValueEvent", func(t *testing.T) { - event := &MultiObserverEvent{} - if _, ok := interface{}(event).(MultiValueEvent); !ok { - t.Error("MultiObserverEvent does not implement MultiValueEvent") - } - }) - - t.Run("MultiObserverEvent implements ExplodableEvent", func(t *testing.T) { - event := &MultiObserverEvent{} - if _, ok := interface{}(event).(ExplodableEvent); !ok { - t.Error("MultiObserverEvent does not implement ExplodableEvent") - } - }) - - t.Run("MultiObserverEvent implements Event", func(t *testing.T) { - event := &MultiObserverEvent{} - if _, ok := interface{}(event).(Event); !ok { - t.Error("MultiObserverEvent does not implement Event") - } - }) - - t.Run("CounterEvent implements MultiValueEvent", func(t *testing.T) { - event := &CounterEvent{} - if _, ok := interface{}(event).(MultiValueEvent); !ok { - t.Error("CounterEvent does not implement MultiValueEvent") - } - }) - - t.Run("GaugeEvent implements MultiValueEvent", func(t *testing.T) { - event := &GaugeEvent{} - if _, ok := interface{}(event).(MultiValueEvent); !ok { - t.Error("GaugeEvent does not implement MultiValueEvent") - } - }) - - t.Run("ObserverEvent implements MultiValueEvent", func(t *testing.T) { - event := &ObserverEvent{} - if _, ok := interface{}(event).(MultiValueEvent); !ok { - t.Error("ObserverEvent does not implement MultiValueEvent") - } - }) -}