Skip to content

Commit

Permalink
Update http2 connection evict logic
Browse files Browse the repository at this point in the history
  • Loading branch information
dilanSachi committed Jan 24, 2024
1 parent dbdd12e commit d25b391
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 43 deletions.
7 changes: 7 additions & 0 deletions ballerina/http_client_connection_pool.bal
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,20 @@ configurable int maxActiveStreamsPerConnection = 100;
# + maxActiveStreamsPerConnection - Maximum active streams per connection. This only applies to HTTP/2. Default value is 100
# + minEvictableIdleTime - Minimum evictable time for an idle connection in seconds. Default value is 5 minutes
# + timeBetweenEvictionRuns - Time between eviction runs in seconds. Default value is 30 seconds
# + minIdleTimeInStaleState - Minimum time in seconds for a connection to be kept open which has received a GOAWAY.
This only applies for HTTP/2. Default value is 5 minutes. If the value is set to -1,
the connection will not be closed until all existing streams are completed
# + timeBetweenStaleCheck - Time between the connection stale check run in seconds. This only applies for HTTP/2.
Default value is 30 seconds
public type PoolConfiguration record {|
int maxActiveConnections = maxActiveConnections;
int maxIdleConnections = maxIdleConnections;
decimal waitTime = waitTime;
int maxActiveStreamsPerConnection = maxActiveStreamsPerConnection;
decimal minEvictableIdleTime = 300;
decimal timeBetweenEvictionRuns = 30;
decimal minIdleTimeInStaleState = 300;
decimal timeBetweenStaleCheck = 30;
|};

//This is a hack to get the global map initialized, without involving locking.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ public final class HttpConstants {
"minEvictableIdleTime");
public static final BString CONNECTION_POOLING_TIME_BETWEEN_EVICTION_RUNS = StringUtils.fromString(
"timeBetweenEvictionRuns");
public static final BString CONNECTION_POOLING_IDLE_TIME_STALE_STATE = StringUtils.fromString(
"minIdleTimeInStaleState");
public static final BString CONNECTION_POOLING_TIME_BETWEEN_STALE_CHECK_RUNS = StringUtils.fromString(
"timeBetweenStaleCheck");
public static final String HTTP_CLIENT_CONNECTION_POOL = "PoolConfiguration";
public static final String CONNECTION_MANAGER = "ConnectionManager";
public static final int POOL_CONFIG_INDEX = 1;
Expand Down
17 changes: 15 additions & 2 deletions native/src/main/java/io/ballerina/stdlib/http/api/HttpUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -1359,8 +1359,21 @@ public static void populatePoolingConfig(BMap poolRecord, PoolConfiguration pool

double timeBetweenEvictionRuns =
((BDecimal) poolRecord.get(HttpConstants.CONNECTION_POOLING_TIME_BETWEEN_EVICTION_RUNS)).floatValue();
poolConfiguration.setTimeBetweenEvictionRuns(
timeBetweenEvictionRuns < 0 ? 0 : (long) timeBetweenEvictionRuns * 1000);
if (timeBetweenEvictionRuns > 0) {
poolConfiguration.setTimeBetweenEvictionRuns((long) timeBetweenEvictionRuns * 1000);
}

double minIdleTimeInStaleState =
((BDecimal) poolRecord.get(HttpConstants.CONNECTION_POOLING_IDLE_TIME_STALE_STATE)).floatValue();
poolConfiguration.setMinIdleTimeInStaleState(minIdleTimeInStaleState < -1 ? -1 :
(long) minEvictableIdleTime * 1000);

double timeBetweenStaleCheck =
((BDecimal) poolRecord.get(HttpConstants.CONNECTION_POOLING_TIME_BETWEEN_STALE_CHECK_RUNS))
.floatValue();
if (timeBetweenStaleCheck > 0) {
poolConfiguration.setTimeBetweenStaleCheck((long) timeBetweenStaleCheck * 1000);
}
}

