-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: [Rhythm] Block-builder consumption loop #4480
base: main-rhythm
Are you sure you want to change the base?
Conversation
…to work using real consumergroup functionality
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
case <-time.After(waitTime): | ||
err := b.consume(ctx) | ||
if err != nil { | ||
b.logger.Log("msg", "consumeCycle failed", "err", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b.logger.Log("msg", "consumeCycle failed", "err", err) | |
level.Error(b.logger).Log("msg", "consumeCycle failed", "err", err) |
return false, err | ||
} | ||
|
||
lastCommit, ok := commits.Lookup(topic, partition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lastCommit, ok := commits.Lookup(topic, partition) | |
lastCommit, exists := commits.Lookup(topic, partition) | |
if exists && lastCommit.At >= 0 { | |
startOffset = startOffset.At(lastCommit.At) | |
} else { | |
startOffset = kgo.NewOffset().AtStart() | |
} |
https://pkg.go.dev/github.com/twmb/franz-go/pkg/kadm@v1.14.0#OffsetResponses.Lookup
} | ||
|
||
err := b.pushTraces(rec.Key, rec.Value, writer) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if something is wrong with the WAL? I guess it will enter in a loop
What this PR does:
This is an alternate block-builder consumption loop that I think has benefits and would like to get feedback on.
The curent loop can be thought of as top-down. It calculates the total time range that the block builder is lagging, splits it into smaller sections (i.e. 5 minutes), consume/flush/commit each section.
This new loop is: while more data, start at last commit and consume/flush/commit another chunk of data (i.e. 5 minutes).
The benefits are:
The drawbacks are:
TODO
TestBlockbuilder_lookbackOnNoCommit
to show an example of how it can be fixed. Specifically, it needs to run with consumer group support, and remove:because this means the commits are not actually committed.
Which issue(s) this PR fixes:
Fixes #
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]