Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event): Add MultiValueEvent interface and MultiObserverEvent implementation #602

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (c *CounterEvent) MetricName() string { return c.CMetricName }
func (c *CounterEvent) Value() float64 { return c.CValue }
func (c *CounterEvent) Labels() map[string]string { return c.CLabels }
func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter }
func (c *CounterEvent) Values() []float64 { return []float64{c.CValue} }

type GaugeEvent struct {
GMetricName string
Expand All @@ -51,6 +52,7 @@ func (g *GaugeEvent) MetricName() string { return g.GMetricName }
func (g *GaugeEvent) Value() float64 { return g.GValue }
func (g *GaugeEvent) Labels() map[string]string { return g.GLabels }
func (g *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge }
func (g *GaugeEvent) Values() []float64 { return []float64{g.GValue} }

type ObserverEvent struct {
OMetricName string
Expand All @@ -62,6 +64,7 @@ func (o *ObserverEvent) MetricName() string { return o.OMetricName }
func (o *ObserverEvent) Value() float64 { return o.OValue }
func (o *ObserverEvent) Labels() map[string]string { return o.OLabels }
func (o *ObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver }
func (o *ObserverEvent) Values() []float64 { return []float64{o.OValue} }

type Events []Event

Expand Down Expand Up @@ -136,3 +139,70 @@ type UnbufferedEventHandler struct {
func (ueh *UnbufferedEventHandler) Queue(events Events) {
ueh.C <- events
}

// MultiValueEvent is an event that contains multiple values, it is going to replace the existing Event interface.
type MultiValueEvent interface {
MetricName() string
Labels() map[string]string
MetricType() mapper.MetricType
Values() []float64
}

type MultiObserverEvent struct {
OMetricName string
OValues []float64 // DataDog extensions allow multiple values in a single sample
OLabels map[string]string
SampleRate float64
}

type ExpandableEvent interface {
Expand() []Event
}

func (m *MultiObserverEvent) MetricName() string { return m.OMetricName }
func (m *MultiObserverEvent) Value() float64 { return m.OValues[0] }
func (m *MultiObserverEvent) Labels() map[string]string { return m.OLabels }
func (m *MultiObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver }
func (m *MultiObserverEvent) Values() []float64 { return m.OValues }

// Expand returns a list of events that are the result of expanding the multi-value event.
// This will be used as a middle-step in the pipeline to convert multi-value events to single-value events.
// And keep the exporter code compatible with previous versions.
func (m *MultiObserverEvent) Expand() []Event {
if len(m.OValues) == 1 && m.SampleRate == 0 {
return []Event{m}
}

events := make([]Event, 0, len(m.OValues))
for _, value := range m.OValues {
labels := make(map[string]string, len(m.OLabels))
for k, v := range m.OLabels {
labels[k] = v
}

events = append(events, &ObserverEvent{
OMetricName: m.OMetricName,
OValue: value,
OLabels: labels,
})
}

if m.SampleRate > 0 && m.SampleRate < 1 {
multiplier := int(1 / m.SampleRate)
multipliedEvents := make([]Event, 0, len(events)*multiplier)
for i := 0; i < multiplier; i++ {
multipliedEvents = append(multipliedEvents, events...)
}
return multipliedEvents
}

return events
}

var (
_ ExpandableEvent = &MultiObserverEvent{}
_ MultiValueEvent = &MultiObserverEvent{}
_ MultiValueEvent = &CounterEvent{}
_ MultiValueEvent = &GaugeEvent{}
_ MultiValueEvent = &ObserverEvent{}
)
185 changes: 185 additions & 0 deletions pkg/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package event

import (
"reflect"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/mapper"
)

var eventsFlushed = prometheus.NewCounter(
Expand Down Expand Up @@ -85,3 +87,186 @@ func TestEventIntervalFlush(t *testing.T) {
t.Fatal("Expected 10 events in the event channel, but got", len(events))
}
}

func TestMultiValueEvent(t *testing.T) {
tests := []struct {
name string
event MultiValueEvent
wantValues []float64
wantName string
wantType mapper.MetricType
wantLabels map[string]string
}{
{
name: "MultiObserverEvent with single value",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0,
},
wantValues: []float64{1.0},
wantName: "test_metric",
wantType: mapper.MetricTypeObserver,
wantLabels: map[string]string{"label": "value"},
},
{
name: "MultiObserverEvent with multiple values",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0, 2.0, 3.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0.5,
},
wantValues: []float64{1.0, 2.0, 3.0},
wantName: "test_metric",
wantType: mapper.MetricTypeObserver,
wantLabels: map[string]string{"label": "value"},
},
{
name: "CounterEvent implements MultiValueEvent",
event: &CounterEvent{
CMetricName: "test_counter",
CValue: 42.0,
CLabels: map[string]string{"label": "value"},
},
wantValues: []float64{42.0},
wantName: "test_counter",
wantType: mapper.MetricTypeCounter,
wantLabels: map[string]string{"label": "value"},
},
{
name: "GaugeEvent implements MultiValueEvent",
event: &GaugeEvent{
GMetricName: "test_gauge",
GValue: 123.0,
GLabels: map[string]string{"label": "value"},
},
wantValues: []float64{123.0},
wantName: "test_gauge",
wantType: mapper.MetricTypeGauge,
wantLabels: map[string]string{"label": "value"},
},
{
name: "ObserverEvent implements MultiValueEvent",
event: &ObserverEvent{
OMetricName: "test_observer",
OValue: 99.0,
OLabels: map[string]string{"label": "value"},
},
wantValues: []float64{99.0},
wantName: "test_observer",
wantType: mapper.MetricTypeObserver,
wantLabels: map[string]string{"label": "value"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.event.Values(); !reflect.DeepEqual(got, tt.wantValues) {
t.Errorf("MultiValueEvent.Values() = %v, want %v", got, tt.wantValues)
}
if got := tt.event.MetricName(); got != tt.wantName {
t.Errorf("MultiValueEvent.MetricName() = %v, want %v", got, tt.wantName)
}
if got := tt.event.MetricType(); got != tt.wantType {
t.Errorf("MultiValueEvent.MetricType() = %v, want %v", got, tt.wantType)
}
if got := tt.event.Labels(); !reflect.DeepEqual(got, tt.wantLabels) {
t.Errorf("MultiValueEvent.Labels() = %v, want %v", got, tt.wantLabels)
}
})
}
}

func TestMultiObserverEvent_Expand(t *testing.T) {
tests := []struct {
name string
event *MultiObserverEvent
wantEvents []Event
}{
{
name: "single value no sampling",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0,
},
wantEvents: []Event{
&MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0,
},
},
},
{
name: "multiple values no sampling",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0, 2.0, 3.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0,
},
wantEvents: []Event{
&ObserverEvent{
OMetricName: "test_metric",
OValue: 1.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 2.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 3.0,
OLabels: map[string]string{"label": "value"},
},
},
},
{
name: "multiple values with sampling",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0, 2.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0.5,
},
wantEvents: []Event{
&ObserverEvent{
OMetricName: "test_metric",
OValue: 1.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 2.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 1.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 2.0,
OLabels: map[string]string{"label": "value"},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.event.Expand()
if !reflect.DeepEqual(got, tt.wantEvents) {
t.Errorf("MultiObserverEvent.Expand() = %v, want %v", got, tt.wantEvents)
}
})
}
}
Loading