Skip to content

Commit

Permalink
Merge pull request #1833 from dilanSachi/goaway-fix
Browse files Browse the repository at this point in the history
Improve the logic of GoAway frame handling in the client side
  • Loading branch information
dilanSachi authored Jan 29, 2024
2 parents 0d1f88a + e48aca0 commit be154a3
Show file tree
Hide file tree
Showing 39 changed files with 1,867 additions and 37 deletions.
16 changes: 14 additions & 2 deletions ballerina/http_client_connection_pool.bal
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ configurable int maxActiveConnections = -1;
configurable int maxIdleConnections = 100;
configurable decimal waitTime = 30;
configurable int maxActiveStreamsPerConnection = 100;
configurable decimal minEvictableIdleTime = 300;
configurable decimal timeBetweenEvictionRuns = 30;
configurable decimal minIdleTimeInStaleState = 300;
configurable decimal timeBetweenStaleEviction = 30;

# Configurations for managing HTTP client connection pool.
#
Expand All @@ -29,14 +33,22 @@ configurable int maxActiveStreamsPerConnection = 100;
# + maxActiveStreamsPerConnection - Maximum active streams per connection. This only applies to HTTP/2. Default value is 100
# + minEvictableIdleTime - Minimum evictable time for an idle connection in seconds. Default value is 5 minutes
# + timeBetweenEvictionRuns - Time between eviction runs in seconds. Default value is 30 seconds
# + minIdleTimeInStaleState - Minimum time in seconds for a connection to be kept open which has received a GOAWAY.
# This only applies for HTTP/2. Default value is 5 minutes. If the value is set to -1,
# the connection will be closed after all in-flight streams are completed
# + timeBetweenStaleEviction - Time between the connection stale eviction runs in seconds. This only applies for HTTP/2.
# Default value is 30 seconds
public type PoolConfiguration record {|
int maxActiveConnections = maxActiveConnections;
int maxIdleConnections = maxIdleConnections;
decimal waitTime = waitTime;
int maxActiveStreamsPerConnection = maxActiveStreamsPerConnection;
decimal minEvictableIdleTime = 300;
decimal timeBetweenEvictionRuns = 30;
decimal minEvictableIdleTime = minEvictableIdleTime;
decimal timeBetweenEvictionRuns = timeBetweenEvictionRuns;
decimal minIdleTimeInStaleState = minIdleTimeInStaleState;
decimal timeBetweenStaleEviction = timeBetweenStaleEviction;
|};

