From 683454888e3d8eb418fde86b2e29e87664dbba5a Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Wed, 15 Nov 2023 16:27:35 -0800 Subject: [PATCH] [chore] [exporterhelper] Remove redundant persistentQueue struct (#8930) Just remove the redundant `persistentQueue` struct as suggested https://github.com/open-telemetry/opentelemetry-collector/pull/8923#issuecomment-1812943071 as step 1 --- .../internal/persistent_queue.go | 28 ++++----- .../internal/persistent_storage.go | 60 ++++++++----------- .../internal/persistent_storage_test.go | 9 +-- 3 files changed, 43 insertions(+), 54 deletions(-) diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index ab775943fe9..2e43748b563 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -17,31 +17,27 @@ var ( errWrongExtensionType = errors.New("requested extension is not a storage extension") ) -// persistentQueue holds the queue backed by file storage -type persistentQueue[T any] struct { - *persistentContiguousStorage[T] - set exporter.CreateSettings - storageID component.ID - dataType component.DataType -} - // NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage func NewPersistentQueue[T any](capacity int, dataType component.DataType, storageID component.ID, marshaler func(req T) ([]byte, error), unmarshaler func([]byte) (T, error), set exporter.CreateSettings) Queue[T] { - return &persistentQueue[T]{ - persistentContiguousStorage: newPersistentContiguousStorage(set.Logger, uint64(capacity), marshaler, unmarshaler), - set: set, - storageID: storageID, - dataType: dataType, + return &persistentContiguousStorage[T]{ + set: set, + storageID: storageID, + dataType: dataType, + unmarshaler: unmarshaler, + marshaler: marshaler, + capacity: uint64(capacity), + putChan: make(chan struct{}, capacity), + stopChan: make(chan struct{}), } } // Start starts the persistentQueue with the given number of consumers. -func (pq *persistentQueue[T]) Start(ctx context.Context, host component.Host) error { - storageClient, err := toStorageClient(ctx, pq.storageID, host, pq.set.ID, pq.dataType) +func (pcs *persistentContiguousStorage[T]) Start(ctx context.Context, host component.Host) error { + storageClient, err := toStorageClient(ctx, pcs.storageID, host, pcs.set.ID, pcs.dataType) if err != nil { return err } - pq.persistentContiguousStorage.start(ctx, storageClient) + pcs.initClient(ctx, storageClient) return nil } diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index 0be09d841f7..39fb34af63b 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -13,6 +13,8 @@ import ( "go.uber.org/zap" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/extension/experimental/storage" ) @@ -39,7 +41,9 @@ import ( // index index x // xxxx deleted type persistentContiguousStorage[T any] struct { - logger *zap.Logger + set exporter.CreateSettings + storageID component.ID + dataType component.DataType client storage.Client unmarshaler func(data []byte) (T, error) marshaler func(req T) ([]byte, error) @@ -70,21 +74,7 @@ var ( errInvalidValue = errors.New("invalid value") ) -// newPersistentContiguousStorage creates a new file-storage extension backed queue; -// queueName parameter must be a unique value that identifies the queue. -func newPersistentContiguousStorage[T any]( - logger *zap.Logger, capacity uint64, marshaler func(req T) ([]byte, error), unmarshaler func([]byte) (T, error)) *persistentContiguousStorage[T] { - return &persistentContiguousStorage[T]{ - logger: logger, - unmarshaler: unmarshaler, - marshaler: marshaler, - capacity: capacity, - putChan: make(chan struct{}, capacity), - stopChan: make(chan struct{}), - } -} - -func (pcs *persistentContiguousStorage[T]) start(ctx context.Context, client storage.Client) { +func (pcs *persistentContiguousStorage[T]) initClient(ctx context.Context, client storage.Client) { pcs.client = client pcs.refClient = 1 pcs.initPersistentContiguousStorage(ctx) @@ -113,9 +103,9 @@ func (pcs *persistentContiguousStorage[T]) initPersistentContiguousStorage(ctx c if err != nil { if errors.Is(err, errValueNotSet) { - pcs.logger.Info("Initializing new persistent queue") + pcs.set.Logger.Info("Initializing new persistent queue") } else { - pcs.logger.Error("Failed getting read/write index, starting with new ones", zap.Error(err)) + pcs.set.Logger.Error("Failed getting read/write index, starting with new ones", zap.Error(err)) } pcs.readIndex = 0 pcs.writeIndex = 0 @@ -184,7 +174,7 @@ func (pcs *persistentContiguousStorage[T]) Offer(ctx context.Context, req T) err // putInternal is the internal version that requires caller to hold the mutex lock. func (pcs *persistentContiguousStorage[T]) putInternal(ctx context.Context, req T) error { if pcs.size() >= pcs.capacity { - pcs.logger.Warn("Maximum queue capacity reached") + pcs.set.Logger.Warn("Maximum queue capacity reached") return ErrQueueIsFull } @@ -235,10 +225,10 @@ func (pcs *persistentContiguousStorage[T]) getNextItem(ctx context.Context) (Que } if err != nil { - pcs.logger.Debug("Failed to dispatch item", zap.Error(err)) + pcs.set.Logger.Debug("Failed to dispatch item", zap.Error(err)) // We need to make sure that currently dispatched items list is cleaned if err = pcs.itemDispatchingFinish(ctx, index); err != nil { - pcs.logger.Error("Error deleting item from queue", zap.Error(err)) + pcs.set.Logger.Error("Error deleting item from queue", zap.Error(err)) } return QueueRequest[T]{}, false @@ -251,10 +241,10 @@ func (pcs *persistentContiguousStorage[T]) getNextItem(ctx context.Context) (Que pcs.mu.Lock() defer pcs.mu.Unlock() if err = pcs.itemDispatchingFinish(ctx, index); err != nil { - pcs.logger.Error("Error deleting item from queue", zap.Error(err)) + pcs.set.Logger.Error("Error deleting item from queue", zap.Error(err)) } if err = pcs.unrefClient(ctx); err != nil { - pcs.logger.Error("Error closing the storage client", zap.Error(err)) + pcs.set.Logger.Error("Error closing the storage client", zap.Error(err)) } } return req, true @@ -267,22 +257,23 @@ func (pcs *persistentContiguousStorage[T]) retrieveAndEnqueueNotDispatchedReqs(c pcs.mu.Lock() defer pcs.mu.Unlock() - pcs.logger.Debug("Checking if there are items left for dispatch by consumers") + pcs.set.Logger.Debug("Checking if there are items left for dispatch by consumers") itemKeysBuf, err := pcs.client.Get(ctx, currentlyDispatchedItemsKey) if err == nil { dispatchedItems, err = bytesToItemIndexArray(itemKeysBuf) } if err != nil { - pcs.logger.Error("Could not fetch items left for dispatch by consumers", zap.Error(err)) + pcs.set.Logger.Error("Could not fetch items left for dispatch by consumers", zap.Error(err)) return } if len(dispatchedItems) == 0 { - pcs.logger.Debug("No items left for dispatch by consumers") + pcs.set.Logger.Debug("No items left for dispatch by consumers") return } - pcs.logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems, len(dispatchedItems))) + pcs.set.Logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems, + len(dispatchedItems))) retrieveBatch := make([]storage.Operation, len(dispatchedItems)) cleanupBatch := make([]storage.Operation, len(dispatchedItems)) for i, it := range dispatchedItems { @@ -294,24 +285,24 @@ func (pcs *persistentContiguousStorage[T]) retrieveAndEnqueueNotDispatchedReqs(c cleanupErr := pcs.client.Batch(ctx, cleanupBatch...) if cleanupErr != nil { - pcs.logger.Debug("Failed cleaning items left by consumers", zap.Error(cleanupErr)) + pcs.set.Logger.Debug("Failed cleaning items left by consumers", zap.Error(cleanupErr)) } if retrieveErr != nil { - pcs.logger.Warn("Failed retrieving items left by consumers", zap.Error(retrieveErr)) + pcs.set.Logger.Warn("Failed retrieving items left by consumers", zap.Error(retrieveErr)) return } errCount := 0 for _, op := range retrieveBatch { if op.Value == nil { - pcs.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet)) + pcs.set.Logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet)) continue } req, err := pcs.unmarshaler(op.Value) // If error happened or item is nil, it will be efficiently ignored if err != nil { - pcs.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err)) + pcs.set.Logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err)) continue } if pcs.putInternal(ctx, req) != nil { @@ -320,10 +311,10 @@ func (pcs *persistentContiguousStorage[T]) retrieveAndEnqueueNotDispatchedReqs(c } if errCount > 0 { - pcs.logger.Error("Errors occurred while moving items for dispatching back to queue", + pcs.set.Logger.Error("Errors occurred while moving items for dispatching back to queue", zap.Int(zapNumberOfItems, len(retrieveBatch)), zap.Int(zapErrorCount, errCount)) } else { - pcs.logger.Info("Moved items for dispatching back to queue", + pcs.set.Logger.Info("Moved items for dispatching back to queue", zap.Int(zapNumberOfItems, len(retrieveBatch))) } } @@ -343,7 +334,8 @@ func (pcs *persistentContiguousStorage[T]) itemDispatchingFinish(ctx context.Con deleteOp := storage.DeleteOperation(getItemKey(index)) if err := pcs.client.Batch(ctx, setOp, deleteOp); err != nil { // got an error, try to gracefully handle it - pcs.logger.Warn("Failed updating currently dispatched items, trying to delete the item first", zap.Error(err)) + pcs.set.Logger.Warn("Failed updating currently dispatched items, trying to delete the item first", + zap.Error(err)) } else { // Everything ok, exit return nil diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index 3a4f33a6aee..631010a66bc 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -13,9 +13,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -31,9 +31,10 @@ func createTestClient(t testing.TB, extension storage.Extension) storage.Client return client } -func createTestPersistentStorageWithCapacity(client storage.Client, capacity uint64) *persistentContiguousStorage[ptrace.Traces] { - pcs := newPersistentContiguousStorage(zap.NewNop(), capacity, marshaler.MarshalTraces, unmarshaler.UnmarshalTraces) - pcs.start(context.Background(), client) +func createTestPersistentStorageWithCapacity(client storage.Client, capacity int) *persistentContiguousStorage[ptrace.Traces] { + pcs := NewPersistentQueue[ptrace.Traces](capacity, component.DataTypeTraces, component.ID{}, marshaler.MarshalTraces, + unmarshaler.UnmarshalTraces, exportertest.NewNopCreateSettings()).(*persistentContiguousStorage[ptrace.Traces]) + pcs.initClient(context.Background(), client) return pcs }