Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [Rhythm] Block-builder consumption loop #4480

Draft
wants to merge 4 commits into
base: main-rhythm
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 167 additions & 3 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"

Expand All @@ -27,14 +28,15 @@ import (
const (
blockBuilderServiceName = "block-builder"
ConsumerGroup = "block-builder"
pollTimeout = 2 * time.Second
)

var (
metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "partition_lag",
Help: "Lag of a partition.",
Name: "partition_lag_s",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Name: "partition_lag_s",
Name: "partition_lag_seconds",

https://prometheus.io/docs/practices/naming/#metric-names

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the number of pending records is also relevant. I think we should keep the original metric and add one for time.

Help: "Lag of a partition in seconds.",
}, []string{"partition"})
metricConsumeCycleDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "tempo",
Expand Down Expand Up @@ -67,6 +69,7 @@ type BlockBuilder struct {
fallbackOffsetMillis int64

kafkaClient *kgo.Client
kadm *kadm.Client
decoder *ingest.Decoder
partitionRing ring.PartitionRingReader

Expand Down Expand Up @@ -142,10 +145,12 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) {
return fmt.Errorf("failed to ping kafka: %w", err)
}

b.kadm = kadm.NewClient(b.kafkaClient)

return nil
}

func (b *BlockBuilder) running(ctx context.Context) error {
func (b *BlockBuilder) runningOld(ctx context.Context) error {
// Initial polling and delay
cycleEndTime := cycleEndAtStartup(time.Now(), b.cfg.ConsumeCycleDuration)
waitTime := 2 * time.Second
Expand All @@ -167,6 +172,165 @@ func (b *BlockBuilder) running(ctx context.Context) error {
}
}

func (b *BlockBuilder) running(ctx context.Context) error {
// Initial delay
waitTime := 0 * time.Second
for {
select {
case <-time.After(waitTime):
err := b.consume(ctx)
if err != nil {
b.logger.Log("msg", "consumeCycle failed", "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
b.logger.Log("msg", "consumeCycle failed", "err", err)
level.Error(b.logger).Log("msg", "consumeCycle failed", "err", err)

}

// Real delay on subsequent
waitTime = b.cfg.ConsumeCycleDuration
case <-ctx.Done():
return nil
}
}
}

func (b *BlockBuilder) consume(ctx context.Context) error {
var (
end = time.Now()
partitions = b.getAssignedActivePartitions()
)

level.Info(b.logger).Log("msg", "starting consume cycle", "cycle_end", end, "active_partitions", partitions)
defer func(t time.Time) { metricConsumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now())

for _, partition := range partitions {
// Consume partition while data remains.
// TODO - round-robin one consumption per partition instead to equalize catch-up time.
for {
more, err := b.consumePartition2(ctx, partition, end)
if err != nil {
return err
}

if !more {
break
}
}
}

return nil
}

func (b *BlockBuilder) consumePartition2(ctx context.Context, partition int32, end time.Time) (more bool, err error) {
defer func(t time.Time) {
metricProcessPartitionSectionDuration.WithLabelValues(strconv.Itoa(int(partition))).Observe(time.Since(t).Seconds())
}(time.Now())

var (
dur = b.cfg.ConsumeCycleDuration
topic = b.cfg.IngestStorageConfig.Kafka.Topic
group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup
startOffset kgo.Offset
writer *writer
lastRec *kgo.Record
begin time.Time
)

commits, err := b.kadm.FetchOffsetsForTopics(ctx, group, topic)
if err != nil {
return false, err
}

lastCommit, ok := commits.Lookup(topic, partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
lastCommit, ok := commits.Lookup(topic, partition)
lastCommit, exists := commits.Lookup(topic, partition)
if exists && lastCommit.At >= 0 {
startOffset = startOffset.At(lastCommit.At)
} else {
startOffset = kgo.NewOffset().AtStart()
}

https://pkg.go.dev/github.com/twmb/franz-go/pkg/kadm@v1.14.0#OffsetResponses.Lookup

if ok && lastCommit.At >= 0 {
startOffset = startOffset.At(lastCommit.At)
} else {
startOffset = kgo.NewOffset().AtStart()
}

// We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment).
// This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously.
// In the end, we remove the partition from the client (refer to the defer below) to guarantee the client always consumes
// from one partition at a time. I.e. when this partition is consumed, we start consuming the next one.
b.kafkaClient.AddConsumePartitions(map[string]map[int32]kgo.Offset{
topic: {
partition: startOffset,
},
})
defer b.kafkaClient.RemoveConsumePartitions(map[string][]int32{topic: {partition}})

outer:
for {
fetches := func() kgo.Fetches {
ctx2, cancel := context.WithTimeout(ctx, pollTimeout)
defer cancel()
return b.kafkaClient.PollFetches(ctx2)
}()
err = fetches.Err()
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
// No more data
more = false
break
}
metricFetchErrors.WithLabelValues(strconv.Itoa(int(partition))).Inc()
return false, err
}

if fetches.Empty() {
more = false
break
}

for iter := fetches.RecordIter(); !iter.Done(); {
rec := iter.Next()

// Initialize if needed
if writer == nil {

// Determine begin and end time range, which is -/+ cycle duration.
// But don't exceed the given overall end time.
begin = rec.Timestamp.Add(-dur)
if rec.Timestamp.Add(dur).Before(end) {
end = rec.Timestamp.Add(dur)
}
Comment on lines +288 to +293
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a strange side-effect of the writer being nil. Cycle initialisation could be consolidated in one place instead.


metricPartitionLag.WithLabelValues(strconv.Itoa(int(partition))).Set(time.Since(rec.Timestamp).Seconds())

writer = newPartitionSectionWriter(b.logger, int64(partition), rec.Offset, b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
}

if rec.Timestamp.Before(begin) || rec.Timestamp.After(end) {
Copy link
Member

@mapno mapno Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure if we should break for records being too old. The record ts is set by the produced—the distributor in this case. It's not based on the trace's time.

If a record was so old as to be outside of the cycle's start time, I think we'd want to put it in the current block. Otherwise it could impact consumption too much, creating too small blocks.

break outer
}

err := b.pushTraces(rec.Key, rec.Value, writer)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if something is wrong with the WAL? I guess it will enter in a loop

return false, err
}

lastRec = rec
}
}

if lastRec == nil {
// Received no data
return false, nil
}

err = writer.flush(ctx, b.writer)
if err != nil {
return false, err
}

resp, err := b.kadm.CommitOffsets(ctx, group, kadm.OffsetsFromRecords(*lastRec))
if err != nil {
return false, err
}
if err := resp.Error(); err != nil {
return false, err
}

return more, nil
}

func (b *BlockBuilder) stopping(err error) error {
if b.kafkaClient != nil {
b.kafkaClient.Close()
Expand Down
15 changes: 3 additions & 12 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,10 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) {
ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

k, address := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test-topic")
k, _ := testkafka.CreateCluster(t, 1, "test-topic")
t.Cleanup(k.Close)

kafkaCommits := atomic.NewInt32(0)
k.ControlKey(kmsg.OffsetCommit.Int16(), func(kmsg.Request) (kmsg.Response, error, bool) {
kafkaCommits.Add(1)
return nil, nil, false
})
address := k.ListenAddrs()[0]

store := newStore(ctx, t)
cfg := blockbuilderConfig(t, address)
Expand All @@ -59,14 +55,9 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) {
client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
sendReq(t, ctx, client)

// Wait for record to be consumed and committed.
require.Eventually(t, func() bool {
return kafkaCommits.Load() > 0
}, time.Minute, time.Second)

// Wait for the block to be flushed.
require.Eventually(t, func() bool {
return len(store.BlockMetas(util.FakeTenantID)) == 1
return len(store.BlockMetas(util.FakeTenantID)) == 1 && store.BlockMetas(util.FakeTenantID)[0].TotalObjects == 1
}, time.Minute, time.Second)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingest/testkafka/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func addSupportForConsumerGroups(t testing.TB, cluster *kfake.Cluster, topicName
// This mimics the real Kafka behaviour.
var partitionsResp []kmsg.OffsetFetchResponseGroupTopicPartition
if partitionID == allPartitions {
for i := int32(1); i < numPartitions+1; i++ {
for i := int32(0); i < numPartitions; i++ {
if committedOffsets[consumerGroup][i] >= 0 {
partitionsResp = append(partitionsResp, kmsg.OffsetFetchResponseGroupTopicPartition{
Partition: i,
Expand Down
Loading