//This is a hack to get the global map initialized, without involving locking.
class ConnectionManager {
public PoolConfiguration & readonly poolConfig = {};
Expand Down
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Added
- [Expose HTTP connection eviction configurations in the client level](https://github.com/ballerina-platform/ballerina-library/issues/5951)
- [Handle GO_AWAY received HTTP/2 clients gracefully](https://github.com/ballerina-platform/ballerina-library/issues/4806)

### Fixed
- [Remove unused import from Http2StateUtil](https://github.com/ballerina-platform/ballerina-library/issues/5966)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ public final class HttpConstants {
"minEvictableIdleTime");
public static final BString CONNECTION_POOLING_TIME_BETWEEN_EVICTION_RUNS = StringUtils.fromString(
"timeBetweenEvictionRuns");
public static final BString CONNECTION_POOLING_IDLE_TIME_STALE_STATE = StringUtils.fromString(
"minIdleTimeInStaleState");
public static final BString CONNECTION_POOLING_TIME_BETWEEN_STALE_CHECK_RUNS = StringUtils.fromString(
"timeBetweenStaleEviction");
public static final String HTTP_CLIENT_CONNECTION_POOL = "PoolConfiguration";
public static final String CONNECTION_MANAGER = "ConnectionManager";
public static final int POOL_CONFIG_INDEX = 1;
Expand Down
17 changes: 15 additions & 2 deletions native/src/main/java/io/ballerina/stdlib/http/api/HttpUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -1359,8 +1359,21 @@ public static void populatePoolingConfig(BMap poolRecord, PoolConfiguration pool

double timeBetweenEvictionRuns =
((BDecimal) poolRecord.get(HttpConstants.CONNECTION_POOLING_TIME_BETWEEN_EVICTION_RUNS)).floatValue();
poolConfiguration.setTimeBetweenEvictionRuns(
timeBetweenEvictionRuns < 0 ? 0 : (long) timeBetweenEvictionRuns * 1000);
if (timeBetweenEvictionRuns > 0) {
poolConfiguration.setTimeBetweenEvictionRuns((long) timeBetweenEvictionRuns * 1000);
}

double minIdleTimeInStaleState =
((BDecimal) poolRecord.get(HttpConstants.CONNECTION_POOLING_IDLE_TIME_STALE_STATE)).floatValue();
poolConfiguration.setMinIdleTimeInStaleState(minIdleTimeInStaleState < -1 ? -1 :
(long) minEvictableIdleTime * 1000);

double timeBetweenStaleEviction =
((BDecimal) poolRecord.get(HttpConstants.CONNECTION_POOLING_TIME_BETWEEN_STALE_CHECK_RUNS))
.floatValue();
if (timeBetweenStaleEviction > 0) {
poolConfiguration.setTimeBetweenStaleEviction((long) timeBetweenStaleEviction * 1000);
}
}

private static int validateConfig(long value, String configName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ public final class Constants {
= "Idle timeout triggered before initiating outbound request";
public static final String IDLE_TIMEOUT_TRIGGERED_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS
= "Idle timeout triggered while writing outbound request headers";
public static final String IDLE_TIMEOUT_TRIGGERED_WHILE_SENDING_RST_STREAM
= "Idle timeout triggered while sending RST_STREAM frame";
public static final String IDLE_TIMEOUT_TRIGGERED_WHILE_WRITING_OUTBOUND_REQUEST_BODY
= "Idle timeout triggered while writing outbound request entity body";
public static final String IDLE_TIMEOUT_TRIGGERED_BEFORE_INITIATING_INBOUND_RESPONSE
Expand Down Expand Up @@ -386,6 +388,8 @@ public final class Constants {
= "Remote host closed the connection before initiating outbound request";
public static final String REMOTE_SERVER_CLOSED_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS
= "Remote host closed the connection while writing outbound request headers";
public static final String REMOTE_SERVER_CLOSED_WHILE_SENDING_RST_STREAM
= "Remote host closed the connection while sending RST_STREAM frame";
public static final String REMOTE_SERVER_CLOSED_WHILE_WRITING_OUTBOUND_REQUEST_BODY
= "Remote host closed the connection while writing outbound request entity body";
public static final String REMOTE_SERVER_CLOSED_BEFORE_INITIATING_INBOUND_RESPONSE
Expand All @@ -396,6 +400,32 @@ public final class Constants {
= "Remote host closed the connection while reading inbound response body";
public static final String REMOTE_SERVER_CLOSED_BEFORE_READING_100_CONTINUE_RESPONSE
= "Remote host closed the connection before reading 100 continue response";
// Server GOAWAY error scenarios
public static final String REMOTE_SERVER_SENT_GOAWAY_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS
= "Remote host sent GOAWAY while writing outbound request headers";
public static final String REMOTE_SERVER_SENT_GOAWAY_WHILE_SENDING_RST_STREAM
= "Remote host sent GOAWAY while sending RST_STREAM frame";
public static final String REMOTE_SERVER_SENT_GOAWAY_WHILE_WRITING_OUTBOUND_REQUEST_BODY
= "Remote host sent GOAWAY while writing outbound request entity body";
public static final String REMOTE_SERVER_SENT_GOAWAY_BEFORE_INITIATING_INBOUND_RESPONSE
= "Remote host sent GOAWAY before initiating inbound response";
public static final String REMOTE_SERVER_SENT_GOAWAY_WHILE_READING_INBOUND_RESPONSE_HEADERS
= "Remote host sent GOAWAY while reading inbound response headers";
public static final String REMOTE_SERVER_SENT_GOAWAY_WHILE_READING_INBOUND_RESPONSE_BODY
= "Remote host sent GOAWAY while reading inbound response body";
// Server send RST_STREAM error scenarios
public static final String REMOTE_SERVER_SENT_RST_STREAM_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS
= "Remote host sent RST_STREAM while writing outbound request headers";
public static final String REMOTE_SERVER_SENT_RST_STREAM_WHILE_SENDING_RST_STREAM
= "Remote host sent RST_STREAM while sending RST_STREAM frame";
public static final String REMOTE_SERVER_SENT_RST_STREAM_WHILE_WRITING_OUTBOUND_REQUEST_BODY
= "Remote host sent RST_STREAM while writing outbound request entity body";
public static final String REMOTE_SERVER_SENT_RST_STREAM_BEFORE_INITIATING_INBOUND_RESPONSE
= "Remote host sent RST_STREAM before initiating inbound response";
public static final String REMOTE_SERVER_SENT_RST_STREAM_WHILE_READING_INBOUND_RESPONSE_HEADERS
= "Remote host sent RST_STREAM while reading inbound response headers";
public static final String REMOTE_SERVER_SENT_RST_STREAM_WHILE_READING_INBOUND_RESPONSE_BODY
= "Remote host sent RST_STREAM while reading inbound response body";

public static final String REMOTE_CLIENT_TO_HOST_CONNECTION_CLOSED
= "Connection between remote client and host is closed";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public HttpResponseFuture send(OutboundMsgHolder outboundMsgHolder, HttpCarbonMe
this.configHashCode);
if (http2) {
// See whether an already upgraded HTTP/2 connection is available
Http2ClientChannel activeHttp2ClientChannel = http2ConnectionManager.borrowChannel(route);
Http2ClientChannel activeHttp2ClientChannel = http2ConnectionManager.fetchChannel(route);

if (activeHttp2ClientChannel != null) {
outboundMsgHolder.setHttp2ClientChannel(activeHttp2ClientChannel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,11 @@ private static int getNextStreamId(Http2Connection conn) {
* @throws Http2Exception if a protocol-related error occurred
*/
private static void createStream(Http2Connection conn, int streamId) throws Http2Exception {
conn.local().createStream(streamId, false);
try {
conn.local().createStream(streamId, false);
} catch (Http2Exception.StreamException exception) {
throw new Http2Exception(exception.error(), "Error occurred while creating stream", exception);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Stream created streamId: {}", streamId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ private void configureSslForHttp2(SocketChannel ch, ChannelPipeline clientPipeli
sslConfig.getCacheSize()));
}
}
clientPipeline.addLast(new Http2PipelineConfiguratorForClient(targetHandler, connectionAvailabilityFuture));
clientPipeline.addLast(
new ALPNClientHandler(targetHandler, connectionAvailabilityFuture));
clientPipeline
.addLast(Constants.HTTP2_EXCEPTION_HANDLER, new Http2ExceptionHandler(http2ConnectionHandler));
}
Expand Down Expand Up @@ -329,13 +330,13 @@ public void setHttp2ClientChannel(Http2ClientChannel http2ClientChannel) {
/**
* A handler to create the pipeline based on the ALPN negotiated protocol.
*/
class Http2PipelineConfiguratorForClient extends ApplicationProtocolNegotiationHandler {
class ALPNClientHandler extends ApplicationProtocolNegotiationHandler {

private TargetHandler targetHandler;
private ConnectionAvailabilityFuture connectionAvailabilityFuture;

public Http2PipelineConfiguratorForClient(TargetHandler targetHandler,
ConnectionAvailabilityFuture connectionAvailabilityFuture) {
public ALPNClientHandler(TargetHandler targetHandler,
ConnectionAvailabilityFuture connectionAvailabilityFuture) {
super(ApplicationProtocolNames.HTTP_1_1);
this.targetHandler = targetHandler;
this.connectionAvailabilityFuture = connectionAvailabilityFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public ConnectionManager(PoolConfiguration poolConfiguration) {
globalConnPool = new ConcurrentHashMap<>();
globalFactoryObjects = new ConcurrentHashMap<>();
http2ConnectionManager = new Http2ConnectionManager(poolConfiguration);
connectionManagerId = "-" + UUID.randomUUID().toString();
connectionManagerId = "-" + UUID.randomUUID();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class PoolConfiguration {
private int eventGroupExecutorThreads = 15;
private long maxWaitTime = 60000L;
private int http2MaxActiveStreamsPerConnection = Integer.MAX_VALUE;
private long minIdleTimeInStaleState = 300000;
private long timeBetweenStaleEviction = 30000;

public PoolConfiguration() {
}
Expand Down Expand Up @@ -142,4 +144,20 @@ public int getHttp2MaxActiveStreamsPerConnection() {
public void setHttp2MaxActiveStreamsPerConnection(int http2MaxActiveStreamsPerConnection) {
this.http2MaxActiveStreamsPerConnection = http2MaxActiveStreamsPerConnection;
}

public long getMinIdleTimeInStaleState() {
return minIdleTimeInStaleState;
}

public void setMinIdleTimeInStaleState(long minIdleTimeInStaleState) {
this.minIdleTimeInStaleState = minIdleTimeInStaleState;
}

public long getTimeBetweenStaleEviction() {
return timeBetweenStaleEviction;
}

public void setTimeBetweenStaleEviction(long timeBetweenStaleEviction) {
this.timeBetweenStaleEviction = timeBetweenStaleEviction;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ synchronized Http2ClientChannel fetchTargetChannel() {
}
Channel channel = http2ClientChannel.getChannel();
if (channel == null) { // if channel is not active, forget it and fetch next one
http2ClientChannels.remove(http2ClientChannel);
removeChannel(http2ClientChannel);
return fetchTargetChannel();
}
// increment and get active stream count
Expand All @@ -89,7 +89,7 @@ synchronized Http2ClientChannel fetchTargetChannel() {
return http2ClientChannel;
} else if (activeStreamCount == maxActiveStreams) { // no more streams except this one can be opened
http2ClientChannel.markAsExhausted();
http2ClientChannels.remove(http2ClientChannel);
removeChannel(http2ClientChannel);
// When the stream count reaches maxActiveStreams, a new channel will be added only if the
// channel queue is empty. This process is synchronized as transport thread can return channels
// after being reset. If such channel is returned before the new CountDownLatch, the subsequent
Expand All @@ -104,7 +104,7 @@ synchronized Http2ClientChannel fetchTargetChannel() {
}
return http2ClientChannel;
} else {
http2ClientChannels.remove(http2ClientChannel);
removeChannel(http2ClientChannel);
return fetchTargetChannel(); // fetch the next one from the queue
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.ballerina.stdlib.http.transport.contract.Constants;
import io.ballerina.stdlib.http.transport.contractimpl.common.HttpRoute;
import io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2MessageStateContext;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.http2.Http2Connection;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class Http2ClientChannel {
private int socketIdleTimeout = Constants.ENDPOINT_TIMEOUT;
private Map<String, Http2DataEventListener> dataEventListeners;
private StreamCloseListener streamCloseListener;
private long timeSinceMarkedAsStale = 0;
private AtomicBoolean isStale = new AtomicBoolean(false);

public Http2ClientChannel(Http2ConnectionManager http2ConnectionManager, Http2Connection connection,
HttpRoute httpRoute, Channel channel) {
Expand Down Expand Up @@ -253,7 +256,7 @@ void destroy() {
this.connection.removeListener(streamCloseListener);
inFlightMessages.clear();
promisedMessages.clear();
http2ConnectionManager.removeClientChannel(httpRoute, this);
removeFromConnectionPool();
}

/**
Expand All @@ -271,6 +274,10 @@ private void handleConnectionClose() {
}
}

ConcurrentHashMap<Integer, OutboundMsgHolder> getInFlightMessages() {
return inFlightMessages;
}

/**
* Listener which listen to the stream closure event.
*/
Expand All @@ -289,13 +296,50 @@ public void onStreamClosed(Http2Stream stream) {
activeStreams.decrementAndGet();
http2ClientChannel.getDataEventListeners().
forEach(dataEventListener -> dataEventListener.onStreamClose(stream.id()));
if (isExhausted.getAndSet(false)) {
if (!isStale.get() && isExhausted.getAndSet(false)) {
http2ConnectionManager.returnClientChannel(httpRoute, http2ClientChannel);
}
}

@Override
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
markAsStale();
http2ClientChannel.inFlightMessages.forEach((streamId, outboundMsgHolder) -> {
if (streamId > lastStreamId) {
http2ClientChannel.removeInFlightMessage(streamId);
activeStreams.decrementAndGet();
http2ClientChannel.getDataEventListeners().forEach(
dataEventListener -> dataEventListener.onStreamClose(streamId));
Http2MessageStateContext messageStateContext =
outboundMsgHolder.getRequest().getHttp2MessageStateContext();
if (messageStateContext != null) {
messageStateContext.getSenderState().handleServerGoAway(outboundMsgHolder);
}
}
});
}
}

void removeFromConnectionPool() {
http2ConnectionManager.removeClientChannel(httpRoute, this);
}

void markAsStale() {
synchronized (http2ConnectionManager) {
isStale.set(true);
http2ConnectionManager.markClientChannelAsStale(httpRoute, this);
}
}

boolean hasInFlightMessages() {
return !inFlightMessages.isEmpty();
}

void setTimeSinceMarkedAsStale(long timeSinceMarkedAsStale) {
this.timeSinceMarkedAsStale = timeSinceMarkedAsStale;
}

long getTimeSinceMarkedAsStale() {
return timeSinceMarkedAsStale;
}
}
Loading

0 comments on commit be154a3

Please sign in to comment.