From fb5b1e6aa550d68ff64200da31d50a445b4cf806 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Mon, 29 Jul 2024 02:29:05 -0600 Subject: [PATCH] [service] Remove servicetelemetry.TelemetrySettings (#10728) #### Description Reorganizes service to not require `servicetelemetry.TelemetrySettings` and instead depend directly on `component.TelemetrySettings` Whether or not we move forward with https://github.com/open-telemetry/opentelemetry-collector/pull/10725 I think this is a useful change for service. #### Testing Unit tests --- .../service-remove-servicetelemetry.yaml | 25 +++++++++ component/telemetry.go | 4 -- service/extensions/extensions.go | 46 ++++++++++++---- service/extensions/extensions_test.go | 26 ++++----- service/internal/graph/graph.go | 10 ++-- service/internal/graph/graph_test.go | 16 +++--- .../proctelemetry/process_telemetry.go | 5 +- .../proctelemetry/process_telemetry_test.go | 7 ++- .../nop_telemetry_settings.go | 27 --------- .../nop_telemetry_settings_test.go | 35 ------------ .../internal/servicetelemetry/package_test.go | 14 ----- .../servicetelemetry/telemetry_settings.go | 55 ------------------- .../telemetry_settings_test.go | 40 -------------- service/internal/status/status.go | 6 +- service/service.go | 28 +++++----- 15 files changed, 109 insertions(+), 235 deletions(-) create mode 100644 .chloggen/service-remove-servicetelemetry.yaml delete mode 100644 service/internal/servicetelemetry/nop_telemetry_settings.go delete mode 100644 service/internal/servicetelemetry/nop_telemetry_settings_test.go delete mode 100644 service/internal/servicetelemetry/package_test.go delete mode 100644 service/internal/servicetelemetry/telemetry_settings.go delete mode 100644 service/internal/servicetelemetry/telemetry_settings_test.go diff --git a/.chloggen/service-remove-servicetelemetry.yaml b/.chloggen/service-remove-servicetelemetry.yaml new file mode 100644 index 00000000000..e3d42576551 --- /dev/null +++ b/.chloggen/service-remove-servicetelemetry.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service/extensions + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds `Options` to `extensions.New`. + +# One or more tracking issues or pull requests related to the change +issues: [10728] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This is only a breaking change if you are depending on `extensions.New`'s signature. Calls to `extensions.New` are not broken. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/component/telemetry.go b/component/telemetry.go index 6f8f0ec84d3..febb6d6cbcd 100644 --- a/component/telemetry.go +++ b/component/telemetry.go @@ -13,10 +13,6 @@ import ( ) // TelemetrySettings provides components with APIs to report telemetry. -// -// Note: there is a service version of this struct, servicetelemetry.TelemetrySettings, that mirrors -// this struct except ReportStatus. When adding or removing anything from -// this struct consider whether the same should be done for the service version. type TelemetrySettings struct { // Logger that the factory can use during creation and can pass to the created // component to be used later as well. diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index 1ef73b08f1e..b45221ee268 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -16,7 +16,7 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" + "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -24,10 +24,11 @@ const zExtensionName = "zextensionname" // Extensions is a map of extensions created from extension configs. type Extensions struct { - telemetry servicetelemetry.TelemetrySettings + telemetry component.TelemetrySettings extMap map[component.ID]extension.Extension instanceIDs map[component.ID]*component.InstanceID extensionIDs []component.ID // start order (and reverse stop order) + reporter status.Reporter } // Start starts all extensions. @@ -38,12 +39,12 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error { extLogger.Info("Extension is starting...") instanceID := bes.instanceIDs[extID] ext := bes.extMap[extID] - bes.telemetry.Status.ReportStatus( + bes.reporter.ReportStatus( instanceID, component.NewStatusEvent(component.StatusStarting), ) if err := ext.Start(ctx, host); err != nil { - bes.telemetry.Status.ReportStatus( + bes.reporter.ReportStatus( instanceID, component.NewPermanentErrorEvent(err), ) @@ -51,7 +52,7 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error { extLogger.WithOptions(zap.AddStacktrace(zap.DPanicLevel)).Error("Failed to start extension", zap.Error(err)) return err } - bes.telemetry.Status.ReportOKIfStarting(instanceID) + bes.reporter.ReportOKIfStarting(instanceID) extLogger.Info("Extension started.") } return nil @@ -65,19 +66,19 @@ func (bes *Extensions) Shutdown(ctx context.Context) error { extID := bes.extensionIDs[i] instanceID := bes.instanceIDs[extID] ext := bes.extMap[extID] - bes.telemetry.Status.ReportStatus( + bes.reporter.ReportStatus( instanceID, component.NewStatusEvent(component.StatusStopping), ) if err := ext.Shutdown(ctx); err != nil { - bes.telemetry.Status.ReportStatus( + bes.reporter.ReportStatus( instanceID, component.NewPermanentErrorEvent(err), ) errs = multierr.Append(errs, err) continue } - bes.telemetry.Status.ReportStatus( + bes.reporter.ReportStatus( instanceID, component.NewStatusEvent(component.StatusStopped), ) @@ -166,21 +167,35 @@ func (bes *Extensions) HandleZPages(w http.ResponseWriter, r *http.Request) { // Settings holds configuration for building Extensions. type Settings struct { - Telemetry servicetelemetry.TelemetrySettings + Telemetry component.TelemetrySettings BuildInfo component.BuildInfo // Extensions builder for extensions. Extensions *extension.Builder } +type Option func(*Extensions) + +func WithReporter(reporter status.Reporter) Option { + return func(e *Extensions) { + e.reporter = reporter + } +} + // New creates a new Extensions from Config. -func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) { +func New(ctx context.Context, set Settings, cfg Config, options ...Option) (*Extensions, error) { exts := &Extensions{ telemetry: set.Telemetry, extMap: make(map[component.ID]extension.Extension), instanceIDs: make(map[component.ID]*component.InstanceID), extensionIDs: make([]component.ID, 0, len(cfg)), + reporter: &nopReporter{}, + } + + for _, opt := range options { + opt(exts) } + for _, extID := range cfg { instanceID := &component.InstanceID{ ID: extID, @@ -188,9 +203,10 @@ func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) { } extSet := extension.Settings{ ID: extID, - TelemetrySettings: set.Telemetry.ToComponentTelemetrySettings(instanceID), + TelemetrySettings: set.Telemetry, BuildInfo: set.BuildInfo, } + extSet.TelemetrySettings.ReportStatus = status.NewReportStatusFunc(instanceID, exts.reporter.ReportStatus) extSet.TelemetrySettings.Logger = components.ExtensionLogger(set.Telemetry.Logger, extID) ext, err := set.Extensions.Create(ctx, extSet) @@ -213,3 +229,11 @@ func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) { exts.extensionIDs = order return exts, nil } + +type nopReporter struct{} + +func (r *nopReporter) Ready() {} + +func (r *nopReporter) ReportStatus(*component.InstanceID, *component.StatusEvent) {} + +func (r *nopReporter) ReportOKIfStarting(*component.InstanceID) {} diff --git a/service/extensions/extensions_test.go b/service/extensions/extensions_test.go index d519132d3f3..8b40d8c90f7 100644 --- a/service/extensions/extensions_test.go +++ b/service/extensions/extensions_test.go @@ -16,7 +16,6 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/extensiontest" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" "go.opentelemetry.io/collector/service/internal/status" ) @@ -83,7 +82,7 @@ func TestBuildExtensions(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { _, err := New(context.Background(), Settings{ - Telemetry: servicetelemetry.NewNopTelemetrySettings(), + Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), Extensions: extension.NewBuilder(tt.extensionsConfigs, tt.factories), }, tt.config) @@ -175,7 +174,7 @@ func (tc testOrderCase) testOrdering(t *testing.T) { } exts, err := New(context.Background(), Settings{ - Telemetry: servicetelemetry.NewNopTelemetrySettings(), + Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), Extensions: extension.NewBuilder( extCfgs, @@ -284,7 +283,7 @@ func TestNotifyConfig(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { extensions, err := New(context.Background(), Settings{ - Telemetry: servicetelemetry.NewNopTelemetrySettings(), + Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), Extensions: extension.NewBuilder(tt.extensionsConfigs, tt.factories), }, tt.serviceExtensions) @@ -420,25 +419,26 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) { factories := map[component.Type]extension.Factory{ statusType: factory, } + + var actualStatuses []*component.StatusEvent + rep := status.NewReporter(func(_ *component.InstanceID, ev *component.StatusEvent) { + actualStatuses = append(actualStatuses, ev) + }, func(err error) { + require.NoError(t, err) + }) + extensions, err := New( context.Background(), Settings{ - Telemetry: servicetelemetry.NewNopTelemetrySettings(), + Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), Extensions: extension.NewBuilder(extensionsConfigs, factories), }, []component.ID{compID}, + WithReporter(rep), ) - assert.NoError(t, err) - var actualStatuses []*component.StatusEvent - rep := status.NewReporter(func(_ *component.InstanceID, ev *component.StatusEvent) { - actualStatuses = append(actualStatuses, ev) - }, func(err error) { - require.NoError(t, err) - }) - extensions.telemetry.Status = rep rep.Ready() assert.Equal(t, tc.startErr, extensions.Start(context.Background(), componenttest.NewNopHost())) diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index a25a04167cd..002fa3129f6 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -32,14 +32,13 @@ import ( "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/pipelines" ) // Settings holds configuration for building builtPipelines. type Settings struct { - Telemetry servicetelemetry.TelemetrySettings + Telemetry component.TelemetrySettings BuildInfo component.BuildInfo ReceiverBuilder *receiver.Builder @@ -49,6 +48,8 @@ type Settings struct { // PipelineConfigs is a map of component.ID to PipelineConfig. PipelineConfigs pipelines.Config + + ReportStatus status.ServiceStatusFunc } type Graph struct { @@ -61,7 +62,7 @@ type Graph struct { // Keep track of status source per node instanceIDs map[int64]*component.InstanceID - telemetry servicetelemetry.TelemetrySettings + telemetry component.TelemetrySettings } // Build builds a full pipeline graph. @@ -301,7 +302,8 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { // skipped for capabilitiesNodes and fanoutNodes as they are not assigned componentIDs. var telemetrySettings component.TelemetrySettings if instanceID, ok := g.instanceIDs[node.ID()]; ok { - telemetrySettings = set.Telemetry.ToComponentTelemetrySettings(instanceID) + telemetrySettings = set.Telemetry + telemetrySettings.ReportStatus = status.NewReportStatusFunc(instanceID, set.ReportStatus) } switch n := node.(type) { diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 6c1ffc0aab7..e20b85b2961 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -28,7 +28,6 @@ import ( "go.opentelemetry.io/collector/processor/processortest" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/internal/status/statustest" "go.opentelemetry.io/collector/service/internal/testcomponents" @@ -145,7 +144,7 @@ func TestGraphStartStop(t *testing.T) { } pg := &Graph{componentGraph: simple.NewDirectedGraph()} - pg.telemetry = servicetelemetry.NewNopTelemetrySettings() + pg.telemetry = componenttest.NewNopTelemetrySettings() pg.instanceIDs = make(map[int64]*component.InstanceID) for _, edge := range tt.edges { @@ -200,7 +199,7 @@ func TestGraphStartStopCycle(t *testing.T) { func TestGraphStartStopComponentError(t *testing.T) { pg := &Graph{componentGraph: simple.NewDirectedGraph()} - pg.telemetry = servicetelemetry.NewNopTelemetrySettings() + pg.telemetry = componenttest.NewNopTelemetrySettings() r1 := &testNode{ id: component.MustNewIDWithName("r", "1"), startErr: errors.New("foo"), @@ -719,7 +718,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { t.Run(test.name, func(t *testing.T) { // Build the pipeline set := Settings{ - Telemetry: servicetelemetry.NewNopTelemetrySettings(), + Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), ReceiverBuilder: receiver.NewBuilder( map[component.ID]component.Config{ @@ -1006,7 +1005,7 @@ func TestConnectorRouter(t *testing.T) { ctx := context.Background() set := Settings{ - Telemetry: servicetelemetry.NewNopTelemetrySettings(), + Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), ReceiverBuilder: receiver.NewBuilder( map[component.ID]component.Config{ @@ -2050,7 +2049,7 @@ func TestGraphBuildErrors(t *testing.T) { t.Run(test.name, func(t *testing.T) { set := Settings{ BuildInfo: component.NewDefaultBuildInfo(), - Telemetry: servicetelemetry.NewNopTelemetrySettings(), + Telemetry: componenttest.NewNopTelemetrySettings(), ReceiverBuilder: receiver.NewBuilder( test.receiverCfgs, map[component.Type]receiver.Factory{ @@ -2097,7 +2096,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { nopConnectorFactory := connectortest.NewNopFactory() set := Settings{ - Telemetry: servicetelemetry.NewNopTelemetrySettings(), + Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), ReceiverBuilder: receiver.NewBuilder( map[component.ID]component.Config{ @@ -2333,7 +2332,7 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { pg := &Graph{componentGraph: simple.NewDirectedGraph()} - pg.telemetry = servicetelemetry.NewNopTelemetrySettings() + pg.telemetry = componenttest.NewNopTelemetrySettings() actualStatuses := make(map[*component.InstanceID][]*component.StatusEvent) rep := status.NewReporter(func(id *component.InstanceID, ev *component.StatusEvent) { @@ -2341,7 +2340,6 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) { }, func(error) { }) - pg.telemetry.Status = rep rep.Ready() e0, e1 := tc.edge[0], tc.edge[1] diff --git a/service/internal/proctelemetry/process_telemetry.go b/service/internal/proctelemetry/process_telemetry.go index e7a0cc1454a..0fe58d5d694 100644 --- a/service/internal/proctelemetry/process_telemetry.go +++ b/service/internal/proctelemetry/process_telemetry.go @@ -15,7 +15,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/service/internal/metadata" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" ) // processMetrics is a struct that contains views related to process metrics (cpu, mem, etc) @@ -53,7 +52,7 @@ func WithHostProc(hostProc string) RegisterOption { // RegisterProcessMetrics creates a new set of processMetrics (mem, cpu) that can be used to measure // basic information about this process. -func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, opts ...RegisterOption) error { +func RegisterProcessMetrics(cfg component.TelemetrySettings, opts ...RegisterOption) error { set := registerOption{} for _, opt := range opts { opt.apply(&set) @@ -74,7 +73,7 @@ func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, opts ...Regi return err } - _, err = metadata.NewTelemetryBuilder(cfg.ToComponentTelemetrySettings(&component.InstanceID{}), + _, err = metadata.NewTelemetryBuilder(cfg, metadata.WithProcessUptimeCallback(pm.updateProcessUptime), metadata.WithProcessRuntimeHeapAllocBytesCallback(pm.updateAllocMem), metadata.WithProcessRuntimeTotalAllocBytesCallback(pm.updateTotalAllocMem), diff --git a/service/internal/proctelemetry/process_telemetry_test.go b/service/internal/proctelemetry/process_telemetry_test.go index fb750fcf664..5a1e2db24b4 100644 --- a/service/internal/proctelemetry/process_telemetry_test.go +++ b/service/internal/proctelemetry/process_telemetry_test.go @@ -20,12 +20,13 @@ import ( sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" ) type testTelemetry struct { - servicetelemetry.TelemetrySettings + component.TelemetrySettings promHandler http.Handler meterProvider *sdkmetric.MeterProvider } @@ -41,7 +42,7 @@ var expectedMetrics = []string{ func setupTelemetry(t *testing.T) testTelemetry { settings := testTelemetry{ - TelemetrySettings: servicetelemetry.NewNopTelemetrySettings(), + TelemetrySettings: componenttest.NewNopTelemetrySettings(), } settings.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal diff --git a/service/internal/servicetelemetry/nop_telemetry_settings.go b/service/internal/servicetelemetry/nop_telemetry_settings.go deleted file mode 100644 index 116554a51f8..00000000000 --- a/service/internal/servicetelemetry/nop_telemetry_settings.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package servicetelemetry // import "go.opentelemetry.io/collector/service/internal/servicetelemetry" - -import ( - noopmetric "go.opentelemetry.io/otel/metric/noop" - nooptrace "go.opentelemetry.io/otel/trace/noop" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/service/internal/status" -) - -// NewNopTelemetrySettings returns a new nop settings for Create* functions. -func NewNopTelemetrySettings() TelemetrySettings { - return TelemetrySettings{ - Logger: zap.NewNop(), - TracerProvider: nooptrace.NewTracerProvider(), - MeterProvider: noopmetric.NewMeterProvider(), - MetricsLevel: configtelemetry.LevelNone, - Resource: pcommon.NewResource(), - Status: status.NewReporter(func(*component.InstanceID, *component.StatusEvent) {}, func(error) {}), - } -} diff --git a/service/internal/servicetelemetry/nop_telemetry_settings_test.go b/service/internal/servicetelemetry/nop_telemetry_settings_test.go deleted file mode 100644 index 8c4a401e0f0..00000000000 --- a/service/internal/servicetelemetry/nop_telemetry_settings_test.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package servicetelemetry - -import ( - "testing" - - "github.com/stretchr/testify/require" - noopmetric "go.opentelemetry.io/otel/metric/noop" - nooptrace "go.opentelemetry.io/otel/trace/noop" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/pdata/pcommon" -) - -func TestNewNopSettings(t *testing.T) { - set := NewNopTelemetrySettings() - set.Status.Ready() - require.NotNil(t, set) - require.IsType(t, TelemetrySettings{}, set) - require.Equal(t, zap.NewNop(), set.Logger) - require.Equal(t, nooptrace.NewTracerProvider(), set.TracerProvider) - require.Equal(t, noopmetric.NewMeterProvider(), set.MeterProvider) - require.Equal(t, configtelemetry.LevelNone, set.MetricsLevel) - require.Equal(t, pcommon.NewResource(), set.Resource) - set.Status.ReportStatus( - &component.InstanceID{}, - component.NewStatusEvent(component.StatusStarting), - ) - set.Status.ReportOKIfStarting(&component.InstanceID{}) - -} diff --git a/service/internal/servicetelemetry/package_test.go b/service/internal/servicetelemetry/package_test.go deleted file mode 100644 index 0c9a1bc67a6..00000000000 --- a/service/internal/servicetelemetry/package_test.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package servicetelemetry - -import ( - "testing" - - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) -} diff --git a/service/internal/servicetelemetry/telemetry_settings.go b/service/internal/servicetelemetry/telemetry_settings.go deleted file mode 100644 index 55da9bcf0c2..00000000000 --- a/service/internal/servicetelemetry/telemetry_settings.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package servicetelemetry // import "go.opentelemetry.io/collector/service/internal/servicetelemetry" - -import ( - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/service/internal/status" -) - -// TelemetrySettings mirrors component.TelemetrySettings except for the mechanism for reporting -// status. Service-level status reporting has additional methods which can report status for -// components by their InstanceID whereas the component versions are tied to a specific component. -type TelemetrySettings struct { - // Logger that the factory can use during creation and can pass to the created - // component to be used later as well. - Logger *zap.Logger - - // TracerProvider that the factory can pass to other instrumented third-party libraries. - TracerProvider trace.TracerProvider - - // MeterProvider that the factory can pass to other instrumented third-party libraries. - MeterProvider metric.MeterProvider - - // MetricsLevel controls the level of detail for metrics emitted by the collector. - // Experimental: *NOTE* this field is experimental and may be changed or removed. - MetricsLevel configtelemetry.Level - - // Resource contains the resource attributes for the collector's telemetry. - Resource pcommon.Resource - - // Status contains a Reporter that allows the service to report status on behalf of a - // component. - Status status.Reporter -} - -// ToComponentTelemetrySettings returns a TelemetrySettings for a specific component derived from -// this service level Settings object. -func (s TelemetrySettings) ToComponentTelemetrySettings(id *component.InstanceID) component.TelemetrySettings { - statusFunc := status.NewReportStatusFunc(id, s.Status.ReportStatus) - return component.TelemetrySettings{ - Logger: s.Logger, - TracerProvider: s.TracerProvider, - MeterProvider: s.MeterProvider, - MetricsLevel: s.MetricsLevel, - Resource: s.Resource, - ReportStatus: statusFunc, - } -} diff --git a/service/internal/servicetelemetry/telemetry_settings_test.go b/service/internal/servicetelemetry/telemetry_settings_test.go deleted file mode 100644 index 5aad2c6c2b6..00000000000 --- a/service/internal/servicetelemetry/telemetry_settings_test.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package servicetelemetry - -import ( - "testing" - - "github.com/stretchr/testify/require" - noopmetric "go.opentelemetry.io/otel/metric/noop" - nooptrace "go.opentelemetry.io/otel/trace/noop" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/service/internal/status" -) - -func TestSettings(t *testing.T) { - set := TelemetrySettings{ - Logger: zap.NewNop(), - TracerProvider: nooptrace.NewTracerProvider(), - MeterProvider: noopmetric.NewMeterProvider(), - MetricsLevel: configtelemetry.LevelNone, - Resource: pcommon.NewResource(), - Status: status.NewReporter( - func(*component.InstanceID, *component.StatusEvent) {}, - func(err error) { require.NoError(t, err) }), - } - set.Status.Ready() - set.Status.ReportStatus( - &component.InstanceID{}, - component.NewStatusEvent(component.StatusStarting), - ) - set.Status.ReportOKIfStarting(&component.InstanceID{}) - - compSet := set.ToComponentTelemetrySettings(&component.InstanceID{}) - compSet.ReportStatus(component.NewStatusEvent(component.StatusStarting)) -} diff --git a/service/internal/status/status.go b/service/internal/status/status.go index fecae051d13..9fac2a8d6bf 100644 --- a/service/internal/status/status.go +++ b/service/internal/status/status.go @@ -89,7 +89,7 @@ type NotifyStatusFunc func(*component.InstanceID, *component.StatusEvent) // InvalidTransitionFunc is the receiver of invalid transition errors type InvalidTransitionFunc func(error) -// ServiceStatusFunc is the expected type of ReportStatus for servicetelemetry.Settings +// ServiceStatusFunc is the expected type of ReportStatus type ServiceStatusFunc func(*component.InstanceID, *component.StatusEvent) // ErrStatusNotReady is returned when trying to report status before service start @@ -167,9 +167,7 @@ func (r *reporter) componentFSM(id *component.InstanceID) *fsm { return fsm } -// NewReportStatusFunc returns a function to be used as ReportStatus for -// component.TelemetrySettings, which differs from servicetelemetry.Settings in that -// the component version is tied to specific component instance. +// NewReportStatusFunc returns a function to be used as ReportStatus for component.TelemetrySettings func NewReportStatusFunc( id *component.InstanceID, srvStatus ServiceStatusFunc, diff --git a/service/service.go b/service/service.go index 8b57bd12cad..320a7068bcf 100644 --- a/service/service.go +++ b/service/service.go @@ -32,7 +32,6 @@ import ( "go.opentelemetry.io/collector/service/internal/graph" "go.opentelemetry.io/collector/service/internal/proctelemetry" "go.opentelemetry.io/collector/service/internal/resource" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/telemetry" ) @@ -70,9 +69,11 @@ type Settings struct { // Service represents the implementation of a component.Host. type Service struct { buildInfo component.BuildInfo - telemetrySettings servicetelemetry.TelemetrySettings + telemetrySettings component.TelemetrySettings host *serviceHost collectorConf *confmap.Conf + + reporter status.Reporter } // New creates a new Service, its telemetry, and Components. @@ -131,20 +132,20 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { } logsAboutMeterProvider(logger, cfg.Telemetry.Metrics, mp, extendedConfig) - srv.telemetrySettings = servicetelemetry.TelemetrySettings{ + srv.telemetrySettings = component.TelemetrySettings{ Logger: logger, MeterProvider: mp, TracerProvider: tracerProvider, MetricsLevel: cfg.Telemetry.Metrics.Level, // Construct telemetry attributes from build info and config's resource attributes. Resource: pcommonRes, - Status: status.NewReporter(srv.host.notifyComponentStatusChange, func(err error) { - if errors.Is(err, status.ErrStatusNotReady) { - logger.Warn("Invalid transition", zap.Error(err)) - } - // ignore other errors as they represent invalid state transitions and are considered benign. - }), } + srv.reporter = status.NewReporter(srv.host.notifyComponentStatusChange, func(err error) { + if errors.Is(err, status.ErrStatusNotReady) { + logger.Warn("Invalid transition", zap.Error(err)) + } + // ignore other errors as they represent invalid state transitions and are considered benign. + }) if err = srv.initGraph(ctx, set, cfg); err != nil { err = multierr.Append(err, srv.shutdownTelemetry(ctx)) @@ -201,7 +202,7 @@ func (srv *Service) Start(ctx context.Context) error { ) // enable status reporting - srv.telemetrySettings.Status.Ready() + srv.reporter.Ready() if err := srv.host.serviceExtensions.Start(ctx, srv.host); err != nil { return fmt.Errorf("failed to start extensions: %w", err) @@ -213,7 +214,7 @@ func (srv *Service) Start(ctx context.Context) error { } } - if err := srv.host.pipelines.StartAll(ctx, srv.host, srv.telemetrySettings.Status); err != nil { + if err := srv.host.pipelines.StartAll(ctx, srv.host, srv.reporter); err != nil { return fmt.Errorf("cannot start pipelines: %w", err) } @@ -264,7 +265,7 @@ func (srv *Service) Shutdown(ctx context.Context) error { errs = multierr.Append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err)) } - if err := srv.host.pipelines.ShutdownAll(ctx, srv.telemetrySettings.Status); err != nil { + if err := srv.host.pipelines.ShutdownAll(ctx, srv.reporter); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown pipelines: %w", err)) } @@ -287,7 +288,7 @@ func (srv *Service) initExtensions(ctx context.Context, cfg extensions.Config) e BuildInfo: srv.buildInfo, Extensions: srv.host.extensions, } - if srv.host.serviceExtensions, err = extensions.New(ctx, extensionsSettings, cfg); err != nil { + if srv.host.serviceExtensions, err = extensions.New(ctx, extensionsSettings, cfg, extensions.WithReporter(srv.reporter)); err != nil { return fmt.Errorf("failed to build extensions: %w", err) } return nil @@ -304,6 +305,7 @@ func (srv *Service) initGraph(ctx context.Context, set Settings, cfg Config) err ExporterBuilder: set.Exporters, ConnectorBuilder: set.Connectors, PipelineConfigs: cfg.Pipelines, + ReportStatus: srv.reporter.ReportStatus, }); err != nil { return fmt.Errorf("failed to build pipelines: %w", err) }