private static int validateConfig(long value, String configName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class PoolConfiguration {
private int eventGroupExecutorThreads = 15;
private long maxWaitTime = 60000L;
private int http2MaxActiveStreamsPerConnection = Integer.MAX_VALUE;
private long minIdleTimeInStaleState = 300000;
private long timeBetweenStaleCheck = 30000;

public PoolConfiguration() {
}
Expand Down Expand Up @@ -142,4 +144,20 @@ public int getHttp2MaxActiveStreamsPerConnection() {
public void setHttp2MaxActiveStreamsPerConnection(int http2MaxActiveStreamsPerConnection) {
this.http2MaxActiveStreamsPerConnection = http2MaxActiveStreamsPerConnection;
}

public long getMinIdleTimeInStaleState() {
return minIdleTimeInStaleState;
}

public void setMinIdleTimeInStaleState(long minIdleTimeInStaleState) {
this.minIdleTimeInStaleState = minIdleTimeInStaleState;
}

public long getTimeBetweenStaleCheck() {
return timeBetweenStaleCheck;
}

public void setTimeBetweenStaleCheck(long timeBetweenStaleCheck) {
this.timeBetweenStaleCheck = timeBetweenStaleCheck;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ private void handleConnectionClose() {
}
}

ConcurrentHashMap<Integer, OutboundMsgHolder> getInFlightMessages() {
return inFlightMessages;
}

/**
* Listener which listen to the stream closure event.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.ballerina.stdlib.http.transport.contractimpl.sender.http2;

import io.ballerina.stdlib.http.transport.contractimpl.common.HttpRoute;
import io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2MessageStateContext;
import io.ballerina.stdlib.http.transport.contractimpl.sender.channel.pool.PoolConfiguration;
import io.netty.channel.DefaultEventLoop;
import io.netty.util.concurrent.DefaultPromise;
Expand Down Expand Up @@ -171,21 +172,35 @@ private void initiateConnectionEvictionTask() {
@Override
public void run() {
http2StaleClientChannels.forEach(http2ClientChannel -> {
if ((System.currentTimeMillis() - http2ClientChannel.getTimeSinceMarkedAsStale()) >
poolConfiguration.getMinEvictableIdleTime()
&& !http2ClientChannel.hasInFlightMessages()) {
boolean result = http2StaleClientChannels.remove(http2ClientChannel);
if (!result) {
logger.warn("Specified channel does not exist in the stale list.");
if (poolConfiguration.getMinIdleTimeInStaleState() == -1) {
if (!http2ClientChannel.hasInFlightMessages()) {
closeChannelAndEvict(http2ClientChannel);
}
http2ClientChannel.getConnection()
.close(new DefaultPromise(new DefaultEventLoop()));
} else if ((System.currentTimeMillis() - http2ClientChannel.getTimeSinceMarkedAsStale()) >
poolConfiguration.getMinIdleTimeInStaleState()) {
http2ClientChannel.getInFlightMessages().forEach((streamId, outboundMsgHolder) -> {
Http2MessageStateContext messageStateContext =
outboundMsgHolder.getRequest().getHttp2MessageStateContext();
if (messageStateContext != null) {
messageStateContext.getSenderState().handleConnectionClose(outboundMsgHolder);
}
});
closeChannelAndEvict(http2ClientChannel);
}
});
}

public void closeChannelAndEvict(Http2ClientChannel http2ClientChannel) {
boolean result = http2StaleClientChannels.remove(http2ClientChannel);
if (!result) {
logger.warn("Specified channel does not exist in the stale list.");
}
http2ClientChannel.getConnection()
.close(new DefaultPromise(new DefaultEventLoop()));
}
};
timer.schedule(timerTask, poolConfiguration.getTimeBetweenEvictionRuns(),
poolConfiguration.getTimeBetweenEvictionRuns());
timer.schedule(timerTask, poolConfiguration.getTimeBetweenStaleCheck(),
poolConfiguration.getTimeBetweenStaleCheck());
}

private Http2ChannelPool.PerRouteConnectionPool fetchPerRoutePool(HttpRoute httpRoute) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ public class FrameLevelTestUtils {
0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x0b};
public static final byte[] GO_AWAY_FRAME_MAX_STREAM_07 = new byte[]{0x00, 0x00, 0x08, 0x07, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x00, 0x00, 0x0b};
public static final byte[] GO_AWAY_FRAME_MAX_STREAM_09 = new byte[]{0x00, 0x00, 0x08, 0x07, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x0b};
public static final byte[] RST_STREAM_FRAME_STREAM_03 = new byte[]{0x00, 0x00, 0x04, 0x03, 0x00, 0x00, 0x00,
0x00, 0x03, 0x00, 0x00, 0x00, 0x02};
public static final byte[] RST_STREAM_FRAME_STREAM_07 = new byte[]{0x00, 0x00, 0x04, 0x03, 0x00, 0x00, 0x00,
0x00, 0x07, 0x00, 0x00, 0x00, 0x02};
public static final String DATA_VALUE_HELLO_WORLD_03 = "hello world3";
public static final String DATA_VALUE_HELLO_WORLD_04 = "hello world4";
public static final String DATA_VALUE_HELLO_WORLD_05 = "hello world5";
public static final String DATA_VALUE_HELLO_WORLD_07 = "hello world7";

public static HttpClientConnector setupHttp2PriorKnowledgeClient() {
HttpWsConnectorFactory connectorFactory = new DefaultHttpWsConnectorFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.io.IOException;
Expand All @@ -44,17 +43,26 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_CLOSED_BEFORE_INITIATING_INBOUND_RESPONSE;
import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_CLOSED_WHILE_READING_INBOUND_RESPONSE_BODY;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_03;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests
.FrameLevelTestUtils.DATA_FRAME_STREAM_03_DIFFERENT_DATA;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_05;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_03;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_04;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_05;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.END_SLEEP_TIME;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.GO_AWAY_FRAME_MAX_STREAM_05;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_03;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_05;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME_WITH_ACK;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SLEEP_TIME;
import static io.ballerina.stdlib.http.transport.util.TestUtil.getDecoderErrorMessage;
import static io.ballerina.stdlib.http.transport.util.TestUtil.getErrorResponseMessage;
import static io.ballerina.stdlib.http.transport.util.TestUtil.getResponseMessage;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertEqualsNoOrder;
import static org.testng.Assert.fail;

Expand All @@ -69,17 +77,12 @@ public class Http2ConnectionEvictionAfterTcpServerGoAwayScenarioTest {
private ServerSocket serverSocket;
private int numOfConnections = 0;

@BeforeClass
public void setup() throws InterruptedException {
runTcpServer(TestUtil.HTTP_SERVER_PORT);
h2ClientWithPriorKnowledge = setupHttp2PriorKnowledgeClient();
}

public HttpClientConnector setupHttp2PriorKnowledgeClient() {
public HttpClientConnector setupHttp2PriorKnowledgeClient(long minIdleTimeInStaleState,
long timeBetweenStaleCheck) {
HttpWsConnectorFactory connectorFactory = new DefaultHttpWsConnectorFactory();
PoolConfiguration poolConfiguration = new PoolConfiguration();
poolConfiguration.setMinEvictableIdleTime(5000);
poolConfiguration.setTimeBetweenEvictionRuns(1000);
poolConfiguration.setMinIdleTimeInStaleState(minIdleTimeInStaleState);
poolConfiguration.setTimeBetweenStaleCheck(timeBetweenStaleCheck);
TransportsConfiguration transportsConfiguration = new TransportsConfiguration();
SenderConfiguration senderConfiguration = new SenderConfiguration();
senderConfiguration.setPoolConfiguration(poolConfiguration);
Expand All @@ -91,15 +94,18 @@ public HttpClientConnector setupHttp2PriorKnowledgeClient() {
}

@Test
private void testConnectionEvictionAfterServerGoAwayScenario() {
private void testConnectionEvictionAfterAllStreamsAreClosedScenario() {
try {
runTcpServer(TestUtil.HTTP_SERVER_PORT);
// Setting to -1 will make the runner to wait until all pending streams are completed
h2ClientWithPriorKnowledge = setupHttp2PriorKnowledgeClient(-1, 1000);
CountDownLatch latch1 = new CountDownLatch(2);
DefaultHttpConnectorListener msgListener1 = TestUtil.sendRequestAsync(latch1, h2ClientWithPriorKnowledge);
DefaultHttpConnectorListener msgListener2 = TestUtil.sendRequestAsync(latch1, h2ClientWithPriorKnowledge);
latch1.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS);

CountDownLatch latch2 = new CountDownLatch(1);
DefaultHttpConnectorListener msgListener3 = TestUtil.sendRequestAsync(latch1, h2ClientWithPriorKnowledge);
DefaultHttpConnectorListener msgListener3 = TestUtil.sendRequestAsync(latch2, h2ClientWithPriorKnowledge);
latch2.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS);

HttpCarbonMessage response1 = msgListener1.getHttpResponseMessage();
Expand All @@ -110,15 +116,50 @@ private void testConnectionEvictionAfterServerGoAwayScenario() {
Object responseVal2 = response2.getHttpContent().content().toString(CharsetUtil.UTF_8);
Object responseVal3 = response3.getHttpContent().content().toString(CharsetUtil.UTF_8);

assertEqualsNoOrder(List.of(responseVal1, responseVal2, responseVal3),
List.of("hello world3", "hello world4", "hello world5"));
} catch (InterruptedException e) {
LOGGER.error("Interrupted exception occurred");
assertEqualsNoOrder(List.of(responseVal1, responseVal2), List.of(DATA_VALUE_HELLO_WORLD_03,
DATA_VALUE_HELLO_WORLD_05));
assertEquals(responseVal3, DATA_VALUE_HELLO_WORLD_04);
} catch (InterruptedException | IOException e) {
LOGGER.error("Exception occurred");
fail();
}
}

private void runTcpServer(int port) {
@Test
private void testConnectionEvictionBeforeAllStreamsAreClosedScenario() {
try {
runTcpServer(TestUtil.HTTP_SERVER_PORT);
// Setting to -1 will make the runner to wait until all pending streams are completed
h2ClientWithPriorKnowledge = setupHttp2PriorKnowledgeClient(5000, 1000);
CountDownLatch latch1 = new CountDownLatch(2);
DefaultHttpConnectorListener msgListener1 = TestUtil.sendRequestAsync(latch1, h2ClientWithPriorKnowledge);
DefaultHttpConnectorListener msgListener2 = TestUtil.sendRequestAsync(latch1, h2ClientWithPriorKnowledge);
latch1.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS);

CountDownLatch latch2 = new CountDownLatch(1);
DefaultHttpConnectorListener msgListener3 = TestUtil.sendRequestAsync(latch2, h2ClientWithPriorKnowledge);
latch2.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS);

String errorMsg1 = msgListener1.getHttpErrorMessage() != null ? getErrorResponseMessage(msgListener1) :
getDecoderErrorMessage(msgListener1);
String errorMsg2 = msgListener2.getHttpErrorMessage() != null ? getErrorResponseMessage(msgListener2) :
getDecoderErrorMessage(msgListener2);

assertEqualsNoOrder(List.of(errorMsg1, errorMsg2),
List.of(REMOTE_SERVER_CLOSED_BEFORE_INITIATING_INBOUND_RESPONSE,
REMOTE_SERVER_CLOSED_WHILE_READING_INBOUND_RESPONSE_BODY));
assertEquals(getResponseMessage(msgListener3), DATA_VALUE_HELLO_WORLD_04);
} catch (InterruptedException | IOException e) {
LOGGER.error("Exception occurred");
fail();
}
}

private void runTcpServer(int port) throws IOException {
if (serverSocket != null) {
serverSocket.close();
}
numOfConnections = 0;
new Thread(() -> {
try {
serverSocket = new ServerSocket(port);
Expand All @@ -129,12 +170,12 @@ private void runTcpServer(int port) {
try (OutputStream outputStream = clientSocket.getOutputStream()) {
if (numOfConnections == 0) {
sendGoAwayForASingleStream(outputStream);
numOfConnections += 1;
} else {
// If the connection successfully closed, a new socket connection
// will be opened and it will come here
sendSuccessfulRequest(outputStream);
}
numOfConnections += 1;
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_05;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_07;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_09;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_03;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_05;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_07;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.END_SLEEP_TIME;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.GO_AWAY_FRAME_MAX_STREAM_07;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_03;
Expand Down Expand Up @@ -90,8 +93,8 @@ private void testGoAwayWhenReceivingHeadersInAMultipleStreamScenario() {
Object responseValOrError4 = msgListener4.getHttpResponseMessage() == null ?
getErrorResponseMessage(msgListener4) : getResponseMessage(msgListener4);
assertEqualsNoOrder(List.of(responseValOrError1, responseValOrError2, responseValOrError3,
responseValOrError4), List.of("hello world3", "hello world5", "hello world7",
Constants.REMOTE_SERVER_SENT_GOAWAY_BEFORE_INITIATING_INBOUND_RESPONSE));
responseValOrError4), List.of(DATA_VALUE_HELLO_WORLD_03, DATA_VALUE_HELLO_WORLD_05,
DATA_VALUE_HELLO_WORLD_07, Constants.REMOTE_SERVER_SENT_GOAWAY_BEFORE_INITIATING_INBOUND_RESPONSE));
}

private void runTcpServer(int port) {
Expand Down
Loading

0 comments on commit d25b391

Please sign in to comment.