diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index 951579406..fd1028711 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -28,10 +28,10 @@ jobs: compile_and_test: strategy: matrix: - flink: [ 1.18.1 ] - jdk: [ '8, 11, 17' ] + flink: [ 1.19.1 ] + jdk: [ '8, 11, 17, 21' ] include: - - flink: 1.19.0 + - flink: 1.20.0 jdk: '8, 11, 17, 21' uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: @@ -40,7 +40,7 @@ jobs: python_test: strategy: matrix: - flink: [ 1.18.1, 1.19.0 ] + flink: [ 1.19.1, 1.20.0 ] uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} \ No newline at end of file diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml index 1caecd5c9..da4426dfd 100644 --- a/.github/workflows/weekly.yml +++ b/.github/workflows/weekly.yml @@ -30,34 +30,21 @@ jobs: strategy: matrix: flink_branches: [{ - flink: 1.18-SNAPSHOT, - branch: main - }, { flink: 1.19-SNAPSHOT, - jdk: '8, 11, 17, 21', branch: main }, { flink: 1.20-SNAPSHOT, - jdk: '8, 11, 17, 21', branch: main }, { - flink: 1.18.1, + flink: 1.19.1, branch: v3.2 }, { - flink: 1.19.0, - branch: v3.2, - jdk: '8, 11, 17, 21', - }, { - flink: 1.18.1, - branch: v3.1 - }, { - flink: 1.19.0, - branch: v3.1, - jdk: '8, 11, 17, 21', + flink: 1.20.0, + branch: v3.2 }] uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink_branches.flink }} connector_branch: ${{ matrix.flink_branches.branch }} - jdk_version: ${{ matrix.flink_branches.jdk || '8, 11, 17' }} + jdk_version: ${{ matrix.flink_branches.jdk || '8, 11, 17, 21' }} run_dependency_convergence: false \ No newline at end of file diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml index 4a10bdc9a..85d437413 100644 --- a/flink-connector-kafka/pom.xml +++ b/flink-connector-kafka/pom.xml @@ -39,7 +39,8 @@ under the License. FlinkKafkaProducerBaseTest --> --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + KafkaProducerExactlyOnceITCase --> --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + @@ -81,10 +82,10 @@ under the License. ${kafka.version} - - com.google.guava - guava - + + com.google.guava + guava + @@ -211,6 +212,7 @@ under the License. ${flink.version} test + org.apache.flink flink-table-planner_${scala.binary.version} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java index 296545cad..47bce8bd9 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -92,7 +92,7 @@ public class FlinkKafkaConsumerBaseMigrationTest { @Parameterized.Parameters(name = "Migration Savepoint: {0}") public static Collection parameters() { - return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_16); + return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.current()); } public FlinkKafkaConsumerBaseMigrationTest(FlinkVersion testMigrateVersion) { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java index 8a413f423..10732c949 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java @@ -42,7 +42,7 @@ public class FlinkKafkaProducerMigrationTest extends KafkaMigrationTestBase { @Parameterized.Parameters(name = "Migration Savepoint: {0}") public static Collection parameters() { - return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_16); + return FlinkVersion.rangeOf(FlinkVersion.v1_12, FlinkVersion.current()); } public FlinkKafkaProducerMigrationTest(FlinkVersion testMigrateVersion) { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerUpgradeTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerUpgradeTest.java index c9e82be5d..90ce2e5eb 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerUpgradeTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerUpgradeTest.java @@ -20,9 +20,9 @@ import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerMatchers; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer; +import org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerMatchers; import org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerUpgradeTestBase; import org.hamcrest.Matcher; diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java index e8bc9e373..632b74ac9 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java @@ -29,7 +29,6 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.Before; @@ -65,7 +64,6 @@ public void testKafkaDebeziumChangelogSource() throws Exception { tableConf.set( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)); tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); - tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); // ---------- Write the Debezium json into Kafka ------------------- List lines = readLines("debezium-data-schema-exclude.txt"); @@ -194,7 +192,6 @@ public void testKafkaCanalChangelogSource() throws Exception { tableConf.set( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)); tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); - tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); // ---------- Write the Canal json into Kafka ------------------- List lines = readLines("canal-data.txt"); @@ -335,7 +332,6 @@ public void testKafkaMaxwellChangelogSource() throws Exception { tableConf.set( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)); tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); - tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); // ---------- Write the Maxwell json into Kafka ------------------- List lines = readLines("maxwell-data.txt"); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThreadContextClassLoader.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThreadContextClassLoader.java new file mode 100644 index 000000000..cdaec31c1 --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThreadContextClassLoader.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import java.io.Closeable; + +/** + * Utility class to temporarily use a different classloader as the thread context classloader. + * + *

