Skip to content

Commit

Permalink
Merge #2937 into 1.1.13
Browse files Browse the repository at this point in the history
  • Loading branch information
pderop committed Oct 18, 2023
2 parents a41c61a + 0e28135 commit 3ef6828
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ public final void onError(Throwable t) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "An outbound error could not be processed"), t);
}
// Let any child class process the outbound error which has not been processed
onUnprocessedOutboundError(t);
return;
}
OUTBOUND_CLOSE.set(this, Operators.cancelledSubscription());
Expand Down Expand Up @@ -572,6 +574,14 @@ protected String asDebugLogMessage(Object o) {
protected void onWritabilityChanged() {
}

/**
* React on an unprocessed outbound error.
*
* @since 1.0.39
*/
protected void onUnprocessedOutboundError(Throwable t) {
}

@Override
public boolean isPersistent() {
return connection.isPersistent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class HttpClientOperations extends HttpOperations<NettyInbound, NettyOutbound>
Consumer<HttpClientRequest> redirectRequestConsumer;
HttpHeaders previousRequestHeaders;
BiConsumer<HttpHeaders, HttpClientRequest> redirectRequestBiConsumer;
volatile Throwable unprocessedOutboundError;

final static String INBOUND_CANCEL_LOG = "Http client inbound receiver cancelled, closing channel.";

Expand All @@ -146,6 +147,10 @@ class HttpClientOperations extends HttpOperations<NettyInbound, NettyOutbound>
this.responseTimeout = replaced.responseTimeout;
this.is100Continue = replaced.is100Continue;
this.trailerHeaders = replaced.trailerHeaders;
// No need to copy the unprocessedOutboundError field from the replaced instance. The reason for this is that the
// "unprocessedOutboundError" field contains an error that occurs when the connection of the HttpClientOperations
// is already closed. In essence, this error represents the final state for the HttpClientOperations, and there's
// no need to carry it over because it's considered as a terminal/concluding state.
}

HttpClientOperations(Connection c, ConnectionObserver listener, ClientCookieEncoder encoder,
Expand Down Expand Up @@ -284,6 +289,11 @@ protected void onInboundCancel() {
channel().close();
}

@Override
protected final void onUnprocessedOutboundError(Throwable t) {
this.unprocessedOutboundError = t;
}

@Override
protected void onInboundClose() {
if (isInboundCancelled() || isInboundDisposed()) {
Expand All @@ -292,18 +302,21 @@ protected void onInboundClose() {
}
listener().onStateChange(this, HttpClientState.RESPONSE_INCOMPLETE);
if (responseState == null) {
Throwable exception;
if (markSentHeaderAndBody()) {
listener().onUncaughtException(this, AbortedException.beforeSend());
exception = AbortedException.beforeSend();
}
else if (markSentBody()) {
listener().onUncaughtException(this, new PrematureCloseException("Connection has been closed BEFORE response, while sending request body"));
exception = new PrematureCloseException("Connection has been closed BEFORE response, while sending request body");
}
else {
listener().onUncaughtException(this, new PrematureCloseException("Connection prematurely closed BEFORE response"));
exception = new PrematureCloseException("Connection prematurely closed BEFORE response");
}
listener().onUncaughtException(this, addOutboundErrorCause(exception, unprocessedOutboundError));
return;
}
super.onInboundError(new PrematureCloseException("Connection prematurely closed DURING response"));
super.onInboundError(addOutboundErrorCause(new PrematureCloseException("Connection prematurely closed DURING response"),
unprocessedOutboundError));
}

@Override
Expand Down Expand Up @@ -870,6 +883,14 @@ final void withWebsocketSupport(WebsocketClientSpec websocketClientSpec, boolean
}
}

static Throwable addOutboundErrorCause(Throwable exception, @Nullable Throwable cause) {
if (cause != null) {
cause.setStackTrace(new StackTraceElement[0]);
exception.initCause(cause);
}
return exception;
}

static final class ResponseState {

final HttpResponse response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2GoAwayFrame;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.ssl.SniCompletionEvent;
Expand Down Expand Up @@ -3525,4 +3526,50 @@ private void doTestIssue2760(HttpServer server, HttpClient client) {
.expectComplete()
.verify(Duration.ofSeconds(60));
}

@ParameterizedTest
@MethodSource("h2CompatibleCombinations")
void testIssue2927_H2(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols) {
Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey());
Http2SslContextSpec clientCtx = Http2SslContextSpec.forClient()
.configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE));

testIssue2927(
server -> server.protocol(serverProtocols).secure(spec -> spec.sslContext(serverCtx)),
client -> client.protocol(clientProtocols).secure(spec -> spec.sslContext(clientCtx)));
}

@ParameterizedTest
@MethodSource("h2cCompatibleCombinations")
void testIssue2927_H2C(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols) {
testIssue2927(server -> server.protocol(serverProtocols), client -> client.protocol(clientProtocols));
}

private void testIssue2927(Function<HttpServer, HttpServer> serverCustomizer, Function<HttpClient, HttpClient> clientCustomizer) {
HttpServer httpServer = serverCustomizer.apply(HttpServer.create())
.http2Settings(spec -> spec.maxHeaderListSize(1024));

disposableServer =
httpServer
.handle((req, res) -> res.sendString(Mono.just("Hello")))
.bindNow();

HttpClient client = clientCustomizer.apply(createClient(disposableServer.port()));

Flux<HttpResponseStatus> responseFlux = Flux.range(0, 2)
.flatMap(i -> i == 1 ?
// For the second request, add some extra headers in order to exceed server max header list size
Flux.range(0, 100).reduce(client, (c, j) -> c.headers(h -> h.set("Foo" + j, "Bar" + j)))
:
Mono.just(client))
.flatMap(cl -> cl
.get()
.uri("/test")
.responseSingle((res, byteBufMono) -> Mono.just(res.status())), 1);

StepVerifier.create(responseFlux)
.expectNextMatches(HttpResponseStatus.OK::equals)
.expectErrorMatches(t -> t instanceof PrematureCloseException && t.getCause() instanceof Http2Exception.HeaderListSizeException)
.verify(Duration.ofSeconds(30));
}
}

0 comments on commit 3ef6828

Please sign in to comment.