Skip to content

Commit

Permalink
feat(event): Add MultiValueEvent interface and MultiObserverEvent imp…
Browse files Browse the repository at this point in the history
…lementation

This PR introduces the first step in refactoring the event handling system to better
support multiple values in a single event, which will help reduce allocations when
processing events. This is part of a larger effort to improve performance and reduce
memory allocations in the statsd exporter.

Changes:
- Add new `MultiValueEvent` interface that supports multiple values per event
- Add `MultiObserverEvent` implementation for handling multiple observations
- Add `ExplodableEvent` interface for backward compatibility
- Add `Values()` method to existing event types
- Add comprehensive tests for new interfaces and implementations

This change is the foundation for future improvements that will:
1. Move explosion logic to a dedicated package
2. Update the line parser to use multi-value events
3. Modify the exporter to handle multi-value events directly
4. Eventually remove the need for event explosion

The changes in this PR are backward compatible and don't affect existing functionality.

Relates to #577

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>
  • Loading branch information
pedro-stanaka committed Dec 17, 2024
1 parent 32fee3f commit 46cb416
Show file tree
Hide file tree
Showing 2 changed files with 259 additions and 0 deletions.
68 changes: 68 additions & 0 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (c *CounterEvent) MetricName() string { return c.CMetricName }
func (c *CounterEvent) Value() float64 { return c.CValue }
func (c *CounterEvent) Labels() map[string]string { return c.CLabels }
func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter }
func (c *CounterEvent) Values() []float64 { return []float64{c.CValue} }

