diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/KafkaAssertjConfiguration.java b/flink-connector-kafka/src/test/java/org/apache/flink/KafkaAssertjConfiguration.java index 5c4577498..26d080484 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/KafkaAssertjConfiguration.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/KafkaAssertjConfiguration.java @@ -23,6 +23,8 @@ /** Configuration for AssertJ that shows full stack traces for unmatched exceptions. */ public class KafkaAssertjConfiguration extends Configuration { public KafkaAssertjConfiguration() { + // in case of an assertion error, show the full stack trace + // for green builds, this is not changing anything setMaxStackTraceElementsDisplayed(10000); } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java index 7fc0bba85..acd0550e4 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java @@ -23,8 +23,8 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; -import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -41,7 +41,9 @@ import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.assertj.core.api.Assertions; +import org.assertj.core.api.ThrowingConsumer; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -66,6 +68,7 @@ import java.util.stream.IntStream; import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectAllRows; import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows; import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.readLines; @@ -1318,7 +1321,7 @@ public void testStartFromGroupOffsetsEarliest() throws Exception { @Test public void testStartFromGroupOffsetsNone() { Assertions.assertThatThrownBy(() -> testStartFromGroupOffsetsWithNoneResetStrategy()) - .satisfies(FlinkAssertions.anyCauseMatches(NoOffsetForPartitionException.class)); + .satisfies(anyCauseMatches(NoOffsetForPartitionException.class)); } private List appendNewData( @@ -1513,20 +1516,31 @@ private static boolean isCausedByJobFinished(Throwable e) { } private void cleanupTopic(String topic) { - ignoreExceptions(() -> deleteTestTopic(topic)); + ignoreExceptions( + () -> deleteTestTopic(topic), + anyCauseMatches(UnknownTopicOrPartitionException.class)); } - private static void ignoreExceptions(RunnableWithException e) { - try { - e.run(); - } catch (Exception ex) { - // ignore + private static void cancelJob(TableResult tableResult) { + if (tableResult != null && tableResult.getJobClient().isPresent()) { + ignoreExceptions( + () -> tableResult.getJobClient().get().cancel().get(), + anyCauseMatches(FlinkJobTerminatedWithoutCancellationException.class), + anyCauseMatches( + "MiniCluster is not yet running or has already been shut down.")); } } - private static void cancelJob(TableResult tableResult) { - if (tableResult != null) { - ignoreExceptions(() -> tableResult.getJobClient().ifPresent(JobClient::cancel)); + @SafeVarargs + private static void ignoreExceptions( + RunnableWithException runnable, ThrowingConsumer... ignoreIf) { + try { + runnable.run(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception ex) { + // check if the exception is one of the ignored ones + assertThat(ex).satisfiesAnyOf(ignoreIf); } } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerMatchers.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerMatchers.java index 08eef897b..0815e2990 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerMatchers.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerMatchers.java @@ -35,7 +35,7 @@ * A Collection of useful {@link Matcher}s for {@link TypeSerializer} and {@link * TypeSerializerSchemaCompatibility}. * - *

Note copied from Flink 1.18. Remove when we drop 1.18 support. + *

Note copied from Flink 1.19. Remove when we drop 1.19 support. */ public final class TypeSerializerMatchers {