Skip to content

Commit

Permalink
fixup! [FLINK-36441] Ensure producers are not leaked in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AHeise committed Oct 8, 2024
1 parent b20c9b5 commit 91f730b
Showing 1 changed file with 11 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,19 @@ public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) {
@Override
public void run() {
// we manually feed data into the Kafka sink
OneInputStreamOperatorTestHarness<String, Object> testHarness = null;
try {
Properties producerProperties =
KafkaUtils.getPropertiesFromBrokerList(server.getBrokerConnectionString());
producerProperties.setProperty("retries", "3");

StreamSink<String> sink =
server.getProducerSink(
topic,
new SimpleStringSchema(),
producerProperties,
new FlinkFixedPartitioner<>());
Properties producerProperties =
KafkaUtils.getPropertiesFromBrokerList(server.getBrokerConnectionString());
producerProperties.setProperty("retries", "3");

testHarness = new OneInputStreamOperatorTestHarness<>(sink);
StreamSink<String> sink =
server.getProducerSink(
topic,
new SimpleStringSchema(),
producerProperties,
new FlinkFixedPartitioner<>());
try (OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(sink)) {
testHarness.open();

final StringBuilder bld = new StringBuilder();
Expand All @@ -183,20 +182,11 @@ public void run() {
}
} catch (Throwable t) {
this.error = t;
} finally {
if (testHarness != null) {
try {
testHarness.close();
} catch (Throwable t) {
// ignore
}
}
}
}

public void shutdown() {
this.running = false;
this.interrupt();
}

public Throwable getError() {
Expand Down

0 comments on commit 91f730b

Please sign in to comment.