From 1b2d21acf8b88c14cd9395f6e69c5802e65786f0 Mon Sep 17 00:00:00 2001 From: Naireen Hussain Date: Tue, 17 Sep 2024 01:50:22 -0700 Subject: [PATCH] Fix Kafka with Redistribute and commits enabled (#32344) --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 30 +++-- ...IOReadImplementationCompatibilityTest.java | 8 +- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 118 ++++++++++++++++-- 3 files changed, 136 insertions(+), 20 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 1fd3e3e044e..0f28edf19dd 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -890,7 +890,6 @@ static void setupExternalBuilder( builder.setRedistributeNumKeys(0); builder.setAllowDuplicates(false); } - System.out.println("xxx builder service" + builder.toString()); } private static Coder resolveCoder(Class> deserializer) { @@ -1697,11 +1696,12 @@ public PCollection> expand(PBegin input) { } if (kafkaRead.isRedistributed()) { - // fail here instead. - checkArgument( - kafkaRead.isCommitOffsetsInFinalizeEnabled(), - "commitOffsetsInFinalize() can't be enabled with isRedistributed"); + if (kafkaRead.isCommitOffsetsInFinalizeEnabled() && kafkaRead.isAllowDuplicates()) { + LOG.warn( + "Offsets committed due to usage of commitOffsetsInFinalize() and may not capture all work processed due to use of withRedistribute() with duplicates enabled"); + } PCollection> output = input.getPipeline().apply(transform); + if (kafkaRead.getRedistributeNumKeys() == 0) { return output.apply( "Insert Redistribute", @@ -1797,7 +1797,7 @@ public PCollection> expand(PBegin input) { return pcol.apply( "Insert Redistribute with Shards", Redistribute.>arbitrarily() - .withAllowDuplicates(true) + .withAllowDuplicates(kafkaRead.isAllowDuplicates()) .withNumBuckets((int) kafkaRead.getRedistributeNumKeys())); } } @@ -2654,6 +2654,12 @@ public PCollection> expand(PCollection if (getRedistributeNumKeys() == 0) { LOG.warn("This will create a key per record, which is sub-optimal for most use cases."); } + if ((isCommitOffsetEnabled() || configuredKafkaCommit()) && isAllowDuplicates()) { + LOG.warn( + "Either auto_commit is set, or commitOffsetEnabled is enabled (or both), but since " + + "withRestribute() is enabled with allow duplicates, the runner may have additional work processed that " + + "is ahead of the current checkpoint"); + } } if (getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) { @@ -2687,8 +2693,7 @@ public PCollection> expand(PCollection .getSchemaCoder(KafkaSourceDescriptor.class), recordCoder)); - boolean applyCommitOffsets = - isCommitOffsetEnabled() && !configuredKafkaCommit() && !isRedistribute(); + boolean applyCommitOffsets = isCommitOffsetEnabled() && !configuredKafkaCommit(); if (!applyCommitOffsets) { return outputWithDescriptor .apply(MapElements.into(new TypeDescriptor>() {}).via(KV::getValue)) @@ -2710,6 +2715,15 @@ public PCollection> expand(PCollection if (Comparators.lexicographical(Comparator.naturalOrder()) .compare(requestedVersion, targetVersion) < 0) { + // Redistribute is not allowed with commits prior to 2.59.0, since there is a Reshuffle + // prior to the redistribute. The reshuffle will occur before commits are offsetted and + // before outputting KafkaRecords. Adding a redistribute then afterwards doesn't provide + // additional performance benefit. + checkArgument( + !isRedistribute(), + "Can not enable isRedistribute() while committing offsets prior to " + + String.join(".", targetVersion)); + return expand259Commits( outputWithDescriptor, recordCoder, input.getPipeline().getSchemaRegistry()); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java index 74f1e83fd86..29c920bf9a6 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java @@ -108,7 +108,13 @@ private PipelineResult testReadTransformCreationWithImplementationBoundPropertie Function, KafkaIO.Read> kafkaReadDecorator) { p.apply( kafkaReadDecorator.apply( - mkKafkaReadTransform(1000, null, new ValueAsTimestampFn(), false, 0))); + mkKafkaReadTransform( + 1000, + null, + new ValueAsTimestampFn(), + false, /*redistribute*/ + false, /*allowDuplicates*/ + 0))); return p.run(); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 1fe1147a739..25ff6dad124 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -88,6 +88,7 @@ import org.apache.beam.sdk.metrics.SinkMetrics; import org.apache.beam.sdk.metrics.SourceMetrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -381,7 +382,13 @@ public Consumer apply(Map config) { static KafkaIO.Read mkKafkaReadTransform( int numElements, @Nullable SerializableFunction, Instant> timestampFn) { - return mkKafkaReadTransform(numElements, numElements, timestampFn, false, 0); + return mkKafkaReadTransform( + numElements, + numElements, + timestampFn, + false, /*redistribute*/ + false, /*allowDuplicates*/ + 0); } /** @@ -393,6 +400,7 @@ static KafkaIO.Read mkKafkaReadTransform( @Nullable Integer maxNumRecords, @Nullable SerializableFunction, Instant> timestampFn, @Nullable Boolean redistribute, + @Nullable Boolean withAllowDuplicates, @Nullable Integer numKeys) { KafkaIO.Read reader = @@ -408,13 +416,21 @@ static KafkaIO.Read mkKafkaReadTransform( reader = reader.withMaxNumRecords(maxNumRecords); } + if (withAllowDuplicates == null) { + withAllowDuplicates = false; + } + if (timestampFn != null) { reader = reader.withTimestampFn(timestampFn); } if (redistribute) { if (numKeys != null) { - reader = reader.withRedistribute().withRedistributeNumKeys(numKeys); + reader = + reader + .withRedistribute() + .withAllowDuplicates(withAllowDuplicates) + .withRedistributeNumKeys(numKeys); } reader = reader.withRedistribute(); } @@ -628,17 +644,47 @@ public void testRiskyConfigurationWarnsProperly() { } @Test - public void testCommitOffsetsInFinalizeAndRedistributeErrors() { - thrown.expect(Exception.class); - thrown.expectMessage("commitOffsetsInFinalize() can't be enabled with isRedistributed"); + public void warningsWithAllowDuplicatesEnabledAndCommitOffsets() { + int numElements = 1000; + PCollection input = + p.apply( + mkKafkaReadTransform( + numElements, + numElements, + new ValueAsTimestampFn(), + true, /*redistribute*/ + true, /*allowDuplicates*/ + 0) + .commitOffsetsInFinalize() + .withConsumerConfigUpdates( + ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "group_id")) + .withoutMetadata()) + .apply(Values.create()); + + addCountingAsserts(input, numElements); + p.run(); + + kafkaIOExpectedLogs.verifyWarn( + "Offsets committed due to usage of commitOffsetsInFinalize() and may not capture all work processed due to use of withRedistribute() with duplicates enabled"); + } + + @Test + public void noWarningsWithNoAllowDuplicatesAndCommitOffsets() { int numElements = 1000; PCollection input = p.apply( - mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), true, 0) + mkKafkaReadTransform( + numElements, + numElements, + new ValueAsTimestampFn(), + true, /*redistribute*/ + false, /*allowDuplicates*/ + 0) + .commitOffsetsInFinalize() .withConsumerConfigUpdates( - ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)) + ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "group_id")) .withoutMetadata()) .apply(Values.create()); @@ -648,13 +694,25 @@ public void testCommitOffsetsInFinalizeAndRedistributeErrors() { @Test public void testNumKeysIgnoredWithRedistributeNotEnabled() { + thrown.expect(Exception.class); + thrown.expectMessage( + "withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform"); + int numElements = 1000; PCollection input = p.apply( - mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), false, 0) + mkKafkaReadTransform( + numElements, + numElements, + new ValueAsTimestampFn(), + false, /*redistribute*/ + false, /*allowDuplicates*/ + 0) + .withRedistributeNumKeys(100) + .commitOffsetsInFinalize() .withConsumerConfigUpdates( - ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)) + ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "group_id")) .withoutMetadata()) .apply(Values.create()); @@ -663,6 +721,32 @@ public void testNumKeysIgnoredWithRedistributeNotEnabled() { p.run(); } + @Test + public void testDisableRedistributeKafkaOffsetLegacy() { + thrown.expect(Exception.class); + thrown.expectMessage( + "Can not enable isRedistribute() while committing offsets prior to 2.60.0"); + p.getOptions().as(StreamingOptions.class).setUpdateCompatibilityVersion("2.59.0"); + + p.apply( + Create.of( + KafkaSourceDescriptor.of( + new TopicPartition("topic", 1), + null, + null, + null, + null, + ImmutableList.of("8.8.8.8:9092")))) + .apply( + KafkaIO.readSourceDescriptors() + .withKeyDeserializer(LongDeserializer.class) + .withValueDeserializer(LongDeserializer.class) + .withRedistribute() + .withProcessingTime() + .commitOffsets()); + p.run(); + } + @Test public void testUnreachableKafkaBrokers() { // Expect an exception when the Kafka brokers are not reachable on the workers. @@ -1982,7 +2066,13 @@ public void testUnboundedSourceStartReadTime() { PCollection input = p.apply( - mkKafkaReadTransform(numElements, maxNumRecords, new ValueAsTimestampFn(), false, 0) + mkKafkaReadTransform( + numElements, + maxNumRecords, + new ValueAsTimestampFn(), + false, /*redistribute*/ + false, /*allowDuplicates*/ + 0) .withStartReadTime(new Instant(startTime)) .withoutMetadata()) .apply(Values.create()); @@ -2006,7 +2096,13 @@ public void testUnboundedSourceStartReadTimeException() { int startTime = numElements / 20; p.apply( - mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), false, 0) + mkKafkaReadTransform( + numElements, + numElements, + new ValueAsTimestampFn(), + false, /*redistribute*/ + false, /*allowDuplicates*/ + 0) .withStartReadTime(new Instant(startTime)) .withoutMetadata()) .apply(Values.create());