Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TBS: flatten badger when compaction is not running #15081

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 82 additions & 3 deletions x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const (
var (
// gcCh works like a global mutex to protect gc from running concurrently when 2 TBS processors are active during a hot reload
gcCh = make(chan struct{}, 1)
// flattenCh works like a global mutex to protect flatten from running concurrently when 2 TBS processors are active during a hot reload
flattenCh = make(chan struct{}, 1)
)

// Processor is a tail-sampling event processor.
Expand Down Expand Up @@ -390,6 +392,37 @@ func (p *Processor) Run() error {
}
}
})
g.Go(func() error {
select {
case <-p.stopping:
return nil
case flattenCh <- struct{}{}:
}
defer func() {
<-flattenCh
}()
ticker := time.NewTicker(p.config.TTL / 2)
defer ticker.Stop()
for {
select {
case <-p.stopping:
return nil
case <-ticker.C:
if p.eventStore.storageLimitTracker.isStuck() {
p.logger.Warn("all writes are failing due to storage limit, trying to flatten db to release storage space")
p.eventStore.storageLimitTracker.flattenMu.Lock()
err = p.config.DB.Flatten(3)
if err != nil {
p.logger.With(logp.Error(err)).Warnf("fail to flatten db")
} else {
p.logger.Warn("flatten successful")
}
p.eventStore.storageLimitTracker.flattenMu.Unlock()
}
p.eventStore.storageLimitTracker.reset()
}
}
})
g.Go(func() error {
// Protect this goroutine from running concurrently when 2 TBS processors are active
// as badger GC is not concurrent safe.
Expand All @@ -411,6 +444,7 @@ func (p *Processor) Run() error {
case <-p.stopping:
return nil
case <-ticker.C:
p.eventStore.storageLimitTracker.flattenMu.RLock()
const discardRatio = 0.5
var err error
for err == nil {
Expand All @@ -419,8 +453,10 @@ func (p *Processor) Run() error {
err = p.config.DB.RunValueLogGC(discardRatio)
}
if err != nil && err != badger.ErrNoRewrite {
p.eventStore.storageLimitTracker.flattenMu.RUnlock()
return err
}
p.eventStore.storageLimitTracker.flattenMu.RUnlock()
}
}
})
Expand Down Expand Up @@ -623,10 +659,36 @@ const (
storageLimitThreshold = 0.90 // Allow 90% of the quota to be used.
)

var (
ErrFlattenInProgress = errors.New("flatten in progress, rejecting writes to badger")
)

type storageLimitTracker struct {
anyWrite, anyWriteSuccess atomic.Bool
flattenMu sync.RWMutex
}

func (s *storageLimitTracker) reset() {
s.anyWrite.Store(false)
s.anyWriteSuccess.Store(false)
}

func (s *storageLimitTracker) recordError(isErr bool) {
s.anyWrite.Store(true)
if !isErr {
s.anyWriteSuccess.Store(true)
}
}

func (s *storageLimitTracker) isStuck() bool {
return s.anyWrite.Load() && !s.anyWriteSuccess.Load()
}

// wrappedRW wraps configurable write options for global ShardedReadWriter
type wrappedRW struct {
rw *eventstorage.ShardedReadWriter
writerOpts eventstorage.WriterOpts
rw *eventstorage.ShardedReadWriter
writerOpts eventstorage.WriterOpts
storageLimitTracker storageLimitTracker
}

// Stored entries expire after ttl.
Expand Down Expand Up @@ -654,11 +716,23 @@ func (s *wrappedRW) ReadTraceEvents(traceID string, out *modelpb.Batch) error {

// WriteTraceEvents calls ShardedReadWriter.WriteTraceEvents using the configured WriterOpts
func (s *wrappedRW) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent) error {
return s.rw.WriteTraceEvent(traceID, id, event, s.writerOpts)
ok := s.storageLimitTracker.flattenMu.TryRLock()
if !ok {
return ErrFlattenInProgress
}
defer s.storageLimitTracker.flattenMu.RUnlock()
err := s.rw.WriteTraceEvent(traceID, id, event, s.writerOpts)
s.storageLimitTracker.recordError(errors.Is(err, eventstorage.ErrLimitReached))
return err
}

// WriteTraceSampled calls ShardedReadWriter.WriteTraceSampled using the configured WriterOpts
func (s *wrappedRW) WriteTraceSampled(traceID string, sampled bool) error {
ok := s.storageLimitTracker.flattenMu.TryRLock()
if !ok {
return ErrFlattenInProgress
}
defer s.storageLimitTracker.flattenMu.RUnlock()
return s.rw.WriteTraceSampled(traceID, sampled, s.writerOpts)
}

Expand All @@ -669,6 +743,11 @@ func (s *wrappedRW) IsTraceSampled(traceID string) (bool, error) {

// DeleteTraceEvent calls ShardedReadWriter.DeleteTraceEvent
func (s *wrappedRW) DeleteTraceEvent(traceID, id string) error {
ok := s.storageLimitTracker.flattenMu.TryRLock()
if !ok {
return ErrFlattenInProgress
}
defer s.storageLimitTracker.flattenMu.RUnlock()
return s.rw.DeleteTraceEvent(traceID, id)
}

Expand Down
Loading