-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35109] Bump to Flink 1.19 and support Flink 1.20 #122
Conversation
e460f50
to
4a0b358
Compare
c8549b6
to
f9597b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR feels a bit overloaded. It's great to see so many quality of life improvements but I think they should be separated from the original intent of the PR (probably also been backported).
Regarding the intent of the PR I am not sure I understand it fully.
- What is the relation between the supported Flink versions and the connector version?
- Do we only support the latest two Flink releases with one connector version (in this case can we remove the 1.18 compatibility)?
} catch (Exception e) { | ||
// This partially solves FLINK-19204 for Kafka Tests | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's only test code, but this pattern always catches an eye since we also catch exceptions like InterruptedException
and we reset the interrupted flag.
Is this necessary to fix on this PR? I would much rather invest in fixing the cause than fighting the symptoms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The root cause is that closing the iterator is not graceful. But that's an issue in Flink 1.20 that can't be fixed without a new Flink release.
I'll try to pick a more specific exception though to avoid catching IE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I now specifically pass a list of allowed exceptions.
private void cleanupTopic(String topic) {
ignoreExceptions(() -> deleteTestTopic(topic), UnknownTopicOrPartitionException.class);
}
private static void ignoreExceptions(
RunnableWithException e, Class<? extends Exception>... exClasses) {
try {
e.run();
} catch (Exception ex) {
assertThatChainOfCauses(ex)
.anyMatch(
cause -> Arrays.stream(exClasses).anyMatch(cl -> cl.isInstance(cause)));
}
}
private static void cancelJob(TableResult tableResult) {
if (tableResult != null) {
ignoreExceptions(
() -> tableResult.getJobClient().ifPresent(JobClient::cancel),
FlinkJobNotFoundException.class,
FlinkJobTerminatedWithoutCancellationException.class);
}
}
/** Configuration for AssertJ that shows full stack traces for unmatched exceptions. */ | ||
public class KafkaAssertjConfiguration extends Configuration { | ||
public KafkaAssertjConfiguration() { | ||
setMaxStackTraceElementsDisplayed(10000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we recently reduced the log sizes significantly, what's the impact of this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you get a test failure, the log size is larger now. But you actually see the issue. For green builds, everything is unchanged.
private void cleanupTopic(String topic) { | ||
ignoreExceptions(() -> deleteTestTopic(topic)); | ||
} | ||
|
||
private static void ignoreExceptions(RunnableWithException e) { | ||
try { | ||
e.run(); | ||
} catch (Exception ex) { | ||
// ignore | ||
} | ||
} | ||
|
||
private static void cancelJob(TableResult tableResult) { | ||
if (tableResult != null) { | ||
ignoreExceptions(() -> tableResult.getJobClient().ifPresent(JobClient::cancel)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only looks like a temporary solution. Can we at least file/link a ticket to fix it properly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you think it's temporary? Cleanup code can be invoked at any point in time during test lifecycle. So job may have already failed or not. In the former case, JobClient::cancel throws. In the latter case, it's needed.
@@ -24,9 +24,9 @@ | |||
*/ | |||
public class DockerImageVersions { | |||
|
|||
public static final String KAFKA = "confluentinc/cp-kafka:7.4.4"; | |||
public static final String KAFKA = "confluentinc/cp-kafka:7.7.1"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CP 7.7 https://docs.confluent.io/platform/current/release-notes/index.html used Kafka 3.7 while in the pom file we only bump the dependency to 3.6.2.
We should either use CP 7.6 or also increase the used kafka version to 3.7.x to avoid suprises
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The kafka version here is used for server. The pom specifies the client. I wouldn't want to force synchronization as this is rather unrealistic in real life.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw 3.7.x had a breaking interface change on the things that we hack to get transactions working. That's why I left it at 3.6.2.
* A Collection of useful {@link Matcher}s for {@link TypeSerializer} and {@link | ||
* TypeSerializerSchemaCompatibility}. | ||
* | ||
* <p>Note copied from Flink 1.18. Remove when we drop 1.18 support. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand this from a CI perspective workflows/weekly.yaml
we removed all testing for 1.18. Why do we need to maintain the tests, then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
I originally started with also supporting 1.18 but I still need to copy the file because 1.20 dropped it.
I will amend the comment to point to 1.19 instead.
There is no relation and that was the intent of factoring it out. We currently encode the supported version in the name of the artifact, e.g.,
There is no strict guideline whatsoever. It's important that we maintain bugfixes for critical bugs for all maintained Flink bugfixes. I have outlined my proposal for the next 3 supported versions on the dev ML. |
fbdf041
to
6bed449
Compare
I forked out FLINK-36441 for the test fixes. |
6bed449
to
19e1efa
Compare
Workaround for FLINK-36454.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % please look at the inline questions
@@ -65,7 +64,6 @@ public void testKafkaDebeziumChangelogSource() throws Exception { | |||
tableConf.set( | |||
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)); | |||
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); | |||
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was the intention behind this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The option went from String Flink 1.19 to Enum in Flink 1.20, so you can't compile the same code in both versions.
Talked to Timo what the option does and he said that it shouldn't apply here as we don't have aggregates. And lo-and-behold, nothing changed when I removed the line. (The other options are probably meaningless as well but I don't want to dig into the table tests).
@@ -42,7 +42,7 @@ | |||
public class FlinkKafkaProducerMigrationTest extends KafkaMigrationTestBase { | |||
@Parameterized.Parameters(name = "Migration Savepoint: {0}") | |||
public static Collection<FlinkVersion> parameters() { | |||
return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_16); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did start of the range change from 1.8
to 1.12
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, reverted that part.
Also - adds the migration support tests up to 1.20. - bumps Kafka-client to 3.6.2
19e1efa
to
b8b019f
Compare
Also