-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: [Rhythm] Block-builder consumption loop #4480
base: main-rhythm
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -4,6 +4,7 @@ import ( | |||||||||||||||
"context" | ||||||||||||||||
"errors" | ||||||||||||||||
"fmt" | ||||||||||||||||
"strconv" | ||||||||||||||||
"sync" | ||||||||||||||||
"time" | ||||||||||||||||
|
||||||||||||||||
|
@@ -27,14 +28,15 @@ import ( | |||||||||||||||
const ( | ||||||||||||||||
blockBuilderServiceName = "block-builder" | ||||||||||||||||
ConsumerGroup = "block-builder" | ||||||||||||||||
pollTimeout = 2 * time.Second | ||||||||||||||||
) | ||||||||||||||||
|
||||||||||||||||
var ( | ||||||||||||||||
metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{ | ||||||||||||||||
Namespace: "tempo", | ||||||||||||||||
Subsystem: "block_builder", | ||||||||||||||||
Name: "partition_lag", | ||||||||||||||||
Help: "Lag of a partition.", | ||||||||||||||||
Name: "partition_lag_s", | ||||||||||||||||
Help: "Lag of a partition in seconds.", | ||||||||||||||||
}, []string{"partition"}) | ||||||||||||||||
metricConsumeCycleDuration = promauto.NewHistogram(prometheus.HistogramOpts{ | ||||||||||||||||
Namespace: "tempo", | ||||||||||||||||
|
@@ -67,6 +69,7 @@ type BlockBuilder struct { | |||||||||||||||
fallbackOffsetMillis int64 | ||||||||||||||||
|
||||||||||||||||
kafkaClient *kgo.Client | ||||||||||||||||
kadm *kadm.Client | ||||||||||||||||
decoder *ingest.Decoder | ||||||||||||||||
partitionRing ring.PartitionRingReader | ||||||||||||||||
|
||||||||||||||||
|
@@ -142,10 +145,12 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) { | |||||||||||||||
return fmt.Errorf("failed to ping kafka: %w", err) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
b.kadm = kadm.NewClient(b.kafkaClient) | ||||||||||||||||
|
||||||||||||||||
return nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (b *BlockBuilder) running(ctx context.Context) error { | ||||||||||||||||
func (b *BlockBuilder) runningOld(ctx context.Context) error { | ||||||||||||||||
// Initial polling and delay | ||||||||||||||||
cycleEndTime := cycleEndAtStartup(time.Now(), b.cfg.ConsumeCycleDuration) | ||||||||||||||||
waitTime := 2 * time.Second | ||||||||||||||||
|
@@ -167,6 +172,165 @@ func (b *BlockBuilder) running(ctx context.Context) error { | |||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (b *BlockBuilder) running(ctx context.Context) error { | ||||||||||||||||
// Initial delay | ||||||||||||||||
waitTime := 0 * time.Second | ||||||||||||||||
for { | ||||||||||||||||
select { | ||||||||||||||||
case <-time.After(waitTime): | ||||||||||||||||
err := b.consume(ctx) | ||||||||||||||||
if err != nil { | ||||||||||||||||
b.logger.Log("msg", "consumeCycle failed", "err", err) | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Real delay on subsequent | ||||||||||||||||
waitTime = b.cfg.ConsumeCycleDuration | ||||||||||||||||
case <-ctx.Done(): | ||||||||||||||||
return nil | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (b *BlockBuilder) consume(ctx context.Context) error { | ||||||||||||||||
var ( | ||||||||||||||||
end = time.Now() | ||||||||||||||||
partitions = b.getAssignedActivePartitions() | ||||||||||||||||
) | ||||||||||||||||
|
||||||||||||||||
level.Info(b.logger).Log("msg", "starting consume cycle", "cycle_end", end, "active_partitions", partitions) | ||||||||||||||||
defer func(t time.Time) { metricConsumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now()) | ||||||||||||||||
|
||||||||||||||||
for _, partition := range partitions { | ||||||||||||||||
// Consume partition while data remains. | ||||||||||||||||
// TODO - round-robin one consumption per partition instead to equalize catch-up time. | ||||||||||||||||
for { | ||||||||||||||||
more, err := b.consumePartition2(ctx, partition, end) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
if !more { | ||||||||||||||||
break | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
return nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (b *BlockBuilder) consumePartition2(ctx context.Context, partition int32, end time.Time) (more bool, err error) { | ||||||||||||||||
defer func(t time.Time) { | ||||||||||||||||
metricProcessPartitionSectionDuration.WithLabelValues(strconv.Itoa(int(partition))).Observe(time.Since(t).Seconds()) | ||||||||||||||||
}(time.Now()) | ||||||||||||||||
|
||||||||||||||||
var ( | ||||||||||||||||
dur = b.cfg.ConsumeCycleDuration | ||||||||||||||||
topic = b.cfg.IngestStorageConfig.Kafka.Topic | ||||||||||||||||
group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup | ||||||||||||||||
startOffset kgo.Offset | ||||||||||||||||
writer *writer | ||||||||||||||||
lastRec *kgo.Record | ||||||||||||||||
begin time.Time | ||||||||||||||||
) | ||||||||||||||||
|
||||||||||||||||
commits, err := b.kadm.FetchOffsetsForTopics(ctx, group, topic) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return false, err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
lastCommit, ok := commits.Lookup(topic, partition) | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
https://pkg.go.dev/github.com/twmb/franz-go/pkg/kadm@v1.14.0#OffsetResponses.Lookup |
||||||||||||||||
if ok && lastCommit.At >= 0 { | ||||||||||||||||
startOffset = startOffset.At(lastCommit.At) | ||||||||||||||||
} else { | ||||||||||||||||
startOffset = kgo.NewOffset().AtStart() | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment). | ||||||||||||||||
// This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously. | ||||||||||||||||
// In the end, we remove the partition from the client (refer to the defer below) to guarantee the client always consumes | ||||||||||||||||
// from one partition at a time. I.e. when this partition is consumed, we start consuming the next one. | ||||||||||||||||
b.kafkaClient.AddConsumePartitions(map[string]map[int32]kgo.Offset{ | ||||||||||||||||
topic: { | ||||||||||||||||
partition: startOffset, | ||||||||||||||||
}, | ||||||||||||||||
}) | ||||||||||||||||
defer b.kafkaClient.RemoveConsumePartitions(map[string][]int32{topic: {partition}}) | ||||||||||||||||
|
||||||||||||||||
outer: | ||||||||||||||||
for { | ||||||||||||||||
fetches := func() kgo.Fetches { | ||||||||||||||||
ctx2, cancel := context.WithTimeout(ctx, pollTimeout) | ||||||||||||||||
defer cancel() | ||||||||||||||||
return b.kafkaClient.PollFetches(ctx2) | ||||||||||||||||
}() | ||||||||||||||||
err = fetches.Err() | ||||||||||||||||
if err != nil { | ||||||||||||||||
if errors.Is(err, context.DeadlineExceeded) { | ||||||||||||||||
// No more data | ||||||||||||||||
more = false | ||||||||||||||||
break | ||||||||||||||||
} | ||||||||||||||||
metricFetchErrors.WithLabelValues(strconv.Itoa(int(partition))).Inc() | ||||||||||||||||
return false, err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
if fetches.Empty() { | ||||||||||||||||
more = false | ||||||||||||||||
break | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
for iter := fetches.RecordIter(); !iter.Done(); { | ||||||||||||||||
rec := iter.Next() | ||||||||||||||||
|
||||||||||||||||
// Initialize if needed | ||||||||||||||||
if writer == nil { | ||||||||||||||||
|
||||||||||||||||
// Determine begin and end time range, which is -/+ cycle duration. | ||||||||||||||||
// But don't exceed the given overall end time. | ||||||||||||||||
begin = rec.Timestamp.Add(-dur) | ||||||||||||||||
if rec.Timestamp.Add(dur).Before(end) { | ||||||||||||||||
end = rec.Timestamp.Add(dur) | ||||||||||||||||
} | ||||||||||||||||
Comment on lines
+288
to
+293
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels like a strange side-effect of the |
||||||||||||||||
|
||||||||||||||||
metricPartitionLag.WithLabelValues(strconv.Itoa(int(partition))).Set(time.Since(rec.Timestamp).Seconds()) | ||||||||||||||||
|
||||||||||||||||
writer = newPartitionSectionWriter(b.logger, int64(partition), rec.Offset, b.cfg.BlockConfig, b.overrides, b.wal, b.enc) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
if rec.Timestamp.Before(begin) || rec.Timestamp.After(end) { | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unsure if we should break for records being too old. The record ts is set by the produced—the distributor in this case. It's not based on the trace's time. If a record was so old as to be outside of the cycle's start time, I think we'd want to put it in the current block. Otherwise it could impact consumption too much, creating too small blocks. |
||||||||||||||||
break outer | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
err := b.pushTraces(rec.Key, rec.Value, writer) | ||||||||||||||||
if err != nil { | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What will happen if something is wrong with the WAL? I guess it will enter in a loop |
||||||||||||||||
return false, err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
lastRec = rec | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
if lastRec == nil { | ||||||||||||||||
// Received no data | ||||||||||||||||
return false, nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
err = writer.flush(ctx, b.writer) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return false, err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
resp, err := b.kadm.CommitOffsets(ctx, group, kadm.OffsetsFromRecords(*lastRec)) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return false, err | ||||||||||||||||
} | ||||||||||||||||
if err := resp.Error(); err != nil { | ||||||||||||||||
return false, err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
return more, nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (b *BlockBuilder) stopping(err error) error { | ||||||||||||||||
if b.kafkaClient != nil { | ||||||||||||||||
b.kafkaClient.Close() | ||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://prometheus.io/docs/practices/naming/#metric-names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO the number of pending records is also relevant. I think we should keep the original metric and add one for time.