From f9597b7b3e3051c574c36dfbb9a18b6f9dd2ffb0 Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Mon, 30 Sep 2024 14:33:46 +0200 Subject: [PATCH] [FLINK-35109] Bump to Flink 1.19 and support Flink 1.20 Also - adds the migration support tests up to 1.20. - bumps Kafka-client to 3.6.2 --- .github/workflows/push_pr.yml | 8 +- .github/workflows/weekly.yml | 21 +- flink-connector-kafka/pom.xml | 12 +- .../FlinkKafkaConsumerBaseMigrationTest.java | 2 +- .../FlinkKafkaProducerMigrationTest.java | 2 +- .../kafka/KafkaSerializerUpgradeTest.java | 2 +- .../table/KafkaChangelogTableITCase.java | 4 - .../testutils/ThreadContextClassLoader.java | 41 ++++ .../testutils/TypeSerializerMatchers.java | 225 ++++++++++++++++++ .../TypeSerializerUpgradeTestBase.java | 21 +- .../context-state-serializer-1.11/test-data | Bin 19 -> 19 bytes .../context-state-serializer-1.12/test-data | Bin 19 -> 19 bytes .../context-state-serializer-1.13/test-data | Bin 19 -> 19 bytes .../context-state-serializer-1.14/test-data | Bin 19 -> 19 bytes .../context-state-serializer-1.15/test-data | Bin 19 -> 19 bytes .../context-state-serializer-1.16/test-data | Bin 19 -> 19 bytes .../serializer-snapshot | Bin 0 -> 126 bytes .../context-state-serializer-1.18/test-data | Bin 0 -> 19 bytes .../serializer-snapshot | Bin 0 -> 126 bytes .../context-state-serializer-1.19/test-data | Bin 0 -> 19 bytes .../serializer-snapshot | Bin 0 -> 126 bytes .../context-state-serializer-1.20/test-data | Bin 0 -> 19 bytes ...ration-test-flink1.17-empty-state-snapshot | Bin 0 -> 1465 bytes ...consumer-migration-test-flink1.17-snapshot | Bin 0 -> 1519 bytes ...ration-test-flink1.18-empty-state-snapshot | Bin 0 -> 1466 bytes ...consumer-migration-test-flink1.18-snapshot | Bin 0 -> 1520 bytes ...ration-test-flink1.19-empty-state-snapshot | Bin 0 -> 1466 bytes ...consumer-migration-test-flink1.19-snapshot | Bin 0 -> 1520 bytes ...ration-test-flink1.20-empty-state-snapshot | Bin 0 -> 1466 bytes ...consumer-migration-test-flink1.20-snapshot | Bin 0 -> 1520 bytes ...gration-kafka-producer-flink-1.17-snapshot | Bin 0 -> 1242 bytes ...gration-kafka-producer-flink-1.18-snapshot | Bin 0 -> 1243 bytes ...gration-kafka-producer-flink-1.19-snapshot | Bin 0 -> 1243 bytes ...gration-kafka-producer-flink-1.20-snapshot | Bin 0 -> 1243 bytes .../serializer-snapshot | Bin 0 -> 134 bytes .../test-data | Bin 0 -> 17 bytes .../serializer-snapshot | Bin 0 -> 134 bytes .../test-data | Bin 0 -> 17 bytes .../serializer-snapshot | Bin 0 -> 134 bytes .../test-data | Bin 0 -> 17 bytes pom.xml | 2 +- 41 files changed, 286 insertions(+), 54 deletions(-) create mode 100644 flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThreadContextClassLoader.java create mode 100644 flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerMatchers.java create mode 100644 flink-connector-kafka/src/test/resources/context-state-serializer-1.18/serializer-snapshot create mode 100644 flink-connector-kafka/src/test/resources/context-state-serializer-1.18/test-data create mode 100644 flink-connector-kafka/src/test/resources/context-state-serializer-1.19/serializer-snapshot create mode 100644 flink-connector-kafka/src/test/resources/context-state-serializer-1.19/test-data create mode 100644 flink-connector-kafka/src/test/resources/context-state-serializer-1.20/serializer-snapshot create mode 100644 flink-connector-kafka/src/test/resources/context-state-serializer-1.20/test-data create mode 100644 flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.17-empty-state-snapshot create mode 100644 flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.17-snapshot create mode 100644 flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.18-empty-state-snapshot create mode 100644 flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.18-snapshot create mode 100644 flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.19-empty-state-snapshot create mode 100644 flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.19-snapshot create mode 100644 flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.20-empty-state-snapshot create mode 100644 flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.20-snapshot create mode 100644 flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.17-snapshot create mode 100644 flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.18-snapshot create mode 100644 flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.19-snapshot create mode 100644 flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.20-snapshot create mode 100644 flink-connector-kafka/src/test/resources/transaction-state-serializer-1.18/serializer-snapshot create mode 100644 flink-connector-kafka/src/test/resources/transaction-state-serializer-1.18/test-data create mode 100644 flink-connector-kafka/src/test/resources/transaction-state-serializer-1.19/serializer-snapshot create mode 100644 flink-connector-kafka/src/test/resources/transaction-state-serializer-1.19/test-data create mode 100644 flink-connector-kafka/src/test/resources/transaction-state-serializer-1.20/serializer-snapshot create mode 100644 flink-connector-kafka/src/test/resources/transaction-state-serializer-1.20/test-data diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index 951579406..fd1028711 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -28,10 +28,10 @@ jobs: compile_and_test: strategy: matrix: - flink: [ 1.18.1 ] - jdk: [ '8, 11, 17' ] + flink: [ 1.19.1 ] + jdk: [ '8, 11, 17, 21' ] include: - - flink: 1.19.0 + - flink: 1.20.0 jdk: '8, 11, 17, 21' uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: @@ -40,7 +40,7 @@ jobs: python_test: strategy: matrix: - flink: [ 1.18.1, 1.19.0 ] + flink: [ 1.19.1, 1.20.0 ] uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} \ No newline at end of file diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml index 1caecd5c9..da4426dfd 100644 --- a/.github/workflows/weekly.yml +++ b/.github/workflows/weekly.yml @@ -30,34 +30,21 @@ jobs: strategy: matrix: flink_branches: [{ - flink: 1.18-SNAPSHOT, - branch: main - }, { flink: 1.19-SNAPSHOT, - jdk: '8, 11, 17, 21', branch: main }, { flink: 1.20-SNAPSHOT, - jdk: '8, 11, 17, 21', branch: main }, { - flink: 1.18.1, + flink: 1.19.1, branch: v3.2 }, { - flink: 1.19.0, - branch: v3.2, - jdk: '8, 11, 17, 21', - }, { - flink: 1.18.1, - branch: v3.1 - }, { - flink: 1.19.0, - branch: v3.1, - jdk: '8, 11, 17, 21', + flink: 1.20.0, + branch: v3.2 }] uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink_branches.flink }} connector_branch: ${{ matrix.flink_branches.branch }} - jdk_version: ${{ matrix.flink_branches.jdk || '8, 11, 17' }} + jdk_version: ${{ matrix.flink_branches.jdk || '8, 11, 17, 21' }} run_dependency_convergence: false \ No newline at end of file diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml index 4a10bdc9a..85d437413 100644 --- a/flink-connector-kafka/pom.xml +++ b/flink-connector-kafka/pom.xml @@ -39,7 +39,8 @@ under the License. FlinkKafkaProducerBaseTest --> --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + KafkaProducerExactlyOnceITCase --> --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + @@ -81,10 +82,10 @@ under the License. ${kafka.version} - - com.google.guava - guava - + + com.google.guava + guava + @@ -211,6 +212,7 @@ under the License. ${flink.version} test + org.apache.flink flink-table-planner_${scala.binary.version} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java index 296545cad..47bce8bd9 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -92,7 +92,7 @@ public class FlinkKafkaConsumerBaseMigrationTest { @Parameterized.Parameters(name = "Migration Savepoint: {0}") public static Collection parameters() { - return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_16); + return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.current()); } public FlinkKafkaConsumerBaseMigrationTest(FlinkVersion testMigrateVersion) { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java index 8a413f423..10732c949 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java @@ -42,7 +42,7 @@ public class FlinkKafkaProducerMigrationTest extends KafkaMigrationTestBase { @Parameterized.Parameters(name = "Migration Savepoint: {0}") public static Collection parameters() { - return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_16); + return FlinkVersion.rangeOf(FlinkVersion.v1_12, FlinkVersion.current()); } public FlinkKafkaProducerMigrationTest(FlinkVersion testMigrateVersion) { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerUpgradeTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerUpgradeTest.java index c9e82be5d..90ce2e5eb 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerUpgradeTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerUpgradeTest.java @@ -20,9 +20,9 @@ import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerMatchers; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer; +import org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerMatchers; import org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerUpgradeTestBase; import org.hamcrest.Matcher; diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java index e8bc9e373..632b74ac9 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java @@ -29,7 +29,6 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.Before; @@ -65,7 +64,6 @@ public void testKafkaDebeziumChangelogSource() throws Exception { tableConf.set( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)); tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); - tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); // ---------- Write the Debezium json into Kafka ------------------- List lines = readLines("debezium-data-schema-exclude.txt"); @@ -194,7 +192,6 @@ public void testKafkaCanalChangelogSource() throws Exception { tableConf.set( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)); tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); - tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); // ---------- Write the Canal json into Kafka ------------------- List lines = readLines("canal-data.txt"); @@ -335,7 +332,6 @@ public void testKafkaMaxwellChangelogSource() throws Exception { tableConf.set( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)); tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); - tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); // ---------- Write the Maxwell json into Kafka ------------------- List lines = readLines("maxwell-data.txt"); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThreadContextClassLoader.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThreadContextClassLoader.java new file mode 100644 index 000000000..cdaec31c1 --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThreadContextClassLoader.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import java.io.Closeable; + +/** + * Utility class to temporarily use a different classloader as the thread context classloader. + * + *

