Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35109] Bump to Flink 1.19 and support Flink 1.20 #122

Merged
merged 2 commits into from
Oct 10, 2024

Conversation

AHeise
Copy link
Contributor

@AHeise AHeise commented Sep 30, 2024

Also

  • adds the migration support tests up to 1.20.
  • bumps Kafka-client to 3.6.2
  • various test cases that become flaky with the bumps

@AHeise AHeise force-pushed the FLINK-35109-add-1.20-support branch 4 times, most recently from e460f50 to 4a0b358 Compare October 2, 2024 07:08
@fapaul fapaul self-requested a review October 2, 2024 14:21
@AHeise AHeise force-pushed the FLINK-35109-add-1.20-support branch 5 times, most recently from c8549b6 to f9597b7 Compare October 4, 2024 10:20
Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR feels a bit overloaded. It's great to see so many quality of life improvements but I think they should be separated from the original intent of the PR (probably also been backported).

Regarding the intent of the PR I am not sure I understand it fully.

  • What is the relation between the supported Flink versions and the connector version?
  • Do we only support the latest two Flink releases with one connector version (in this case can we remove the 1.18 compatibility)?

Comment on lines 88 to 90
} catch (Exception e) {
// This partially solves FLINK-19204 for Kafka Tests
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's only test code, but this pattern always catches an eye since we also catch exceptions like InterruptedException and we reset the interrupted flag.

Is this necessary to fix on this PR? I would much rather invest in fixing the cause than fighting the symptoms.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root cause is that closing the iterator is not graceful. But that's an issue in Flink 1.20 that can't be fixed without a new Flink release.
I'll try to pick a more specific exception though to avoid catching IE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now specifically pass a list of allowed exceptions.

    private void cleanupTopic(String topic) {
        ignoreExceptions(() -> deleteTestTopic(topic), UnknownTopicOrPartitionException.class);
    }

    private static void ignoreExceptions(
            RunnableWithException e, Class<? extends Exception>... exClasses) {
        try {
            e.run();
        } catch (Exception ex) {
            assertThatChainOfCauses(ex)
                    .anyMatch(
                            cause -> Arrays.stream(exClasses).anyMatch(cl -> cl.isInstance(cause)));
        }
    }

    private static void cancelJob(TableResult tableResult) {
        if (tableResult != null) {
            ignoreExceptions(
                    () -> tableResult.getJobClient().ifPresent(JobClient::cancel),
                    FlinkJobNotFoundException.class,
                    FlinkJobTerminatedWithoutCancellationException.class);
        }
    }

/** Configuration for AssertJ that shows full stack traces for unmatched exceptions. */
public class KafkaAssertjConfiguration extends Configuration {
public KafkaAssertjConfiguration() {
setMaxStackTraceElementsDisplayed(10000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we recently reduced the log sizes significantly, what's the impact of this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you get a test failure, the log size is larger now. But you actually see the issue. For green builds, everything is unchanged.

Comment on lines 1514 to 1544
private void cleanupTopic(String topic) {
ignoreExceptions(() -> deleteTestTopic(topic));
}

private static void ignoreExceptions(RunnableWithException e) {
try {
e.run();
} catch (Exception ex) {
// ignore
}
}

private static void cancelJob(TableResult tableResult) {
if (tableResult != null) {
ignoreExceptions(() -> tableResult.getJobClient().ifPresent(JobClient::cancel));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only looks like a temporary solution. Can we at least file/link a ticket to fix it properly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you think it's temporary? Cleanup code can be invoked at any point in time during test lifecycle. So job may have already failed or not. In the former case, JobClient::cancel throws. In the latter case, it's needed.

@@ -24,9 +24,9 @@
*/
public class DockerImageVersions {

public static final String KAFKA = "confluentinc/cp-kafka:7.4.4";
public static final String KAFKA = "confluentinc/cp-kafka:7.7.1";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CP 7.7 https://docs.confluent.io/platform/current/release-notes/index.html used Kafka 3.7 while in the pom file we only bump the dependency to 3.6.2.

We should either use CP 7.6 or also increase the used kafka version to 3.7.x to avoid suprises

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kafka version here is used for server. The pom specifies the client. I wouldn't want to force synchronization as this is rather unrealistic in real life.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw 3.7.x had a breaking interface change on the things that we hack to get transactions working. That's why I left it at 3.6.2.

.github/workflows/weekly.yml Show resolved Hide resolved
.github/workflows/weekly.yml Show resolved Hide resolved
* A Collection of useful {@link Matcher}s for {@link TypeSerializer} and {@link
* TypeSerializerSchemaCompatibility}.
*
* <p>Note copied from Flink 1.18. Remove when we drop 1.18 support.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this from a CI perspective workflows/weekly.yaml we removed all testing for 1.18. Why do we need to maintain the tests, then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.
I originally started with also supporting 1.18 but I still need to copy the file because 1.20 dropped it.
I will amend the comment to point to 1.19 instead.

@AHeise
Copy link
Contributor Author

AHeise commented Oct 4, 2024

* What is the relation between the supported Flink versions and the connector version?

There is no relation and that was the intent of factoring it out. We currently encode the supported version in the name of the artifact, e.g., flink-kafka-connector-3.1.0-1.18. With Flink 2.0 and hopefully binary stable APIs, we would drop the name and just say that it's compatible with all Flink 2 releases (in some kind of version matrix). Hopefully, that works out.

* Do we only support the latest two Flink releases with one connector version (in this case can we remove the 1.18 compatibility)?

There is no strict guideline whatsoever. It's important that we maintain bugfixes for critical bugs for all maintained Flink bugfixes. I have outlined my proposal for the next 3 supported versions on the dev ML.

@AHeise AHeise force-pushed the FLINK-35109-add-1.20-support branch 8 times, most recently from fbdf041 to 6bed449 Compare October 9, 2024 09:56
@AHeise
Copy link
Contributor Author

AHeise commented Oct 9, 2024

I forked out FLINK-36441 for the test fixes.

@AHeise AHeise force-pushed the FLINK-35109-add-1.20-support branch from 6bed449 to 19e1efa Compare October 9, 2024 12:25
Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % please look at the inline questions

@@ -65,7 +64,6 @@ public void testKafkaDebeziumChangelogSource() throws Exception {
tableConf.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the intention behind this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option went from String Flink 1.19 to Enum in Flink 1.20, so you can't compile the same code in both versions.
Talked to Timo what the option does and he said that it shouldn't apply here as we don't have aggregates. And lo-and-behold, nothing changed when I removed the line. (The other options are probably meaningless as well but I don't want to dig into the table tests).

@@ -42,7 +42,7 @@
public class FlinkKafkaProducerMigrationTest extends KafkaMigrationTestBase {
@Parameterized.Parameters(name = "Migration Savepoint: {0}")
public static Collection<FlinkVersion> parameters() {
return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_16);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did start of the range change from 1.8 to 1.12?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, reverted that part.

Also
- adds the migration support tests up to 1.20.
- bumps Kafka-client to 3.6.2
@AHeise AHeise force-pushed the FLINK-35109-add-1.20-support branch from 19e1efa to b8b019f Compare October 10, 2024 08:57
@AHeise AHeise merged commit aedc790 into apache:main Oct 10, 2024
11 checks passed
@AHeise AHeise deleted the FLINK-35109-add-1.20-support branch October 10, 2024 09:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants