Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36441] Ensure producers are not leaked #126

Merged
merged 6 commits into from
Oct 9, 2024

Conversation

AHeise
Copy link
Contributor

@AHeise AHeise commented Oct 8, 2024

Fixes a production and various test issues around producer leaks. The issues appeared when I tried to upgrade Kafka-clients to 3.6.2 but are bugs in our code.

AHeise added 3 commits October 8, 2024 15:56
We have the same code standard (enforced through spotless) and we should have the same IDE configurations.
Make assertj print full stacktraces when encountering unexpected exceptions.
Job may be at various states including race conditions during shutdown. Ideally, the framework would provide idempotence but we can workaround that by ignoring specific exceptions.
@AHeise AHeise force-pushed the FLINK-36441-fix-resource-leaks branch from 91f730b to fbd1b7b Compare October 8, 2024 13:56
Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with the test only fixes great work but I am skeptical about the new feature/fix we added to the FlinkKafkaProducer.

Comment on lines +1233 to +1241
protected void finishProcessing(@Nullable KafkaTransactionState transaction) {
super.finishProcessing(transaction);
// TwoPhaseCommitSink sets transaction = null on final checkpoint and thus closing will leak
// the producer. For transactional producers, we track the producer in pendingTransactions.
if (transaction != null && !transaction.isTransactional()) {
transaction.producer.flush();
transaction.producer.close(Duration.ZERO);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel uncomfortable merging this change as part of a "fix-tests-ticket". This is a change in production code to more or less support checkpoints-after-task-finish for the deprecated sink function.

Copy link
Contributor Author

@AHeise AHeise Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel uncomfortable merging this change as part of a "fix-tests-ticket".

I rephrased the ticket. The PR never claimed to be test-only and it fits the theme. We basically roll out the leak detection to all test and fix what we find.

This is a change in production code to more or less support checkpoints-after-task-finish for the deprecated sink function.

No this only fixes the leak that makes the tests run stable. The only reason why tests passed at all is because we rerun them by default a couple of times. The only plausible alternative to fixing the leaks is to remove the leak detection altogether.

Also can you elaborate why fixing a deprecated but still supported feature causes concerns? We do have at least 2 releases depending on it and we shouldn't compromise on stable CI imho.

@AHeise AHeise force-pushed the FLINK-36441-fix-resource-leaks branch from 61da6da to 31a995f Compare October 9, 2024 09:22
AHeise added 3 commits October 9, 2024 11:23
For non-transactional producers, a notifyCheckpointCompleted after finishOperator will set the transaction inside the 2PCSinkFunction to null, such that on close, the producer is leaked. Since transactional producer stores the transactions in pendingTransactions before that, we just need to fix the cases where we don't preCommit/commit. The easiest solution is to actually close the producer on finishOperator - no new record can arrive.
Add leak check in all relevant tests.
The test tried to assert on byte counts which are written async. Commit adds flushing and establishes a baseline so that metadata request don't interfere with assertions.
@AHeise AHeise force-pushed the FLINK-36441-fix-resource-leaks branch from 31a995f to 17b26d2 Compare October 9, 2024 09:23
Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing 👍

@AHeise AHeise merged commit 5eeafd6 into apache:main Oct 9, 2024
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants