Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Extract exporter helper options as separate struct #11990

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 66 additions & 67 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,37 @@ var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegis
type ObsrepSenderFactory = func(obsrep *ObsReport) RequestSender

// Option apply changes to BaseExporter.
type Option func(*BaseExporter) error
type Option func(*baseOptions) error

type BaseExporter struct {
type baseOptions struct {
component.StartFunc
component.ShutdownFunc

Signal pipeline.Signal
// Message for the user to be added with an export failure message.
ExportFailureMessage string

Marshaler exporterqueue.Marshaler[internal.Request]
Unmarshaler exporterqueue.Unmarshaler[internal.Request]

queueCfg exporterqueue.Config
queueFactory exporterqueue.Factory[internal.Request]
batcherCfg exporterbatcher.Config
retryCfg configretry.BackOffConfig
timeoutCfg TimeoutConfig

ConsumerOptions []consumer.Option
}

type BaseExporter struct {
component.StartFunc
component.ShutdownFunc

Set exporter.Settings
Obsrep *ObsReport

// Message for the user to be added with an export failure message.
ExportFailureMessage string
ConsumerOptions []consumer.Option

// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
Expand All @@ -59,13 +74,7 @@ type BaseExporter struct {
QueueSender RequestSender
ObsrepSender RequestSender
RetrySender RequestSender
TimeoutSender *TimeoutSender // TimeoutSender is always initialized.

ConsumerOptions []consumer.Option

queueCfg exporterqueue.Config
queueFactory exporterqueue.Factory[internal.Request]
BatcherCfg exporterbatcher.Config
TimeoutSender RequestSender
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) {
Expand All @@ -74,59 +83,57 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
return nil, err
}

be := &BaseExporter{
Signal: signal,

BatchSender: &BaseRequestSender{},
QueueSender: &BaseRequestSender{},
ObsrepSender: osf(obsReport),
RetrySender: &BaseRequestSender{},
TimeoutSender: &TimeoutSender{cfg: NewDefaultTimeoutConfig()},

Set: set,
Obsrep: obsReport,
}

bo := &baseOptions{}
for _, op := range options {
err = multierr.Append(err, op(be))
err = multierr.Append(err, op(bo))
}
if err != nil {
return nil, err
}

if be.queueCfg.Enabled {
q := be.queueFactory(
be := &BaseExporter{
StartFunc: bo.StartFunc,
ShutdownFunc: bo.ShutdownFunc,
ExportFailureMessage: bo.ExportFailureMessage,
ConsumerOptions: bo.ConsumerOptions,
BatchSender: &BaseRequestSender{},
QueueSender: &BaseRequestSender{},
ObsrepSender: osf(obsReport),
RetrySender: newRetrySender(bo.retryCfg, set),
TimeoutSender: &TimeoutSender{cfg: bo.timeoutCfg},

Set: set,
Obsrep: obsReport,
}

if bo.queueCfg.Enabled {
q := bo.queueFactory(
context.Background(),
exporterqueue.Settings{
Signal: be.Signal,
Signal: signal,
ExporterSettings: be.Set,
},
be.queueCfg)
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg)
for _, op := range options {
err = multierr.Append(err, op(be))
}
}

if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled ||
usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set)
be.BatchSender = bs
bo.queueCfg)
be.QueueSender = NewQueueSender(q, be.Set, bo.queueCfg.NumConsumers, bo.ExportFailureMessage, be.Obsrep, bo.batcherCfg)
}

