-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36441] Ensure producers are not leaked #126
Conversation
We have the same code standard (enforced through spotless) and we should have the same IDE configurations.
Make assertj print full stacktraces when encountering unexpected exceptions.
Job may be at various states including race conditions during shutdown. Ideally, the framework would provide idempotence but we can workaround that by ignoring specific exceptions.
91f730b
to
fbd1b7b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with the test only fixes great work but I am skeptical about the new feature/fix we added to the FlinkKafkaProducer
.
protected void finishProcessing(@Nullable KafkaTransactionState transaction) { | ||
super.finishProcessing(transaction); | ||
// TwoPhaseCommitSink sets transaction = null on final checkpoint and thus closing will leak | ||
// the producer. For transactional producers, we track the producer in pendingTransactions. | ||
if (transaction != null && !transaction.isTransactional()) { | ||
transaction.producer.flush(); | ||
transaction.producer.close(Duration.ZERO); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel uncomfortable merging this change as part of a "fix-tests-ticket". This is a change in production code to more or less support checkpoints-after-task-finish for the deprecated sink function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel uncomfortable merging this change as part of a "fix-tests-ticket".
I rephrased the ticket. The PR never claimed to be test-only and it fits the theme. We basically roll out the leak detection to all test and fix what we find.
This is a change in production code to more or less support checkpoints-after-task-finish for the deprecated sink function.
No this only fixes the leak that makes the tests run stable. The only reason why tests passed at all is because we rerun them by default a couple of times. The only plausible alternative to fixing the leaks is to remove the leak detection altogether.
Also can you elaborate why fixing a deprecated but still supported feature causes concerns? We do have at least 2 releases depending on it and we shouldn't compromise on stable CI imho.
61da6da
to
31a995f
Compare
For non-transactional producers, a notifyCheckpointCompleted after finishOperator will set the transaction inside the 2PCSinkFunction to null, such that on close, the producer is leaked. Since transactional producer stores the transactions in pendingTransactions before that, we just need to fix the cases where we don't preCommit/commit. The easiest solution is to actually close the producer on finishOperator - no new record can arrive.
Add leak check in all relevant tests.
The test tried to assert on byte counts which are written async. Commit adds flushing and establishes a baseline so that metadata request don't interfere with assertions.
31a995f
to
17b26d2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing 👍
Fixes a production and various test issues around producer leaks. The issues appeared when I tried to upgrade Kafka-clients to 3.6.2 but are bugs in our code.