Skip to content

Commit

Permalink
Fix compile errors
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanhang1993 committed Jan 19, 2024
1 parent 79573fe commit b47e7d6
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumState;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumStateSerializer;
Expand All @@ -42,6 +43,8 @@
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import javax.annotation.Nullable;

import java.util.Properties;

/**
Expand Down Expand Up @@ -87,6 +90,7 @@ public class DynamicKafkaSource<T>
private final OffsetsInitializer stoppingOffsetsInitializer;
private final Properties properties;
private final Boundedness boundedness;
@Nullable private final RecordEvaluator<T> eofRecordEvaluator;

DynamicKafkaSource(
KafkaStreamSubscriber kafkaStreamSubscriber,
Expand All @@ -95,14 +99,16 @@ public class DynamicKafkaSource<T>
OffsetsInitializer startingOffsetsInitializer,
OffsetsInitializer stoppingOffsetsInitializer,
Properties properties,
Boundedness boundedness) {
Boundedness boundedness,
@Nullable RecordEvaluator<T> eofRecordEvaluator) {
this.kafkaStreamSubscriber = kafkaStreamSubscriber;
this.deserializationSchema = deserializationSchema;
this.properties = properties;
this.kafkaMetadataService = kafkaMetadataService;
this.startingOffsetsInitializer = startingOffsetsInitializer;
this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
this.boundedness = boundedness;
this.eofRecordEvaluator = eofRecordEvaluator;
}

/**
Expand Down Expand Up @@ -134,7 +140,8 @@ public Boundedness getBoundedness() {
@Override
public SourceReader<T, DynamicKafkaSourceSplit> createReader(
SourceReaderContext readerContext) {
return new DynamicKafkaSourceReader<>(readerContext, deserializationSchema, properties);
return new DynamicKafkaSourceReader<>(
readerContext, deserializationSchema, properties, eofRecordEvaluator);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class DynamicKafkaSourceBuilder<T> {
private OffsetsInitializer stoppingOffsetsInitializer;
private Boundedness boundedness;
private final Properties props;
private RecordEvaluator<T> eofRecordEvaluator;

DynamicKafkaSourceBuilder() {
this.kafkaStreamSubscriber = null;
Expand Down Expand Up @@ -140,6 +142,18 @@ public DynamicKafkaSourceBuilder<T> setDeserializer(
return this;
}

/**
* Set the {@link RecordEvaluator}.
*
* @param eofRecordEvaluator the {@link RecordEvaluator}.
* @return the builder.
*/
public DynamicKafkaSourceBuilder<T> setEofRecordEvaluator(
RecordEvaluator<T> eofRecordEvaluator) {
this.eofRecordEvaluator = eofRecordEvaluator;
return this;
}

/**
* Set the starting offsets of the stream. This will be applied to all clusters.
*
Expand Down Expand Up @@ -217,7 +231,8 @@ public DynamicKafkaSource<T> build() {
startingOffsetsInitializer,
stoppingOffsetsInitializer,
props,
boundedness);
boundedness,
eofRecordEvaluator);
}

// Below are utility methods, code and structure are mostly copied over from KafkaSourceBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
Expand Down Expand Up @@ -54,6 +55,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -95,11 +98,13 @@ public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafka
private boolean isActivelyConsumingSplits;
private boolean isNoMoreSplits;
private AtomicBoolean restartingReaders;
@Nullable private final RecordEvaluator<T> eofRecordEvaluator;

public DynamicKafkaSourceReader(
SourceReaderContext readerContext,
KafkaRecordDeserializationSchema<T> deserializationSchema,
Properties properties) {
Properties properties,
@Nullable RecordEvaluator<T> eofRecordEvaluator) {
this.readerContext = readerContext;
this.clusterReaderMap = new TreeMap<>();
this.deserializationSchema = deserializationSchema;
Expand All @@ -116,6 +121,7 @@ public DynamicKafkaSourceReader(
this.isActivelyConsumingSplits = false;
this.restartingReaders = new AtomicBoolean();
this.clustersProperties = new HashMap<>();
this.eofRecordEvaluator = eofRecordEvaluator;
}

/**
Expand Down Expand Up @@ -448,7 +454,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
}
});

KafkaRecordEmitter<T> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
KafkaRecordEmitter<T> recordEmitter =
new KafkaRecordEmitter<>(deserializationSchema, eofRecordEvaluator);
return new KafkaSourceReader<>(
elementsQueue,
new KafkaSourceFetcherManager(
Expand All @@ -463,7 +470,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
recordEmitter,
toConfiguration(readerSpecificProperties),
readerContext,
kafkaSourceReaderMetrics);
kafkaSourceReaderMetrics,
eofRecordEvaluator);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ private DynamicKafkaSourceReader<Integer> createReaderWithoutStart(
return new DynamicKafkaSourceReader<>(
context,
KafkaRecordDeserializationSchema.valueOnly(IntegerDeserializer.class),
properties);
properties,
null);
}

private SourceReader<Integer, DynamicKafkaSourceSplit> startReader(
Expand Down

0 comments on commit b47e7d6

Please sign in to comment.