diff --git a/.chloggen/add-is-read-only.yaml b/.chloggen/add-is-read-only.yaml new file mode 100644 index 00000000000..c4af586b265 --- /dev/null +++ b/.chloggen/add-is-read-only.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: pdata + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add IsReadOnly() method to p[metrics|logs|traces].[Metrics|Logs|Spans] pdata structs allowing to check if the struct is read-only. + +# One or more tracking issues or pull requests related to the change +issues: [6794] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/enable-mutation-assertions.yaml b/.chloggen/enable-mutation-assertions.yaml new file mode 100644 index 00000000000..eec8f892160 --- /dev/null +++ b/.chloggen/enable-mutation-assertions.yaml @@ -0,0 +1,26 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: fanoutconsumer + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Enable runtime assertions to catch incorrect pdata mutations in the components claiming as non-mutating pdata. + +# One or more tracking issues or pull requests related to the change +issues: [6794] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This change enables the runtime assertions to catch unintentional pdata mutations in components that are claimed + as non-mutating pdata. Without these assertions, runtime errors may still occur, but thrown by unrelated components, + making it very difficult to troubleshoot. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/fanoutconsumer/logs.go b/internal/fanoutconsumer/logs.go index 9b07bbc6377..e0f9f88df4c 100644 --- a/internal/fanoutconsumer/logs.go +++ b/internal/fanoutconsumer/logs.go @@ -20,36 +20,22 @@ import ( // NewLogs wraps multiple log consumers in a single one. // It fanouts the incoming data to all the consumers, and does smart routing: // - Clones only to the consumer that needs to mutate the data. -// - If all consumers needs to mutate the data one will get the original data. +// - If all consumers needs to mutate the data one will get the original mutable data. func NewLogs(lcs []consumer.Logs) consumer.Logs { - if len(lcs) == 1 { - // Don't wrap if no need to do it. - return lcs[0] - } - var pass []consumer.Logs - var clone []consumer.Logs - for i := 0; i < len(lcs)-1; i++ { - if !lcs[i].Capabilities().MutatesData { - pass = append(pass, lcs[i]) + lc := &logsConsumer{} + for i := 0; i < len(lcs); i++ { + if lcs[i].Capabilities().MutatesData { + lc.mutable = append(lc.mutable, lcs[i]) } else { - clone = append(clone, lcs[i]) + lc.readonly = append(lc.readonly, lcs[i]) } } - // Give the original data to the last consumer if no other read-only consumer, - // otherwise put it in the right bucket. Never share the same data between - // a mutating and a non-mutating consumer since the non-mutating consumer may process - // data async and the mutating consumer may change the data before that. - if len(pass) == 0 || !lcs[len(lcs)-1].Capabilities().MutatesData { - pass = append(pass, lcs[len(lcs)-1]) - } else { - clone = append(clone, lcs[len(lcs)-1]) - } - return &logsConsumer{pass: pass, clone: clone} + return lc } type logsConsumer struct { - pass []consumer.Logs - clone []consumer.Logs + mutable []consumer.Logs + readonly []consumer.Logs } func (lsc *logsConsumer) Capabilities() consumer.Capabilities { @@ -59,20 +45,40 @@ func (lsc *logsConsumer) Capabilities() consumer.Capabilities { // ConsumeLogs exports the plog.Logs to all consumers wrapped by the current one. func (lsc *logsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { var errs error - // Initially pass to clone exporter to avoid the case where the optimization of sending - // the incoming data to a mutating consumer is used that may change the incoming data before - // cloning. - for _, lc := range lsc.clone { - clonedLogs := plog.NewLogs() - ld.CopyTo(clonedLogs) - errs = multierr.Append(errs, lc.ConsumeLogs(ctx, clonedLogs)) + + if len(lsc.mutable) > 0 { + // Clone the data before sending to all mutating consumers except the last one. + for i := 0; i < len(lsc.mutable)-1; i++ { + errs = multierr.Append(errs, lsc.mutable[i].ConsumeLogs(ctx, cloneLogs(ld))) + } + // Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the + // data is mutable. Never share the same data between a mutating and a non-mutating consumer since the + // non-mutating consumer may process data async and the mutating consumer may change the data before that. + lastConsumer := lsc.mutable[len(lsc.mutable)-1] + if len(lsc.readonly) == 0 && !ld.IsReadOnly() { + errs = multierr.Append(errs, lastConsumer.ConsumeLogs(ctx, ld)) + } else { + errs = multierr.Append(errs, lastConsumer.ConsumeLogs(ctx, cloneLogs(ld))) + } } - for _, lc := range lsc.pass { + + // Mark the data as read-only if it will be sent to more than one read-only consumer. + if len(lsc.readonly) > 1 && !ld.IsReadOnly() { + ld.MarkReadOnly() + } + for _, lc := range lsc.readonly { errs = multierr.Append(errs, lc.ConsumeLogs(ctx, ld)) } + return errs } +func cloneLogs(ld plog.Logs) plog.Logs { + clonedLogs := plog.NewLogs() + ld.CopyTo(clonedLogs) + return clonedLogs +} + var _ connector.LogsRouter = (*logsRouter)(nil) type logsRouter struct { diff --git a/internal/fanoutconsumer/logs_test.go b/internal/fanoutconsumer/logs_test.go index 1db046b854d..5ae01b6b423 100644 --- a/internal/fanoutconsumer/logs_test.go +++ b/internal/fanoutconsumer/logs_test.go @@ -20,12 +20,6 @@ import ( "go.opentelemetry.io/collector/pdata/plog" ) -func TestLogsNotMultiplexing(t *testing.T) { - nop := consumertest.NewNop() - lfc := NewLogs([]consumer.Logs{nop}) - assert.Same(t, nop, lfc) -} - func TestLogsMultiplexingNonMutating(t *testing.T) { p1 := new(consumertest.LogsSink) p2 := new(consumertest.LogsSink) @@ -57,6 +51,9 @@ func TestLogsMultiplexingNonMutating(t *testing.T) { assert.True(t, ld == p3.AllLogs()[1]) assert.EqualValues(t, ld, p3.AllLogs()[0]) assert.EqualValues(t, ld, p3.AllLogs()[1]) + + // The data should be marked as read only. + assert.True(t, ld.IsReadOnly()) } func TestLogsMultiplexingMutating(t *testing.T) { @@ -91,6 +88,46 @@ func TestLogsMultiplexingMutating(t *testing.T) { assert.True(t, ld == p3.AllLogs()[1]) assert.EqualValues(t, ld, p3.AllLogs()[0]) assert.EqualValues(t, ld, p3.AllLogs()[1]) + + // The data should not be marked as read only. + assert.False(t, ld.IsReadOnly()) +} + +func TestReadOnlyLogsMultiplexingMutating(t *testing.T) { + p1 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)} + p2 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)} + p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)} + + lfc := NewLogs([]consumer.Logs{p1, p2, p3}) + assert.False(t, lfc.Capabilities().MutatesData) + ldOrig := testdata.GenerateLogs(1) + ld := testdata.GenerateLogs(1) + ld.MarkReadOnly() + + for i := 0; i < 2; i++ { + err := lfc.ConsumeLogs(context.Background(), ld) + if err != nil { + t.Errorf("Wanted nil got error") + return + } + } + + // All consumers should receive the cloned data. + + assert.True(t, ld != p1.AllLogs()[0]) + assert.True(t, ld != p1.AllLogs()[1]) + assert.EqualValues(t, ldOrig, p1.AllLogs()[0]) + assert.EqualValues(t, ldOrig, p1.AllLogs()[1]) + + assert.True(t, ld != p2.AllLogs()[0]) + assert.True(t, ld != p2.AllLogs()[1]) + assert.EqualValues(t, ldOrig, p2.AllLogs()[0]) + assert.EqualValues(t, ldOrig, p2.AllLogs()[1]) + + assert.True(t, ld != p3.AllLogs()[0]) + assert.True(t, ld != p3.AllLogs()[1]) + assert.EqualValues(t, ldOrig, p3.AllLogs()[0]) + assert.EqualValues(t, ldOrig, p3.AllLogs()[1]) } func TestLogsMultiplexingMixLastMutating(t *testing.T) { @@ -126,6 +163,9 @@ func TestLogsMultiplexingMixLastMutating(t *testing.T) { assert.True(t, ld != p3.AllLogs()[1]) assert.EqualValues(t, ld, p3.AllLogs()[0]) assert.EqualValues(t, ld, p3.AllLogs()[1]) + + // The data should not be marked as read only. + assert.False(t, ld.IsReadOnly()) } func TestLogsMultiplexingMixLastNonMutating(t *testing.T) { @@ -160,6 +200,9 @@ func TestLogsMultiplexingMixLastNonMutating(t *testing.T) { assert.True(t, ld == p3.AllLogs()[1]) assert.EqualValues(t, ld, p3.AllLogs()[0]) assert.EqualValues(t, ld, p3.AllLogs()[1]) + + // The data should not be marked as read only. + assert.False(t, ld.IsReadOnly()) } func TestLogsWhenErrors(t *testing.T) { diff --git a/internal/fanoutconsumer/metrics.go b/internal/fanoutconsumer/metrics.go index f1c1280e4c3..13ea0efe0cf 100644 --- a/internal/fanoutconsumer/metrics.go +++ b/internal/fanoutconsumer/metrics.go @@ -18,36 +18,22 @@ import ( // NewMetrics wraps multiple metrics consumers in a single one. // It fanouts the incoming data to all the consumers, and does smart routing: // - Clones only to the consumer that needs to mutate the data. -// - If all consumers needs to mutate the data one will get the original data. +// - If all consumers needs to mutate the data one will get the original mutable data. func NewMetrics(mcs []consumer.Metrics) consumer.Metrics { - if len(mcs) == 1 { - // Don't wrap if no need to do it. - return mcs[0] - } - var pass []consumer.Metrics - var clone []consumer.Metrics - for i := 0; i < len(mcs)-1; i++ { - if !mcs[i].Capabilities().MutatesData { - pass = append(pass, mcs[i]) + mc := &metricsConsumer{} + for i := 0; i < len(mcs); i++ { + if mcs[i].Capabilities().MutatesData { + mc.mutable = append(mc.mutable, mcs[i]) } else { - clone = append(clone, mcs[i]) + mc.readonly = append(mc.readonly, mcs[i]) } } - // Give the original data to the last consumer if no other read-only consumer, - // otherwise put it in the right bucket. Never share the same data between - // a mutating and a non-mutating consumer since the non-mutating consumer may process - // data async and the mutating consumer may change the data before that. - if len(pass) == 0 || !mcs[len(mcs)-1].Capabilities().MutatesData { - pass = append(pass, mcs[len(mcs)-1]) - } else { - clone = append(clone, mcs[len(mcs)-1]) - } - return &metricsConsumer{pass: pass, clone: clone} + return mc } type metricsConsumer struct { - pass []consumer.Metrics - clone []consumer.Metrics + mutable []consumer.Metrics + readonly []consumer.Metrics } func (msc *metricsConsumer) Capabilities() consumer.Capabilities { @@ -57,20 +43,40 @@ func (msc *metricsConsumer) Capabilities() consumer.Capabilities { // ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one. func (msc *metricsConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { var errs error - // Initially pass to clone exporter to avoid the case where the optimization of sending - // the incoming data to a mutating consumer is used that may change the incoming data before - // cloning. - for _, mc := range msc.clone { - clonedMetrics := pmetric.NewMetrics() - md.CopyTo(clonedMetrics) - errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, clonedMetrics)) + + if len(msc.mutable) > 0 { + // Clone the data before sending to all mutating consumers except the last one. + for i := 0; i < len(msc.mutable)-1; i++ { + errs = multierr.Append(errs, msc.mutable[i].ConsumeMetrics(ctx, cloneMetrics(md))) + } + // Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the + // data is mutable. Never share the same data between a mutating and a non-mutating consumer since the + // non-mutating consumer may process data async and the mutating consumer may change the data before that. + lastConsumer := msc.mutable[len(msc.mutable)-1] + if len(msc.readonly) == 0 && !md.IsReadOnly() { + errs = multierr.Append(errs, lastConsumer.ConsumeMetrics(ctx, md)) + } else { + errs = multierr.Append(errs, lastConsumer.ConsumeMetrics(ctx, cloneMetrics(md))) + } } - for _, mc := range msc.pass { + + // Mark the data as read-only if it will be sent to more than one read-only consumer. + if len(msc.readonly) > 1 && !md.IsReadOnly() { + md.MarkReadOnly() + } + for _, mc := range msc.readonly { errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, md)) } + return errs } +func cloneMetrics(md pmetric.Metrics) pmetric.Metrics { + clonedMetrics := pmetric.NewMetrics() + md.CopyTo(clonedMetrics) + return clonedMetrics +} + var _ connector.MetricsRouter = (*metricsRouter)(nil) type metricsRouter struct { diff --git a/internal/fanoutconsumer/metrics_test.go b/internal/fanoutconsumer/metrics_test.go index 8cdfeeb51fa..bbf86990944 100644 --- a/internal/fanoutconsumer/metrics_test.go +++ b/internal/fanoutconsumer/metrics_test.go @@ -20,12 +20,6 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" ) -func TestMetricsNotMultiplexing(t *testing.T) { - nop := consumertest.NewNop() - mfc := NewMetrics([]consumer.Metrics{nop}) - assert.Same(t, nop, mfc) -} - func TestMetricsMultiplexingNonMutating(t *testing.T) { p1 := new(consumertest.MetricsSink) p2 := new(consumertest.MetricsSink) @@ -57,6 +51,9 @@ func TestMetricsMultiplexingNonMutating(t *testing.T) { assert.True(t, md == p3.AllMetrics()[1]) assert.EqualValues(t, md, p3.AllMetrics()[0]) assert.EqualValues(t, md, p3.AllMetrics()[1]) + + // The data should be marked as read only. + assert.True(t, md.IsReadOnly()) } func TestMetricsMultiplexingMutating(t *testing.T) { @@ -91,6 +88,46 @@ func TestMetricsMultiplexingMutating(t *testing.T) { assert.True(t, md == p3.AllMetrics()[1]) assert.EqualValues(t, md, p3.AllMetrics()[0]) assert.EqualValues(t, md, p3.AllMetrics()[1]) + + // The data should not be marked as read only. + assert.False(t, md.IsReadOnly()) +} + +func TestReadOnlyMetricsMultiplexingMixFirstMutating(t *testing.T) { + p1 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)} + p2 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)} + p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)} + + mfc := NewMetrics([]consumer.Metrics{p1, p2, p3}) + assert.False(t, mfc.Capabilities().MutatesData) + mdOrig := testdata.GenerateMetrics(1) + md := testdata.GenerateMetrics(1) + md.MarkReadOnly() + + for i := 0; i < 2; i++ { + err := mfc.ConsumeMetrics(context.Background(), md) + if err != nil { + t.Errorf("Wanted nil got error") + return + } + } + + // All consumers should receive the cloned data. + + assert.True(t, md != p1.AllMetrics()[0]) + assert.True(t, md != p1.AllMetrics()[1]) + assert.EqualValues(t, mdOrig, p1.AllMetrics()[0]) + assert.EqualValues(t, mdOrig, p1.AllMetrics()[1]) + + assert.True(t, md != p2.AllMetrics()[0]) + assert.True(t, md != p2.AllMetrics()[1]) + assert.EqualValues(t, mdOrig, p2.AllMetrics()[0]) + assert.EqualValues(t, mdOrig, p2.AllMetrics()[1]) + + assert.True(t, md != p3.AllMetrics()[0]) + assert.True(t, md != p3.AllMetrics()[1]) + assert.EqualValues(t, mdOrig, p3.AllMetrics()[0]) + assert.EqualValues(t, mdOrig, p3.AllMetrics()[1]) } func TestMetricsMultiplexingMixLastMutating(t *testing.T) { @@ -126,6 +163,9 @@ func TestMetricsMultiplexingMixLastMutating(t *testing.T) { assert.True(t, md != p3.AllMetrics()[1]) assert.EqualValues(t, md, p3.AllMetrics()[0]) assert.EqualValues(t, md, p3.AllMetrics()[1]) + + // The data should not be marked as read only. + assert.False(t, md.IsReadOnly()) } func TestMetricsMultiplexingMixLastNonMutating(t *testing.T) { @@ -160,6 +200,9 @@ func TestMetricsMultiplexingMixLastNonMutating(t *testing.T) { assert.True(t, md == p3.AllMetrics()[1]) assert.EqualValues(t, md, p3.AllMetrics()[0]) assert.EqualValues(t, md, p3.AllMetrics()[1]) + + // The data should not be marked as read only. + assert.False(t, md.IsReadOnly()) } func TestMetricsWhenErrors(t *testing.T) { diff --git a/internal/fanoutconsumer/traces.go b/internal/fanoutconsumer/traces.go index 89ecb166ec5..bb6c30ae84e 100644 --- a/internal/fanoutconsumer/traces.go +++ b/internal/fanoutconsumer/traces.go @@ -18,36 +18,22 @@ import ( // NewTraces wraps multiple trace consumers in a single one. // It fanouts the incoming data to all the consumers, and does smart routing: // - Clones only to the consumer that needs to mutate the data. -// - If all consumers needs to mutate the data one will get the original data. +// - If all consumers needs to mutate the data one will get the original mutable data. func NewTraces(tcs []consumer.Traces) consumer.Traces { - if len(tcs) == 1 { - // Don't wrap if no need to do it. - return tcs[0] - } - var pass []consumer.Traces - var clone []consumer.Traces - for i := 0; i < len(tcs)-1; i++ { - if !tcs[i].Capabilities().MutatesData { - pass = append(pass, tcs[i]) + tc := &tracesConsumer{} + for i := 0; i < len(tcs); i++ { + if tcs[i].Capabilities().MutatesData { + tc.mutable = append(tc.mutable, tcs[i]) } else { - clone = append(clone, tcs[i]) + tc.readonly = append(tc.readonly, tcs[i]) } } - // Give the original data to the last consumer if no other read-only consumer, - // otherwise put it in the right bucket. Never share the same data between - // a mutating and a non-mutating consumer since the non-mutating consumer may process - // data async and the mutating consumer may change the data before that. - if len(pass) == 0 || !tcs[len(tcs)-1].Capabilities().MutatesData { - pass = append(pass, tcs[len(tcs)-1]) - } else { - clone = append(clone, tcs[len(tcs)-1]) - } - return &tracesConsumer{pass: pass, clone: clone} + return tc } type tracesConsumer struct { - pass []consumer.Traces - clone []consumer.Traces + mutable []consumer.Traces + readonly []consumer.Traces } func (tsc *tracesConsumer) Capabilities() consumer.Capabilities { @@ -57,20 +43,40 @@ func (tsc *tracesConsumer) Capabilities() consumer.Capabilities { // ConsumeTraces exports the ptrace.Traces to all consumers wrapped by the current one. func (tsc *tracesConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { var errs error - // Initially pass to clone exporter to avoid the case where the optimization of sending - // the incoming data to a mutating consumer is used that may change the incoming data before - // cloning. - for _, tc := range tsc.clone { - clonedTraces := ptrace.NewTraces() - td.CopyTo(clonedTraces) - errs = multierr.Append(errs, tc.ConsumeTraces(ctx, clonedTraces)) + + if len(tsc.mutable) > 0 { + // Clone the data before sending to all mutating consumers except the last one. + for i := 0; i < len(tsc.mutable)-1; i++ { + errs = multierr.Append(errs, tsc.mutable[i].ConsumeTraces(ctx, cloneTraces(td))) + } + // Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the + // data is mutable. Never share the same data between a mutating and a non-mutating consumer since the + // non-mutating consumer may process data async and the mutating consumer may change the data before that. + lastConsumer := tsc.mutable[len(tsc.mutable)-1] + if len(tsc.readonly) == 0 && !td.IsReadOnly() { + errs = multierr.Append(errs, lastConsumer.ConsumeTraces(ctx, td)) + } else { + errs = multierr.Append(errs, lastConsumer.ConsumeTraces(ctx, cloneTraces(td))) + } } - for _, tc := range tsc.pass { + + // Mark the data as read-only if it will be sent to more than one read-only consumer. + if len(tsc.readonly) > 1 && !td.IsReadOnly() { + td.MarkReadOnly() + } + for _, tc := range tsc.readonly { errs = multierr.Append(errs, tc.ConsumeTraces(ctx, td)) } + return errs } +func cloneTraces(td ptrace.Traces) ptrace.Traces { + clonedTraces := ptrace.NewTraces() + td.CopyTo(clonedTraces) + return clonedTraces +} + var _ connector.TracesRouter = (*tracesRouter)(nil) type tracesRouter struct { diff --git a/internal/fanoutconsumer/traces_test.go b/internal/fanoutconsumer/traces_test.go index d2147b36167..ceda83ecebb 100644 --- a/internal/fanoutconsumer/traces_test.go +++ b/internal/fanoutconsumer/traces_test.go @@ -20,12 +20,6 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" ) -func TestTracesNotMultiplexing(t *testing.T) { - nop := consumertest.NewNop() - tfc := NewTraces([]consumer.Traces{nop}) - assert.Same(t, nop, tfc) -} - func TestTracesMultiplexingNonMutating(t *testing.T) { p1 := new(consumertest.TracesSink) p2 := new(consumertest.TracesSink) @@ -57,6 +51,9 @@ func TestTracesMultiplexingNonMutating(t *testing.T) { assert.True(t, td == p3.AllTraces()[1]) assert.EqualValues(t, td, p3.AllTraces()[0]) assert.EqualValues(t, td, p3.AllTraces()[1]) + + // The data should be marked as read only. + assert.True(t, td.IsReadOnly()) } func TestTracesMultiplexingMutating(t *testing.T) { @@ -91,6 +88,47 @@ func TestTracesMultiplexingMutating(t *testing.T) { assert.True(t, td == p3.AllTraces()[1]) assert.EqualValues(t, td, p3.AllTraces()[0]) assert.EqualValues(t, td, p3.AllTraces()[1]) + + // The data should not be marked as read only. + assert.False(t, td.IsReadOnly()) +} + +func TestReadOnlyTracesMultiplexingMutating(t *testing.T) { + p1 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)} + p2 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)} + p3 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)} + + tfc := NewTraces([]consumer.Traces{p1, p2, p3}) + assert.False(t, tfc.Capabilities().MutatesData) + + tdOrig := testdata.GenerateTraces(1) + td := testdata.GenerateTraces(1) + td.MarkReadOnly() + + for i := 0; i < 2; i++ { + err := tfc.ConsumeTraces(context.Background(), td) + if err != nil { + t.Errorf("Wanted nil got error") + return + } + } + + // All consumers should receive the cloned data. + + assert.True(t, td != p1.AllTraces()[0]) + assert.True(t, td != p1.AllTraces()[1]) + assert.EqualValues(t, tdOrig, p1.AllTraces()[0]) + assert.EqualValues(t, tdOrig, p1.AllTraces()[1]) + + assert.True(t, td != p2.AllTraces()[0]) + assert.True(t, td != p2.AllTraces()[1]) + assert.EqualValues(t, tdOrig, p2.AllTraces()[0]) + assert.EqualValues(t, tdOrig, p2.AllTraces()[1]) + + assert.True(t, td != p3.AllTraces()[0]) + assert.True(t, td != p3.AllTraces()[1]) + assert.EqualValues(t, tdOrig, p3.AllTraces()[0]) + assert.EqualValues(t, tdOrig, p3.AllTraces()[1]) } func TestTracesMultiplexingMixLastMutating(t *testing.T) { @@ -126,6 +164,9 @@ func TestTracesMultiplexingMixLastMutating(t *testing.T) { assert.True(t, td != p3.AllTraces()[1]) assert.EqualValues(t, td, p3.AllTraces()[0]) assert.EqualValues(t, td, p3.AllTraces()[1]) + + // The data should not be marked as read only. + assert.False(t, td.IsReadOnly()) } func TestTracesMultiplexingMixLastNonMutating(t *testing.T) { @@ -160,6 +201,9 @@ func TestTracesMultiplexingMixLastNonMutating(t *testing.T) { assert.True(t, td == p3.AllTraces()[1]) assert.EqualValues(t, td, p3.AllTraces()[0]) assert.EqualValues(t, td, p3.AllTraces()[1]) + + // The data should not be marked as read only. + assert.False(t, td.IsReadOnly()) } func TestTracesWhenErrors(t *testing.T) { diff --git a/pdata/plog/logs.go b/pdata/plog/logs.go index a6187fbc0f6..490526090f8 100644 --- a/pdata/plog/logs.go +++ b/pdata/plog/logs.go @@ -21,11 +21,20 @@ func (ms Logs) getOrig() *otlpcollectorlog.ExportLogsServiceRequest { return internal.GetOrigLogs(internal.Logs(ms)) } +func (ms Logs) getState() *internal.State { + return internal.GetLogsState(internal.Logs(ms)) +} + // NewLogs creates a new Logs struct. func NewLogs() Logs { return newLogs(&otlpcollectorlog.ExportLogsServiceRequest{}) } +// IsReadOnly returns true if this Logs instance is read-only. +func (ms Logs) IsReadOnly() bool { + return *ms.getState() == internal.StateReadOnly +} + // CopyTo copies the Logs instance overriding the destination. func (ms Logs) CopyTo(dest Logs) { ms.ResourceLogs().CopyTo(dest.ResourceLogs()) diff --git a/pdata/plog/logs_test.go b/pdata/plog/logs_test.go index 6eeefcf509c..9d3feef4d98 100644 --- a/pdata/plog/logs_test.go +++ b/pdata/plog/logs_test.go @@ -117,9 +117,11 @@ func TestLogsCopyTo(t *testing.T) { func TestReadOnlyLogsInvalidUsage(t *testing.T) { logs := NewLogs() + assert.False(t, logs.IsReadOnly()) res := logs.ResourceLogs().AppendEmpty().Resource() res.Attributes().PutStr("k1", "v1") logs.MarkReadOnly() + assert.True(t, logs.IsReadOnly()) assert.Panics(t, func() { res.Attributes().PutStr("k2", "v2") }) } diff --git a/pdata/pmetric/metrics.go b/pdata/pmetric/metrics.go index 5a8c0f29974..91195ca4dfa 100644 --- a/pdata/pmetric/metrics.go +++ b/pdata/pmetric/metrics.go @@ -21,11 +21,20 @@ func (ms Metrics) getOrig() *otlpcollectormetrics.ExportMetricsServiceRequest { return internal.GetOrigMetrics(internal.Metrics(ms)) } +func (ms Metrics) getState() *internal.State { + return internal.GetMetricsState(internal.Metrics(ms)) +} + // NewMetrics creates a new Metrics struct. func NewMetrics() Metrics { return newMetrics(&otlpcollectormetrics.ExportMetricsServiceRequest{}) } +// IsReadOnly returns true if this Metrics instance is read-only. +func (ms Metrics) IsReadOnly() bool { + return *ms.getState() == internal.StateReadOnly +} + // CopyTo copies the Metrics instance overriding the destination. func (ms Metrics) CopyTo(dest Metrics) { ms.ResourceMetrics().CopyTo(dest.ResourceMetrics()) diff --git a/pdata/pmetric/metrics_test.go b/pdata/pmetric/metrics_test.go index 9b2b5c2b70d..3c059643a84 100644 --- a/pdata/pmetric/metrics_test.go +++ b/pdata/pmetric/metrics_test.go @@ -638,9 +638,11 @@ func TestMetricsCopyTo(t *testing.T) { func TestReadOnlyMetricsInvalidUsage(t *testing.T) { metrics := NewMetrics() + assert.False(t, metrics.IsReadOnly()) res := metrics.ResourceMetrics().AppendEmpty().Resource() res.Attributes().PutStr("k1", "v1") metrics.MarkReadOnly() + assert.True(t, metrics.IsReadOnly()) assert.Panics(t, func() { res.Attributes().PutStr("k2", "v2") }) } diff --git a/pdata/ptrace/traces.go b/pdata/ptrace/traces.go index 0d8294098a6..a4b71e17853 100644 --- a/pdata/ptrace/traces.go +++ b/pdata/ptrace/traces.go @@ -21,11 +21,20 @@ func (ms Traces) getOrig() *otlpcollectortrace.ExportTraceServiceRequest { return internal.GetOrigTraces(internal.Traces(ms)) } +func (ms Traces) getState() *internal.State { + return internal.GetTracesState(internal.Traces(ms)) +} + // NewTraces creates a new Traces struct. func NewTraces() Traces { return newTraces(&otlpcollectortrace.ExportTraceServiceRequest{}) } +// IsReadOnly returns true if this Traces instance is read-only. +func (ms Traces) IsReadOnly() bool { + return *ms.getState() == internal.StateReadOnly +} + // CopyTo copies the Traces instance overriding the destination. func (ms Traces) CopyTo(dest Traces) { ms.ResourceSpans().CopyTo(dest.ResourceSpans()) diff --git a/pdata/ptrace/traces_test.go b/pdata/ptrace/traces_test.go index a09e4844d15..8884349485f 100644 --- a/pdata/ptrace/traces_test.go +++ b/pdata/ptrace/traces_test.go @@ -118,9 +118,11 @@ func TestTracesCopyTo(t *testing.T) { func TestReadOnlyTracesInvalidUsage(t *testing.T) { traces := NewTraces() + assert.False(t, traces.IsReadOnly()) res := traces.ResourceSpans().AppendEmpty().Resource() res.Attributes().PutStr("k1", "v1") traces.MarkReadOnly() + assert.True(t, traces.IsReadOnly()) assert.Panics(t, func() { res.Attributes().PutStr("k2", "v2") }) } diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 4e6bc0256eb..168d3832558 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -861,22 +861,34 @@ func TestConnectorPipelinesGraph(t *testing.T) { for _, e := range allExporters[component.DataTypeTraces] { tracesExporter := e.(*testcomponents.ExampleExporter) assert.Equal(t, test.expectedPerExporter, len(tracesExporter.Traces)) + expected := testdata.GenerateTraces(1) + if len(allExporters[component.DataTypeTraces]) > 1 { + expected.MarkReadOnly() // multiple read-only exporters should get read-only pdata + } for i := 0; i < test.expectedPerExporter; i++ { - assert.EqualValues(t, testdata.GenerateTraces(1), tracesExporter.Traces[0]) + assert.EqualValues(t, expected, tracesExporter.Traces[0]) } } for _, e := range allExporters[component.DataTypeMetrics] { metricsExporter := e.(*testcomponents.ExampleExporter) assert.Equal(t, test.expectedPerExporter, len(metricsExporter.Metrics)) + expected := testdata.GenerateMetrics(1) + if len(allExporters[component.DataTypeMetrics]) > 1 { + expected.MarkReadOnly() // multiple read-only exporters should get read-only pdata + } for i := 0; i < test.expectedPerExporter; i++ { - assert.EqualValues(t, testdata.GenerateMetrics(1), metricsExporter.Metrics[0]) + assert.EqualValues(t, expected, metricsExporter.Metrics[0]) } } for _, e := range allExporters[component.DataTypeLogs] { logsExporter := e.(*testcomponents.ExampleExporter) assert.Equal(t, test.expectedPerExporter, len(logsExporter.Logs)) + expected := testdata.GenerateLogs(1) + if len(allExporters[component.DataTypeLogs]) > 1 { + expected.MarkReadOnly() // multiple read-only exporters should get read-only pdata + } for i := 0; i < test.expectedPerExporter; i++ { - assert.EqualValues(t, testdata.GenerateLogs(1), logsExporter.Logs[0]) + assert.EqualValues(t, expected, logsExporter.Logs[0]) } } })