Skip to content

Commit

Permalink
[chore] [exporterhelper] Update test validating failed re-enqueue (#8960
Browse files Browse the repository at this point in the history
)

Update the test validating failed re-enqueue to ensure that the request
is sent through the queue sender, not the retry sender. It'll unblock
removing the dependency on the retry sender for the re-enqueue as done
in #8942
  • Loading branch information
dmitryax authored Nov 21, 2023
1 parent 8226a60 commit 73fa163
Showing 1 changed file with 46 additions and 10 deletions.
56 changes: 46 additions & 10 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -257,26 +259,44 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
// if requeueing is enabled, but the queue is full, we get an error
func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 0
qCfg.QueueSize = 0
qCfg.NumConsumers = 1
qCfg.QueueSize = 1
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))

set := exportertest.NewNopCreateSettings()
logger, observedLogs := observer.New(zap.ErrorLevel)
set.Logger = zap.New(logger)
be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)

be.queueSender.(*queueSender).requeuingEnabled = true
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})

traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1))
mockR := newMockRequest(1, traceErr)

ocs := be.obsrepSender.(*observabilityConsumerSender)
ocs.run(func() {
require.Error(t, be.retrySender.send(context.Background(), mockR), "sending_queue is full")
// send a request that will fail after waitReq1 is unblocked
waitReq1 := make(chan struct{})
req1 := newMockExportRequest(func(ctx context.Context) error {
waitReq1 <- struct{}{}
return errors.New("some error")
})
mockR.checkNumRequests(t, 1)
require.NoError(t, be.queueSender.send(context.Background(), req1))

// send another request to fill the queue
req2 := newMockRequest(1, nil)
require.NoError(t, be.queueSender.send(context.Background(), req2))

<-waitReq1

// req1 cannot be put back to the queue and should be dropped, check the log message
assert.Eventually(t, func() bool {
return observedLogs.FilterMessageSnippet("Queue did not accept requeuing request. Dropping data.").Len() == 1
}, time.Second, 1*time.Millisecond)

// req2 should be sent out after that
req2.checkNumRequests(t, 1)
}

func TestQueueRetryWithDisabledQueue(t *testing.T) {
Expand Down Expand Up @@ -388,3 +408,19 @@ type mockHost struct {
func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
return nh.ext
}

type mockExportRequest struct {
exportFunc func(context.Context) error
}

func newMockExportRequest(exportFunc func(context.Context) error) *mockExportRequest {
return &mockExportRequest{exportFunc: exportFunc}
}

func (m *mockExportRequest) ItemsCount() int {
return 1
}

func (m *mockExportRequest) Export(ctx context.Context) error {
return m.exportFunc(ctx)
}

0 comments on commit 73fa163

Please sign in to comment.