Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TBS: drop and recreate badger db after exceeding storage limit for TTL time #15106

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1e4d5b3
TBS: drop and recreate badger db after exceeding storage limit for TT…
carsonip Jan 3, 2025
0b9bcc1
Hardcode file mode 0700
carsonip Jan 3, 2025
25957a6
Ensure backupDir does not exist
carsonip Jan 3, 2025
092b587
Make dropAndRecreate private
carsonip Jan 3, 2025
8f6ac0e
Fix typo
carsonip Jan 3, 2025
c8cc7a4
Return error in dropAndRecreate
carsonip Jan 3, 2025
962905f
Remove duplicate reset
carsonip Jan 3, 2025
82dc615
Test DropAndRecreate
carsonip Jan 3, 2025
266f735
Log a warning on first exceeded
carsonip Jan 3, 2025
f1e11bd
Add missing sm.Close
carsonip Jan 3, 2025
a2aa762
Add FIXME
carsonip Jan 6, 2025
817e69f
Fix unprotected db access
carsonip Jan 6, 2025
e9a4957
Fix mutex release
carsonip Jan 6, 2025
6efb732
Explain why loops are run in processor not StorageManager
carsonip Jan 6, 2025
dd8dc17
Expose Run only
carsonip Jan 6, 2025
4da8534
Explain not flushing
carsonip Jan 6, 2025
e2d52ec
Add missing RLock for gc
carsonip Jan 6, 2025
1c39e98
Lazy init to avoid deadlock
carsonip Jan 6, 2025
0195098
Fix whitebox test
carsonip Jan 6, 2025
8d503d1
Use ReadSubscriberPosition()
carsonip Jan 6, 2025
7e62985
Refactor, add missing config.Storage reload, write after dropAndRecreate
carsonip Jan 6, 2025
786ad51
Report size in log
carsonip Jan 6, 2025
33abec0
Add buffer
carsonip Jan 6, 2025
0b3e742
Run dropLoop on startup and every 1m
carsonip Jan 6, 2025
6113a71
Merge branch 'main' into tbs-recover-from-storage-limit-exceeded-rm-db
carsonip Jan 6, 2025
f6340f2
Delete badger files in-place
carsonip Jan 7, 2025
ae08526
Gracefully handle dropAndRecreate errors
carsonip Jan 7, 2025
5da092a
Return error right away as dropping db may take time
carsonip Jan 7, 2025
bf4bfba
Consider storageLimitThreshold
carsonip Jan 7, 2025
fc4ed8b
Format float
carsonip Jan 7, 2025
bcfc1db
Merge branch 'main' into tbs-recover-from-storage-limit-exceeded-rm-db
carsonip Jan 7, 2025
3c00d97
Add link to issue
carsonip Jan 7, 2025
71e3be2
Merge branch 'main' into tbs-recover-from-storage-limit-exceeded-rm-db
carsonip Jan 7, 2025
9d15913
Log deleted files
carsonip Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions x-pack/apm-server/sampling/eventstorage/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package eventstorage

