Skip to content

Commit

Permalink
[FLINK-36177] Introduce KafkaPartitioner to replace FlinkKafkaPartiti…
Browse files Browse the repository at this point in the history
…oner

Relocate FlinkKafkaPartitioner to KafkaSink package and turn it into a functional interface.
  • Loading branch information
AHeise committed Sep 27, 2024
1 parent 12faf7d commit fee54fa
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.kafka.sink;

import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;

/**
* A {@code KafkaPartitioner} wraps logic on how to partition records across partitions of multiple
* Kafka topics.
*/
@PublicEvolving
public interface KafkaPartitioner<T> extends Serializable {
/**
* Initializer for the partitioner. This is called once on each parallel sink instance of the
* Flink Kafka producer. This method should be overridden if necessary.
*
* @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink
* @param parallelInstances the total number of parallel instances
*/
default void open(int parallelInstanceId, int parallelInstances) {}

/**
* Determine the id of the partition that the record should be written to.
*
* @param record the record value
* @param key serialized key of the record
* @param value serialized value of the record
* @param targetTopic target topic for the record
* @param partitions found partitions for the target topic
* @return the id of the target partition
*/
int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {

@Nullable private Function<? super IN, String> topicSelector;
@Nullable private SerializationSchema<? super IN> valueSerializationSchema;
@Nullable private FlinkKafkaPartitioner<? super IN> partitioner;
@Nullable private KafkaPartitioner<? super IN> partitioner;
@Nullable private SerializationSchema<? super IN> keySerializationSchema;
@Nullable private HeaderProvider<? super IN> headerProvider;

Expand All @@ -91,6 +91,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
*
* @param partitioner
* @return {@code this}
* @deprecated use {@link #setPartitioner(KafkaPartitioner)}
*/
public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setPartitioner(
FlinkKafkaPartitioner<? super T> partitioner) {
Expand All @@ -99,6 +100,19 @@ public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setPartitioner(
return self;
}

/**
* Sets a custom partitioner determining the target partition of the target topic.
*
* @param partitioner
* @return {@code this}
*/
public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setPartitioner(
KafkaPartitioner<? super T> partitioner) {
KafkaRecordSerializationSchemaBuilder<T> self = self();
self.partitioner = checkNotNull(partitioner);
return self;
}

/**
* Sets a fixed topic which used as destination for all records.
*
Expand Down Expand Up @@ -295,15 +309,15 @@ private static class KafkaRecordSerializationSchemaWrapper<IN>
implements KafkaRecordSerializationSchema<IN> {
private final SerializationSchema<? super IN> valueSerializationSchema;
private final Function<? super IN, String> topicSelector;
private final FlinkKafkaPartitioner<? super IN> partitioner;
private final KafkaPartitioner<? super IN> partitioner;
private final SerializationSchema<? super IN> keySerializationSchema;
private final HeaderProvider<? super IN> headerProvider;

KafkaRecordSerializationSchemaWrapper(
Function<? super IN, String> topicSelector,
SerializationSchema<? super IN> valueSerializationSchema,
@Nullable SerializationSchema<? super IN> keySerializationSchema,
@Nullable FlinkKafkaPartitioner<? super IN> partitioner,
@Nullable KafkaPartitioner<? super IN> partitioner,
@Nullable HeaderProvider<? super IN> headerProvider) {
this.topicSelector = checkNotNull(topicSelector);
this.valueSerializationSchema = checkNotNull(valueSerializationSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,18 @@
package org.apache.flink.streaming.connectors.kafka.partitioner;

import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;

/**
* A {@link FlinkKafkaPartitioner} wraps logic on how to partition records across partitions of
* multiple Kafka topics.
*
* @deprecated Will be turned into internal class when {@link
* org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is removed.
* @deprecated Use {@link KafkaPartitioner} instead for {@link
* org.apache.flink.connector.kafka.sink.KafkaSink}.
*/
@PublicEvolving
@Deprecated
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
public abstract class FlinkKafkaPartitioner<T> implements KafkaPartitioner<T> {

private static final long serialVersionUID = -9086719227828020494L;

/**
* Initializer for the partitioner. This is called once on each parallel sink instance of the
* Flink Kafka producer. This method should be overridden if necessary.
*
* @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink
* @param parallelInstances the total number of parallel instances
*/
public void open(int parallelInstanceId, int parallelInstances) {
// overwrite this method if needed.
}

/**
* Determine the id of the partition that the record should be written to.
*
* @param record the record value
* @param key serialized key of the record
* @param value serialized value of the record
* @param targetTopic target topic for the record
* @param partitions found partitions for the target topic
* @return the id of the target partition
*/
public abstract int partition(
T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
Expand All @@ -46,7 +46,7 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS

private final Set<String> topics;
private final Pattern topicPattern;
private final FlinkKafkaPartitioner<RowData> partitioner;
private final KafkaPartitioner<RowData> partitioner;
@Nullable private final SerializationSchema<RowData> keySerialization;
private final SerializationSchema<RowData> valueSerialization;
private final RowData.FieldGetter[] keyFieldGetters;
Expand All @@ -59,7 +59,7 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS
DynamicKafkaRecordSerializationSchema(
@Nullable List<String> topics,
@Nullable Pattern topicPattern,
@Nullable FlinkKafkaPartitioner<RowData> partitioner,
@Nullable KafkaPartitioner<RowData> partitioner,
@Nullable SerializationSchema<RowData> keySerialization,
SerializationSchema<RowData> valueSerialization,
RowData.FieldGetter[] keyFieldGetters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ValueFieldsStrategy;
Expand Down Expand Up @@ -386,7 +386,7 @@ public static Properties getKafkaProperties(Map<String, String> tableOptions) {
* The partitioner can be either "fixed", "round-robin" or a customized partitioner full class
* name.
*/
public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
public static Optional<KafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
ReadableConfig tableOptions, ClassLoader classLoader) {
return tableOptions
.getOptional(SINK_PARTITIONER)
Expand Down Expand Up @@ -465,19 +465,19 @@ private static boolean hasKafkaClientProperties(Map<String, String> tableOptions
}

/** Returns a class value with the given class name. */
private static <T> FlinkKafkaPartitioner<T> initializePartitioner(
private static <T> KafkaPartitioner<T> initializePartitioner(
String name, ClassLoader classLoader) {
try {
Class<?> clazz = Class.forName(name, true, classLoader);
if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
if (!KafkaPartitioner.class.isAssignableFrom(clazz)) {
throw new ValidationException(
String.format(
"Sink partitioner class '%s' should extend from the required class %s",
name, FlinkKafkaPartitioner.class.getName()));
"Sink partitioner class '%s' should implement the required class %s",
name, KafkaPartitioner.class.getName()));
}
@SuppressWarnings("unchecked")
final FlinkKafkaPartitioner<T> kafkaPartitioner =
InstantiationUtil.instantiate(name, FlinkKafkaPartitioner.class, classLoader);
final KafkaPartitioner<T> kafkaPartitioner =
InstantiationUtil.instantiate(name, KafkaPartitioner.class, classLoader);

return kafkaPartitioner;
} catch (ClassNotFoundException | FlinkException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
Expand Down Expand Up @@ -125,7 +125,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
protected final Properties properties;

/** Partitioner to select Kafka partition for each item. */
protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
protected final @Nullable KafkaPartitioner<RowData> partitioner;

/**
* Flag to determine sink mode. In upsert mode sink transforms the delete/update-before message
Expand All @@ -150,7 +150,7 @@ public KafkaDynamicSink(
@Nullable List<String> topics,
@Nullable Pattern topicPattern,
Properties properties,
@Nullable FlinkKafkaPartitioner<RowData> partitioner,
@Nullable KafkaPartitioner<RowData> partitioner,
DeliveryGuarantee deliveryGuarantee,
boolean upsertMode,
SinkBufferFlushMode flushMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
Expand Down Expand Up @@ -427,7 +427,7 @@ protected KafkaDynamicSink createKafkaTableSink(
@Nullable List<String> topics,
@Nullable Pattern topicPattern,
Properties properties,
FlinkKafkaPartitioner<RowData> partitioner,
KafkaPartitioner<RowData> partitioner,
DeliveryGuarantee deliveryGuarantee,
Integer parallelism,
@Nullable String transactionalIdPrefix) {
Expand Down

0 comments on commit fee54fa

Please sign in to comment.