Skip to content

Commit

Permalink
HTTP/3: resolve the remote address
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Nov 11, 2024
1 parent fe25831 commit ff1a2d1
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 27 deletions.
5 changes: 2 additions & 3 deletions docs/modules/ROOT/pages/http-client.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,10 @@ The following listing presents a simple `HTTP3` example:

{examples-link}/http3/Application.java
----
include::{examples-dir}/http3/Application.java[lines=18..49]
include::{examples-dir}/http3/Application.java[lines=18..46]
----
<1> Configures the client to support only `HTTP/3`
<2> Configures `SSL`
<3> Configures `HTTP/3` settings
<2> Configures `HTTP/3` settings

include::partial$proxy.adoc[]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,21 @@
import reactor.netty.http.client.HttpClient;
import reactor.util.function.Tuple2;

import java.net.InetSocketAddress;
import java.time.Duration;

public class Application {

public static void main(String[] args) throws Exception {
HttpClient client =
HttpClient.create()
.remoteAddress(() -> new InetSocketAddress("www.google.com", 443))
.protocol(HttpProtocol.HTTP3) //<1>
.secure() //<2>
.http3Settings(spec -> spec.idleTimeout(Duration.ofSeconds(5)) //<3>
.http3Settings(spec -> spec.idleTimeout(Duration.ofSeconds(5)) //<2>
.maxData(10000000)
.maxStreamDataBidirectionalLocal(1000000));

Tuple2<String, HttpHeaders> response =
client.get()
.uri("/")
.uri("https://www.google.com/")
.responseSingle((res, bytes) -> bytes.asString()
.zipWith(Mono.just(res.responseHeaders())))
.block();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package reactor.netty.http.client;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.logging.LoggingHandler;
Expand All @@ -25,6 +26,7 @@
import io.netty.incubator.codec.quic.QuicChannelBootstrap;
import io.netty.incubator.codec.quic.QuicStreamChannel;
import io.netty.incubator.codec.quic.QuicStreamChannelBootstrap;
import io.netty.resolver.AddressResolver;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
Expand Down Expand Up @@ -495,8 +497,9 @@ static final class PooledConnectionAllocator {
poolConfig -> new Http3Pool(poolConfig, poolFactory.allocationStrategy()));
}

@SuppressWarnings({"unchecked", "FutureReturnValueIgnored"})
Publisher<Connection> connectChannel() {
return parent.acquire(config, new DelegatingConnectionObserver(), () -> remoteAddress, resolver)
return parent.acquire(config, new DelegatingConnectionObserver(), () -> remoteAddress, null)
.flatMap(conn ->
Mono.create(sink -> {
Channel channel = conn.channel();
Expand All @@ -506,22 +509,47 @@ Publisher<Connection> connectChannel() {
Http3ChannelInitializer.HttpTrafficHandler initializer =
channel.pipeline().remove(Http3ChannelInitializer.HttpTrafficHandler.class);

QuicChannelBootstrap bootstrap =
QuicChannel.newBootstrap(channel)
.handler(initializer.quicChannelInitializer)
.remoteAddress(remoteAddress);
attributes(bootstrap, config.attributes());
channelOptions(bootstrap, config.options());
bootstrap.option(ChannelOption.AUTO_READ, true);
bootstrap.connect()
.addListener(f -> {
if (!f.isSuccess()) {
sink.error(f.cause());
}
else {
sink.success(Connection.from((Channel) f.get()));
}
});
AddressResolver<SocketAddress> addrResolver;
try {
addrResolver = (AddressResolver<SocketAddress>) resolver.getResolver(channel.eventLoop());
}
catch (Throwable t) {
// "FutureReturnValueIgnored" this is deliberate
channel.close();
sink.error(t);
return;
}

if (!addrResolver.isSupported(remoteAddress) || addrResolver.isResolved(remoteAddress)) {
connect(channel, config, initializer.quicChannelInitializer, remoteAddress, sink);
}
else {
Future<SocketAddress> resolveFuture = addrResolver.resolve(remoteAddress);
if (resolveFuture.isDone()) {
Throwable cause = resolveFuture.cause();
if (cause != null) {
// "FutureReturnValueIgnored" this is deliberate
channel.close();
sink.error(cause);
}
else {
connect(channel, config, initializer.quicChannelInitializer, resolveFuture.getNow(), sink);
}
}
else {
resolveFuture.addListener(future -> {
Throwable cause = future.cause();
if (cause != null) {
// "FutureReturnValueIgnored" this is deliberate
channel.close();
sink.error(cause);
}
else {
connect(channel, config, initializer.quicChannelInitializer, (SocketAddress) future.getNow(), sink);
}
});
}
}
}));
}

Expand All @@ -539,6 +567,26 @@ static void channelOptions(QuicChannelBootstrap bootstrap, Map<ChannelOption<?>,
}
}

static void connect(Channel channel, TransportConfig config, ChannelHandler handler, SocketAddress remoteAddress,
MonoSink<Connection> sink) {
QuicChannelBootstrap bootstrap =
QuicChannel.newBootstrap(channel)
.handler(handler)
.remoteAddress(remoteAddress);
attributes(bootstrap, config.attributes());
channelOptions(bootstrap, config.options());
bootstrap.option(ChannelOption.AUTO_READ, true);
bootstrap.connect()
.addListener(f -> {
if (!f.isSuccess()) {
sink.error(f.cause());
}
else {
sink.success(Connection.from((Channel) f.get()));
}
});
}

static final BiPredicate<Connection, PooledRefMetadata> DEFAULT_EVICTION_PREDICATE =
(connection, metadata) -> false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,7 @@ else if (_config.checkProtocol(HttpClientConfig.h3)) {
.then(_config.connectionObserver())
.then(new HttpIOHandlerObserver(sink, handler));

AddressResolverGroup<?> resolver =
!_config.checkProtocol(HttpClientConfig.h3) ? _config.resolverInternal() : null;
AddressResolverGroup<?> resolver = _config.resolverInternal();

_config.httpConnectionProvider()
.acquire(_config, observer, handler, resolver)
Expand Down

0 comments on commit ff1a2d1

Please sign in to comment.