Skip to content

Commit

Permalink
[test] Add tests for metric pending.connections.time with status ERROR
Browse files Browse the repository at this point in the history
Related to #3060
  • Loading branch information
violetagg committed Mar 8, 2024
1 parent 320ee50 commit 3edf435
Showing 1 changed file with 56 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,20 @@
import static org.assertj.core.api.Assertions.assertThat;
import static reactor.netty.Metrics.ACTIVE_CONNECTIONS;
import static reactor.netty.Metrics.ACTIVE_STREAMS;
import static reactor.netty.Metrics.ERROR;
import static reactor.netty.Metrics.MAX_CONNECTIONS;
import static reactor.netty.Metrics.CONNECTION_PROVIDER_PREFIX;
import static reactor.netty.Metrics.IDLE_CONNECTIONS;
import static reactor.netty.Metrics.PENDING_CONNECTIONS;
import static reactor.netty.Metrics.MAX_PENDING_CONNECTIONS;
import static reactor.netty.Metrics.NAME;
import static reactor.netty.Metrics.PENDING_STREAMS;
import static reactor.netty.Metrics.STATUS;
import static reactor.netty.Metrics.TOTAL_CONNECTIONS;
import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED;
import static reactor.netty.micrometer.GaugeAssert.assertGauge;
import static reactor.netty.micrometer.TimerAssert.assertTimer;
import static reactor.netty.resources.ConnectionProviderMeters.PENDING_CONNECTIONS_TIME;

/**
* This test class verifies {@link ConnectionProvider} metrics functionality.
Expand Down Expand Up @@ -300,6 +304,58 @@ void testConnectionPoolPendingAcquireSize() throws Exception {
assertGauge(registry, CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, NAME, "http2.testConnectionPoolPendingAcquireSize").isNull();
}

@Test
void testIssue3060PendingAcquireMaxCountReached() throws Exception {
testIssue3060(
ConnectionProvider.builder("testIssue3060")
.maxConnections(1)
.pendingAcquireMaxCount(1)
.metrics(true)
.build());
}

@Test
void testIssue3060PendingAcquireTimeoutReached() throws Exception {
testIssue3060(
ConnectionProvider.builder("testIssue3060")
.maxConnections(2)
.pendingAcquireTimeout(Duration.ofMillis(10))
.metrics(true)
.build());
}

private void testIssue3060(ConnectionProvider provider) throws Exception {
try {
disposableServer =
createServer().handle((req, res) -> res.header("Connection", "close")
.sendString(Mono.just("testIssue3060")
.delayElement(Duration.ofMillis(50))))
.bindNow();

CountDownLatch latch = new CountDownLatch(1);
HttpClient client =
createClient(provider, () -> disposableServer.address())
.doOnResponse((res, conn) -> conn.channel().closeFuture().addListener(f -> latch.countDown()));

Flux.range(0, 3)
.flatMapDelayError(i ->
client.get()
.uri("/")
.responseContent()
.aggregate()
.asString(), 256, 32)
.materialize()
.blockLast(Duration.ofSeconds(30));

assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue();
assertTimer(registry, PENDING_CONNECTIONS_TIME.getName(), NAME, "testIssue3060", STATUS, ERROR).hasCountEqualTo(1);
}
finally {
provider.disposeLater()
.block(Duration.ofSeconds(5));
}
}

private double getGaugeValue(String gaugeName, String poolName) {
Gauge gauge = registry.find(gaugeName).tag(NAME, poolName).gauge();
double result = -1;
Expand Down

0 comments on commit 3edf435

Please sign in to comment.