import (
"errors"
"fmt"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -34,9 +36,14 @@ type StorageManager struct {
storage *Storage
rw *ShardedReadWriter

// mu guards db, storage, and rw swaps.
mu sync.RWMutex
// subscriberPosMu protects the subscriber file from concurrent RW.
subscriberPosMu sync.Mutex

// dropLoopCh acts as a mutex to ensure that there is only 1 active RunDropLoop per StorageManager,
// as it is possible that 2 separate RunDropLoop are created by 2 TBS processors during a hot reload.
dropLoopCh chan struct{}
// gcLoopCh acts as a mutex to ensure only 1 gc loop is running per StorageManager.
// as it is possible that 2 separate RunGCLoop are created by 2 TBS processors during a hot reload.
gcLoopCh chan struct{}
Expand All @@ -46,6 +53,7 @@ type StorageManager struct {
func NewStorageManager(storageDir string) (*StorageManager, error) {
sm := &StorageManager{
storageDir: storageDir,
dropLoopCh: make(chan struct{}, 1),
gcLoopCh: make(chan struct{}, 1),
logger: logp.NewLogger(logs.Sampling),
}
Expand Down Expand Up @@ -107,16 +115,120 @@ func (s *StorageManager) runValueLogGC(discardRatio float64) error {
return s.db.RunValueLogGC(discardRatio)
}

// RunDropLoop runs a loop that detects if storage limit has been exceeded for at least ttl.
// If so, it drops and recreates the underlying badger DB.
// The loop stops when it receives from stopping.
func (s *StorageManager) RunDropLoop(stopping <-chan struct{}, ttl time.Duration, storageLimitInBytes uint64) error {
carsonip marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-stopping:
return nil
case s.dropLoopCh <- struct{}{}:
}
defer func() {
<-s.dropLoopCh
}()

if storageLimitInBytes == 0 {
<-stopping
return nil
}

timer := time.NewTicker(min(time.Minute, ttl)) // Eval db size every minute as badger reports them with 1m lag, but use min to facilitate testing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting DB size should be a relatively cheap operation so we can consider lowering ticker to be more frequent. This should prevent a configuration where TTL is low sub-minute to swap DB frequently after just 2 limit hits in a row.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand here.

This should prevent a configuration where TTL is low sub-minute to swap DB frequently after just 2 limit hits in a row.

if TTL < 1m, DB will be swapped after 2*TTL. (I actually consider this pathological as badger db size reporting comes with 1m delay)
if TTL >=1m, DB will be swapped after at max TTL+1m.

Do you think we should fix TTL <1m? It will increase test run time to 1m, but will fix swapping DB frequently for sub-minute TTL (which I don't think has much value in reality). I'm inclined to fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, I didn't realize the size is getting recalculated every 1m. Then I think the current implementation is fine.
I was mostly afraid of the edge case situation when a TTL is set to a low value like 5s and DB is running close to the max disk limit, then even if compaction is making progress we might perform DB swap every other minute if we are unlucky and check twice in a row when the DB is full.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually afraid of the situation that TTL is way lower than a minute e.g. 5s, causing storage limit check to be done on the same 1m cached reported db size. I have the min() so that the test doesn't take a minute to run, but this edge case worries me a bit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 0b3e742 but TestDropLoop tests will take 1m to run.

defer timer.Stop()
var firstExceeded time.Time
for {
select {
case <-stopping:
return nil
case <-timer.C:
lsm, vlog := s.Size()
if uint64(lsm+vlog) >= storageLimitInBytes { //FIXME: Add a bit of buffer? Is s.storage.pendingSize reliable enough?
now := time.Now()
if firstExceeded.IsZero() {
firstExceeded = now
carsonip marked this conversation as resolved.
Show resolved Hide resolved
s.logger.Warnf("badger db size has exceeded storage limit; db will be dropped and recreated if problem persists for `sampling.tail.ttl` (%s)", ttl.String())
carsonip marked this conversation as resolved.
Show resolved Hide resolved
}
if now.Sub(firstExceeded) >= ttl {
s.logger.Warnf("badger db size has exceeded storage limit for over `sampling.tail.ttl` (%s), please consider increasing `sampling.tail.storage_limit`; dropping and recreating badger db to recover", ttl.String())
err := s.dropAndRecreate()
if err != nil {
return fmt.Errorf("error dropping and recreating badger db to recover storage space: %w", err)
}
s.logger.Info("badger db dropped and recreated")
}
} else {
firstExceeded = time.Time{}
}
}
}
}

func (s *StorageManager) Close() error {
s.mu.RLock()
defer s.mu.RUnlock()
s.rw.Close()
return s.db.Close()
}

// Size returns the db size
//
// Caller should either be main Run loop or should be holding RLock already
func (s *StorageManager) Size() (lsm, vlog int64) {
carsonip marked this conversation as resolved.
Show resolved Hide resolved
return s.db.Size()
}

func getBackupPath(path string) string {
return filepath.Join(filepath.Dir(path), filepath.Base(path)+".old")
carsonip marked this conversation as resolved.
Show resolved Hide resolved
}

// dropAndRecreate deletes the underlying badger DB at a file system level, and replaces it with a new badger DB.
func (s *StorageManager) dropAndRecreate() error {
backupPath := getBackupPath(s.storageDir)
if err := os.RemoveAll(backupPath); err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("error removing existing backup dir: %w", err)
}

s.mu.Lock()
s.rw.Close()
err := s.db.Close()
if err != nil {
return fmt.Errorf("error closing badger db: %w", err)
}

s.subscriberPosMu.Lock()
err = os.Rename(s.storageDir, backupPath)
if err != nil {
return fmt.Errorf("error backing up existing badger db: %w", err)
}

// Since subscriber position file lives in the same tail sampling directory as badger DB,
// Create tail sampling dir, move back subscriber position file, as it is not a part of the DB.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change the subcriber position directory to be a separate directory than badger data? All the orchestration required to move this file is just not needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll still need to maintain backwards compatibility. e.g. even if we ship a migration in 8.16.3, we'll need to keep backwards compatibility code at apm-server startup time to read from a pre-8.16.3 directory structure and move to a 8.16.3 directory structure. Then the dropAndRecreate implementation can be much cleaner. I was originally thinking about doing the move in 9.0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even if we ship a migration in 8.16.3, we'll need to keep backwards compatibility code at apm-server startup time to read from a pre-8.16.3 directory structure and move to a 8.16.3 directory structure

Hmm, is it critical for us to keep the subscriber position data for restarts? Will it result in starting from zero and rereading all sampled traces?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it result in starting from zero and rereading all sampled traces?

Not 100% sure. Need to dig deeper.

is it critical for us to keep the subscriber position data for restarts?

Either a performance hit to reread all sampling decisions, or data loss if it does not reread sampling decisions made during apm-server restart.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be a blocker? I think that the scope of this PR is fairly large already, not sure if we should further expand the scope.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely not a blocker though I don't personally like either of the renames (this one + the one we do earlier to the directory). But, feel free to go ahead with merge if you have approving reviews as I might not be able to get to this today.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I'm raising this is that in an ideal world instead of subscriber position being in a subdirectory, I think we should move badger into a subdirectory. And the risk factor may be different.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Personally I hate all the fs interactions. There are too many ways to fail - think about owner, file mode, file systems)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Personally I hate all the fs interactions. There are too many ways to fail - think about owner, file mode, file systems)

Exactly, too many pieces for comfort for me.


// Use mode 0700 as hardcoded in badger: https://github.com/dgraph-io/badger/blob/c5b434a643bbea0c0075d9b7336b496403d0f399/db.go#L1778
err = os.Mkdir(s.storageDir, 0700)
if err != nil {
return fmt.Errorf("error creating storage directory: %w", err)
}
err = os.Rename(filepath.Join(backupPath, subscriberPositionFile), filepath.Join(s.storageDir, subscriberPositionFile))
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("error copying subscriber position file: %w", err)
}

err = s.reset()
if err != nil {
return fmt.Errorf("error creating new badger db: %w", err)
}
s.subscriberPosMu.Unlock()
s.mu.Unlock()

err = os.RemoveAll(backupPath)
if err != nil {
return fmt.Errorf("error removing old badger db: %w", err)
}

return nil
}

