Skip to content

Commit

Permalink
Fix global labels values race condition (#186)
Browse files Browse the repository at this point in the history
Fix a race condition for global labels that use a slice of values instead of a single value.
  • Loading branch information
carsonip authored Aug 2, 2024
1 parent 23aec97 commit dbb653e
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 51 deletions.
14 changes: 10 additions & 4 deletions aggregators/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ func (gl *globalLabels) ToProto() *aggregationpb.GlobalLabels {
}
pb.Labels[i].Key = k
pb.Labels[i].Value = v.Value
pb.Labels[i].Values = v.Values
pb.Labels[i].Values = slices.Grow(pb.Labels[i].Values, len(v.Values))[:len(v.Values)]
copy(pb.Labels[i].Values, v.Values)
i++
}
sort.Slice(pb.Labels, func(i, j int) bool {
Expand All @@ -345,7 +346,8 @@ func (gl *globalLabels) ToProto() *aggregationpb.GlobalLabels {
}
pb.NumericLabels[i].Key = k
pb.NumericLabels[i].Value = v.Value
pb.NumericLabels[i].Values = v.Values
pb.NumericLabels[i].Values = slices.Grow(pb.NumericLabels[i].Values, len(v.Values))[:len(v.Values)]
copy(pb.NumericLabels[i].Values, v.Values)
i++
}
sort.Slice(pb.NumericLabels, func(i, j int) bool {
Expand All @@ -359,11 +361,15 @@ func (gl *globalLabels) ToProto() *aggregationpb.GlobalLabels {
func (gl *globalLabels) FromProto(pb *aggregationpb.GlobalLabels) {
gl.Labels = make(modelpb.Labels, len(pb.Labels))
for _, l := range pb.Labels {
gl.Labels[l.Key] = &modelpb.LabelValue{Value: l.Value, Values: l.Values, Global: true}
gl.Labels[l.Key] = &modelpb.LabelValue{Value: l.Value, Global: true}
gl.Labels[l.Key].Values = slices.Grow(gl.Labels[l.Key].Values, len(l.Values))[:len(l.Values)]
copy(gl.Labels[l.Key].Values, l.Values)
}
gl.NumericLabels = make(modelpb.NumericLabels, len(pb.NumericLabels))
for _, l := range pb.NumericLabels {
gl.NumericLabels[l.Key] = &modelpb.NumericLabelValue{Value: l.Value, Values: l.Values, Global: true}
gl.NumericLabels[l.Key] = &modelpb.NumericLabelValue{Value: l.Value, Global: true}
gl.NumericLabels[l.Key].Values = slices.Grow(gl.NumericLabels[l.Key].Values, len(l.Values))[:len(l.Values)]
copy(gl.NumericLabels[l.Key].Values, l.Values)
}
}

Expand Down
83 changes: 38 additions & 45 deletions aggregators/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"math"
"slices"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -984,75 +985,67 @@ func overflowSpanMetricsToAPMEvent(
}

func marshalEventGlobalLabels(e *modelpb.APMEvent) ([]byte, error) {
if len(e.Labels) == 0 && len(e.NumericLabels) == 0 {
var labelsCnt, numericLabelsCnt int
for _, v := range e.Labels {
if !v.Global {
continue
}
labelsCnt++
}
for _, v := range e.NumericLabels {
if !v.Global {
continue
}
numericLabelsCnt++
}

if labelsCnt == 0 && numericLabelsCnt == 0 {
return nil, nil
}

var pb *aggregationpb.GlobalLabels
pb := aggregationpb.GlobalLabelsFromVTPool()
defer pb.ReturnToVTPool()
pb.Labels = slices.Grow(pb.Labels, labelsCnt)[:labelsCnt]
pb.NumericLabels = slices.Grow(pb.NumericLabels, numericLabelsCnt)[:numericLabelsCnt]

var i int
// Keys must be sorted to ensure wire formats are deterministically generated and strings are directly comparable
// i.e. Protobuf formats are equal if and only if the structs are equal
for k, v := range e.Labels {
if !v.Global {
continue
}

if pb == nil {
pb = aggregationpb.GlobalLabelsFromVTPool()
defer pb.ReturnToVTPool()
}

i := len(pb.Labels)
if i == cap(pb.Labels) {
pb.Labels = append(pb.Labels, &aggregationpb.Label{})
} else {
pb.Labels = pb.Labels[:i+1]
if pb.Labels[i] == nil {
pb.Labels[i] = &aggregationpb.Label{}
}
if pb.Labels[i] == nil {
pb.Labels[i] = &aggregationpb.Label{}
}
pb.Labels[i].Key = k
pb.Labels[i].Value = v.Value
pb.Labels[i].Values = v.Values
}
if pb != nil {
sort.Slice(pb.Labels, func(i, j int) bool {
return pb.Labels[i].Key < pb.Labels[j].Key
})
pb.Labels[i].Values = slices.Grow(pb.Labels[i].Values, len(v.Values))[:len(v.Values)]
copy(pb.Labels[i].Values, v.Values)
i++
}
sort.Slice(pb.Labels, func(i, j int) bool {
return pb.Labels[i].Key < pb.Labels[j].Key
})

i = 0
for k, v := range e.NumericLabels {
if !v.Global {
continue
}

if pb == nil {
pb = aggregationpb.GlobalLabelsFromVTPool()
defer pb.ReturnToVTPool()
}

i := len(pb.NumericLabels)
if i == cap(pb.NumericLabels) {
pb.NumericLabels = append(pb.NumericLabels, &aggregationpb.NumericLabel{})
} else {
pb.NumericLabels = pb.NumericLabels[:i+1]
if pb.NumericLabels[i] == nil {
pb.NumericLabels[i] = &aggregationpb.NumericLabel{}
}
if pb.NumericLabels[i] == nil {
pb.NumericLabels[i] = &aggregationpb.NumericLabel{}
}
pb.NumericLabels[i].Key = k
pb.NumericLabels[i].Value = v.Value
pb.NumericLabels[i].Values = v.Values
}
if pb != nil {
sort.Slice(pb.NumericLabels, func(i, j int) bool {
return pb.NumericLabels[i].Key < pb.NumericLabels[j].Key
})
pb.NumericLabels[i].Values = slices.Grow(pb.NumericLabels[i].Values, len(v.Values))[:len(v.Values)]
copy(pb.NumericLabels[i].Values, v.Values)
i++
}
sort.Slice(pb.NumericLabels, func(i, j int) bool {
return pb.NumericLabels[i].Key < pb.NumericLabels[j].Key
})

if pb == nil {
return nil, nil
}
return pb.MarshalVT()
}

Expand Down
31 changes: 29 additions & 2 deletions aggregators/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package aggregators
import (
"fmt"
"net/netip"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -779,8 +780,8 @@ func getTestGlobalLabelsStr(t *testing.T, s string) string {
return gls
}

func TestMarshalEventGlobalLabels(t *testing.T) {
e := &modelpb.APMEvent{
func globalLabelsEvent() *modelpb.APMEvent {
return &modelpb.APMEvent{
Labels: modelpb.Labels{
"tag1": &modelpb.LabelValue{
Value: "1",
Expand Down Expand Up @@ -826,6 +827,10 @@ func TestMarshalEventGlobalLabels(t *testing.T) {
},
},
}
}

func TestMarshalEventGlobalLabels(t *testing.T) {
e := globalLabelsEvent()
b, err := marshalEventGlobalLabels(e)
require.NoError(t, err)
gl := globalLabels{}
Expand Down Expand Up @@ -856,3 +861,25 @@ func TestMarshalEventGlobalLabels(t *testing.T) {
},
}, gl.NumericLabels)
}

func TestMarshalEventGlobalLabelsRace(t *testing.T) {
const N = 1000
wg := sync.WaitGroup{}
for i := 0; i < N; i++ {
wg.Add(1)
go func() {
e := globalLabelsEvent()
b, err := marshalEventGlobalLabels(e)
require.NoError(t, err)
gl := globalLabels{}
err = gl.UnmarshalBinary(b)
require.NoError(t, err)
b, err = gl.MarshalBinary()
require.NoError(t, err)
err = gl.UnmarshalBinary(b)
require.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
}

0 comments on commit dbb653e

Please sign in to comment.