Skip to content

Commit

Permalink
[FLINK-36441] Ensure producers are not leaked in tests
Browse files Browse the repository at this point in the history
Add leak check in all relevant tests.
  • Loading branch information
AHeise committed Oct 8, 2024
1 parent 0e1396e commit b20c9b5
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -45,6 +46,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -57,6 +59,11 @@ class FlinkKafkaInternalProducerITCase {
private static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(FlinkKafkaInternalProducerITCase.class).withEmbeddedZookeeper();

@AfterEach
public void check() {
checkProducerLeak();
}

@Test
void testInitTransactionId() {
final String topic = "test-init-transactions";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

Expand All @@ -32,16 +33,22 @@
import java.util.Collections;
import java.util.Properties;

import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link KafkaCommitter}. */
@ExtendWith({TestLoggerExtension.class})
public class KafkaCommitterTest {
class KafkaCommitterTest {

private static final int PRODUCER_ID = 0;
private static final short EPOCH = 0;
private static final String TRANSACTIONAL_ID = "transactionalId";

@AfterEach
public void check() {
checkProducerLeak();
}

/** Causes a network error by inactive broker and tests that a retry will happen. */
@Test
public void testRetryCommittableOnRetriableError() throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public void testPropertyHandling() {
getBasicBuilder().setProperty("k1", "v1"),
p -> {
Arrays.stream(DEFAULT_KEYS).forEach(k -> assertThat(p).containsKey(k));
p.containsKey("k1");
});

Properties testConf = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -101,9 +100,9 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

/** Tests for using KafkaSink writing to a Kafka cluster. */
public class KafkaSinkITCase extends TestLogger {
Expand Down Expand Up @@ -158,6 +157,7 @@ public void setUp() throws ExecutionException, InterruptedException, TimeoutExce

@After
public void tearDown() throws ExecutionException, InterruptedException, TimeoutException {
checkProducerLeak();
deleteTestTopic(topic);
}

Expand Down Expand Up @@ -329,7 +329,6 @@ private void executeWithMapper(
builder.setTransactionalIdPrefix(transactionalIdPrefix);
stream.sinkTo(builder.build());
env.execute();
checkProducerLeak();
}

private void testRecoveryWithAssertion(
Expand Down Expand Up @@ -600,40 +599,6 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
public void initializeState(FunctionInitializationContext context) throws Exception {}
}

private void checkProducerLeak() throws InterruptedException {
List<Map.Entry<Thread, StackTraceElement[]>> leaks = null;
for (int tries = 0; tries < 10; tries++) {
leaks =
Thread.getAllStackTraces().entrySet().stream()
.filter(this::findAliveKafkaThread)
.collect(Collectors.toList());
if (leaks.isEmpty()) {
return;
}
Thread.sleep(1000);
}

for (Map.Entry<Thread, StackTraceElement[]> leak : leaks) {
leak.getKey().stop();
}
fail(
"Detected producer leaks:\n"
+ leaks.stream().map(this::format).collect(Collectors.joining("\n\n")));
}

private String format(Map.Entry<Thread, StackTraceElement[]> leak) {
String stackTrace =
Arrays.stream(leak.getValue())
.map(StackTraceElement::toString)
.collect(Collectors.joining("\n"));
return leak.getKey().getName() + ":\n" + stackTrace;
}

private boolean findAliveKafkaThread(Map.Entry<Thread, StackTraceElement[]> threadStackTrace) {
return threadStackTrace.getKey().getState() != Thread.State.TERMINATED
&& threadStackTrace.getKey().getName().contains("kafka-producer-network-thread");
}

/**
* Exposes information about how man records have been emitted overall and finishes after
* receiving the checkpoint completed event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;

import java.util.ArrayList;
Expand All @@ -44,13 +42,13 @@
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.Ongoing;
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.PrepareAbort;
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.PrepareCommit;
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link KafkaTransactionLog} to retrieve abortable Kafka transactions. */
public class KafkaTransactionLogITCase extends TestLogger {

private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkITCase.class);
private static final String TOPIC_NAME = "kafkaTransactionLogTest";
private static final String TRANSACTIONAL_ID_PREFIX = "kafka-log";

Expand All @@ -63,6 +61,7 @@ public class KafkaTransactionLogITCase extends TestLogger {
@After
public void tearDown() {
openProducers.forEach(Producer::close);
checkProducerLeak();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,21 @@ void testWriteExceptionWhenKafkaUnavailable() throws Exception {

final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup();

final KafkaWriter<Integer> writer =
try (KafkaWriter<Integer> writer =
createWriterWithConfiguration(
properties, DeliveryGuarantee.AT_LEAST_ONCE, metricGroup);
properties, DeliveryGuarantee.AT_LEAST_ONCE, metricGroup)) {

writer.write(1, SINK_WRITER_CONTEXT);
writer.write(1, SINK_WRITER_CONTEXT);

KAFKA_CONTAINER.stop();
KAFKA_CONTAINER.stop();

try {
writer.getCurrentProducer().flush();
assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
.hasRootCauseExactlyInstanceOf(NetworkException.class);
} finally {
KAFKA_CONTAINER.start();
try {
writer.getCurrentProducer().flush();
assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
.hasRootCauseExactlyInstanceOf(NetworkException.class);
} finally {
KAFKA_CONTAINER.start();
}
}
}

Expand All @@ -86,17 +87,18 @@ void testFlushExceptionWhenKafkaUnavailable() throws Exception {

final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup();

final KafkaWriter<Integer> writer =
try (KafkaWriter<Integer> writer =
createWriterWithConfiguration(
properties, DeliveryGuarantee.AT_LEAST_ONCE, metricGroup);
writer.write(1, SINK_WRITER_CONTEXT);

KAFKA_CONTAINER.stop();
try {
assertThatCode(() -> writer.flush(false))
.hasRootCauseExactlyInstanceOf(NetworkException.class);
} finally {
KAFKA_CONTAINER.start();
properties, DeliveryGuarantee.AT_LEAST_ONCE, metricGroup)) {
writer.write(1, SINK_WRITER_CONTEXT);

KAFKA_CONTAINER.stop();
try {
assertThatCode(() -> writer.flush(false))
.hasRootCauseExactlyInstanceOf(NetworkException.class);
} finally {
KAFKA_CONTAINER.start();
}
}
}

Expand All @@ -106,7 +108,7 @@ void testCloseExceptionWhenKafkaUnavailable() throws Exception {

final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup();

final KafkaWriter<Integer> writer =
KafkaWriter<Integer> writer =
createWriterWithConfiguration(
properties, DeliveryGuarantee.AT_LEAST_ONCE, metricGroup);

Expand All @@ -119,6 +121,9 @@ void testCloseExceptionWhenKafkaUnavailable() throws Exception {
// closing producer resource throws exception first
assertThatCode(() -> writer.close())
.hasRootCauseExactlyInstanceOf(NetworkException.class);
} catch (Exception e) {
writer.close();
throw e;
} finally {
KAFKA_CONTAINER.start();
}
Expand All @@ -130,26 +135,27 @@ void testMailboxExceptionWhenKafkaUnavailable() throws Exception {
SinkInitContext sinkInitContext =
new SinkInitContext(createSinkWriterMetricGroup(), timeService, null);

final KafkaWriter<Integer> writer =
try (KafkaWriter<Integer> writer =
createWriterWithConfiguration(
properties, DeliveryGuarantee.AT_LEAST_ONCE, sinkInitContext);
properties, DeliveryGuarantee.AT_LEAST_ONCE, sinkInitContext)) {

KAFKA_CONTAINER.stop();
KAFKA_CONTAINER.stop();

writer.write(1, SINK_WRITER_CONTEXT);
writer.write(1, SINK_WRITER_CONTEXT);

try {
writer.getCurrentProducer().flush();
try {
writer.getCurrentProducer().flush();

assertThatCode(
() -> {
while (sinkInitContext.getMailboxExecutor().tryYield()) {
// execute all mails
}
})
.hasRootCauseExactlyInstanceOf(TimeoutException.class);
} finally {
KAFKA_CONTAINER.start();
assertThatCode(
() -> {
while (sinkInitContext.getMailboxExecutor().tryYield()) {
// execute all mails
}
})
.hasRootCauseExactlyInstanceOf(TimeoutException.class);
} finally {
KAFKA_CONTAINER.start();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ void testFlushAsyncErrorPropagationAndErrorCounter() throws Exception {
.as("the exception is not thrown again")
.doesNotThrowAnyException();
assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);

// async exception is checked and thrown on close
assertThatCode(writer::close).hasRootCauseInstanceOf(ProducerFencedException.class);
}

@Test
Expand All @@ -191,6 +194,9 @@ void testWriteAsyncErrorPropagationAndErrorCounter() throws Exception {
.as("the exception is not thrown again")
.doesNotThrowAnyException();
assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);

// async exception is checked and thrown on close
assertThatCode(writer::close).hasRootCauseInstanceOf(ProducerFencedException.class);
}

@Test
Expand Down Expand Up @@ -225,6 +231,9 @@ void testMailboxAsyncErrorPropagationAndErrorCounter() throws Exception {
.as("the exception is not thrown again")
.doesNotThrowAnyException();
assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);

// async exception is checked and thrown on close
assertThatCode(writer::close).hasRootCauseInstanceOf(ProducerFencedException.class);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
Expand All @@ -55,6 +56,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;

import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;

/** Test base for KafkaWriter. */
Expand Down Expand Up @@ -84,6 +86,11 @@ public void setUp(TestInfo testInfo) {
topic = testInfo.getDisplayName().replaceAll("\\W", "");
}

@AfterEach
public void check() {
checkProducerLeak();
}

protected KafkaWriter<Integer> createWriterWithConfiguration(
Properties config, DeliveryGuarantee guarantee) throws IOException {
return createWriterWithConfiguration(config, guarantee, createSinkWriterMetricGroup());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;

import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;

/**
* An IT case class that runs all the IT cases of the legacy {@link
* org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer} with the new {@link KafkaSource}.
Expand All @@ -44,6 +47,11 @@ public static void prepare() throws Exception {
.setProducerSemantic(FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}

@After
public void check() {
checkProducerLeak();
}

@Test
public void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
Expand Down
Loading

0 comments on commit b20c9b5

Please sign in to comment.