diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index a76a725981f..24abaa9928e 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -35,22 +35,37 @@ var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegis type ObsrepSenderFactory = func(obsrep *ObsReport) RequestSender // Option apply changes to BaseExporter. -type Option func(*BaseExporter) error +type Option func(*baseOptions) error -type BaseExporter struct { +type baseOptions struct { component.StartFunc component.ShutdownFunc - Signal pipeline.Signal + // Message for the user to be added with an export failure message. + ExportFailureMessage string Marshaler exporterqueue.Marshaler[internal.Request] Unmarshaler exporterqueue.Unmarshaler[internal.Request] + queueCfg exporterqueue.Config + queueFactory exporterqueue.Factory[internal.Request] + batcherCfg exporterbatcher.Config + retryCfg configretry.BackOffConfig + timeoutCfg TimeoutConfig + + ConsumerOptions []consumer.Option +} + +type BaseExporter struct { + component.StartFunc + component.ShutdownFunc + Set exporter.Settings Obsrep *ObsReport // Message for the user to be added with an export failure message. ExportFailureMessage string + ConsumerOptions []consumer.Option // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. @@ -59,13 +74,7 @@ type BaseExporter struct { QueueSender RequestSender ObsrepSender RequestSender RetrySender RequestSender - TimeoutSender *TimeoutSender // TimeoutSender is always initialized. - - ConsumerOptions []consumer.Option - - queueCfg exporterqueue.Config - queueFactory exporterqueue.Factory[internal.Request] - BatcherCfg exporterbatcher.Config + TimeoutSender RequestSender } func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) { @@ -74,51 +83,49 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe return nil, err } - be := &BaseExporter{ - Signal: signal, - - BatchSender: &BaseRequestSender{}, - QueueSender: &BaseRequestSender{}, - ObsrepSender: osf(obsReport), - RetrySender: &BaseRequestSender{}, - TimeoutSender: &TimeoutSender{cfg: NewDefaultTimeoutConfig()}, - - Set: set, - Obsrep: obsReport, - } - + bo := &baseOptions{} for _, op := range options { - err = multierr.Append(err, op(be)) + err = multierr.Append(err, op(bo)) } if err != nil { return nil, err } - if be.queueCfg.Enabled { - q := be.queueFactory( + be := &BaseExporter{ + StartFunc: bo.StartFunc, + ShutdownFunc: bo.ShutdownFunc, + ExportFailureMessage: bo.ExportFailureMessage, + ConsumerOptions: bo.ConsumerOptions, + BatchSender: &BaseRequestSender{}, + QueueSender: &BaseRequestSender{}, + ObsrepSender: osf(obsReport), + RetrySender: newRetrySender(bo.retryCfg, set), + TimeoutSender: &TimeoutSender{cfg: bo.timeoutCfg}, + + Set: set, + Obsrep: obsReport, + } + + if bo.queueCfg.Enabled { + q := bo.queueFactory( context.Background(), exporterqueue.Settings{ - Signal: be.Signal, + Signal: signal, ExporterSettings: be.Set, }, - be.queueCfg) - be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg) - for _, op := range options { - err = multierr.Append(err, op(be)) - } - } - - if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled || - usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled { - bs := NewBatchSender(be.BatcherCfg, be.Set) - be.BatchSender = bs + bo.queueCfg) + be.QueueSender = NewQueueSender(q, be.Set, bo.queueCfg.NumConsumers, bo.ExportFailureMessage, be.Obsrep, bo.batcherCfg) } - if err != nil { - return nil, err + if !usePullingBasedExporterQueueBatcher.IsEnabled() && bo.batcherCfg.Enabled || + usePullingBasedExporterQueueBatcher.IsEnabled() && bo.batcherCfg.Enabled && !bo.queueCfg.Enabled { + be.BatchSender = NewBatchSender(bo.batcherCfg, be.Set) } - be.connectSenders() + be.QueueSender.SetNextSender(be.BatchSender) + be.BatchSender.SetNextSender(be.ObsrepSender) + be.ObsrepSender.SetNextSender(be.RetrySender) + be.RetrySender.SetNextSender(be.TimeoutSender) if bs, ok := be.BatchSender.(*BatchSender); ok { // If queue sender is enabled assign to the batch sender the same number of workers. @@ -126,7 +133,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe bs.concurrencyLimit = int64(qs.numConsumers) } // Batcher sender mutates the data. - be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true})) + bo.ConsumerOptions = append(bo.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true})) } return be, nil @@ -142,14 +149,6 @@ func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error { return err } -// connectSenders connects the senders in the predefined order. -func (be *BaseExporter) connectSenders() { - be.QueueSender.SetNextSender(be.BatchSender) - be.BatchSender.SetNextSender(be.ObsrepSender) - be.ObsrepSender.SetNextSender(be.RetrySender) - be.RetrySender.SetNextSender(be.TimeoutSender) -} - func (be *BaseExporter) Start(ctx context.Context, host component.Host) error { // First start the wrapped exporter. if err := be.StartFunc.Start(ctx, host); err != nil { @@ -180,7 +179,7 @@ func (be *BaseExporter) Shutdown(ctx context.Context) error { // WithStart overrides the default Start function for an exporter. // The default start function does nothing and always returns nil. func WithStart(start component.StartFunc) Option { - return func(o *BaseExporter) error { + return func(o *baseOptions) error { o.StartFunc = start return nil } @@ -189,7 +188,7 @@ func WithStart(start component.StartFunc) Option { // WithShutdown overrides the default Shutdown function for an exporter. // The default shutdown function does nothing and always returns nil. func WithShutdown(shutdown component.ShutdownFunc) Option { - return func(o *BaseExporter) error { + return func(o *baseOptions) error { o.ShutdownFunc = shutdown return nil } @@ -197,22 +196,22 @@ func WithShutdown(shutdown component.ShutdownFunc) Option { // WithTimeout overrides the default TimeoutConfig for an exporter. // The default TimeoutConfig is 5 seconds. -func WithTimeout(timeoutConfig TimeoutConfig) Option { - return func(o *BaseExporter) error { - o.TimeoutSender.cfg = timeoutConfig +func WithTimeout(cfg TimeoutConfig) Option { + return func(o *baseOptions) error { + o.timeoutCfg = cfg return nil } } // WithRetry overrides the default configretry.BackOffConfig for an exporter. // The default configretry.BackOffConfig is to disable retries. -func WithRetry(config configretry.BackOffConfig) Option { - return func(o *BaseExporter) error { - if !config.Enabled { +func WithRetry(cfg configretry.BackOffConfig) Option { + return func(o *baseOptions) error { + if !cfg.Enabled { o.ExportFailureMessage += " Try enabling retry_on_failure config option to retry on retryable errors." return nil } - o.RetrySender = newRetrySender(config, o.Set) + o.retryCfg = cfg return nil } } @@ -221,7 +220,7 @@ func WithRetry(config configretry.BackOffConfig) Option { // The default QueueConfig is to disable queueing. // This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. func WithQueue(config QueueConfig) Option { - return func(o *BaseExporter) error { + return func(o *baseOptions) error { if o.Marshaler == nil || o.Unmarshaler == nil { return errors.New("WithQueue option is not available for the new request exporters, use WithRequestQueue instead") } @@ -247,7 +246,7 @@ func WithQueue(config QueueConfig) Option { // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[internal.Request]) Option { - return func(o *BaseExporter) error { + return func(o *baseOptions) error { if o.Marshaler != nil || o.Unmarshaler != nil { return errors.New("WithRequestQueue option must be used with the new request exporters only, use WithQueue instead") } @@ -265,7 +264,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto // The default is non-mutable data. // TODO: Verify if we can change the default to be mutable as we do for processors. func WithCapabilities(capabilities consumer.Capabilities) Option { - return func(o *BaseExporter) error { + return func(o *baseOptions) error { o.ConsumerOptions = append(o.ConsumerOptions, consumer.WithCapabilities(capabilities)) return nil } @@ -277,8 +276,8 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { // This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func WithBatcher(cfg exporterbatcher.Config) Option { - return func(o *BaseExporter) error { - o.BatcherCfg = cfg + return func(o *baseOptions) error { + o.batcherCfg = cfg return nil } } @@ -286,16 +285,16 @@ func WithBatcher(cfg exporterbatcher.Config) Option { // WithMarshaler is used to set the request marshaler for the new exporter helper. // It must be provided as the first option when creating a new exporter helper. func WithMarshaler(marshaler exporterqueue.Marshaler[internal.Request]) Option { - return func(o *BaseExporter) error { + return func(o *baseOptions) error { o.Marshaler = marshaler return nil } } -// withUnmarshaler is used to set the request unmarshaler for the new exporter helper. +// WithUnmarshaler is used to set the request unmarshaler for the new exporter helper. // It must be provided as the first option when creating a new exporter helper. func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[internal.Request]) Option { - return func(o *BaseExporter) error { + return func(o *baseOptions) error { o.Unmarshaler = unmarshaler return nil } diff --git a/exporter/exporterhelper/internal/base_exporter_test.go b/exporter/exporterhelper/internal/base_exporter_test.go index cca72cd8272..fec3626985d 100644 --- a/exporter/exporterhelper/internal/base_exporter_test.go +++ b/exporter/exporterhelper/internal/base_exporter_test.go @@ -75,11 +75,9 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) { runTest := func(testName string, enableQueueBatcher bool) { t.Run(testName, func(t *testing.T) { defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - bs, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, + _, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, WithRetry(configretry.NewDefaultBackOffConfig())) require.NoError(t, err) - require.Nil(t, bs.Marshaler) - require.Nil(t, bs.Unmarshaler) _, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig())) require.Error(t, err)