From fee54faaba3d6e50f5f2968519fba173caf6a9b7 Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Fri, 27 Sep 2024 17:14:01 +0200 Subject: [PATCH] [FLINK-36177] Introduce KafkaPartitioner to replace FlinkKafkaPartitioner Relocate FlinkKafkaPartitioner to KafkaSink package and turn it into a functional interface. --- .../kafka/sink/KafkaPartitioner.java | 50 +++++++++++++++++++ ...KafkaRecordSerializationSchemaBuilder.java | 20 ++++++-- .../partitioner/FlinkKafkaPartitioner.java | 33 ++---------- ...DynamicKafkaRecordSerializationSchema.java | 6 +-- .../table/KafkaConnectorOptionsUtil.java | 16 +++--- .../kafka/table/KafkaDynamicSink.java | 6 +-- .../kafka/table/KafkaDynamicTableFactory.java | 4 +- 7 files changed, 87 insertions(+), 48 deletions(-) create mode 100644 flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaPartitioner.java diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaPartitioner.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaPartitioner.java new file mode 100644 index 000000000..19a2d4731 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaPartitioner.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; + +/** + * A {@code KafkaPartitioner} wraps logic on how to partition records across partitions of multiple + * Kafka topics. + */ +@PublicEvolving +public interface KafkaPartitioner extends Serializable { + /** + * Initializer for the partitioner. This is called once on each parallel sink instance of the + * Flink Kafka producer. This method should be overridden if necessary. + * + * @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink + * @param parallelInstances the total number of parallel instances + */ + default void open(int parallelInstanceId, int parallelInstances) {} + + /** + * Determine the id of the partition that the record should be written to. + * + * @param record the record value + * @param key serialized key of the record + * @param value serialized value of the record + * @param targetTopic target topic for the record + * @param partitions found partitions for the target topic + * @return the id of the target partition + */ + int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions); +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java index 92eb625b2..e9fc413b2 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java @@ -82,7 +82,7 @@ public class KafkaRecordSerializationSchemaBuilder { @Nullable private Function topicSelector; @Nullable private SerializationSchema valueSerializationSchema; - @Nullable private FlinkKafkaPartitioner partitioner; + @Nullable private KafkaPartitioner partitioner; @Nullable private SerializationSchema keySerializationSchema; @Nullable private HeaderProvider headerProvider; @@ -91,6 +91,7 @@ public class KafkaRecordSerializationSchemaBuilder { * * @param partitioner * @return {@code this} + * @deprecated use {@link #setPartitioner(KafkaPartitioner)} */ public KafkaRecordSerializationSchemaBuilder setPartitioner( FlinkKafkaPartitioner partitioner) { @@ -99,6 +100,19 @@ public KafkaRecordSerializationSchemaBuilder setPartitioner( return self; } + /** + * Sets a custom partitioner determining the target partition of the target topic. + * + * @param partitioner + * @return {@code this} + */ + public KafkaRecordSerializationSchemaBuilder setPartitioner( + KafkaPartitioner partitioner) { + KafkaRecordSerializationSchemaBuilder self = self(); + self.partitioner = checkNotNull(partitioner); + return self; + } + /** * Sets a fixed topic which used as destination for all records. * @@ -295,7 +309,7 @@ private static class KafkaRecordSerializationSchemaWrapper implements KafkaRecordSerializationSchema { private final SerializationSchema valueSerializationSchema; private final Function topicSelector; - private final FlinkKafkaPartitioner partitioner; + private final KafkaPartitioner partitioner; private final SerializationSchema keySerializationSchema; private final HeaderProvider headerProvider; @@ -303,7 +317,7 @@ private static class KafkaRecordSerializationSchemaWrapper Function topicSelector, SerializationSchema valueSerializationSchema, @Nullable SerializationSchema keySerializationSchema, - @Nullable FlinkKafkaPartitioner partitioner, + @Nullable KafkaPartitioner partitioner, @Nullable HeaderProvider headerProvider) { this.topicSelector = checkNotNull(topicSelector); this.valueSerializationSchema = checkNotNull(valueSerializationSchema); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java index 7318ed697..9568349a3 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java @@ -18,43 +18,18 @@ package org.apache.flink.streaming.connectors.kafka.partitioner; import org.apache.flink.annotation.PublicEvolving; - -import java.io.Serializable; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; /** * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records across partitions of * multiple Kafka topics. * - * @deprecated Will be turned into internal class when {@link - * org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is removed. + * @deprecated Use {@link KafkaPartitioner} instead for {@link + * org.apache.flink.connector.kafka.sink.KafkaSink}. */ @PublicEvolving @Deprecated -public abstract class FlinkKafkaPartitioner implements Serializable { +public abstract class FlinkKafkaPartitioner implements KafkaPartitioner { private static final long serialVersionUID = -9086719227828020494L; - - /** - * Initializer for the partitioner. This is called once on each parallel sink instance of the - * Flink Kafka producer. This method should be overridden if necessary. - * - * @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink - * @param parallelInstances the total number of parallel instances - */ - public void open(int parallelInstanceId, int parallelInstances) { - // overwrite this method if needed. - } - - /** - * Determine the id of the partition that the record should be written to. - * - * @param record the record value - * @param key serialized key of the record - * @param value serialized value of the record - * @param targetTopic target topic for the record - * @param partitions found partitions for the target topic - * @return the id of the target partition - */ - public abstract int partition( - T record, byte[] key, byte[] value, String targetTopic, int[] partitions); } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java index f3a7acb3b..229b08b5f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java @@ -19,9 +19,9 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.types.RowKind; @@ -46,7 +46,7 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS private final Set topics; private final Pattern topicPattern; - private final FlinkKafkaPartitioner partitioner; + private final KafkaPartitioner partitioner; @Nullable private final SerializationSchema keySerialization; private final SerializationSchema valueSerialization; private final RowData.FieldGetter[] keyFieldGetters; @@ -59,7 +59,7 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS DynamicKafkaRecordSerializationSchema( @Nullable List topics, @Nullable Pattern topicPattern, - @Nullable FlinkKafkaPartitioner partitioner, + @Nullable KafkaPartitioner partitioner, @Nullable SerializationSchema keySerialization, SerializationSchema valueSerialization, RowData.FieldGetter[] keyFieldGetters, diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java index f752276a3..5960a709a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java @@ -24,11 +24,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ValueFieldsStrategy; @@ -386,7 +386,7 @@ public static Properties getKafkaProperties(Map tableOptions) { * The partitioner can be either "fixed", "round-robin" or a customized partitioner full class * name. */ - public static Optional> getFlinkKafkaPartitioner( + public static Optional> getFlinkKafkaPartitioner( ReadableConfig tableOptions, ClassLoader classLoader) { return tableOptions .getOptional(SINK_PARTITIONER) @@ -465,19 +465,19 @@ private static boolean hasKafkaClientProperties(Map tableOptions } /** Returns a class value with the given class name. */ - private static FlinkKafkaPartitioner initializePartitioner( + private static KafkaPartitioner initializePartitioner( String name, ClassLoader classLoader) { try { Class clazz = Class.forName(name, true, classLoader); - if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) { + if (!KafkaPartitioner.class.isAssignableFrom(clazz)) { throw new ValidationException( String.format( - "Sink partitioner class '%s' should extend from the required class %s", - name, FlinkKafkaPartitioner.class.getName())); + "Sink partitioner class '%s' should implement the required class %s", + name, KafkaPartitioner.class.getName())); } @SuppressWarnings("unchecked") - final FlinkKafkaPartitioner kafkaPartitioner = - InstantiationUtil.instantiate(name, FlinkKafkaPartitioner.class, classLoader); + final KafkaPartitioner kafkaPartitioner = + InstantiationUtil.instantiate(name, KafkaPartitioner.class, classLoader); return kafkaPartitioner; } catch (ClassNotFoundException | FlinkException e) { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java index 8ab0f10c6..2bb52c94d 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java @@ -24,11 +24,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.Projection; @@ -125,7 +125,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada protected final Properties properties; /** Partitioner to select Kafka partition for each item. */ - protected final @Nullable FlinkKafkaPartitioner partitioner; + protected final @Nullable KafkaPartitioner partitioner; /** * Flag to determine sink mode. In upsert mode sink transforms the delete/update-before message @@ -150,7 +150,7 @@ public KafkaDynamicSink( @Nullable List topics, @Nullable Pattern topicPattern, Properties properties, - @Nullable FlinkKafkaPartitioner partitioner, + @Nullable KafkaPartitioner partitioner, DeliveryGuarantee deliveryGuarantee, boolean upsertMode, SinkBufferFlushMode flushMode, diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java index 7c23923b5..8124691a5 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -26,11 +26,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -427,7 +427,7 @@ protected KafkaDynamicSink createKafkaTableSink( @Nullable List topics, @Nullable Pattern topicPattern, Properties properties, - FlinkKafkaPartitioner partitioner, + KafkaPartitioner partitioner, DeliveryGuarantee deliveryGuarantee, Integer parallelism, @Nullable String transactionalIdPrefix) {