Skip to content

Commit

Permalink
[chore] [exporterhelper] Update persistent queue test names (#8954)
Browse files Browse the repository at this point in the history
Update test names after removing the persistentStorage
  • Loading branch information
dmitryax authored Nov 17, 2023
1 parent fdaa351 commit 639223a
Showing 1 changed file with 30 additions and 30 deletions.
60 changes: 30 additions & 30 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
return nh.ext
}

// createTestQueue creates and starts a fake queue with the given capacity and number of consumers.
func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(_ context.Context, item ptrace.Traces)) Queue[ptrace.Traces] {
// createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers.
func createAndStartTestPersistentQueue(t *testing.T, capacity, numConsumers int, consumeFunc func(_ context.Context, item ptrace.Traces)) Queue[ptrace.Traces] {
pq := NewPersistentQueue[ptrace.Traces](capacity, component.DataTypeTraces, component.ID{}, marshaler.MarshalTraces,
unmarshaler.UnmarshalTraces, exportertest.NewNopCreateSettings())
host := &mockHost{ext: map[component.ID]component.Component{
{}: NewMockStorageExtension(nil),
}}
consumers := NewQueueConsumers(pq, numConsumers, callback)
consumers := NewQueueConsumers(pq, numConsumers, consumeFunc)
require.NoError(t, consumers.Start(context.Background(), host))
t.Cleanup(func() {
assert.NoError(t, consumers.Shutdown(context.Background()))
Expand All @@ -59,21 +59,21 @@ func createTestClient(t testing.TB, extension storage.Extension) storage.Client
return client
}

func createTestPersistentStorageWithCapacity(client storage.Client, capacity int) *persistentQueue[ptrace.Traces] {
func createTestPersistentQueueWithCapacity(client storage.Client, capacity int) *persistentQueue[ptrace.Traces] {
pq := NewPersistentQueue[ptrace.Traces](capacity, component.DataTypeTraces, component.ID{}, marshaler.MarshalTraces,
unmarshaler.UnmarshalTraces, exportertest.NewNopCreateSettings()).(*persistentQueue[ptrace.Traces])
pq.initClient(context.Background(), client)
return pq
}

func createTestPersistentStorage(client storage.Client) *persistentQueue[ptrace.Traces] {
return createTestPersistentStorageWithCapacity(client, 1000)
func createTestPersistentQueue(client storage.Client) *persistentQueue[ptrace.Traces] {
return createTestPersistentQueueWithCapacity(client, 1000)
}

func TestPersistentQueue_FullCapacity(t *testing.T) {
start := make(chan struct{})
done := make(chan struct{})
pq := createTestQueue(t, 5, 1, func(context.Context, ptrace.Traces) {
pq := createAndStartTestPersistentQueue(t, 5, 1, func(context.Context, ptrace.Traces) {
start <- struct{}{}
<-done
})
Expand All @@ -97,8 +97,8 @@ func TestPersistentQueue_FullCapacity(t *testing.T) {
close(done)
}

func TestPersistentQueueShutdown(t *testing.T) {
pq := createTestQueue(t, 1001, 100, func(context.Context, ptrace.Traces) {})
func TestPersistentQueue_Shutdown(t *testing.T) {
pq := createAndStartTestPersistentQueue(t, 1001, 100, func(context.Context, ptrace.Traces) {})
req := newTraces(1, 10)

for i := 0; i < 1000; i++ {
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) {
req := newTraces(1, 10)

numMessagesConsumed := &atomic.Int32{}
pq := createTestQueue(t, 1000, c.numConsumers, func(context.Context, ptrace.Traces) {
pq := createAndStartTestPersistentQueue(t, 1000, c.numConsumers, func(context.Context, ptrace.Traces) {
numMessagesConsumed.Add(int32(1))
})

Expand Down Expand Up @@ -272,7 +272,7 @@ func TestPersistentQueue_StopAfterBadStart(t *testing.T) {
assert.NoError(t, pq.Shutdown(context.Background()))
}

func TestPersistentStorage_CorruptedData(t *testing.T) {
func TestPersistentQueue_CorruptedData(t *testing.T) {
req := newTraces(5, 10)

cases := []struct {
Expand Down Expand Up @@ -329,7 +329,7 @@ func TestPersistentStorage_CorruptedData(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
ext := NewMockStorageExtension(nil)
client := createTestClient(t, ext)
ps := createTestPersistentStorage(client)
ps := createTestPersistentQueue(client)

ctx := context.Background()

Expand Down Expand Up @@ -365,18 +365,18 @@ func TestPersistentStorage_CorruptedData(t *testing.T) {
}

// Reload
newPs := createTestPersistentStorage(client)
newPs := createTestPersistentQueue(client)
assert.Equal(t, c.desiredQueueSize, newPs.Size())
})
}
}

func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) {
func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) {
req := newTraces(5, 10)

ext := NewMockStorageExtension(nil)
client := createTestClient(t, ext)
ps := createTestPersistentStorage(client)
ps := createTestPersistentQueue(client)

for i := 0; i < 5; i++ {
err := ps.Offer(context.Background(), req)
Expand All @@ -402,7 +402,7 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) {

// Reload the storage. Since items 0 was not finished, this should be re-enqueued at the end.
// The queue should be essentially {3,4,0,2}.
newPs := createTestPersistentStorage(client)
newPs := createTestPersistentQueue(client)
assert.Equal(t, 4, newPs.Size())
requireCurrentlyDispatchedItemsEqual(t, newPs, []uint64{})

Expand All @@ -429,12 +429,12 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) {

// this test attempts to check if all the invariants are kept if the queue is recreated while
// close to full and with some items dispatched
func TestPersistentStorage_StartWithNonDispatched(t *testing.T) {
func TestPersistentQueue_StartWithNonDispatched(t *testing.T) {
req := newTraces(5, 10)

ext := NewMockStorageExtension(nil)
client := createTestClient(t, ext)
ps := createTestPersistentStorageWithCapacity(client, 5)
ps := createTestPersistentQueueWithCapacity(client, 5)

// Put in items up to capacity
for i := 0; i < 5; i++ {
Expand All @@ -451,14 +451,14 @@ func TestPersistentStorage_StartWithNonDispatched(t *testing.T) {
assert.NoError(t, ps.Shutdown(context.Background()))

// Reload
newPs := createTestPersistentStorageWithCapacity(client, 5)
newPs := createTestPersistentQueueWithCapacity(client, 5)
require.Equal(t, 5, newPs.Size())
}

func TestPersistentStorage_PutCloseReadClose(t *testing.T) {
func TestPersistentQueue_PutCloseReadClose(t *testing.T) {
req := newTraces(5, 10)
ext := NewMockStorageExtension(nil)
ps := createTestPersistentStorage(createTestClient(t, ext))
ps := createTestPersistentQueue(createTestClient(t, ext))
assert.Equal(t, 0, ps.Size())

// Put two elements and close the extension
Expand All @@ -469,7 +469,7 @@ func TestPersistentStorage_PutCloseReadClose(t *testing.T) {
_, _ = ps.Poll()
assert.NoError(t, ps.Shutdown(context.Background()))

newPs := createTestPersistentStorage(createTestClient(t, ext))
newPs := createTestPersistentQueue(createTestClient(t, ext))
require.Equal(t, 2, newPs.Size())

// Lets read both of the elements we put
Expand All @@ -484,7 +484,7 @@ func TestPersistentStorage_PutCloseReadClose(t *testing.T) {
assert.NoError(t, newPs.Shutdown(context.Background()))
}

func BenchmarkPersistentStorage_TraceSpans(b *testing.B) {
func BenchmarkPersistentQueue_TraceSpans(b *testing.B) {
cases := []struct {
numTraces int
numSpansPerTrace int
Expand All @@ -507,7 +507,7 @@ func BenchmarkPersistentStorage_TraceSpans(b *testing.B) {
b.Run(fmt.Sprintf("#traces: %d #spansPerTrace: %d", c.numTraces, c.numSpansPerTrace), func(bb *testing.B) {
ext := NewMockStorageExtension(nil)
client := createTestClient(b, ext)
ps := createTestPersistentStorageWithCapacity(client, 10000000)
ps := createTestPersistentQueueWithCapacity(client, 10000000)

req := newTraces(c.numTraces, c.numSpansPerTrace)

Expand Down Expand Up @@ -585,9 +585,9 @@ func TestItemIndexArrayMarshaling(t *testing.T) {
}
}

func TestPersistentStorage_ShutdownWhileConsuming(t *testing.T) {
func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) {
client := createTestClient(t, NewMockStorageExtension(nil))
ps := createTestPersistentStorage(client)
ps := createTestPersistentQueue(client)

assert.Equal(t, 0, ps.Size())
assert.False(t, client.(*mockStorageClient).isClosed())
Expand All @@ -603,15 +603,15 @@ func TestPersistentStorage_ShutdownWhileConsuming(t *testing.T) {
assert.True(t, client.(*mockStorageClient).isClosed())
}

func TestPersistentStorage_StorageFull(t *testing.T) {
func TestPersistentQueue_StorageFull(t *testing.T) {
req := newTraces(5, 10)
marshaled, err := marshaler.MarshalTraces(req)
require.NoError(t, err)
maxSizeInBytes := len(marshaled) * 5 // arbitrary small number
freeSpaceInBytes := 1

client := newFakeBoundedStorageClient(maxSizeInBytes)
ps := createTestPersistentStorage(client)
ps := createTestPersistentQueue(client)

// Put enough items in to fill the underlying storage
reqCount := 0
Expand Down Expand Up @@ -643,7 +643,7 @@ func TestPersistentStorage_StorageFull(t *testing.T) {
require.NoError(t, ps.Offer(context.Background(), req))
}

func TestPersistentStorage_ItemDispatchingFinish_ErrorHandling(t *testing.T) {
func TestPersistentQueue_ItemDispatchingFinish_ErrorHandling(t *testing.T) {
errDeletingItem := fmt.Errorf("error deleting item")
errUpdatingDispatched := fmt.Errorf("error updating dispatched items")
testCases := []struct {
Expand Down Expand Up @@ -685,7 +685,7 @@ func TestPersistentStorage_ItemDispatchingFinish_ErrorHandling(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
client := newFakeStorageClientWithErrors(testCase.storageErrors)
ps := createTestPersistentStorage(client)
ps := createTestPersistentQueue(client)
client.Reset()

err := ps.itemDispatchingFinish(context.Background(), 0)
Expand Down

0 comments on commit 639223a

Please sign in to comment.