diff --git a/modules/blockbuilder/blockbuilder.go b/modules/blockbuilder/blockbuilder.go index 29ca0895c3c..ca2b079d5dc 100644 --- a/modules/blockbuilder/blockbuilder.go +++ b/modules/blockbuilder/blockbuilder.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strconv" "sync" "time" @@ -27,14 +28,15 @@ import ( const ( blockBuilderServiceName = "block-builder" ConsumerGroup = "block-builder" + pollTimeout = 2 * time.Second ) var ( metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "tempo", Subsystem: "block_builder", - Name: "partition_lag", - Help: "Lag of a partition.", + Name: "partition_lag_s", + Help: "Lag of a partition in seconds.", }, []string{"partition"}) metricConsumeCycleDuration = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "tempo", @@ -67,6 +69,7 @@ type BlockBuilder struct { fallbackOffsetMillis int64 kafkaClient *kgo.Client + kadm *kadm.Client decoder *ingest.Decoder partitionRing ring.PartitionRingReader @@ -142,10 +145,12 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) { return fmt.Errorf("failed to ping kafka: %w", err) } + b.kadm = kadm.NewClient(b.kafkaClient) + return nil } -func (b *BlockBuilder) running(ctx context.Context) error { +func (b *BlockBuilder) runningOld(ctx context.Context) error { // Initial polling and delay cycleEndTime := cycleEndAtStartup(time.Now(), b.cfg.ConsumeCycleDuration) waitTime := 2 * time.Second @@ -167,6 +172,165 @@ func (b *BlockBuilder) running(ctx context.Context) error { } } +func (b *BlockBuilder) running(ctx context.Context) error { + // Initial delay + waitTime := 0 * time.Second + for { + select { + case <-time.After(waitTime): + err := b.consume(ctx) + if err != nil { + b.logger.Log("msg", "consumeCycle failed", "err", err) + } + + // Real delay on subsequent + waitTime = b.cfg.ConsumeCycleDuration + case <-ctx.Done(): + return nil + } + } +} + +func (b *BlockBuilder) consume(ctx context.Context) error { + var ( + end = time.Now() + partitions = b.getAssignedActivePartitions() + ) + + level.Info(b.logger).Log("msg", "starting consume cycle", "cycle_end", end, "active_partitions", partitions) + defer func(t time.Time) { metricConsumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now()) + + for _, partition := range partitions { + // Consume partition while data remains. + // TODO - round-robin one consumption per partition instead to equalize catch-up time. + for { + more, err := b.consumePartition2(ctx, partition, end) + if err != nil { + return err + } + + if !more { + break + } + } + } + + return nil +} + +func (b *BlockBuilder) consumePartition2(ctx context.Context, partition int32, end time.Time) (more bool, err error) { + defer func(t time.Time) { + metricProcessPartitionSectionDuration.WithLabelValues(strconv.Itoa(int(partition))).Observe(time.Since(t).Seconds()) + }(time.Now()) + + var ( + dur = b.cfg.ConsumeCycleDuration + topic = b.cfg.IngestStorageConfig.Kafka.Topic + group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup + startOffset kgo.Offset + writer *writer + lastRec *kgo.Record + begin time.Time + ) + + commits, err := b.kadm.FetchOffsetsForTopics(ctx, group, topic) + if err != nil { + return false, err + } + + lastCommit, ok := commits.Lookup(topic, partition) + if ok && lastCommit.At >= 0 { + startOffset = startOffset.At(lastCommit.At) + } else { + startOffset = kgo.NewOffset().AtStart() + } + + // We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment). + // This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously. + // In the end, we remove the partition from the client (refer to the defer below) to guarantee the client always consumes + // from one partition at a time. I.e. when this partition is consumed, we start consuming the next one. + b.kafkaClient.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + topic: { + partition: startOffset, + }, + }) + defer b.kafkaClient.RemoveConsumePartitions(map[string][]int32{topic: {partition}}) + +outer: + for { + fetches := func() kgo.Fetches { + ctx2, cancel := context.WithTimeout(ctx, pollTimeout) + defer cancel() + return b.kafkaClient.PollFetches(ctx2) + }() + err = fetches.Err() + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + // No more data + more = false + break + } + metricFetchErrors.WithLabelValues(strconv.Itoa(int(partition))).Inc() + return false, err + } + + if fetches.Empty() { + more = false + break + } + + for iter := fetches.RecordIter(); !iter.Done(); { + rec := iter.Next() + + // Initialize if needed + if writer == nil { + + // Determine begin and end time range, which is -/+ cycle duration. + // But don't exceed the given overall end time. + begin = rec.Timestamp.Add(-dur) + if rec.Timestamp.Add(dur).Before(end) { + end = rec.Timestamp.Add(dur) + } + + metricPartitionLag.WithLabelValues(strconv.Itoa(int(partition))).Set(time.Since(rec.Timestamp).Seconds()) + + writer = newPartitionSectionWriter(b.logger, int64(partition), rec.Offset, b.cfg.BlockConfig, b.overrides, b.wal, b.enc) + } + + if rec.Timestamp.Before(begin) || rec.Timestamp.After(end) { + break outer + } + + err := b.pushTraces(rec.Key, rec.Value, writer) + if err != nil { + return false, err + } + + lastRec = rec + } + } + + if lastRec == nil { + // Received no data + return false, nil + } + + err = writer.flush(ctx, b.writer) + if err != nil { + return false, err + } + + resp, err := b.kadm.CommitOffsets(ctx, group, kadm.OffsetsFromRecords(*lastRec)) + if err != nil { + return false, err + } + if err := resp.Error(); err != nil { + return false, err + } + + return more, nil +} + func (b *BlockBuilder) stopping(err error) error { if b.kafkaClient != nil { b.kafkaClient.Close() diff --git a/modules/blockbuilder/blockbuilder_test.go b/modules/blockbuilder/blockbuilder_test.go index bef82771aaf..0076f2c97be 100644 --- a/modules/blockbuilder/blockbuilder_test.go +++ b/modules/blockbuilder/blockbuilder_test.go @@ -38,14 +38,10 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) { ctx, cancel := context.WithCancelCause(context.Background()) t.Cleanup(func() { cancel(errors.New("test done")) }) - k, address := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test-topic") + k, _ := testkafka.CreateCluster(t, 1, "test-topic") t.Cleanup(k.Close) - kafkaCommits := atomic.NewInt32(0) - k.ControlKey(kmsg.OffsetCommit.Int16(), func(kmsg.Request) (kmsg.Response, error, bool) { - kafkaCommits.Add(1) - return nil, nil, false - }) + address := k.ListenAddrs()[0] store := newStore(ctx, t) cfg := blockbuilderConfig(t, address) @@ -59,14 +55,9 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) { client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka) sendReq(t, ctx, client) - // Wait for record to be consumed and committed. - require.Eventually(t, func() bool { - return kafkaCommits.Load() > 0 - }, time.Minute, time.Second) - // Wait for the block to be flushed. require.Eventually(t, func() bool { - return len(store.BlockMetas(util.FakeTenantID)) == 1 + return len(store.BlockMetas(util.FakeTenantID)) == 1 && store.BlockMetas(util.FakeTenantID)[0].TotalObjects == 1 }, time.Minute, time.Second) } diff --git a/pkg/ingest/testkafka/cluster.go b/pkg/ingest/testkafka/cluster.go index 08002d87bee..ef01d684b84 100644 --- a/pkg/ingest/testkafka/cluster.go +++ b/pkg/ingest/testkafka/cluster.go @@ -109,7 +109,7 @@ func addSupportForConsumerGroups(t testing.TB, cluster *kfake.Cluster, topicName // This mimics the real Kafka behaviour. var partitionsResp []kmsg.OffsetFetchResponseGroupTopicPartition if partitionID == allPartitions { - for i := int32(1); i < numPartitions+1; i++ { + for i := int32(0); i < numPartitions; i++ { if committedOffsets[consumerGroup][i] >= 0 { partitionsResp = append(partitionsResp, kmsg.OffsetFetchResponseGroupTopicPartition{ Partition: i,