Temporarily copied from flink-core to avoid dependency on flink-core. + */ +public class ThreadContextClassLoader implements Closeable { + + private final ClassLoader originalThreadContextClassLoader; + + public ThreadContextClassLoader(ClassLoader newThreadContextClassLoader) { + this.originalThreadContextClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(newThreadContextClassLoader); + } + + @Override + public void close() { + Thread.currentThread().setContextClassLoader(originalThreadContextClassLoader); + } +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerMatchers.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerMatchers.java new file mode 100644 index 000000000..08eef897b --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerMatchers.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; + +import org.hamcrest.CoreMatchers; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.hamcrest.TypeSafeMatcher; + +import java.util.function.Predicate; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A Collection of useful {@link Matcher}s for {@link TypeSerializer} and {@link + * TypeSerializerSchemaCompatibility}. + * + *

Note copied from Flink 1.18. Remove when we drop 1.18 support. + */ +public final class TypeSerializerMatchers { + + private TypeSerializerMatchers() {} + + // ------------------------------------------------------------------------------------------------------------- + // Matcher Factories + // ------------------------------------------------------------------------------------------------------------- + + /** + * Matches {@code compatibleAsIs} {@link TypeSerializerSchemaCompatibility}. + * + * @param element type + * @return a {@code Matcher} that matches {@code compatibleAsIs} {@link + * TypeSerializerSchemaCompatibility}. + */ + public static Matcher> isCompatibleAsIs() { + return propertyMatcher( + TypeSerializerSchemaCompatibility::isCompatibleAsIs, + "type serializer schema that is a compatible as is"); + } + + /** + * Matches {@code isIncompatible} {@link TypeSerializerSchemaCompatibility}. + * + * @param element type + * @return a {@code Matcher} that matches {@code isIncompatible} {@link + * TypeSerializerSchemaCompatibility}. + */ + public static Matcher> isIncompatible() { + return propertyMatcher( + TypeSerializerSchemaCompatibility::isIncompatible, + "type serializer schema that is incompatible"); + } + + /** + * Matches {@code isCompatibleAfterMigration} {@link TypeSerializerSchemaCompatibility}. + * + * @param element type + * @return a {@code Matcher} that matches {@code isCompatibleAfterMigration} {@link + * TypeSerializerSchemaCompatibility}. + */ + public static Matcher> isCompatibleAfterMigration() { + return propertyMatcher( + TypeSerializerSchemaCompatibility::isCompatibleAfterMigration, + "type serializer schema that is compatible after migration"); + } + + /** + * Matches {@code isCompatibleWithReconfiguredSerializer} {@link + * TypeSerializerSchemaCompatibility}. + * + * @param element type + * @return a {@code Matcher} that matches {@code isCompatibleWithReconfiguredSerializer} {@link + * TypeSerializerSchemaCompatibility}. + */ + public static + Matcher> isCompatibleWithReconfiguredSerializer() { + @SuppressWarnings("unchecked") + Matcher> anything = + (Matcher>) (Matcher) CoreMatchers.anything(); + + return new CompatibleAfterReconfiguration<>(anything); + } + + /** + * Matches {@code isCompatibleWithReconfiguredSerializer} {@link + * TypeSerializerSchemaCompatibility}. + * + * @param reconfiguredSerializerMatcher matches the reconfigured serializer. + * @param element type + * @return a {@code Matcher} that matches {@code isCompatibleWithReconfiguredSerializer} {@link + * TypeSerializerSchemaCompatibility}. + */ + public static + Matcher> isCompatibleWithReconfiguredSerializer( + Matcher> reconfiguredSerializerMatcher) { + + return new CompatibleAfterReconfiguration<>(reconfiguredSerializerMatcher); + } + + /** + * Matches if the expected {@code TypeSerializerSchemaCompatibility} has the same compatibility + * as {@code expectedCompatibility}. + * + * @param expectedCompatibility the compatibility to match to. + * @param element type. + * @return a {@code Matcher} that matches if it has the same compatibility as {@code + * expectedCompatibility}. + */ + public static Matcher> hasSameCompatibilityAs( + TypeSerializerSchemaCompatibility expectedCompatibility) { + + return new SchemaCompatibilitySameAs<>(expectedCompatibility); + } + + // ------------------------------------------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------------------------------------------- + + private static Matcher propertyMatcher( + Predicate predicate, String matcherDescription) { + return new TypeSafeMatcher() { + + @Override + protected boolean matchesSafely(T item) { + return predicate.test(item); + } + + @Override + public void describeTo(Description description) { + description.appendText(matcherDescription); + } + }; + } + + // ------------------------------------------------------------------------------------------------------------- + // Matchers + // ------------------------------------------------------------------------------------------------------------- + + private static final class CompatibleAfterReconfiguration + extends TypeSafeDiagnosingMatcher> { + + private final Matcher> reconfiguredSerializerMatcher; + + private CompatibleAfterReconfiguration( + Matcher> reconfiguredSerializerMatcher) { + this.reconfiguredSerializerMatcher = checkNotNull(reconfiguredSerializerMatcher); + } + + @Override + protected boolean matchesSafely( + TypeSerializerSchemaCompatibility item, Description mismatchDescription) { + if (!item.isCompatibleWithReconfiguredSerializer()) { + mismatchDescription.appendText( + "serializer schema is not compatible with a reconfigured serializer"); + return false; + } + TypeSerializer reconfiguredSerializer = item.getReconfiguredSerializer(); + if (!reconfiguredSerializerMatcher.matches(reconfiguredSerializer)) { + reconfiguredSerializerMatcher.describeMismatch( + reconfiguredSerializer, mismatchDescription); + return false; + } + return true; + } + + @Override + public void describeTo(Description description) { + description + .appendText("type serializer schema that is compatible after reconfiguration,") + .appendText("with a reconfigured serializer matching ") + .appendDescriptionOf(reconfiguredSerializerMatcher); + } + } + + private static class SchemaCompatibilitySameAs + extends TypeSafeMatcher> { + + private final TypeSerializerSchemaCompatibility expectedCompatibility; + + private SchemaCompatibilitySameAs( + TypeSerializerSchemaCompatibility expectedCompatibility) { + this.expectedCompatibility = checkNotNull(expectedCompatibility); + } + + @Override + protected boolean matchesSafely( + TypeSerializerSchemaCompatibility testResultCompatibility) { + if (expectedCompatibility.isCompatibleAsIs()) { + return testResultCompatibility.isCompatibleAsIs(); + } else if (expectedCompatibility.isIncompatible()) { + return testResultCompatibility.isIncompatible(); + } else if (expectedCompatibility.isCompatibleAfterMigration()) { + return testResultCompatibility.isCompatibleAfterMigration(); + } else if (expectedCompatibility.isCompatibleWithReconfiguredSerializer()) { + return testResultCompatibility.isCompatibleWithReconfiguredSerializer(); + } + return false; + } + + @Override + public void describeTo(Description description) { + description.appendText("same compatibility as ").appendValue(expectedCompatibility); + } + } +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase.java index 90e218382..ab3baadef 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase.java @@ -20,9 +20,7 @@ import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.typeutils.ClassRelocator; -import org.apache.flink.api.common.typeutils.ThreadContextClassLoader; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerMatchers; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; @@ -61,7 +59,7 @@ @TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class TypeSerializerUpgradeTestBase { - public static final FlinkVersion CURRENT_VERSION = FlinkVersion.v1_17; + public static final FlinkVersion CURRENT_VERSION = FlinkVersion.current(); public static final Set MIGRATION_VERSIONS = FlinkVersion.rangeOf(FlinkVersion.v1_11, CURRENT_VERSION); @@ -136,9 +134,6 @@ public TypeSerializer createPriorSerializer() { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(setupClassloader)) { return delegateSetup.createPriorSerializer(); - } catch (IOException e) { - throw new RuntimeException( - "Error creating prior serializer via ClassLoaderSafePreUpgradeSetup.", e); } } @@ -147,9 +142,6 @@ public PreviousElementT createTestData() { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(setupClassloader)) { return delegateSetup.createTestData(); - } catch (IOException e) { - throw new RuntimeException( - "Error creating test data via ThreadContextClassLoader.", e); } } } @@ -179,10 +171,6 @@ public TypeSerializer createUpgradedSerializer() { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) { return delegateVerifier.createUpgradedSerializer(); - } catch (IOException e) { - throw new RuntimeException( - "Error creating upgraded serializer via ClassLoaderSafeUpgradeVerifier.", - e); } } @@ -191,9 +179,6 @@ public Matcher testDataMatcher() { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) { return delegateVerifier.testDataMatcher(); - } catch (IOException e) { - throw new RuntimeException( - "Error creating expected test data via ClassLoaderSafeUpgradeVerifier.", e); } } @@ -203,10 +188,6 @@ public Matcher testDataMatcher() { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) { return delegateVerifier.schemaCompatibilityMatcher(version); - } catch (IOException e) { - throw new RuntimeException( - "Error creating schema compatibility matcher via ClassLoaderSafeUpgradeVerifier.", - e); } } } diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.11/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.11/test-data index 3efe488b1..d13f52b8c 100644 Binary files a/flink-connector-kafka/src/test/resources/context-state-serializer-1.11/test-data and b/flink-connector-kafka/src/test/resources/context-state-serializer-1.11/test-data differ diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.12/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.12/test-data index 3efe488b1..d13f52b8c 100644 Binary files a/flink-connector-kafka/src/test/resources/context-state-serializer-1.12/test-data and b/flink-connector-kafka/src/test/resources/context-state-serializer-1.12/test-data differ diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.13/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.13/test-data index 3efe488b1..d13f52b8c 100644 Binary files a/flink-connector-kafka/src/test/resources/context-state-serializer-1.13/test-data and b/flink-connector-kafka/src/test/resources/context-state-serializer-1.13/test-data differ diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.14/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.14/test-data index 3efe488b1..d13f52b8c 100644 Binary files a/flink-connector-kafka/src/test/resources/context-state-serializer-1.14/test-data and b/flink-connector-kafka/src/test/resources/context-state-serializer-1.14/test-data differ diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.15/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.15/test-data index 3efe488b1..d13f52b8c 100644 Binary files a/flink-connector-kafka/src/test/resources/context-state-serializer-1.15/test-data and b/flink-connector-kafka/src/test/resources/context-state-serializer-1.15/test-data differ diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.16/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.16/test-data index 3efe488b1..d13f52b8c 100644 Binary files a/flink-connector-kafka/src/test/resources/context-state-serializer-1.16/test-data and b/flink-connector-kafka/src/test/resources/context-state-serializer-1.16/test-data differ diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.18/serializer-snapshot b/flink-connector-kafka/src/test/resources/context-state-serializer-1.18/serializer-snapshot new file mode 100644 index 000000000..85a71dc6b Binary files /dev/null and b/flink-connector-kafka/src/test/resources/context-state-serializer-1.18/serializer-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.18/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.18/test-data new file mode 100644 index 000000000..3efe488b1 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/context-state-serializer-1.18/test-data differ diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.19/serializer-snapshot b/flink-connector-kafka/src/test/resources/context-state-serializer-1.19/serializer-snapshot new file mode 100644 index 000000000..85a71dc6b Binary files /dev/null and b/flink-connector-kafka/src/test/resources/context-state-serializer-1.19/serializer-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.19/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.19/test-data new file mode 100644 index 000000000..3efe488b1 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/context-state-serializer-1.19/test-data differ diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.20/serializer-snapshot b/flink-connector-kafka/src/test/resources/context-state-serializer-1.20/serializer-snapshot new file mode 100644 index 000000000..85a71dc6b Binary files /dev/null and b/flink-connector-kafka/src/test/resources/context-state-serializer-1.20/serializer-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.20/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.20/test-data new file mode 100644 index 000000000..3efe488b1 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/context-state-serializer-1.20/test-data differ diff --git a/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.17-empty-state-snapshot b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.17-empty-state-snapshot new file mode 100644 index 000000000..79ca38871 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.17-empty-state-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.17-snapshot b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.17-snapshot new file mode 100644 index 000000000..08f1f7da1 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.17-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.18-empty-state-snapshot b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.18-empty-state-snapshot new file mode 100644 index 000000000..0fc1a5555 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.18-empty-state-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.18-snapshot b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.18-snapshot new file mode 100644 index 000000000..36381498b Binary files /dev/null and b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.18-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.19-empty-state-snapshot b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.19-empty-state-snapshot new file mode 100644 index 000000000..d9bd99d5a Binary files /dev/null and b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.19-empty-state-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.19-snapshot b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.19-snapshot new file mode 100644 index 000000000..ca2179cac Binary files /dev/null and b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.19-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.20-empty-state-snapshot b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.20-empty-state-snapshot new file mode 100644 index 000000000..78ba15781 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.20-empty-state-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.20-snapshot b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.20-snapshot new file mode 100644 index 000000000..995b70e58 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.20-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.17-snapshot b/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.17-snapshot new file mode 100644 index 000000000..6d5d487df Binary files /dev/null and b/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.17-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.18-snapshot b/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.18-snapshot new file mode 100644 index 000000000..68c9d6326 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.18-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.19-snapshot b/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.19-snapshot new file mode 100644 index 000000000..03b1a24a1 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.19-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.20-snapshot b/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.20-snapshot new file mode 100644 index 000000000..a6a147cae Binary files /dev/null and b/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.20-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.18/serializer-snapshot b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.18/serializer-snapshot new file mode 100644 index 000000000..b95eacb63 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.18/serializer-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.18/test-data b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.18/test-data new file mode 100644 index 000000000..0936509e9 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.18/test-data differ diff --git a/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.19/serializer-snapshot b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.19/serializer-snapshot new file mode 100644 index 000000000..b95eacb63 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.19/serializer-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.19/test-data b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.19/test-data new file mode 100644 index 000000000..0936509e9 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.19/test-data differ diff --git a/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.20/serializer-snapshot b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.20/serializer-snapshot new file mode 100644 index 000000000..b95eacb63 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.20/serializer-snapshot differ diff --git a/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.20/test-data b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.20/test-data new file mode 100644 index 000000000..0936509e9 Binary files /dev/null and b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.20/test-data differ diff --git a/pom.xml b/pom.xml index 3fc01c888..c1a7d624c 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ under the License. - 1.18.0 + 1.19.1 3.6.2 7.4.4