diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java index 0cb2da1a3a..1190b59949 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java @@ -15,6 +15,7 @@ */ package reactor.netty.http.client; +import java.io.IOException; import java.net.SocketAddress; import java.net.URI; import java.time.Duration; @@ -25,6 +26,7 @@ import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import io.netty.buffer.ByteBuf; @@ -49,6 +51,7 @@ import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.NettyOutbound; +import reactor.netty.channel.AbortedException; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.http.Http2SettingsSpec; import reactor.netty.http.HttpProtocol; @@ -644,14 +647,39 @@ public final RequestSender delete() { * @since 0.9.6 */ public final HttpClient disableRetry(boolean disableRetry) { - if (disableRetry == configuration().retryDisabled) { + if (RequestRetryConfig.DISABLED == configuration().retryConfig) { // yes instance comparison.. return this; } HttpClient dup = duplicate(); - dup.configuration().retryDisabled = disableRetry; + dup.configuration().retryConfig = RequestRetryConfig.DISABLED; return dup; } + public final HttpClient retryConfig(final RequestRetryConfig retryConfig) { + Objects.requireNonNull(retryConfig, "retryConfig"); + HttpClient dup = duplicate(); + dup.configuration().retryConfig = retryConfig; + return dup; + } + + public static class RequestRetryConfig { + + public final static RequestRetryConfig DEFAULT = new RequestRetryConfig(1, AbortedException::isConnectionReset); + public final static RequestRetryConfig DISABLED = new RequestRetryConfig(0, anyException -> false); + + final int maxRetries; + final Predicate isRetriable; + + public RequestRetryConfig(final int maxRetries, final Predicate isRetriable) { + this.maxRetries = maxRetries; + this.isRetriable = isRetriable; + } + + boolean isRetrieable(final IOException ioe) { + return isRetriable.test(ioe); + } + } + /** * Setup a callback called when {@link HttpClientRequest} has been sent * and {@link HttpClientState#REQUEST_SENT} has been emitted. diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java index 062517539d..997ca98950 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java @@ -83,6 +83,7 @@ import reactor.util.Loggers; import reactor.util.annotation.Nullable; import reactor.util.context.Context; +import reactor.util.retry.Retry; import static reactor.netty.ReactorNetty.format; import static reactor.netty.http.client.Http2ConnectionProvider.OWNER; @@ -314,7 +315,12 @@ public WebsocketClientSpec websocketClientSpec() { BiConsumer redirectRequestBiConsumer; Consumer redirectRequestConsumer; Duration responseTimeout; + + HttpClient.RequestRetryConfig retryConfig; + + // TODO consolidate this with config concept boolean retryDisabled; + SslProvider sslProvider; URI uri; String uriStr; @@ -332,7 +338,7 @@ public WebsocketClientSpec websocketClientSpec() { this.method = HttpMethod.GET; this.protocols = new HttpProtocol[]{HttpProtocol.HTTP11}; this._protocols = h11; - this.retryDisabled = false; + this.retryConfig = HttpClient.RequestRetryConfig.DEFAULT; } HttpClientConfig(HttpClientConfig parent) { @@ -361,6 +367,7 @@ public WebsocketClientSpec websocketClientSpec() { this.redirectRequestBiConsumer = parent.redirectRequestBiConsumer; this.redirectRequestConsumer = parent.redirectRequestConsumer; this.responseTimeout = parent.responseTimeout; + this.retryConfig = parent.retryConfig; this.retryDisabled = parent.retryDisabled; this.sslProvider = parent.sslProvider; this.uri = parent.uri; diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java index 615374ed6a..b2fa611509 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java @@ -15,6 +15,7 @@ */ package reactor.netty.http.client; +import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; @@ -269,7 +270,7 @@ public void subscribe(CoreSubscriber actual) { .acquire(_config, observer, handler, resolver) .subscribe(new ClientTransportSubscriber(sink)); - }).retryWhen(Retry.indefinitely().filter(handler)) + }).retryWhen(Retry.max(config.retryConfig.maxRetries).filter(handler)) .subscribe(actual); } @@ -347,7 +348,7 @@ public void onUncaughtException(Connection connection, Throwable error) { handler.previousRequestHeaders = ops.requestHeaders; } } - else if (handler.shouldRetry && AbortedException.isConnectionReset(error)) { + else if (handler.canRetry(error)) { HttpClientOperations ops = connection.as(HttpClientOperations.class); if (ops != null && ops.hasSentHeaders()) { // In some cases the channel close event may be delayed and thus the connection to be @@ -468,7 +469,12 @@ static final class HttpClientHandler extends SocketAddress volatile String resourceUrl; volatile UriEndpoint fromURI; volatile Supplier[] redirectedFrom; - volatile boolean shouldRetry; + + final RequestRetryConfig retryConfig; + + // TODO not happy with name as it collides with config concept.. + volatile boolean shouldRetry = true; + volatile HttpHeaders previousRequestHeaders; HttpClientHandler(HttpClientConfig configuration) { @@ -488,7 +494,7 @@ static final class HttpClientHandler extends SocketAddress new UriEndpointFactory(configuration.remoteAddress(), configuration.isSecure(), URI_ADDRESS_MAPPER); this.websocketClientSpec = configuration.websocketClientSpec; - this.shouldRetry = !configuration.retryDisabled; + this.retryConfig = configuration.retryConfig; this.handler = configuration.body; if (configuration.uri == null) { @@ -682,14 +688,22 @@ public boolean test(Throwable throwable) { redirect(re.location); return true; } - if (shouldRetry && AbortedException.isConnectionReset(throwable)) { - shouldRetry = false; + if (shouldRetry && canRetry(throwable)) { redirect(toURI.toString()); return true; } return false; } + /** + * Signals that the request can be retried. + */ + boolean canRetry(final Throwable err) { + return shouldRetry && + err instanceof IOException && + retryConfig.isRetrieable((IOException)err); + } + @Override public String toString() { return "{" + "uri=" + toURI + ", method=" + method + '}';