if err != nil {
return nil, err
if !usePullingBasedExporterQueueBatcher.IsEnabled() && bo.batcherCfg.Enabled ||
usePullingBasedExporterQueueBatcher.IsEnabled() && bo.batcherCfg.Enabled && !bo.queueCfg.Enabled {
be.BatchSender = NewBatchSender(bo.batcherCfg, be.Set)
}

be.connectSenders()
be.QueueSender.SetNextSender(be.BatchSender)
be.BatchSender.SetNextSender(be.ObsrepSender)
be.ObsrepSender.SetNextSender(be.RetrySender)
be.RetrySender.SetNextSender(be.TimeoutSender)

if bs, ok := be.BatchSender.(*BatchSender); ok {
// If queue sender is enabled assign to the batch sender the same number of workers.
if qs, ok := be.QueueSender.(*QueueSender); ok {
bs.concurrencyLimit = int64(qs.numConsumers)
}
// Batcher sender mutates the data.
be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
bo.ConsumerOptions = append(bo.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
}

return be, nil
Expand All @@ -142,14 +149,6 @@ func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error {
return err
}

// connectSenders connects the senders in the predefined order.
func (be *BaseExporter) connectSenders() {
be.QueueSender.SetNextSender(be.BatchSender)
be.BatchSender.SetNextSender(be.ObsrepSender)
be.ObsrepSender.SetNextSender(be.RetrySender)
be.RetrySender.SetNextSender(be.TimeoutSender)
}

func (be *BaseExporter) Start(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
if err := be.StartFunc.Start(ctx, host); err != nil {
Expand Down Expand Up @@ -180,7 +179,7 @@ func (be *BaseExporter) Shutdown(ctx context.Context) error {
// WithStart overrides the default Start function for an exporter.
// The default start function does nothing and always returns nil.
func WithStart(start component.StartFunc) Option {
return func(o *BaseExporter) error {
return func(o *baseOptions) error {
o.StartFunc = start
return nil
}
Expand All @@ -189,30 +188,30 @@ func WithStart(start component.StartFunc) Option {
// WithShutdown overrides the default Shutdown function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown component.ShutdownFunc) Option {
return func(o *BaseExporter) error {
return func(o *baseOptions) error {
o.ShutdownFunc = shutdown
return nil
}
}

// WithTimeout overrides the default TimeoutConfig for an exporter.
// The default TimeoutConfig is 5 seconds.
func WithTimeout(timeoutConfig TimeoutConfig) Option {
return func(o *BaseExporter) error {
o.TimeoutSender.cfg = timeoutConfig
func WithTimeout(cfg TimeoutConfig) Option {
return func(o *baseOptions) error {
o.timeoutCfg = cfg
return nil
}
}

// WithRetry overrides the default configretry.BackOffConfig for an exporter.
// The default configretry.BackOffConfig is to disable retries.
func WithRetry(config configretry.BackOffConfig) Option {
return func(o *BaseExporter) error {
if !config.Enabled {
func WithRetry(cfg configretry.BackOffConfig) Option {
return func(o *baseOptions) error {
if !cfg.Enabled {
o.ExportFailureMessage += " Try enabling retry_on_failure config option to retry on retryable errors."
return nil
}
o.RetrySender = newRetrySender(config, o.Set)
o.retryCfg = cfg
return nil
}
}
Expand All @@ -221,7 +220,7 @@ func WithRetry(config configretry.BackOffConfig) Option {
// The default QueueConfig is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueConfig) Option {
return func(o *BaseExporter) error {
return func(o *baseOptions) error {
if o.Marshaler == nil || o.Unmarshaler == nil {
return errors.New("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
}
Expand All @@ -247,7 +246,7 @@ func WithQueue(config QueueConfig) Option {
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[internal.Request]) Option {
return func(o *BaseExporter) error {
return func(o *baseOptions) error {
if o.Marshaler != nil || o.Unmarshaler != nil {
return errors.New("WithRequestQueue option must be used with the new request exporters only, use WithQueue instead")
}
Expand All @@ -265,7 +264,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto
// The default is non-mutable data.
// TODO: Verify if we can change the default to be mutable as we do for processors.
func WithCapabilities(capabilities consumer.Capabilities) Option {
return func(o *BaseExporter) error {
return func(o *baseOptions) error {
o.ConsumerOptions = append(o.ConsumerOptions, consumer.WithCapabilities(capabilities))
return nil
}
Expand All @@ -277,25 +276,25 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatcher(cfg exporterbatcher.Config) Option {
return func(o *BaseExporter) error {
o.BatcherCfg = cfg
return func(o *baseOptions) error {
o.batcherCfg = cfg
return nil
}
}

// WithMarshaler is used to set the request marshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func WithMarshaler(marshaler exporterqueue.Marshaler[internal.Request]) Option {
return func(o *BaseExporter) error {
return func(o *baseOptions) error {
o.Marshaler = marshaler
return nil
}
}

// withUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// WithUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[internal.Request]) Option {
return func(o *BaseExporter) error {
return func(o *baseOptions) error {
o.Unmarshaler = unmarshaler
return nil
}
Expand Down
4 changes: 1 addition & 3 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,9 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
bs, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
_, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()))
require.NoError(t, err)
require.Nil(t, bs.Marshaler)
require.Nil(t, bs.Unmarshaler)
_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
require.Error(t, err)
Expand Down
Loading