type GaugeEvent struct {
GMetricName string
Expand All @@ -51,6 +52,7 @@ func (g *GaugeEvent) MetricName() string { return g.GMetricName }
func (g *GaugeEvent) Value() float64 { return g.GValue }
func (g *GaugeEvent) Labels() map[string]string { return g.GLabels }
func (g *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge }
func (g *GaugeEvent) Values() []float64 { return []float64{g.GValue} }

type ObserverEvent struct {
OMetricName string
Expand All @@ -62,6 +64,7 @@ func (o *ObserverEvent) MetricName() string { return o.OMetricName }
func (o *ObserverEvent) Value() float64 { return o.OValue }
func (o *ObserverEvent) Labels() map[string]string { return o.OLabels }
func (o *ObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver }
func (o *ObserverEvent) Values() []float64 { return []float64{o.OValue} }

type Events []Event

Expand Down Expand Up @@ -136,3 +139,68 @@ type UnbufferedEventHandler struct {
func (ueh *UnbufferedEventHandler) Queue(events Events) {
ueh.C <- events
}

// MultiValueEvent is an event that contains multiple values, it is going to replace the existing Event interface.
type MultiValueEvent interface {
MetricName() string
Value() float64
Labels() map[string]string
MetricType() mapper.MetricType
Values() []float64
}

type MultiObserverEvent struct {
OMetricName string
OValues []float64 // DataDog extensions allow multiple values in a single sample
OLabels map[string]string
SampleRate float64
}

type ExplodableEvent interface {
Explode() []Event
}

func (m *MultiObserverEvent) MetricName() string { return m.OMetricName }
func (m *MultiObserverEvent) Value() float64 { return m.OValues[0] }
func (m *MultiObserverEvent) Labels() map[string]string { return m.OLabels }
func (m *MultiObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver }
func (m *MultiObserverEvent) Values() []float64 { return m.OValues }

// Explode returns a list of events that are the result of exploding the multi-value event.
// This will be used as a middle-step in the pipeline to convert multi-value events to single-value events.
// And keep the exporter code compatible with previous versions.
func (m *MultiObserverEvent) Explode() []Event {
if len(m.OValues) == 1 && m.SampleRate == 0 {
return []Event{m}
}

events := make([]Event, 0, len(m.OValues))
for _, value := range m.OValues {
labels := make(map[string]string, len(m.OLabels))
for k, v := range m.OLabels {
labels[k] = v
}

events = append(events, &ObserverEvent{
OMetricName: m.OMetricName,
OValue: value,
OLabels: labels,
})
}

if m.SampleRate > 0 && m.SampleRate < 1 {
multiplier := int(1 / m.SampleRate)
multipliedEvents := make([]Event, 0, len(events)*multiplier)
for i := 0; i < multiplier; i++ {
multipliedEvents = append(multipliedEvents, events...)
}
return multipliedEvents
}

return events
}

var (
_ ExplodableEvent = &MultiObserverEvent{}
_ MultiValueEvent = &MultiObserverEvent{}
)
191 changes: 191 additions & 0 deletions pkg/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package event

import (
"reflect"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/mapper"
)

var eventsFlushed = prometheus.NewCounter(
Expand Down Expand Up @@ -85,3 +87,192 @@ func TestEventIntervalFlush(t *testing.T) {
t.Fatal("Expected 10 events in the event channel, but got", len(events))
}
}

func TestMultiValueEvent(t *testing.T) {
tests := []struct {
name string
event MultiValueEvent
wantValues []float64
wantName string
wantType mapper.MetricType
wantLabels map[string]string
}{
{
name: "MultiObserverEvent with single value",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0,
},
wantValues: []float64{1.0},
wantName: "test_metric",
wantType: mapper.MetricTypeObserver,
wantLabels: map[string]string{"label": "value"},
},
{
name: "MultiObserverEvent with multiple values",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0, 2.0, 3.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0.5,
},
wantValues: []float64{1.0, 2.0, 3.0},
wantName: "test_metric",
wantType: mapper.MetricTypeObserver,
wantLabels: map[string]string{"label": "value"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.event.Values(); !reflect.DeepEqual(got, tt.wantValues) {
t.Errorf("MultiValueEvent.Values() = %v, want %v", got, tt.wantValues)
}
if got := tt.event.MetricName(); got != tt.wantName {
t.Errorf("MultiValueEvent.MetricName() = %v, want %v", got, tt.wantName)
}
if got := tt.event.MetricType(); got != tt.wantType {
t.Errorf("MultiValueEvent.MetricType() = %v, want %v", got, tt.wantType)
}
if got := tt.event.Labels(); !reflect.DeepEqual(got, tt.wantLabels) {
t.Errorf("MultiValueEvent.Labels() = %v, want %v", got, tt.wantLabels)
}
if got := tt.event.Value(); got != tt.wantValues[0] {
t.Errorf("MultiValueEvent.Value() = %v, want %v", got, tt.wantValues[0])
}
})
}
}

func TestMultiObserverEvent_Explode(t *testing.T) {
tests := []struct {
name string
event *MultiObserverEvent
wantEvents []Event
}{
{
name: "single value no sampling",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0,
},
wantEvents: []Event{
&MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0,
},
},
},
{
name: "multiple values no sampling",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0, 2.0, 3.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0,
},
wantEvents: []Event{
&ObserverEvent{
OMetricName: "test_metric",
OValue: 1.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 2.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 3.0,
OLabels: map[string]string{"label": "value"},
},
},
},
{
name: "multiple values with sampling",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0, 2.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0.5,
},
wantEvents: []Event{
&ObserverEvent{
OMetricName: "test_metric",
OValue: 1.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 2.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 1.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 2.0,
OLabels: map[string]string{"label": "value"},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.event.Explode()
if !reflect.DeepEqual(got, tt.wantEvents) {
t.Errorf("MultiObserverEvent.Explode() = %v, want %v", got, tt.wantEvents)
}
})
}
}

func TestEventImplementations(t *testing.T) {
tests := []struct {
name string
event interface{}
}{
{
name: "MultiObserverEvent implements MultiValueEvent",
event: &MultiObserverEvent{},
},
{
name: "MultiObserverEvent implements ExplodableEvent",
event: &MultiObserverEvent{},
},
{
name: "MultiObserverEvent implements Event",
event: &MultiObserverEvent{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
switch tt.name {
case "MultiObserverEvent implements MultiValueEvent":
if _, ok := tt.event.(MultiValueEvent); !ok {
t.Error("MultiObserverEvent does not implement MultiValueEvent")
}
case "MultiObserverEvent implements ExplodableEvent":
if _, ok := tt.event.(ExplodableEvent); !ok {
t.Error("MultiObserverEvent does not implement ExplodableEvent")
}
case "MultiObserverEvent implements Event":
if _, ok := tt.event.(Event); !ok {
t.Error("MultiObserverEvent does not implement Event")
}
}
})
}
}

0 comments on commit 46cb416

Please sign in to comment.