Skip to content

Commit

Permalink
[chore] [exporterhelper] Remove redundant persistentQueue struct (#8930)
Browse files Browse the repository at this point in the history
Just remove the redundant `persistentQueue` struct as suggested
#8923 (comment)
as step 1
  • Loading branch information
dmitryax authored Nov 16, 2023
1 parent 658a6a9 commit 6834548
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 54 deletions.
28 changes: 12 additions & 16 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,27 @@ var (
errWrongExtensionType = errors.New("requested extension is not a storage extension")
)

// persistentQueue holds the queue backed by file storage
type persistentQueue[T any] struct {
*persistentContiguousStorage[T]
set exporter.CreateSettings
storageID component.ID
dataType component.DataType
}

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue[T any](capacity int, dataType component.DataType, storageID component.ID, marshaler func(req T) ([]byte, error), unmarshaler func([]byte) (T, error), set exporter.CreateSettings) Queue[T] {
return &persistentQueue[T]{
persistentContiguousStorage: newPersistentContiguousStorage(set.Logger, uint64(capacity), marshaler, unmarshaler),
set: set,
storageID: storageID,
dataType: dataType,
return &persistentContiguousStorage[T]{
set: set,
storageID: storageID,
dataType: dataType,
unmarshaler: unmarshaler,
marshaler: marshaler,
capacity: uint64(capacity),
putChan: make(chan struct{}, capacity),
stopChan: make(chan struct{}),
}
}

// Start starts the persistentQueue with the given number of consumers.
func (pq *persistentQueue[T]) Start(ctx context.Context, host component.Host) error {
storageClient, err := toStorageClient(ctx, pq.storageID, host, pq.set.ID, pq.dataType)
func (pcs *persistentContiguousStorage[T]) Start(ctx context.Context, host component.Host) error {
storageClient, err := toStorageClient(ctx, pcs.storageID, host, pcs.set.ID, pcs.dataType)
if err != nil {
return err
}
pq.persistentContiguousStorage.start(ctx, storageClient)
pcs.initClient(ctx, storageClient)
return nil
}

Expand Down
60 changes: 26 additions & 34 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

Expand All @@ -39,7 +41,9 @@ import (
// index index x
// xxxx deleted
type persistentContiguousStorage[T any] struct {
logger *zap.Logger
set exporter.CreateSettings
storageID component.ID
dataType component.DataType
client storage.Client
unmarshaler func(data []byte) (T, error)
marshaler func(req T) ([]byte, error)
Expand Down Expand Up @@ -70,21 +74,7 @@ var (
errInvalidValue = errors.New("invalid value")
)

// newPersistentContiguousStorage creates a new file-storage extension backed queue;
// queueName parameter must be a unique value that identifies the queue.
func newPersistentContiguousStorage[T any](
logger *zap.Logger, capacity uint64, marshaler func(req T) ([]byte, error), unmarshaler func([]byte) (T, error)) *persistentContiguousStorage[T] {
return &persistentContiguousStorage[T]{
logger: logger,
unmarshaler: unmarshaler,
marshaler: marshaler,
capacity: capacity,
putChan: make(chan struct{}, capacity),
stopChan: make(chan struct{}),
}
}

func (pcs *persistentContiguousStorage[T]) start(ctx context.Context, client storage.Client) {
func (pcs *persistentContiguousStorage[T]) initClient(ctx context.Context, client storage.Client) {
pcs.client = client
pcs.refClient = 1
pcs.initPersistentContiguousStorage(ctx)
Expand Down Expand Up @@ -113,9 +103,9 @@ func (pcs *persistentContiguousStorage[T]) initPersistentContiguousStorage(ctx c

if err != nil {
if errors.Is(err, errValueNotSet) {
pcs.logger.Info("Initializing new persistent queue")
pcs.set.Logger.Info("Initializing new persistent queue")
} else {
pcs.logger.Error("Failed getting read/write index, starting with new ones", zap.Error(err))
pcs.set.Logger.Error("Failed getting read/write index, starting with new ones", zap.Error(err))
}
pcs.readIndex = 0
pcs.writeIndex = 0
Expand Down Expand Up @@ -184,7 +174,7 @@ func (pcs *persistentContiguousStorage[T]) Offer(ctx context.Context, req T) err
// putInternal is the internal version that requires caller to hold the mutex lock.
func (pcs *persistentContiguousStorage[T]) putInternal(ctx context.Context, req T) error {
if pcs.size() >= pcs.capacity {
pcs.logger.Warn("Maximum queue capacity reached")
pcs.set.Logger.Warn("Maximum queue capacity reached")
return ErrQueueIsFull
}

Expand Down Expand Up @@ -235,10 +225,10 @@ func (pcs *persistentContiguousStorage[T]) getNextItem(ctx context.Context) (Que
}

if err != nil {
pcs.logger.Debug("Failed to dispatch item", zap.Error(err))
pcs.set.Logger.Debug("Failed to dispatch item", zap.Error(err))
// We need to make sure that currently dispatched items list is cleaned
if err = pcs.itemDispatchingFinish(ctx, index); err != nil {
pcs.logger.Error("Error deleting item from queue", zap.Error(err))
pcs.set.Logger.Error("Error deleting item from queue", zap.Error(err))
}

return QueueRequest[T]{}, false
Expand All @@ -251,10 +241,10 @@ func (pcs *persistentContiguousStorage[T]) getNextItem(ctx context.Context) (Que
pcs.mu.Lock()
defer pcs.mu.Unlock()
if err = pcs.itemDispatchingFinish(ctx, index); err != nil {
pcs.logger.Error("Error deleting item from queue", zap.Error(err))
pcs.set.Logger.Error("Error deleting item from queue", zap.Error(err))
}
if err = pcs.unrefClient(ctx); err != nil {
pcs.logger.Error("Error closing the storage client", zap.Error(err))
pcs.set.Logger.Error("Error closing the storage client", zap.Error(err))
}
}
return req, true
Expand All @@ -267,22 +257,23 @@ func (pcs *persistentContiguousStorage[T]) retrieveAndEnqueueNotDispatchedReqs(c

pcs.mu.Lock()
defer pcs.mu.Unlock()
pcs.logger.Debug("Checking if there are items left for dispatch by consumers")
pcs.set.Logger.Debug("Checking if there are items left for dispatch by consumers")
itemKeysBuf, err := pcs.client.Get(ctx, currentlyDispatchedItemsKey)
if err == nil {
dispatchedItems, err = bytesToItemIndexArray(itemKeysBuf)
}
if err != nil {
pcs.logger.Error("Could not fetch items left for dispatch by consumers", zap.Error(err))
pcs.set.Logger.Error("Could not fetch items left for dispatch by consumers", zap.Error(err))
return
}

if len(dispatchedItems) == 0 {
pcs.logger.Debug("No items left for dispatch by consumers")
pcs.set.Logger.Debug("No items left for dispatch by consumers")
return
}

pcs.logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems, len(dispatchedItems)))
pcs.set.Logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems,
len(dispatchedItems)))
retrieveBatch := make([]storage.Operation, len(dispatchedItems))
cleanupBatch := make([]storage.Operation, len(dispatchedItems))
for i, it := range dispatchedItems {
Expand All @@ -294,24 +285,24 @@ func (pcs *persistentContiguousStorage[T]) retrieveAndEnqueueNotDispatchedReqs(c
cleanupErr := pcs.client.Batch(ctx, cleanupBatch...)

if cleanupErr != nil {
pcs.logger.Debug("Failed cleaning items left by consumers", zap.Error(cleanupErr))
pcs.set.Logger.Debug("Failed cleaning items left by consumers", zap.Error(cleanupErr))
}

if retrieveErr != nil {
pcs.logger.Warn("Failed retrieving items left by consumers", zap.Error(retrieveErr))
pcs.set.Logger.Warn("Failed retrieving items left by consumers", zap.Error(retrieveErr))
return
}

errCount := 0
for _, op := range retrieveBatch {
if op.Value == nil {
pcs.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet))
pcs.set.Logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet))
continue
}
req, err := pcs.unmarshaler(op.Value)
// If error happened or item is nil, it will be efficiently ignored
if err != nil {
pcs.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err))
pcs.set.Logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err))
continue
}
if pcs.putInternal(ctx, req) != nil {
Expand All @@ -320,10 +311,10 @@ func (pcs *persistentContiguousStorage[T]) retrieveAndEnqueueNotDispatchedReqs(c
}

if errCount > 0 {
pcs.logger.Error("Errors occurred while moving items for dispatching back to queue",
pcs.set.Logger.Error("Errors occurred while moving items for dispatching back to queue",
zap.Int(zapNumberOfItems, len(retrieveBatch)), zap.Int(zapErrorCount, errCount))
} else {
pcs.logger.Info("Moved items for dispatching back to queue",
pcs.set.Logger.Info("Moved items for dispatching back to queue",
zap.Int(zapNumberOfItems, len(retrieveBatch)))
}
}
Expand All @@ -343,7 +334,8 @@ func (pcs *persistentContiguousStorage[T]) itemDispatchingFinish(ctx context.Con
deleteOp := storage.DeleteOperation(getItemKey(index))
if err := pcs.client.Batch(ctx, setOp, deleteOp); err != nil {
// got an error, try to gracefully handle it
pcs.logger.Warn("Failed updating currently dispatched items, trying to delete the item first", zap.Error(err))
pcs.set.Logger.Warn("Failed updating currently dispatched items, trying to delete the item first",
zap.Error(err))
} else {
// Everything ok, exit
return nil
Expand Down
9 changes: 5 additions & 4 deletions exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/pdata/ptrace"
)
Expand All @@ -31,9 +31,10 @@ func createTestClient(t testing.TB, extension storage.Extension) storage.Client
return client
}

func createTestPersistentStorageWithCapacity(client storage.Client, capacity uint64) *persistentContiguousStorage[ptrace.Traces] {
pcs := newPersistentContiguousStorage(zap.NewNop(), capacity, marshaler.MarshalTraces, unmarshaler.UnmarshalTraces)
pcs.start(context.Background(), client)
func createTestPersistentStorageWithCapacity(client storage.Client, capacity int) *persistentContiguousStorage[ptrace.Traces] {
pcs := NewPersistentQueue[ptrace.Traces](capacity, component.DataTypeTraces, component.ID{}, marshaler.MarshalTraces,
unmarshaler.UnmarshalTraces, exportertest.NewNopCreateSettings()).(*persistentContiguousStorage[ptrace.Traces])
pcs.initClient(context.Background(), client)
return pcs
}

Expand Down

0 comments on commit 6834548

Please sign in to comment.