func (s *StorageManager) ReadSubscriberPosition() ([]byte, error) {
s.subscriberPosMu.Lock()
defer s.subscriberPosMu.Unlock()
Expand All @@ -142,26 +254,38 @@ type ManagedReadWriter struct {
}

func (s *ManagedReadWriter) ReadTraceEvents(traceID string, out *modelpb.Batch) error {
s.sm.mu.RLock()
defer s.sm.mu.RUnlock()
return s.sm.rw.ReadTraceEvents(traceID, out)
}

func (s *ManagedReadWriter) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent, opts WriterOpts) error {
s.sm.mu.RLock()
carsonip marked this conversation as resolved.
Show resolved Hide resolved
defer s.sm.mu.RUnlock()
return s.sm.rw.WriteTraceEvent(traceID, id, event, opts)
}

func (s *ManagedReadWriter) WriteTraceSampled(traceID string, sampled bool, opts WriterOpts) error {
s.sm.mu.RLock()
defer s.sm.mu.RUnlock()
return s.sm.rw.WriteTraceSampled(traceID, sampled, opts)
}

func (s *ManagedReadWriter) IsTraceSampled(traceID string) (bool, error) {
s.sm.mu.RLock()
defer s.sm.mu.RUnlock()
return s.sm.rw.IsTraceSampled(traceID)
}