Temporarily copied from flink-core to avoid dependency on flink-core. + */ +public class ThreadContextClassLoader implements Closeable { + + private final ClassLoader originalThreadContextClassLoader; + + public ThreadContextClassLoader(ClassLoader newThreadContextClassLoader) { + this.originalThreadContextClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(newThreadContextClassLoader); + } + + @Override + public void close() { + Thread.currentThread().setContextClassLoader(originalThreadContextClassLoader); + } +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerMatchers.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerMatchers.java new file mode 100644 index 000000000..08eef897b --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerMatchers.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; + +import org.hamcrest.CoreMatchers; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.hamcrest.TypeSafeMatcher; + +import java.util.function.Predicate; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A Collection of useful {@link Matcher}s for {@link TypeSerializer} and {@link + * TypeSerializerSchemaCompatibility}. + * + *

Note copied from Flink 1.18. Remove when we drop 1.18 support. + */ +public final class TypeSerializerMatchers { + + private TypeSerializerMatchers() {} + + // ------------------------------------------------------------------------------------------------------------- + // Matcher Factories + // ------------------------------------------------------------------------------------------------------------- + + /** + * Matches {@code compatibleAsIs} {@link TypeSerializerSchemaCompatibility}. + * + * @param element type + * @return a {@code Matcher} that matches {@code compatibleAsIs} {@link + * TypeSerializerSchemaCompatibility}. + */ + public static Matcher> isCompatibleAsIs() { + return propertyMatcher( + TypeSerializerSchemaCompatibility::isCompatibleAsIs, + "type serializer schema that is a compatible as is"); + } + + /** + * Matches {@code isIncompatible} {@link TypeSerializerSchemaCompatibility}. + * + * @param element type + * @return a {@code Matcher} that matches {@code isIncompatible} {@link + * TypeSerializerSchemaCompatibility}. + */ + public static Matcher> isIncompatible() { + return propertyMatcher( + TypeSerializerSchemaCompatibility::isIncompatible, + "type serializer schema that is incompatible"); + } + + /** + * Matches {@code isCompatibleAfterMigration} {@link TypeSerializerSchemaCompatibility}. + * + * @param element type + * @return a {@code Matcher} that matches {@code isCompatibleAfterMigration} {@link + * TypeSerializerSchemaCompatibility}. + */ + public static Matcher> isCompatibleAfterMigration() { + return propertyMatcher( + TypeSerializerSchemaCompatibility::isCompatibleAfterMigration, + "type serializer schema that is compatible after migration"); + } + + /** + * Matches {@code isCompatibleWithReconfiguredSerializer} {@link + * TypeSerializerSchemaCompatibility}. + * + * @param element type + * @return a {@code Matcher} that matches {@code isCompatibleWithReconfiguredSerializer} {@link + * TypeSerializerSchemaCompatibility}. + */ + public static + Matcher> isCompatibleWithReconfiguredSerializer() { + @SuppressWarnings("unchecked") + Matcher> anything = + (Matcher>) (Matcher) CoreMatchers.anything(); + + return new CompatibleAfterReconfiguration<>(anything); + } + + /** + * Matches {@code isCompatibleWithReconfiguredSerializer} {@link + * TypeSerializerSchemaCompatibility}. + * + * @param reconfiguredSerializerMatcher matches the reconfigured serializer. + * @param element type + * @return a {@code Matcher} that matches {@code isCompatibleWithReconfiguredSerializer} {@link + * TypeSerializerSchemaCompatibility}. + */ + public static + Matcher> isCompatibleWithReconfiguredSerializer( + Matcher> reconfiguredSerializerMatcher) { + + return new CompatibleAfterReconfiguration<>(reconfiguredSerializerMatcher); + } + + /** + * Matches if the expected {@code TypeSerializerSchemaCompatibility} has the same compatibility + * as {@code expectedCompatibility}. + * + * @param expectedCompatibility the compatibility to match to. + * @param element type. + * @return a {@code Matcher} that matches if it has the same compatibility as {@code + * expectedCompatibility}. + */ + public static Matcher> hasSameCompatibilityAs( + TypeSerializerSchemaCompatibility expectedCompatibility) { + + return new SchemaCompatibilitySameAs<>(expectedCompatibility); + } + + // ------------------------------------------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------------------------------------------- + + private static Matcher propertyMatcher( + Predicate predicate, String matcherDescription) { + return new TypeSafeMatcher() { + + @Override + protected boolean matchesSafely(T item) { + return predicate.test(item); + } + + @Override + public void describeTo(Description description) { + description.appendText(matcherDescription); + } + }; + } + + // ------------------------------------------------------------------------------------------------------------- + // Matchers + // ------------------------------------------------------------------------------------------------------------- + + private static final class CompatibleAfterReconfiguration + extends TypeSafeDiagnosingMatcher> { + + private final Matcher> reconfiguredSerializerMatcher; + + private CompatibleAfterReconfiguration( + Matcher> reconfiguredSerializerMatcher) { + this.reconfiguredSerializerMatcher = checkNotNull(reconfiguredSerializerMatcher); + } + + @Override + protected boolean matchesSafely( + TypeSerializerSchemaCompatibility item, Description mismatchDescription) { + if (!item.isCompatibleWithReconfiguredSerializer()) { + mismatchDescription.appendText( + "serializer schema is not compatible with a reconfigured serializer"); + return false; + } + TypeSerializer reconfiguredSerializer = item.getReconfiguredSerializer(); + if (!reconfiguredSerializerMatcher.matches(reconfiguredSerializer)) { + reconfiguredSerializerMatcher.describeMismatch( + reconfiguredSerializer, mismatchDescription); + return false; + } + return true; + } + + @Override + public void describeTo(Description description) { + description + .appendText("type serializer schema that is compatible after reconfiguration,") + .appendText("with a reconfigured serializer matching ") + .appendDescriptionOf(reconfiguredSerializerMatcher); + } + } + + private static class SchemaCompatibilitySameAs + extends TypeSafeMatcher> { + + private final TypeSerializerSchemaCompatibility expectedCompatibility; + + private SchemaCompatibilitySameAs( + TypeSerializerSchemaCompatibility expectedCompatibility) { + this.expectedCompatibility = checkNotNull(expectedCompatibility); + } + + @Override + protected boolean matchesSafely( + TypeSerializerSchemaCompatibility testResultCompatibility) { + if (expectedCompatibility.isCompatibleAsIs()) { + return testResultCompatibility.isCompatibleAsIs(); + } else if (expectedCompatibility.isIncompatible()) { + return testResultCompatibility.isIncompatible(); + } else if (expectedCompatibility.isCompatibleAfterMigration()) { + return testResultCompatibility.isCompatibleAfterMigration(); + } else if (expectedCompatibility.isCompatibleWithReconfiguredSerializer()) { + return testResultCompatibility.isCompatibleWithReconfiguredSerializer(); + } + return false; + } + + @Override + public void describeTo(Description description) { + description.appendText("same compatibility as ").appendValue(expectedCompatibility); + } + } +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase.java index 90e218382..ab3baadef 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase.java @@ -20,9 +20,7 @@ import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.typeutils.ClassRelocator; -import org.apache.flink.api.common.typeutils.ThreadContextClassLoader; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerMatchers; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; @@ -61,7 +59,7 @@ @TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class TypeSerializerUpgradeTestBase { - public static final FlinkVersion CURRENT_VERSION = FlinkVersion.v1_17; + public static final FlinkVersion CURRENT_VERSION = FlinkVersion.current(); public static final Set MIGRATION_VERSIONS = FlinkVersion.rangeOf(FlinkVersion.v1_11, CURRENT_VERSION); @@ -136,9 +134,6 @@ public TypeSerializer createPriorSerializer() { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(setupClassloader)) { return delegateSetup.createPriorSerializer(); - } catch (IOException e) { - throw new RuntimeException( - "Error creating prior serializer via ClassLoaderSafePreUpgradeSetup.", e); } } @@ -147,9 +142,6 @@ public PreviousElementT createTestData() { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(setupClassloader)) { return delegateSetup.createTestData(); - } catch (IOException e) { - throw new RuntimeException( - "Error creating test data via ThreadContextClassLoader.", e); } } } @@ -179,10 +171,6 @@ public TypeSerializer createUpgradedSerializer() { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) { return delegateVerifier.createUpgradedSerializer(); - } catch (IOException e) { - throw new RuntimeException( - "Error creating upgraded serializer via ClassLoaderSafeUpgradeVerifier.", - e); } } @@ -191,9 +179,6 @@ public Matcher testDataMatcher() { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) { return delegateVerifier.testDataMatcher(); - } catch (IOException e) { - throw new RuntimeException( - "Error creating expected test data via ClassLoaderSafeUpgradeVerifier.", e); } } @@ -203,10 +188,6 @@ public Matcher testDataMatcher() { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) { return delegateVerifier.schemaCompatibilityMatcher(version); - } catch (IOException e) { - throw new RuntimeException( - "Error creating schema compatibility matcher via ClassLoaderSafeUpgradeVerifier.", - e); } } } diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.11/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.11/test-data index 3efe488b18be8cab3ffc86511ae5a20321effd7f..d13f52b8c771029793a85c406fc90f3d1380cb6b 100644 GIT binary patch literal 19 acmZQzU|?ooHn*^3U^X!|V_-HkG6nz*^8xAr literal 19 acmZQzU|?ooHZ(G3U^X!|V_-J7umk`NKLP3h diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.12/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.12/test-data index 3efe488b18be8cab3ffc86511ae5a20321effd7f..d13f52b8c771029793a85c406fc90f3d1380cb6b 100644 GIT binary patch literal 19 acmZQzU|?ooHn*^3U^X!|V_-HkG6nz*^8xAr literal 19 acmZQzU|?ooHZ(G3U^X!|V_-J7umk`NKLP3h diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.13/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.13/test-data index 3efe488b18be8cab3ffc86511ae5a20321effd7f..d13f52b8c771029793a85c406fc90f3d1380cb6b 100644 GIT binary patch literal 19 acmZQzU|?ooHn*^3U^X!|V_-HkG6nz*^8xAr literal 19 acmZQzU|?ooHZ(G3U^X!|V_-J7umk`NKLP3h diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.14/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.14/test-data index 3efe488b18be8cab3ffc86511ae5a20321effd7f..d13f52b8c771029793a85c406fc90f3d1380cb6b 100644 GIT binary patch literal 19 acmZQzU|?ooHn*^3U^X!|V_-HkG6nz*^8xAr literal 19 acmZQzU|?ooHZ(G3U^X!|V_-J7umk`NKLP3h diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.15/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.15/test-data index 3efe488b18be8cab3ffc86511ae5a20321effd7f..d13f52b8c771029793a85c406fc90f3d1380cb6b 100644 GIT binary patch literal 19 acmZQzU|?ooHn*^3U^X!|V_-HkG6nz*^8xAr literal 19 acmZQzU|?ooHZ(G3U^X!|V_-J7umk`NKLP3h diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.16/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.16/test-data index 3efe488b18be8cab3ffc86511ae5a20321effd7f..d13f52b8c771029793a85c406fc90f3d1380cb6b 100644 GIT binary patch literal 19 acmZQzU|?ooHn*^3U^X!|V_-HkG6nz*^8xAr literal 19 acmZQzU|?ooHZ(G3U^X!|V_-J7umk`NKLP3h diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.18/serializer-snapshot b/flink-connector-kafka/src/test/resources/context-state-serializer-1.18/serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..85a71dc6ba48967d2886e2d23db00f3ace9f3e00 GIT binary patch literal 126 zcmZ|F!3lsc3_#JKt9Z^5f)_7BCkQoaX-y=JAWp9i;O+By0MLM|tw;yOjd3YSGFCUH uJ;{nuML52qVx}b%4j&l5oJ{94vURHgh8=%|}I0JIM-2QB~r literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.18/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.18/test-data new file mode 100644 index 0000000000000000000000000000000000000000..3efe488b18be8cab3ffc86511ae5a20321effd7f GIT binary patch literal 19 acmZQzU|?ooHZ(G3U^X!|V_-J7umk`NKLP3h literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.19/serializer-snapshot b/flink-connector-kafka/src/test/resources/context-state-serializer-1.19/serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..85a71dc6ba48967d2886e2d23db00f3ace9f3e00 GIT binary patch literal 126 zcmZ|F!3lsc3_#JKt9Z^5f)_7BCkQoaX-y=JAWp9i;O+By0MLM|tw;yOjd3YSGFCUH uJ;{nuML52qVx}b%4j&l5oJ{94vURHgh8=%|}I0JIM-2QB~r literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.19/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.19/test-data new file mode 100644 index 0000000000000000000000000000000000000000..3efe488b18be8cab3ffc86511ae5a20321effd7f GIT binary patch literal 19 acmZQzU|?ooHZ(G3U^X!|V_-J7umk`NKLP3h literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.20/serializer-snapshot b/flink-connector-kafka/src/test/resources/context-state-serializer-1.20/serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..85a71dc6ba48967d2886e2d23db00f3ace9f3e00 GIT binary patch literal 126 zcmZ|F!3lsc3_#JKt9Z^5f)_7BCkQoaX-y=JAWp9i;O+By0MLM|tw;yOjd3YSGFCUH uJ;{nuML52qVx}b%4j&l5oJ{94vURHgh8=%|}I0JIM-2QB~r literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/context-state-serializer-1.20/test-data b/flink-connector-kafka/src/test/resources/context-state-serializer-1.20/test-data new file mode 100644 index 0000000000000000000000000000000000000000..3efe488b18be8cab3ffc86511ae5a20321effd7f GIT binary patch literal 19 acmZQzU|?ooHZ(G3U^X!|V_-J7umk`NKLP3h literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.17-empty-state-snapshot b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.17-empty-state-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..79ca38871d54646bf0a29ab74a5245ef27cf9888 GIT binary patch literal 1465 zcmcIkzi$&U6n>W`3lazfYo!iRC4|m_P_C^I!!!{4*!d@zkPtf(_BFq%mZD0y>&wG()$3FC&s$+ zPtXF=!Y;MiLB~&8oo<}?NziSEancDlc9XER)7@@01E}oom~-K9{iz@A2HiO5gnkr0 z4?5;=2^vE75Y90pEHS+AUp-8w4`{zcn=Z5 zIVD<1h59({W5l?ol4FzOmc1wTE!&f)1StRd^6pVyCu z;V3sm24@oaOV<>Fogt`doKwP5MnZ0e1Shm1vdpN`P@h>n7P6>r#FXtXyPOihNCN)v zv5&*qc&0{WI|ek|h-`MX4D(*Ckzu7wA%+re7CGA~fu?jvufHw6{fgf&fD=MlnMpA+ mEi`QS*< zXpyqWOHpd3nc$v?V@0*6G-|3GW8?N6vbT5M_Z-Mw#%os!&5H#O3u`l8u@DtKoSrUt zMVzLHGsUYj%{725H!j(K1~Eo9jsugG<#N@pSE`L#-LID$b3v_M4Q5yBL1n42SeYwB zc4f)zd&Z~k_~A;qQ7cyiKMd}ctLCctiOx3#3251aUezWy)Vd@Y->)r>s1ZP)&_h?z4`9pjTkZ8*- zK^Dw1l86hz0WhT{#SY^w5(&;}q=i(Zjd2?Z{T~p%HA*gIA*YPCE$OL-4AEunHpv67|?J!wAtkoEYCPmj=p}nhQ(1-TRBeU6Q>0kmiQBe{K-WpUWxBs|r7{RLZ l>BD#ToP*6T{cQY4tIpf)U+%cGM!(um*FUen>i-(7e*h{2^8o+= literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.18-empty-state-snapshot b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.18-empty-state-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..0fc1a5555c4ee18363c72d756d145de97ca36480 GIT binary patch literal 1466 zcmcIkL2DC16n>i~9t1_vYoUiw5ZOsAlC+42G+E2iZBvr1f|u|z%%)>DGt100Cf@u7 zo)x@!_UhH6-u)Av1o197vuT@@)=HtyVP|G}?|a{S^SuWEOK`NjqCI%9Y-Gl%pP@3` za4CG5B$^pt8#GLN&e(%;f1_5d*KotH(K`AMYY*0ZT&H!PR@bZjI@?HSzY0(at@X8k zqvn{-Y^A*ybQ*EH)9c2KIOw&)Zrlks_v5g=+uLck0x0e8+H>x3>uDp}4|?686E>po zdC;+c3(%BmKrlmkz(|sEF|-#>Uf>vsnP%+J@Kh6ZC=4Gl5+7zM>oUbL<*%6P3e5C@ zG!|$HZhUtIDZuIlfX;mevIcjb{myAQpREgg&X5?YsU+L>b1snrglE9CQ4B|13<#A% zFlwaIWQfTS2^WSbfi}l&caPm;?j64+K=Id?caO6?ud9P~qS(Gdnhvjhg?JVY;NC(N zajYa6Fkv5_lC9a^LWAXoQaE+t?_bW|`*{g_3t#y714>%K<^rj?Sg9P1q#(hBQ7cy| zno=ZuP(GFi_7^qVxcPbK!^)da1@I!cu1~aVU^TKp6{e+C1gBai<|QgtadN91V=?t; zl3607bBX+=YjVNP5p=nmG0GE8Lt&?cpsXp=)T+`_pIbeVDz9$Dwd*gpyaK>P0si;c zvv4k+tx?g90Ry)qmt7@AF{m_C)Y=wetl&FcRsbdUYW literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.18-snapshot b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.18-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..36381498b767cc8dc18d1ba1f2c8dd04ebaddc0b GIT binary patch literal 1520 zcmcIkF>4e-6n?Y0vk(l5Rze`f5fItAm_!p4i_K*XOE$T1w=p2XF}s<)jJG$l%uM3N zN`Ha1f{nF}g{_rnDOgzA2-+w}8Y{t>yCioZF%n3&n4Q^u?|a{S@0$kzL$LFpg4rM# z(IRJwn-i&-W`esSO%>Iy(nM2*jE&QCC7YK_DIMPXdd{6C7ZvVjncR+s&n7gcMGdgMiZV=kyiwP1E73aU$u#p;|7 z#g!$q?;4-K>xC|b2gZ&7wC+8S3AnWSr%TKF%pc)%fJ94f z3A|vILy0&L8~{^VQqpF;i4(y&O|*~-w@BI|m~l-dCnm=QdyZ^#mv?&$P}qL|{C3{W z>*QchvMB{_Qf?Zb;Q8Krj%ZTyq$FE5{MF;xqhH6cH}KA@ zpHNg1W(P?1#hQqATX5`mXkz3lct!A(HA}nlAn~APz2UcuFGrugaR7yIR_$q7!&qp5 z%2WxX2$m?3>W4(qvXxtT7mLbUo!k&^{6gD12ivdpN` zQ14sa5wcs|kSTk=?1Ri?=|~`_9=jLL#xpf4*fF5tLTIxqXM{J)m5eB53b85SVmD_y zCGZ~I&XZ3g&pwbBLx2J(C^IQK<_hgyC4)Y&Up_ROotE||P)roXq2SGag-ZL6`-Ty$ mqR$_HIF}KLzxJ~69P!eTJg%y#OF}5IAdRQs;9~F1)Vxds9A5~dHr6I>~mD@R8FSyx+M z`!{NWY0S2pEvH@eo9$l5uli1}?sojPyVv#I=3(!kS$81YJ=EvK;nuUN*L8Xwr|nif z_l48ee^XH7Vt{argJXiih_a!+P;`PP7^zuI#)?J~i7``jM36s@Bhn#);)uQ?qQh`3 zkGax7X}Iy-6eI;Z7XTXfIYx-F zGe!c%g+xOf4l$xk5y7y|v0?7MdCb=GEdesWzPx)9CwW~RtPw@y3TYa=<`v>;IDmT_ zRm2m)(SR`h@B!MN?{%zjt||ncneg{7XK($yguRAOef$AgDPV7d)LN_oAB{Le&XfdN zuAJ343+W)Yln3r_YPNdw^TCJhH=j~qd2n5>w5(vq(?EG5xmE-XBoEa~EJ)tSEx*Jf z;_)=rL`LTl`AgR%f?XrXTqh<04Qb#qJtbH`YCMXxDmC@B)l)8#>Uva~{xZu-0Zawp z{~miD&cxF-%9t^r;I?P7%SV_E^0f#{sS7a?a3{&xObN82JAM6a>+M(kJ`I)&8L215 nRJYKol{EUsxpi(fizi)Apgxuad7KT_70R8yo*G7r4A$Epi}~d! literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.19-snapshot b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.19-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..ca2179cac8eb25a206b944ee7571503938491b09 GIT binary patch literal 1520 zcmcIkziSjh6n?w8vk(l5Rze`f5fItA{5WEwSZpq9ShC55yNv-6j>$NC8EzS^iKg5Z zXxS`B5^*6s1g5m4xW#w_5y3enT1bVOIB8FMy`UF1y5L`uqO|^IjmWK^v%kP@h7ica3VOX_O+~GA~HZl zs)SJlBT6Lt0hY9A$WIub7Sa<)?f z?bGc({y6sZJ$^m{P6#<=CPmj=q5Z35(1-SGM`p9z(!m7U#EPOw@W!A*g@eZf!w5E< lPanR!mmR$MrJs%eXv2BC^UED|?$NLI)Ai3=ulm0R>mRk^^0)v1 literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.20-empty-state-snapshot b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.20-empty-state-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..78ba15781b599180fdcfbca6e66b1c8015ec88fa GIT binary patch literal 1466 zcmcIk&ubGw6n>ke9t1_vYoUiw5ZOs>lbRwP(qt`5woOU43SPp?I-5?jnOSD0G4bYK z;90?oXRls8>fL|BlOWy&XSP3*(pV|fIqb|V?|tukZ@%{cAP>jODVT!;i&~^Ca#NI= zX(qTU;#g7bDvg>d#~M3O#PY_AhimKTK3I)=Zn<2oxEok4yOoU!tDMtI0#0ldAb7ABFM@aM}rH7g&ipWE**^rX(FimKON`?u0MP-L$ zs`iC8KzX?F-4-MV%NGDz_al&1xclsPM$7taUEp($#6V62X`0VmB02ESfhjF14jJ#0 zNN`RgEu<*Hm&eIX3M*w2!%a@|FOFUtiumPP4qO4%UjId4)7BUh4|+3>?6{ znJVH)3euEY_sNi4A}Ma`w*8OV}Iu+{YiVpd@V0keZ4$62qb3#2eGd z$W`#V;4$kLPvn8US2 o#-@c%Tgjl$oOkDDbNr;!3DiSHNg#NCTA||E>zQG6oWXSa1GK&6TmS$7 literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.20-snapshot b/flink-connector-kafka/src/test/resources/kafka-consumer-migration-test-flink1.20-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..995b70e585d57305de822cf042f57e5e03ca7be9 GIT binary patch literal 1520 zcmcIkL2DC16n?u+JO~y=PeP%GP!QQk8$%_Ehi$r+rQ4A`>Ou+7A<@Cma zDaA7yI~f*=D$1F|lSC5bNQD)VmbS5n4tam~eb0hId9z%Zn~!E4Qo5En<ON^Q}D z{Q9ci4~);>af5ZQ+3@PV8~Asa+&`ascr+`~w1XQZoWDv{_i zMLPtA-7F;yA}CJjLn0asXL5@x4KxAgzL^5qFmnu`aUX(|;L^tL0WIURbd1j-5^b@~ z(Xw8SBw~Yq2uvzLaEG!MiaBE>R$NHb#z`9^$`la{>l~NOIW)~(+v_brZs*;zTiGD5 z(}Oj=TRuga2CsICcp47i%2*X~PjJ*COgBD8OT+gfR=D5_fwxWgt0%L!exATy!`m-^ zKwb)17$Y?jYs@(htt_GJr4UxwMZk2b8gJHPBnYmnm>s-0<|wBuf&`A~T*-mY71O5m``nW`>-xD3m-EW3 z@M~eb>9LX}h*C=C^!g)+fRUw`ajE**SjqRS>iZ0Lf>F=IE9&kVvcK1mB};0?=PVnq zFojkEU-Hq6i0F_n7E}hxb(8~YfgbXTWP!rggT6IY2)_J!%$m_U|5;P|BRap*_)YEF z|Bni7sX2aX(ubT$HSD+5|LrbRRw86(s5gWV?Dct=U?LJ!FPc@{)-BY?L$~$c1aA*o myPH{(GC8a~2Yku#?NMJYqg;+sO_u=MW!iAcfa^K$zn>?CqEK)E literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.18-snapshot b/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.18-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..68c9d63268d2ae438e744457e4b771c35d0634e2 GIT binary patch literal 1243 zcmbtT-A>yu6t+vt-K1V{i?oa0O^#Y+KEw{Fc+P5;x6iSsmt6>_pH}5G1ZX? z+kU-bo*aUY61cpOe6+{fPZ$d7R7A55@69!-biBM;OjBeTu|>RA9~9dh;wWKU+gkS$bgIwQ@O1HC+O{U#5+A8E`Y_{qVU3#DG%Q literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.19-snapshot b/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.19-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..03b1a24a17bdedc09cb5e1cdab80657ce8f2dce4 GIT binary patch literal 1243 zcmbtTPjAyO6t}yE+r$oBkvQ~*9Hnivt;cT5M6|B05*H*c<&n6nmDa9e4-n!LaOAj8 zwVn0>I3T_QNZ28x%LLsbo|MFX{=GlH-*XT`Tc|YLG>Zm$D%b~2bPhs6qPY~XAUq{= zE;M=9!3JX=V~uO%C=Hs~&5iFXMlG+mmjdj*C9^%>Cw{lrCcWN%O1f$4cX#(0>v(`r zb7;aI-Cwaxj?uHxIEeam6is8=r@{0njA;}eOlUY7PESWi0dgiovn~R5UiJr*U>XNe z*dK(igUEOr=(EhvTv$OeXYOplMdm73PEP z*3d{pZz~g~FN`iR%Q-Cg$A=(d0V_3^+Vr!v*6+8h_cPpy$AgfrsRtX#zS%$)R@|MP zi(f?c4X90~bz5dvp z1g`J(`rbF!*WL=a7U|p~D-y=knk^VlEU8jTmWP;gE|B~oc6WR?)?&;X9ja+Heb zG^A;BprygjmGPn@txG^tMi%VyEr@`Vm5hbb{Z!ZbJ<9qz!$vUbdw5OV-9YyH2C`(u ztl25g<|{2}l)&d=6cb7J#d671pk0SKpc$AcuSpgtY~7h_Q-k3B*ZZs)qx02<(r?lE znZ_?_H~)Q9XiLxWzb3sexYEOZS^e+sLS-#NW_A6B5Q0x9B1|xm39^gk6t}j6T6t*0 s{7&#@?`QlYOHw9>ljo>D<(S`^%T+Izqg2x+z}scoXqN%oIqyH82Q!IM&j0`b literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.18/serializer-snapshot b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.18/serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..b95eacb633e5812b3ee57980a0dd34732d9d81cc GIT binary patch literal 134 zcmaLNu?@mN3`SuqtU~QEXlRfw3;Z%pVqLH!pJ@=oqbxx8r0)^YMBm7jaX>cZWjF6U yDitsDl_lI&Dsphg(!sZLxS#pRc)msRIwyQU%EDWrv}qc4`~0QmIK~!5M3*1FD=|s{ literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.18/test-data b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.18/test-data new file mode 100644 index 0000000000000000000000000000000000000000..0936509e94a5ecf733366d7275336f06d56d70a1 GIT binary patch literal 17 UcmZQ%U@cZWjF6U yDitsDl_lI&Dsphg(!sZLxS#pRc)msRIwyQU%EDWrv}qc4`~0QmIK~!5M3*1FD=|s{ literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.19/test-data b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.19/test-data new file mode 100644 index 0000000000000000000000000000000000000000..0936509e94a5ecf733366d7275336f06d56d70a1 GIT binary patch literal 17 UcmZQ%U@cZWjF6U yDitsDl_lI&Dsphg(!sZLxS#pRc)msRIwyQU%EDWrv}qc4`~0QmIK~!5M3*1FD=|s{ literal 0 HcmV?d00001 diff --git a/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.20/test-data b/flink-connector-kafka/src/test/resources/transaction-state-serializer-1.20/test-data new file mode 100644 index 0000000000000000000000000000000000000000..0936509e94a5ecf733366d7275336f06d56d70a1 GIT binary patch literal 17 UcmZQ%U@ - 1.18.0 + 1.19.1 3.6.2 7.4.4