From 12faf7dc7bc1ffa1b6af2d6e4606ceb10688dc0a Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Thu, 26 Sep 2024 14:05:42 +0200 Subject: [PATCH 1/2] [FLINK-36177] Deprecate KafkaShuffle and more This commit deprecates all classes that are slated for removal in the kafka-4.0 release compatible with Flink 2.0. I also deprecated internal classes to make later removal easier. Some public classes will cease to be public API but are still internally used. --- .../deserializer/KafkaDeserializationSchemaWrapper.java | 2 ++ .../deserializer/KafkaRecordDeserializationSchema.java | 2 ++ .../connectors/kafka/FlinkKafkaConsumerBase.java | 1 + .../streaming/connectors/kafka/FlinkKafkaErrorCode.java | 8 +++++++- .../streaming/connectors/kafka/FlinkKafkaException.java | 8 +++++++- .../streaming/connectors/kafka/KafkaContextAware.java | 3 +++ .../connectors/kafka/KafkaDeserializationSchema.java | 2 ++ .../connectors/kafka/KafkaSerializationSchema.java | 2 ++ .../connectors/kafka/config/OffsetCommitMode.java | 1 + .../connectors/kafka/config/OffsetCommitModes.java | 1 + .../connectors/kafka/internals/AbstractFetcher.java | 1 + .../kafka/internals/AbstractPartitionDiscoverer.java | 1 + .../connectors/kafka/internals/ClosableBlockingQueue.java | 1 + .../connectors/kafka/internals/ExceptionProxy.java | 1 + .../kafka/internals/FlinkKafkaInternalProducer.java | 1 + .../streaming/connectors/kafka/internals/Handover.java | 1 + .../connectors/kafka/internals/KafkaCommitCallback.java | 1 + .../connectors/kafka/internals/KafkaConsumerThread.java | 1 + .../internals/KafkaDeserializationSchemaWrapper.java | 1 + .../connectors/kafka/internals/KafkaFetcher.java | 1 + .../kafka/internals/KafkaPartitionDiscoverer.java | 1 + .../kafka/internals/KafkaSerializationSchemaWrapper.java | 1 + .../connectors/kafka/internals/KafkaShuffleFetcher.java | 1 + .../connectors/kafka/internals/KafkaTopicPartition.java | 5 +++++ .../kafka/internals/KafkaTopicPartitionAssigner.java | 1 + .../kafka/internals/KafkaTopicPartitionLeader.java | 1 + .../kafka/internals/KafkaTopicPartitionState.java | 1 + .../KafkaTopicPartitionStateWithWatermarkGenerator.java | 1 + .../connectors/kafka/internals/KafkaTopicsDescriptor.java | 1 + .../kafka/internals/KeyedSerializationSchemaWrapper.java | 1 + .../internals/SourceContextWatermarkOutputAdapter.java | 1 + .../kafka/internals/TransactionalIdsGenerator.java | 1 + .../internals/metrics/KafkaConsumerMetricConstants.java | 1 + .../kafka/internals/metrics/KafkaMetricWrapper.java | 1 + .../kafka/partitioner/FlinkFixedPartitioner.java | 4 ++++ .../kafka/partitioner/FlinkKafkaPartitioner.java | 4 ++++ .../connectors/kafka/shuffle/FlinkKafkaShuffle.java | 5 +++++ .../kafka/shuffle/FlinkKafkaShuffleConsumer.java | 1 + .../kafka/shuffle/FlinkKafkaShuffleProducer.java | 1 + .../connectors/kafka/shuffle/StreamKafkaShuffleSink.java | 1 + .../kafka/table/DynamicKafkaDeserializationSchema.java | 2 ++ .../table/DynamicKafkaRecordSerializationSchema.java | 2 ++ .../serialization/JSONKeyValueDeserializationSchema.java | 1 + .../TypeInformationKeyValueSerializationSchema.java | 1 + 44 files changed, 78 insertions(+), 2 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java index 94197e347..1cc7dde79 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java @@ -33,7 +33,9 @@ * ConsumerRecord ConsumerRecords}. * * @param the type of the deserialized records. + * @deprecated Remove with @{@link KafkaDeserializationSchema} */ +@Deprecated class KafkaDeserializationSchemaWrapper implements KafkaRecordDeserializationSchema { private static final long serialVersionUID = 1L; private final KafkaDeserializationSchema kafkaDeserializationSchema; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java index 6ad6607c9..91d1f3439 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java @@ -71,7 +71,9 @@ default void open(DeserializationSchema.InitializationContext context) throws Ex * @param the return type of the deserialized record. * @return A {@link KafkaRecordDeserializationSchema} that uses the given {@link * KafkaDeserializationSchema} to deserialize the {@link ConsumerRecord ConsumerRecords}. + * @deprecated Will be removed with {@link KafkaDeserializationSchema}. */ + @Deprecated static KafkaRecordDeserializationSchema of( KafkaDeserializationSchema kafkaDeserializationSchema) { return new KafkaDeserializationSchemaWrapper<>(kafkaDeserializationSchema); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index f291b05bc..7a85b434e 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -90,6 +90,7 @@ * @param The type of records produced by this data source */ @Internal +@Deprecated public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFunction implements CheckpointListener, ResultTypeQueryable, CheckpointedFunction { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java index c11f1b176..3c1ae27e3 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java @@ -19,8 +19,14 @@ import org.apache.flink.annotation.PublicEvolving; -/** Error codes used in {@link FlinkKafkaException}. */ +/** + * Error codes used in {@link FlinkKafkaException}. + * + * @deprecated Will be removed with {@link FlinkKafkaProducer} and {@link + * org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle}. + */ @PublicEvolving +@Deprecated public enum FlinkKafkaErrorCode { PRODUCERS_POOL_EMPTY, EXTERNAL_ERROR diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaException.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaException.java index 77d023130..65b654c64 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaException.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaException.java @@ -20,8 +20,14 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.util.FlinkException; -/** Exception used by {@link FlinkKafkaProducer} and {@link FlinkKafkaConsumer}. */ +/** + * Exception used by {@link FlinkKafkaProducer} and {@link FlinkKafkaConsumer}. + * + * @deprecated Will be removed with {@link FlinkKafkaProducer} and {@link + * org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle}. + */ @PublicEvolving +@Deprecated public class FlinkKafkaException extends FlinkException { private static final long serialVersionUID = 920269130311214200L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java index ad977cd95..d40139595 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java @@ -26,8 +26,11 @@ * *

You only need to override the methods for the information that you need. However, {@link * #getTargetTopic(Object)} is required because it is used to determine the available partitions. + * + * @deprecated Will be turned into internal API when {@link FlinkKafkaProducer} is removed. */ @PublicEvolving +@Deprecated public interface KafkaContextAware { /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java index b54b9835f..8f15b921b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java @@ -31,8 +31,10 @@ * (Java/Scala objects) that are processed by Flink. * * @param The type created by the keyed deserialization schema. + * @deprecated Will be turned into internal API when {@link FlinkKafkaConsumer} is removed. */ @PublicEvolving +@Deprecated public interface KafkaDeserializationSchema extends Serializable, ResultTypeQueryable { /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java index 89e2b9209..7ed987fce 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java @@ -35,8 +35,10 @@ * which the Kafka Producer is running. * * @param the type of values being serialized + * @deprecated Will be turned into internal API when {@link FlinkKafkaProducer} is removed. */ @PublicEvolving +@Deprecated public interface KafkaSerializationSchema extends Serializable { /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java index 32b9d4cdc..6ad4f8337 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java @@ -26,6 +26,7 @@ *

The exact value of this is determined at runtime in the consumer subtasks. */ @Internal +@Deprecated public enum OffsetCommitMode { /** Completely disable offset committing. */ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java index 1394af72f..32ac2f5f0 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java @@ -21,6 +21,7 @@ /** Utilities for {@link OffsetCommitMode}. */ @Internal +@Deprecated public class OffsetCommitModes { /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 41b5ad24c..074363021 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -61,6 +61,7 @@ * @param The type of topic/partition identifier used by Kafka in the specific version. */ @Internal +@Deprecated public abstract class AbstractFetcher { private static final int NO_TIMESTAMPS_WATERMARKS = 0; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java index 18322bf68..c8dc18360 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java @@ -41,6 +41,7 @@ * allows the discoverer to be interrupted during a {@link #discoverPartitions()} call. */ @Internal +@Deprecated public abstract class AbstractPartitionDiscoverer { /** Describes whether we are discovering partitions for fixed topics or a topic pattern. */ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java index 22800611d..3b1751d40 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java @@ -49,6 +49,7 @@ * @param The type of elements in the queue. */ @Internal +@Deprecated public class ClosableBlockingQueue { /** The lock used to make queue accesses and open checks atomic. */ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java index 204a01b4c..a9f9c9cae 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java @@ -65,6 +65,7 @@ * } */ @Internal +@Deprecated public class ExceptionProxy { /** The thread that should be interrupted when an exception occurs. */ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java index 12dad9fba..6e618cbe0 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java @@ -58,6 +58,7 @@ /** Internal flink kafka producer. */ @PublicEvolving +@Deprecated public class FlinkKafkaInternalProducer implements Producer { private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaInternalProducer.class); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java index 7fc50e625..64132b0b9 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java @@ -47,6 +47,7 @@ */ @ThreadSafe @Internal +@Deprecated public final class Handover implements Closeable { private final Object lock = new Object(); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java index d7666772b..f1180b8b5 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java @@ -25,6 +25,7 @@ * commit request completes, which should normally be triggered from checkpoint complete event. */ @Internal +@Deprecated public interface KafkaCommitCallback { /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java index f7f40b80b..5b6fb4d43 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java @@ -61,6 +61,7 @@ * an indirection to the KafkaConsumer calls that change signature. */ @Internal +@Deprecated public class KafkaConsumerThread extends Thread { /** Logger for this consumer. */ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java index d53e4ff4d..b754b4d09 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java @@ -32,6 +32,7 @@ * @param The type created by the deserialization schema. */ @Internal +@Deprecated public class KafkaDeserializationSchemaWrapper implements KafkaDeserializationSchema { private static final long serialVersionUID = 2651665280744549932L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java index 9c4d8387c..428e6c7ce 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java @@ -51,6 +51,7 @@ * @param The type of elements produced by the fetcher. */ @Internal +@Deprecated public class KafkaFetcher extends AbstractFetcher { private static final Logger LOG = LoggerFactory.getLogger(KafkaFetcher.class); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionDiscoverer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionDiscoverer.java index ec788991d..ef7162bde 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionDiscoverer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionDiscoverer.java @@ -34,6 +34,7 @@ * brokers via the Kafka high-level consumer API. */ @Internal +@Deprecated public class KafkaPartitionDiscoverer extends AbstractPartitionDiscoverer { private final Properties kafkaProperties; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java index 73b1d42a6..147fad9b6 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java @@ -35,6 +35,7 @@ * KafkaSerializationSchema}. */ @Internal +@Deprecated public class KafkaSerializationSchemaWrapper implements KafkaSerializationSchema, KafkaContextAware { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java index fe7ee7f7b..c61db83f0 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java @@ -50,6 +50,7 @@ /** Fetch data from Kafka for Kafka Shuffle. */ @Internal +@Deprecated public class KafkaShuffleFetcher extends KafkaFetcher { /** The handler to check and generate watermarks from fetched records. * */ private final WatermarkHandler watermarkHandler; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java index f262a222c..0e91042f6 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java @@ -32,8 +32,13 @@ * *

Note: This class must not change in its structure, because it would change the serialization * format and make previous savepoints unreadable. + * + * @deprecated Will be turned into internal class when {@link + * org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is removed. Replace with + * {@link org.apache.kafka.common.TopicPartition}. */ @PublicEvolving +@Deprecated public final class KafkaTopicPartition implements Serializable { /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java index be61e8ad7..83c7483ff 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java @@ -21,6 +21,7 @@ /** Utility for assigning Kafka partitions to consumer subtasks. */ @Internal +@Deprecated public class KafkaTopicPartitionAssigner { /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java index a2ef12882..031400d6e 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java @@ -27,6 +27,7 @@ * Serializable Topic Partition info with leader Node information. This class is used at runtime. */ @Internal +@Deprecated public class KafkaTopicPartitionLeader implements Serializable { private static final long serialVersionUID = 9145855900303748582L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java index c09df342c..ee669e7e1 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java @@ -29,6 +29,7 @@ * @param The type of the Kafka partition descriptor, which varies across Kafka versions. */ @Internal +@Deprecated public class KafkaTopicPartitionState { // ------------------------------------------------------------------------ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java index 6c843409d..f9c815fcc 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java @@ -34,6 +34,7 @@ * @param The type of the Kafka partition descriptor, which varies across Kafka versions. */ @Internal +@Deprecated public final class KafkaTopicPartitionStateWithWatermarkGenerator extends KafkaTopicPartitionState { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java index 8261a2b31..4bb37b1c2 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java @@ -33,6 +33,7 @@ * list of topics, or a topic pattern. */ @Internal +@Deprecated public class KafkaTopicsDescriptor implements Serializable { private static final long serialVersionUID = -3807227764764900975L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KeyedSerializationSchemaWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KeyedSerializationSchemaWrapper.java index c95cd9c40..ae4e922d5 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KeyedSerializationSchemaWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KeyedSerializationSchemaWrapper.java @@ -27,6 +27,7 @@ * @param The type to serialize */ @Internal +@Deprecated public class KeyedSerializationSchemaWrapper implements KeyedSerializationSchema { private static final long serialVersionUID = 1351665280744549933L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java index 8e5674313..68c4db12a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java @@ -25,6 +25,7 @@ * A {@link org.apache.flink.api.common.eventtime.WatermarkOutput} that forwards calls to a {@link * org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. */ +@Deprecated public class SourceContextWatermarkOutputAdapter implements WatermarkOutput { private final SourceContext sourceContext; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TransactionalIdsGenerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TransactionalIdsGenerator.java index e21355e8f..cd6270acc 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TransactionalIdsGenerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TransactionalIdsGenerator.java @@ -40,6 +40,7 @@ * subtask. */ @Internal +@Deprecated public class TransactionalIdsGenerator { private final String prefix; private final int subtaskIndex; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaConsumerMetricConstants.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaConsumerMetricConstants.java index 30d469714..731089028 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaConsumerMetricConstants.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaConsumerMetricConstants.java @@ -26,6 +26,7 @@ * metrics. */ @Internal +@Deprecated public class KafkaConsumerMetricConstants { public static final String KAFKA_CONSUMER_METRICS_GROUP = "KafkaConsumer"; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java index 1ab41ce9c..2893bad33 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java @@ -23,6 +23,7 @@ /** Gauge for getting the current value of a Kafka metric. */ @Internal +@Deprecated public class KafkaMetricWrapper implements Gauge { private final org.apache.kafka.common.Metric kafkaMetric; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java index 16b57f621..e70baea3b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java @@ -54,8 +54,12 @@ *

Not all Kafka partitions contain data To avoid such an unbalanced partitioning, use a * round-robin kafka partitioner (note that this will cause a lot of network connections between all * the Flink instances and all the Kafka brokers). + * + * @deprecated Will be turned into internal class when {@link + * org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is removed. */ @PublicEvolving +@Deprecated public class FlinkFixedPartitioner extends FlinkKafkaPartitioner { private static final long serialVersionUID = -3785320239953858777L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java index 2fb89e205..7318ed697 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java @@ -24,8 +24,12 @@ /** * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records across partitions of * multiple Kafka topics. + * + * @deprecated Will be turned into internal class when {@link + * org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is removed. */ @PublicEvolving +@Deprecated public abstract class FlinkKafkaPartitioner implements Serializable { private static final long serialVersionUID = -9086719227828020494L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java index ae9af29f0..bb7c76a67 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java @@ -98,8 +98,13 @@ * | * | ----------> KafkaShuffleConsumerReuse -> ... * + * + * @deprecated This experimental feature never graduated to a stable feature and will be removed in + * future releases. In case of interest to port it to the Source/Sink API, please reach out to + * the Flink community. */ @Experimental +@Deprecated public class FlinkKafkaShuffle { static final String PRODUCER_PARALLELISM = "producer parallelism"; static final String PARTITION_NUMBER = "partition number"; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java index 886343bea..b96e9c0f5 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java @@ -39,6 +39,7 @@ /** Flink Kafka Shuffle Consumer Function. */ @Internal +@Deprecated public class FlinkKafkaShuffleConsumer extends FlinkKafkaConsumer { private final TypeSerializer typeSerializer; private final int producerParallelism; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java index e05e8f9a3..46754f270 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java @@ -44,6 +44,7 @@ * handling elements and watermarks */ @Internal +@Deprecated public class FlinkKafkaShuffleProducer extends FlinkKafkaProducer { private final KafkaSerializer kafkaSerializer; private final KeySelector keySelector; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java index 8bd77840f..e24e15650 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java @@ -29,6 +29,7 @@ * this way to avoid public interface change. */ @Internal +@Deprecated class StreamKafkaShuffleSink extends StreamSink { public StreamKafkaShuffleSink(FlinkKafkaShuffleProducer flinkKafkaShuffleProducer) { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java index 91798281d..ef5eca95b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka.table; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; @@ -38,6 +39,7 @@ import java.util.List; /** A specific {@link KafkaSerializationSchema} for {@link KafkaDynamicSource}. */ +@Internal class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema { private static final long serialVersionUID = 1L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java index 71ca41474..f3a7acb3b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka.table; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; @@ -40,6 +41,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link KafkaSink}. */ +@Internal class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationSchema { private final Set topics; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java index 970bad1c3..cffdc8ea4 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java @@ -42,6 +42,7 @@ * (String) and "partition" (int). */ @PublicEvolving +@Deprecated public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSchema { private static final long serialVersionUID = 1509391548173891955L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java index 1c3eaa620..05e0eaea1 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java @@ -41,6 +41,7 @@ * @param The value type to be serialized. */ @PublicEvolving +@Deprecated public class TypeInformationKeyValueSerializationSchema implements KafkaDeserializationSchema>, KeyedSerializationSchema> { From fee54faaba3d6e50f5f2968519fba173caf6a9b7 Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Fri, 27 Sep 2024 17:14:01 +0200 Subject: [PATCH 2/2] [FLINK-36177] Introduce KafkaPartitioner to replace FlinkKafkaPartitioner Relocate FlinkKafkaPartitioner to KafkaSink package and turn it into a functional interface. --- .../kafka/sink/KafkaPartitioner.java | 50 +++++++++++++++++++ ...KafkaRecordSerializationSchemaBuilder.java | 20 ++++++-- .../partitioner/FlinkKafkaPartitioner.java | 33 ++---------- ...DynamicKafkaRecordSerializationSchema.java | 6 +-- .../table/KafkaConnectorOptionsUtil.java | 16 +++--- .../kafka/table/KafkaDynamicSink.java | 6 +-- .../kafka/table/KafkaDynamicTableFactory.java | 4 +- 7 files changed, 87 insertions(+), 48 deletions(-) create mode 100644 flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaPartitioner.java diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaPartitioner.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaPartitioner.java new file mode 100644 index 000000000..19a2d4731 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaPartitioner.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; + +/** + * A {@code KafkaPartitioner} wraps logic on how to partition records across partitions of multiple + * Kafka topics. + */ +@PublicEvolving +public interface KafkaPartitioner extends Serializable { + /** + * Initializer for the partitioner. This is called once on each parallel sink instance of the + * Flink Kafka producer. This method should be overridden if necessary. + * + * @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink + * @param parallelInstances the total number of parallel instances + */ + default void open(int parallelInstanceId, int parallelInstances) {} + + /** + * Determine the id of the partition that the record should be written to. + * + * @param record the record value + * @param key serialized key of the record + * @param value serialized value of the record + * @param targetTopic target topic for the record + * @param partitions found partitions for the target topic + * @return the id of the target partition + */ + int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions); +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java index 92eb625b2..e9fc413b2 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java @@ -82,7 +82,7 @@ public class KafkaRecordSerializationSchemaBuilder { @Nullable private Function topicSelector; @Nullable private SerializationSchema valueSerializationSchema; - @Nullable private FlinkKafkaPartitioner partitioner; + @Nullable private KafkaPartitioner partitioner; @Nullable private SerializationSchema keySerializationSchema; @Nullable private HeaderProvider headerProvider; @@ -91,6 +91,7 @@ public class KafkaRecordSerializationSchemaBuilder { * * @param partitioner * @return {@code this} + * @deprecated use {@link #setPartitioner(KafkaPartitioner)} */ public KafkaRecordSerializationSchemaBuilder setPartitioner( FlinkKafkaPartitioner partitioner) { @@ -99,6 +100,19 @@ public KafkaRecordSerializationSchemaBuilder setPartitioner( return self; } + /** + * Sets a custom partitioner determining the target partition of the target topic. + * + * @param partitioner + * @return {@code this} + */ + public KafkaRecordSerializationSchemaBuilder setPartitioner( + KafkaPartitioner partitioner) { + KafkaRecordSerializationSchemaBuilder self = self(); + self.partitioner = checkNotNull(partitioner); + return self; + } + /** * Sets a fixed topic which used as destination for all records. * @@ -295,7 +309,7 @@ private static class KafkaRecordSerializationSchemaWrapper implements KafkaRecordSerializationSchema { private final SerializationSchema valueSerializationSchema; private final Function topicSelector; - private final FlinkKafkaPartitioner partitioner; + private final KafkaPartitioner partitioner; private final SerializationSchema keySerializationSchema; private final HeaderProvider headerProvider; @@ -303,7 +317,7 @@ private static class KafkaRecordSerializationSchemaWrapper Function topicSelector, SerializationSchema valueSerializationSchema, @Nullable SerializationSchema keySerializationSchema, - @Nullable FlinkKafkaPartitioner partitioner, + @Nullable KafkaPartitioner partitioner, @Nullable HeaderProvider headerProvider) { this.topicSelector = checkNotNull(topicSelector); this.valueSerializationSchema = checkNotNull(valueSerializationSchema); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java index 7318ed697..9568349a3 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java @@ -18,43 +18,18 @@ package org.apache.flink.streaming.connectors.kafka.partitioner; import org.apache.flink.annotation.PublicEvolving; - -import java.io.Serializable; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; /** * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records across partitions of * multiple Kafka topics. * - * @deprecated Will be turned into internal class when {@link - * org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is removed. + * @deprecated Use {@link KafkaPartitioner} instead for {@link + * org.apache.flink.connector.kafka.sink.KafkaSink}. */ @PublicEvolving @Deprecated -public abstract class FlinkKafkaPartitioner implements Serializable { +public abstract class FlinkKafkaPartitioner implements KafkaPartitioner { private static final long serialVersionUID = -9086719227828020494L; - - /** - * Initializer for the partitioner. This is called once on each parallel sink instance of the - * Flink Kafka producer. This method should be overridden if necessary. - * - * @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink - * @param parallelInstances the total number of parallel instances - */ - public void open(int parallelInstanceId, int parallelInstances) { - // overwrite this method if needed. - } - - /** - * Determine the id of the partition that the record should be written to. - * - * @param record the record value - * @param key serialized key of the record - * @param value serialized value of the record - * @param targetTopic target topic for the record - * @param partitions found partitions for the target topic - * @return the id of the target partition - */ - public abstract int partition( - T record, byte[] key, byte[] value, String targetTopic, int[] partitions); } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java index f3a7acb3b..229b08b5f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java @@ -19,9 +19,9 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.types.RowKind; @@ -46,7 +46,7 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS private final Set topics; private final Pattern topicPattern; - private final FlinkKafkaPartitioner partitioner; + private final KafkaPartitioner partitioner; @Nullable private final SerializationSchema keySerialization; private final SerializationSchema valueSerialization; private final RowData.FieldGetter[] keyFieldGetters; @@ -59,7 +59,7 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS DynamicKafkaRecordSerializationSchema( @Nullable List topics, @Nullable Pattern topicPattern, - @Nullable FlinkKafkaPartitioner partitioner, + @Nullable KafkaPartitioner partitioner, @Nullable SerializationSchema keySerialization, SerializationSchema valueSerialization, RowData.FieldGetter[] keyFieldGetters, diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java index f752276a3..5960a709a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java @@ -24,11 +24,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ValueFieldsStrategy; @@ -386,7 +386,7 @@ public static Properties getKafkaProperties(Map tableOptions) { * The partitioner can be either "fixed", "round-robin" or a customized partitioner full class * name. */ - public static Optional> getFlinkKafkaPartitioner( + public static Optional> getFlinkKafkaPartitioner( ReadableConfig tableOptions, ClassLoader classLoader) { return tableOptions .getOptional(SINK_PARTITIONER) @@ -465,19 +465,19 @@ private static boolean hasKafkaClientProperties(Map tableOptions } /** Returns a class value with the given class name. */ - private static FlinkKafkaPartitioner initializePartitioner( + private static KafkaPartitioner initializePartitioner( String name, ClassLoader classLoader) { try { Class clazz = Class.forName(name, true, classLoader); - if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) { + if (!KafkaPartitioner.class.isAssignableFrom(clazz)) { throw new ValidationException( String.format( - "Sink partitioner class '%s' should extend from the required class %s", - name, FlinkKafkaPartitioner.class.getName())); + "Sink partitioner class '%s' should implement the required class %s", + name, KafkaPartitioner.class.getName())); } @SuppressWarnings("unchecked") - final FlinkKafkaPartitioner kafkaPartitioner = - InstantiationUtil.instantiate(name, FlinkKafkaPartitioner.class, classLoader); + final KafkaPartitioner kafkaPartitioner = + InstantiationUtil.instantiate(name, KafkaPartitioner.class, classLoader); return kafkaPartitioner; } catch (ClassNotFoundException | FlinkException e) { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java index 8ab0f10c6..2bb52c94d 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java @@ -24,11 +24,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.Projection; @@ -125,7 +125,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada protected final Properties properties; /** Partitioner to select Kafka partition for each item. */ - protected final @Nullable FlinkKafkaPartitioner partitioner; + protected final @Nullable KafkaPartitioner partitioner; /** * Flag to determine sink mode. In upsert mode sink transforms the delete/update-before message @@ -150,7 +150,7 @@ public KafkaDynamicSink( @Nullable List topics, @Nullable Pattern topicPattern, Properties properties, - @Nullable FlinkKafkaPartitioner partitioner, + @Nullable KafkaPartitioner partitioner, DeliveryGuarantee deliveryGuarantee, boolean upsertMode, SinkBufferFlushMode flushMode, diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java index 7c23923b5..8124691a5 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -26,11 +26,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -427,7 +427,7 @@ protected KafkaDynamicSink createKafkaTableSink( @Nullable List topics, @Nullable Pattern topicPattern, Properties properties, - FlinkKafkaPartitioner partitioner, + KafkaPartitioner partitioner, DeliveryGuarantee deliveryGuarantee, Integer parallelism, @Nullable String transactionalIdPrefix) {