func (s *ManagedReadWriter) DeleteTraceEvent(traceID, id string) error {
s.sm.mu.RLock()
defer s.sm.mu.RUnlock()
return s.sm.rw.DeleteTraceEvent(traceID, id)
}

func (s *ManagedReadWriter) Flush() error {
s.sm.mu.RLock()
defer s.sm.mu.RUnlock()
return s.sm.rw.Flush()
}

Expand Down
77 changes: 77 additions & 0 deletions x-pack/apm-server/sampling/eventstorage/storage_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package eventstorage

import (
"fmt"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDropAndRecreate_backupPathExists(t *testing.T) {
tempDir := t.TempDir()
sm, err := NewStorageManager(tempDir)
require.NoError(t, err)
defer sm.Close()

backupPath := getBackupPath(tempDir)
err = os.Mkdir(backupPath, 0700)
require.NoError(t, err)

err = sm.dropAndRecreate()
assert.NoError(t, err)
}

func TestDropAndRecreate_directoryRecreated(t *testing.T) {
tempDir := t.TempDir()
sm, err := NewStorageManager(tempDir)
require.NoError(t, err)
defer sm.Close()

stat, err := os.Stat(tempDir)
assert.NoError(t, err)
assert.True(t, stat.IsDir())

err = sm.dropAndRecreate()
assert.NoError(t, err)

newStat, err := os.Stat(tempDir)
assert.NoError(t, err)
assert.True(t, newStat.IsDir())
assert.Greater(t, newStat.ModTime(), stat.ModTime())

backupPath := getBackupPath(tempDir)
_, err = os.Stat(backupPath)
assert.ErrorIs(t, err, os.ErrNotExist)
}

func TestDropAndRecreate_subscriberPositionFile(t *testing.T) {
for _, exists := range []bool{true, false} {
t.Run(fmt.Sprintf("exists=%t", exists), func(t *testing.T) {
tempDir := t.TempDir()
sm, err := NewStorageManager(tempDir)
require.NoError(t, err)
defer sm.Close()

if exists {
err := sm.WriteSubscriberPosition([]byte("{}"))
require.NoError(t, err)
}

err = sm.dropAndRecreate()
assert.NoError(t, err)

data, err := sm.ReadSubscriberPosition()
if exists {
assert.Equal(t, data, []byte("{}"))
} else {
assert.ErrorIs(t, err, os.ErrNotExist)
}
})
}
}
3 changes: 3 additions & 0 deletions x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ func (p *Processor) Run() error {
g.Go(func() error {
return p.config.DB.RunGCLoop(p.stopping, p.config.StorageGCInterval)
})
g.Go(func() error {
return p.config.DB.RunDropLoop(p.stopping, p.config.TTL, p.config.StorageLimit)
})
g.Go(func() error {
// Subscribe to remotely sampled trace IDs. This is cancelled immediately when
// Stop is called. But it is possible that both old and new subscriber goroutines
Expand Down
Loading
Loading