diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java index be3651e58..b11b729e2 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java @@ -151,20 +151,19 @@ public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) { @Override public void run() { // we manually feed data into the Kafka sink - OneInputStreamOperatorTestHarness testHarness = null; - try { - Properties producerProperties = - KafkaUtils.getPropertiesFromBrokerList(server.getBrokerConnectionString()); - producerProperties.setProperty("retries", "3"); - StreamSink sink = - server.getProducerSink( - topic, - new SimpleStringSchema(), - producerProperties, - new FlinkFixedPartitioner<>()); + Properties producerProperties = + KafkaUtils.getPropertiesFromBrokerList(server.getBrokerConnectionString()); + producerProperties.setProperty("retries", "3"); - testHarness = new OneInputStreamOperatorTestHarness<>(sink); + StreamSink sink = + server.getProducerSink( + topic, + new SimpleStringSchema(), + producerProperties, + new FlinkFixedPartitioner<>()); + try (OneInputStreamOperatorTestHarness testHarness = + new OneInputStreamOperatorTestHarness<>(sink)) { testHarness.open(); final StringBuilder bld = new StringBuilder(); @@ -183,20 +182,11 @@ public void run() { } } catch (Throwable t) { this.error = t; - } finally { - if (testHarness != null) { - try { - testHarness.close(); - } catch (Throwable t) { - // ignore - } - } } } public void shutdown() { this.running = false; - this.interrupt(); } public Throwable getError() {