Skip to content

Commit

Permalink
Addressing Fabian's review
Browse files Browse the repository at this point in the history
  • Loading branch information
AHeise committed Oct 8, 2024
1 parent e1bc678 commit fbdf041
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
/** Configuration for AssertJ that shows full stack traces for unmatched exceptions. */
public class KafkaAssertjConfiguration extends Configuration {
public KafkaAssertjConfiguration() {
// in case of an assertion error, show the full stack trace
// for green builds, this is not changing anything
setMaxStackTraceElementsDisplayed(10000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
Expand All @@ -41,7 +41,9 @@
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -66,6 +68,7 @@
import java.util.stream.IntStream;

import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectAllRows;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.readLines;
Expand Down Expand Up @@ -1318,7 +1321,7 @@ public void testStartFromGroupOffsetsEarliest() throws Exception {
@Test
public void testStartFromGroupOffsetsNone() {
Assertions.assertThatThrownBy(() -> testStartFromGroupOffsetsWithNoneResetStrategy())
.satisfies(FlinkAssertions.anyCauseMatches(NoOffsetForPartitionException.class));
.satisfies(anyCauseMatches(NoOffsetForPartitionException.class));
}

private List<String> appendNewData(
Expand Down Expand Up @@ -1513,20 +1516,31 @@ private static boolean isCausedByJobFinished(Throwable e) {
}

private void cleanupTopic(String topic) {
ignoreExceptions(() -> deleteTestTopic(topic));
ignoreExceptions(
() -> deleteTestTopic(topic),
anyCauseMatches(UnknownTopicOrPartitionException.class));
}

private static void ignoreExceptions(RunnableWithException e) {
try {
e.run();
} catch (Exception ex) {
// ignore
private static void cancelJob(TableResult tableResult) {
if (tableResult != null && tableResult.getJobClient().isPresent()) {
ignoreExceptions(
() -> tableResult.getJobClient().get().cancel().get(),
anyCauseMatches(FlinkJobTerminatedWithoutCancellationException.class),
anyCauseMatches(
"MiniCluster is not yet running or has already been shut down."));
}
}

private static void cancelJob(TableResult tableResult) {
if (tableResult != null) {
ignoreExceptions(() -> tableResult.getJobClient().ifPresent(JobClient::cancel));
@SafeVarargs
private static void ignoreExceptions(
RunnableWithException runnable, ThrowingConsumer<? super Throwable>... ignoreIf) {
try {
runnable.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception ex) {
// check if the exception is one of the ignored ones
assertThat(ex).satisfiesAnyOf(ignoreIf);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* A Collection of useful {@link Matcher}s for {@link TypeSerializer} and {@link
* TypeSerializerSchemaCompatibility}.
*
* <p>Note copied from Flink 1.18. Remove when we drop 1.18 support.
* <p>Note copied from Flink 1.19. Remove when we drop 1.19 support.
*/
public final class TypeSerializerMatchers {

Expand Down

0 comments on commit fbdf041

Please sign in to comment.