From 313fe619e605095d0001b4d8d7dc1ca61121cfb6 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 6 Mar 2024 11:08:05 +0200 Subject: [PATCH 1/4] Add proxy address information to the metrics Related to #3060 --- .../src/main/java/reactor/netty/Metrics.java | 4 +- .../AbstractChannelMetricsHandler.java | 28 +++- .../reactor/netty/channel/ChannelMeters.java | 12 +- .../netty/channel/ChannelMetricsHandler.java | 47 +++++-- .../netty/channel/ChannelMetricsRecorder.java | 64 ++++++++- .../netty/channel/ConnectObservations.java | 12 +- .../ContextAwareChannelMetricsHandler.java | 88 +++++++++--- .../ContextAwareChannelMetricsRecorder.java | 94 ++++++++++++- .../java/reactor/netty/channel/MeterKey.java | 32 ++++- .../MicrometerChannelMetricsHandler.java | 45 ++++-- .../MicrometerChannelMetricsRecorder.java | 112 +++++++++++++-- .../netty/tcp/TlsHandshakeObservations.java | 12 +- .../http/MicrometerHttpMetricsRecorder.java | 8 +- .../AbstractHttpClientMetricsHandler.java | 47 +++++-- .../ContextAwareHttpClientMetricsHandler.java | 45 ++++-- ...ContextAwareHttpClientMetricsRecorder.java | 123 ++++++++++++++++- .../http/client/Http2ConnectionProvider.java | 11 +- .../netty/http/client/HttpClientConfig.java | 50 ++++--- .../netty/http/client/HttpClientMeters.java | 22 ++- .../http/client/HttpClientMetricsHandler.java | 3 +- .../client/HttpClientMetricsRecorder.java | 84 +++++++++++- .../http/client/HttpClientObservations.java | 12 +- .../MicrometerHttpClientMetricsHandler.java | 44 ++++-- .../MicrometerHttpClientMetricsRecorder.java | 128 +++++++++++++++++- .../MicrometerHttpServerMetricsRecorder.java | 8 +- .../http/client/HttpClientProxyTest.java | 12 +- 26 files changed, 1016 insertions(+), 131 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/Metrics.java b/reactor-netty-core/src/main/java/reactor/netty/Metrics.java index 1b13934c65..bcecf745fa 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/Metrics.java +++ b/reactor-netty-core/src/main/java/reactor/netty/Metrics.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -271,6 +271,8 @@ public class Metrics { // Tags public static final String LOCAL_ADDRESS = "local.address"; + public static final String PROXY_ADDRESS = "proxy.address"; + public static final String REMOTE_ADDRESS = "remote.address"; public static final String URI = "uri"; diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java b/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java index ce2b44e405..cfd6ab9ac6 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.socket.DatagramPacket; +import io.netty.handler.proxy.ProxyHandler; import io.netty.handler.ssl.AbstractSniHandler; import io.netty.handler.ssl.SslHandler; import reactor.netty.NettyPipeline; @@ -47,6 +48,7 @@ public abstract class AbstractChannelMetricsHandler extends ChannelDuplexHandler final boolean onServer; boolean channelOpened; + SocketAddress proxyAddress; protected AbstractChannelMetricsHandler(@Nullable SocketAddress remoteAddress, boolean onServer) { this.remoteAddress = remoteAddress; @@ -92,6 +94,11 @@ public void channelInactive(ChannelHandlerContext ctx) { @Override public void channelRegistered(ChannelHandlerContext ctx) { if (!onServer) { + ProxyHandler proxyHandler = ctx.pipeline().get(ProxyHandler.class); + if (proxyHandler != null) { + proxyAddress = proxyHandler.proxyAddress(); + } + ctx.pipeline() .addAfter(NettyPipeline.ChannelMetricsHandler, NettyPipeline.ConnectMetricsHandler, @@ -192,14 +199,29 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { public abstract ChannelMetricsRecorder recorder(); protected void recordException(ChannelHandlerContext ctx, SocketAddress address) { - recorder().incrementErrorsCount(address); + if (proxyAddress == null) { + recorder().incrementErrorsCount(address); + } + else { + recorder().incrementErrorsCount(address, proxyAddress); + } } protected void recordRead(ChannelHandlerContext ctx, SocketAddress address, long bytes) { - recorder().recordDataReceived(address, bytes); + if (proxyAddress == null) { + recorder().recordDataReceived(address, bytes); + } + else { + recorder().recordDataReceived(address, proxyAddress, bytes); + } } protected void recordWrite(ChannelHandlerContext ctx, SocketAddress address, long bytes) { - recorder().recordDataSent(address, bytes); + if (proxyAddress == null) { + recorder().recordDataSent(address, bytes); + } + else { + recorder().recordDataSent(address, proxyAddress, bytes); + } } } diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMeters.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMeters.java index fa94c90866..ac11fd43d6 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMeters.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMeters.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -120,6 +120,16 @@ public Meter.Type getType() { public enum ChannelMetersTags implements KeyName { + /** + * Proxy address, when there is a proxy configured. + */ + PROXY_ADDRESS { + @Override + public String asString() { + return "proxy.address"; + } + }, + /** * Remote address. */ diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMetricsHandler.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMetricsHandler.java index d07d4eda78..45a8db42b8 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMetricsHandler.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMetricsHandler.java @@ -46,12 +46,12 @@ public class ChannelMetricsHandler extends AbstractChannelMetricsHandler { @Override public ChannelHandler connectMetricsHandler() { - return new ConnectMetricsHandler(recorder()); + return new ConnectMetricsHandler(recorder(), proxyAddress); } @Override public ChannelHandler tlsMetricsHandler() { - return new TlsMetricsHandler(recorder, remoteAddress); + return new TlsMetricsHandler(recorder, remoteAddress, proxyAddress); } @Override @@ -61,9 +61,11 @@ public ChannelMetricsRecorder recorder() { static final class ConnectMetricsHandler extends ChannelOutboundHandlerAdapter { + final SocketAddress proxyAddress; final ChannelMetricsRecorder recorder; - ConnectMetricsHandler(ChannelMetricsRecorder recorder) { + ConnectMetricsHandler(ChannelMetricsRecorder recorder, @Nullable SocketAddress proxyAddress) { + this.proxyAddress = proxyAddress; this.recorder = recorder; } @@ -75,22 +77,34 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, promise.addListener(future -> { ctx.pipeline().remove(this); - recorder.recordConnectTime( - remoteAddress, - Duration.ofNanos(System.nanoTime() - connectTimeStart), - future.isSuccess() ? SUCCESS : ERROR); + if (proxyAddress == null) { + recorder.recordConnectTime( + remoteAddress, + Duration.ofNanos(System.nanoTime() - connectTimeStart), + future.isSuccess() ? SUCCESS : ERROR); + } + else { + recorder.recordConnectTime( + remoteAddress, + proxyAddress, + Duration.ofNanos(System.nanoTime() - connectTimeStart), + future.isSuccess() ? SUCCESS : ERROR); + } }); } } static class TlsMetricsHandler extends ChannelInboundHandlerAdapter { + protected final SocketAddress proxyAddress; protected final ChannelMetricsRecorder recorder; protected final SocketAddress remoteAddress; boolean listenerAdded; - TlsMetricsHandler(ChannelMetricsRecorder recorder, @Nullable SocketAddress remoteAddress) { + TlsMetricsHandler(ChannelMetricsRecorder recorder, @Nullable SocketAddress remoteAddress, + @Nullable SocketAddress proxyAddress) { + this.proxyAddress = proxyAddress; this.recorder = recorder; this.remoteAddress = remoteAddress; } @@ -110,10 +124,19 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { } protected void recordTlsHandshakeTime(ChannelHandlerContext ctx, long tlsHandshakeTimeStart, String status) { - recorder.recordTlsHandshakeTime( - remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress(), - Duration.ofNanos(System.nanoTime() - tlsHandshakeTimeStart), - status); + if (proxyAddress == null) { + recorder.recordTlsHandshakeTime( + remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress(), + Duration.ofNanos(System.nanoTime() - tlsHandshakeTimeStart), + status); + } + else { + recorder.recordTlsHandshakeTime( + remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress(), + proxyAddress, + Duration.ofNanos(System.nanoTime() - tlsHandshakeTimeStart), + status); + } } private void addListener(ChannelHandlerContext ctx) { diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMetricsRecorder.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMetricsRecorder.java index d48626a6b0..3cd94abd05 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMetricsRecorder.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMetricsRecorder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,18 @@ public interface ChannelMetricsRecorder { */ void recordDataReceived(SocketAddress remoteAddress, long bytes); + /** + * Records the amount of the data that is received, in bytes. + * + * @param remoteAddress The remote peer + * @param proxyAddress The proxy address + * @param bytes The amount of the data that is received, in bytes + * @since 1.1.17 + */ + default void recordDataReceived(SocketAddress remoteAddress, SocketAddress proxyAddress, long bytes) { + recordDataReceived(remoteAddress, bytes); + } + /** * Records the amount of the data that is sent, in bytes. * @@ -41,6 +53,18 @@ public interface ChannelMetricsRecorder { */ void recordDataSent(SocketAddress remoteAddress, long bytes); + /** + * Records the amount of the data that is sent, in bytes. + * + * @param remoteAddress The remote peer + * @param proxyAddress The proxy address + * @param bytes The amount of the data that is sent, in bytes + * @since 1.1.17 + */ + default void recordDataSent(SocketAddress remoteAddress, SocketAddress proxyAddress, long bytes) { + recordDataSent(remoteAddress, bytes); + } + /** * Increments the number of the errors that are occurred. * @@ -48,6 +72,17 @@ public interface ChannelMetricsRecorder { */ void incrementErrorsCount(SocketAddress remoteAddress); + /** + * Increments the number of the errors that are occurred. + * + * @param remoteAddress The remote peer + * @param proxyAddress The proxy address + * @since 1.1.17 + */ + default void incrementErrorsCount(SocketAddress remoteAddress, SocketAddress proxyAddress) { + incrementErrorsCount(remoteAddress); + } + /** * Records the time that is spent for TLS handshake. * @@ -57,6 +92,19 @@ public interface ChannelMetricsRecorder { */ void recordTlsHandshakeTime(SocketAddress remoteAddress, Duration time, String status); + /** + * Records the time that is spent for TLS handshake. + * + * @param remoteAddress The remote peer + * @param proxyAddress The proxy address + * @param time the time in nanoseconds that is spent for TLS handshake + * @param status the status of the operation + * @since 1.1.17 + */ + default void recordTlsHandshakeTime(SocketAddress remoteAddress, SocketAddress proxyAddress, Duration time, String status) { + recordTlsHandshakeTime(remoteAddress, time, status); + } + /** * Records the time that is spent for connecting to the remote address. * Relevant only when on the client @@ -67,6 +115,20 @@ public interface ChannelMetricsRecorder { */ void recordConnectTime(SocketAddress remoteAddress, Duration time, String status); + /** + * Records the time that is spent for connecting to the remote address. + * Relevant only when on the client + * + * @param remoteAddress The remote peer + * @param proxyAddress The proxy address + * @param time the time in nanoseconds that is spent for connecting to the remote address + * @param status the status of the operation + * @since 1.1.17 + */ + default void recordConnectTime(SocketAddress remoteAddress, SocketAddress proxyAddress, Duration time, String status) { + recordConnectTime(remoteAddress, time, status); + } + /** * Records the time that is spent for resolving the remote address. * Relevant only when on the client. diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ConnectObservations.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ConnectObservations.java index ca8403fbbd..460c6dbf24 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ConnectObservations.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ConnectObservations.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -108,6 +108,16 @@ public String asString() { */ enum ConnectTimeLowCardinalityTags implements KeyName { + /** + * Proxy address, when there is a proxy configured. + */ + PROXY_ADDRESS { + @Override + public String asString() { + return "proxy.address"; + } + }, + /** * Remote address. */ diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ContextAwareChannelMetricsHandler.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ContextAwareChannelMetricsHandler.java index 08c554318a..02ac3cc606 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ContextAwareChannelMetricsHandler.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ContextAwareChannelMetricsHandler.java @@ -48,12 +48,12 @@ final class ContextAwareChannelMetricsHandler extends AbstractChannelMetricsHand @Override public ChannelHandler connectMetricsHandler() { - return new ContextAwareConnectMetricsHandler(recorder()); + return new ContextAwareConnectMetricsHandler(recorder(), proxyAddress); } @Override public ChannelHandler tlsMetricsHandler() { - return new TlsMetricsHandler(recorder, remoteAddress); + return new TlsMetricsHandler(recorder, remoteAddress, proxyAddress); } @Override @@ -66,10 +66,20 @@ protected void recordException(ChannelHandlerContext ctx, SocketAddress address) Connection connection = Connection.from(ctx.channel()); ChannelOperations ops = connection.as(ChannelOperations.class); if (ops != null) { - recorder().incrementErrorsCount(ops.currentContext(), address); + if (proxyAddress == null) { + recorder().incrementErrorsCount(ops.currentContext(), address); + } + else { + recorder().incrementErrorsCount(ops.currentContext(), address, proxyAddress); + } } else if (connection instanceof ConnectionObserver) { - recorder().incrementErrorsCount(((ConnectionObserver) connection).currentContext(), address); + if (proxyAddress == null) { + recorder().incrementErrorsCount(((ConnectionObserver) connection).currentContext(), address); + } + else { + recorder().incrementErrorsCount(((ConnectionObserver) connection).currentContext(), address, proxyAddress); + } } else { super.recordException(ctx, address); @@ -80,7 +90,12 @@ else if (connection instanceof ConnectionObserver) { protected void recordRead(ChannelHandlerContext ctx, SocketAddress address, long bytes) { ChannelOperations ops = ChannelOperations.get(ctx.channel()); if (ops != null) { - recorder().recordDataReceived(ops.currentContext(), address, bytes); + if (proxyAddress == null) { + recorder().recordDataReceived(ops.currentContext(), address, bytes); + } + else { + recorder().recordDataReceived(ops.currentContext(), address, proxyAddress, bytes); + } } else { super.recordRead(ctx, address, bytes); @@ -91,7 +106,12 @@ protected void recordRead(ChannelHandlerContext ctx, SocketAddress address, long protected void recordWrite(ChannelHandlerContext ctx, SocketAddress address, long bytes) { ChannelOperations ops = ChannelOperations.get(ctx.channel()); if (ops != null) { - recorder().recordDataSent(ops.currentContext(), address, bytes); + if (proxyAddress == null) { + recorder().recordDataSent(ops.currentContext(), address, bytes); + } + else { + recorder().recordDataSent(ops.currentContext(), address, proxyAddress, bytes); + } } else { super.recordWrite(ctx, address, bytes); @@ -100,9 +120,11 @@ protected void recordWrite(ChannelHandlerContext ctx, SocketAddress address, lon static final class ContextAwareConnectMetricsHandler extends ChannelOutboundHandlerAdapter { + final SocketAddress proxyAddress; final ContextAwareChannelMetricsRecorder recorder; - ContextAwareConnectMetricsHandler(ContextAwareChannelMetricsRecorder recorder) { + ContextAwareConnectMetricsHandler(ContextAwareChannelMetricsRecorder recorder, @Nullable SocketAddress proxyAddress) { + this.proxyAddress = proxyAddress; this.recorder = recorder; } @@ -120,33 +142,59 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, void recordConnectTime(ChannelHandlerContext ctx, SocketAddress address, long connectTimeStart, String status) { Connection connection = Connection.from(ctx.channel()); if (connection instanceof ConnectionObserver) { - recorder.recordConnectTime( - ((ConnectionObserver) connection).currentContext(), - address, - Duration.ofNanos(System.nanoTime() - connectTimeStart), - status); + if (proxyAddress == null) { + recorder.recordConnectTime( + ((ConnectionObserver) connection).currentContext(), + address, + Duration.ofNanos(System.nanoTime() - connectTimeStart), + status); + } + else { + recorder.recordConnectTime( + ((ConnectionObserver) connection).currentContext(), + address, + proxyAddress, + Duration.ofNanos(System.nanoTime() - connectTimeStart), + status); + } } else { - recorder.recordConnectTime(address, Duration.ofNanos(System.nanoTime() - connectTimeStart), status); + if (proxyAddress == null) { + recorder.recordConnectTime(address, Duration.ofNanos(System.nanoTime() - connectTimeStart), status); + } + else { + recorder.recordConnectTime(address, proxyAddress, Duration.ofNanos(System.nanoTime() - connectTimeStart), status); + } } } } static final class TlsMetricsHandler extends ChannelMetricsHandler.TlsMetricsHandler { - TlsMetricsHandler(ContextAwareChannelMetricsRecorder recorder, @Nullable SocketAddress remoteAddress) { - super(recorder, remoteAddress); + TlsMetricsHandler(ContextAwareChannelMetricsRecorder recorder, @Nullable SocketAddress remoteAddress, + @Nullable SocketAddress proxyAddress) { + super(recorder, remoteAddress, proxyAddress); } @Override protected void recordTlsHandshakeTime(ChannelHandlerContext ctx, long tlsHandshakeTimeStart, String status) { Connection connection = Connection.from(ctx.channel()); if (connection instanceof ConnectionObserver) { - ((ContextAwareChannelMetricsRecorder) recorder).recordTlsHandshakeTime( - ((ConnectionObserver) connection).currentContext(), - remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress(), - Duration.ofNanos(System.nanoTime() - tlsHandshakeTimeStart), - status); + if (proxyAddress == null) { + ((ContextAwareChannelMetricsRecorder) recorder).recordTlsHandshakeTime( + ((ConnectionObserver) connection).currentContext(), + remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress(), + Duration.ofNanos(System.nanoTime() - tlsHandshakeTimeStart), + status); + } + else { + ((ContextAwareChannelMetricsRecorder) recorder).recordTlsHandshakeTime( + ((ConnectionObserver) connection).currentContext(), + remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress(), + proxyAddress, + Duration.ofNanos(System.nanoTime() - tlsHandshakeTimeStart), + status); + } } } } diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ContextAwareChannelMetricsRecorder.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ContextAwareChannelMetricsRecorder.java index e82edbca08..1c3bc30f8d 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ContextAwareChannelMetricsRecorder.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ContextAwareChannelMetricsRecorder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,18 @@ public abstract class ContextAwareChannelMetricsRecorder implements ChannelMetri */ public abstract void incrementErrorsCount(ContextView contextView, SocketAddress remoteAddress); + /** + * Increments the number of the errors that are occurred. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux pipeline + * @param remoteAddress The remote peer + * @param proxyAddress The proxy address + * @since 1.1.17 + */ + public void incrementErrorsCount(ContextView contextView, SocketAddress remoteAddress, SocketAddress proxyAddress) { + incrementErrorsCount(contextView, remoteAddress); + } + /** * Records the time that is spent for connecting to the remote address. * Relevant only when on the client. @@ -48,6 +60,21 @@ public abstract class ContextAwareChannelMetricsRecorder implements ChannelMetri */ public abstract void recordConnectTime(ContextView contextView, SocketAddress remoteAddress, Duration time, String status); + /** + * Records the time that is spent for connecting to the remote address. + * Relevant only when on the client. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux pipeline + * @param remoteAddress The remote peer + * @param proxyAddress The proxy address + * @param time The time in nanoseconds that is spent for connecting to the remote address + * @param status The status of the operation + * @since 1.1.17 + */ + public void recordConnectTime(ContextView contextView, SocketAddress remoteAddress, SocketAddress proxyAddress, Duration time, String status) { + recordConnectTime(contextView, remoteAddress, time, status); + } + /** * Records the amount of the data that is received, in bytes. * @@ -57,6 +84,19 @@ public abstract class ContextAwareChannelMetricsRecorder implements ChannelMetri */ public abstract void recordDataReceived(ContextView contextView, SocketAddress remoteAddress, long bytes); + /** + * Records the amount of the data that is received, in bytes. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux pipeline + * @param remoteAddress The remote peer + * @param proxyAddress The proxy address + * @param bytes The amount of the data that is received, in bytes + * @since 1.1.17 + */ + public void recordDataReceived(ContextView contextView, SocketAddress remoteAddress, SocketAddress proxyAddress, long bytes) { + recordDataReceived(contextView, remoteAddress, bytes); + } + /** * Records the amount of the data that is sent, in bytes. * @@ -66,6 +106,19 @@ public abstract class ContextAwareChannelMetricsRecorder implements ChannelMetri */ public abstract void recordDataSent(ContextView contextView, SocketAddress remoteAddress, long bytes); + /** + * Records the amount of the data that is sent, in bytes. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux pipeline + * @param remoteAddress The remote peer + * @param proxyAddress The proxy address + * @param bytes The amount of the data that is sent, in bytes + * @since 1.1.17 + */ + public void recordDataSent(ContextView contextView, SocketAddress remoteAddress, SocketAddress proxyAddress, long bytes) { + recordDataSent(contextView, remoteAddress, bytes); + } + /** * Records the time that is spent for TLS handshake. * @@ -76,28 +129,67 @@ public abstract class ContextAwareChannelMetricsRecorder implements ChannelMetri */ public abstract void recordTlsHandshakeTime(ContextView contextView, SocketAddress remoteAddress, Duration time, String status); + /** + * Records the time that is spent for TLS handshake. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux pipeline + * @param remoteAddress The remote peer + * @param proxyAddress The proxy address + * @param time The time in nanoseconds that is spent for TLS handshake + * @param status The status of the operation + * @since 1.1.17 + */ + public void recordTlsHandshakeTime(ContextView contextView, SocketAddress remoteAddress, SocketAddress proxyAddress, Duration time, String status) { + recordTlsHandshakeTime(contextView, remoteAddress, time, status); + } + @Override public void incrementErrorsCount(SocketAddress remoteAddress) { incrementErrorsCount(Context.empty(), remoteAddress); } + @Override + public void incrementErrorsCount(SocketAddress remoteAddress, SocketAddress proxyAddress) { + incrementErrorsCount(Context.empty(), remoteAddress, proxyAddress); + } + @Override public void recordConnectTime(SocketAddress remoteAddress, Duration time, String status) { recordConnectTime(Context.empty(), remoteAddress, time, status); } + @Override + public void recordConnectTime(SocketAddress remoteAddress, SocketAddress proxyAddress, Duration time, String status) { + recordConnectTime(Context.empty(), remoteAddress, proxyAddress, time, status); + } + @Override public void recordDataReceived(SocketAddress remoteAddress, long bytes) { recordDataReceived(Context.empty(), remoteAddress, bytes); } + @Override + public void recordDataReceived(SocketAddress remoteAddress, SocketAddress proxyAddress, long bytes) { + recordDataReceived(Context.empty(), remoteAddress, proxyAddress, bytes); + } + @Override public void recordDataSent(SocketAddress remoteAddress, long bytes) { recordDataSent(Context.empty(), remoteAddress, bytes); } + @Override + public void recordDataSent(SocketAddress remoteAddress, SocketAddress proxyAddress, long bytes) { + recordDataSent(Context.empty(), remoteAddress, proxyAddress, bytes); + } + @Override public void recordTlsHandshakeTime(SocketAddress remoteAddress, Duration time, String status) { recordTlsHandshakeTime(Context.empty(), remoteAddress, time, status); } + + @Override + public void recordTlsHandshakeTime(SocketAddress remoteAddress, SocketAddress proxyAddress, Duration time, String status) { + recordTlsHandshakeTime(Context.empty(), remoteAddress, proxyAddress, time, status); + } } diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/MeterKey.java b/reactor-netty-core/src/main/java/reactor/netty/channel/MeterKey.java index ea8d0a7084..91207c4003 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/MeterKey.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/MeterKey.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,13 +29,41 @@ public final class MeterKey { private final String uri; private final String remoteAddress; + private final String proxyAddress; private final String method; private final String status; + /** + * Creates new meter key. + * + * @param uri the requested URI + * @param remoteAddress the remote address + * @param method the HTTP method + * @param status the HTTP status + * @deprecated as of 1.1.17. Prefer using {@link #MeterKey(String, String, String, String, String)} constructor. + * This method will be removed in version 1.3.0. + */ + @Deprecated public MeterKey(@Nullable String uri, @Nullable String remoteAddress, @Nullable String method, @Nullable String status) { + this(uri, remoteAddress, null, method, status); + } + + /** + * Creates new meter key. + * + * @param uri the requested URI + * @param remoteAddress the remote address + * @param proxyAddress the proxy address + * @param method the HTTP method + * @param status the HTTP status + * @since 1.1.17 + */ + public MeterKey(@Nullable String uri, @Nullable String remoteAddress, @Nullable String proxyAddress, + @Nullable String method, @Nullable String status) { this.uri = uri; this.remoteAddress = remoteAddress; + this.proxyAddress = proxyAddress; this.method = method; this.status = status; } @@ -51,6 +79,7 @@ public boolean equals(Object o) { MeterKey meterKey = (MeterKey) o; return Objects.equals(uri, meterKey.uri) && Objects.equals(remoteAddress, meterKey.remoteAddress) && + Objects.equals(proxyAddress, meterKey.proxyAddress) && Objects.equals(method, meterKey.method) && Objects.equals(status, meterKey.status); } @@ -60,6 +89,7 @@ public int hashCode() { int result = 1; result = 31 * result + Objects.hashCode(uri); result = 31 * result + Objects.hashCode(remoteAddress); + result = 31 * result + Objects.hashCode(proxyAddress); result = 31 * result + Objects.hashCode(method); result = 31 * result + Objects.hashCode(status); return result; diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/MicrometerChannelMetricsHandler.java b/reactor-netty-core/src/main/java/reactor/netty/channel/MicrometerChannelMetricsHandler.java index faed1ed81e..27d393f8e3 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/MicrometerChannelMetricsHandler.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/MicrometerChannelMetricsHandler.java @@ -39,12 +39,14 @@ import static reactor.netty.Metrics.SUCCESS; import static reactor.netty.Metrics.TLS_HANDSHAKE_TIME; import static reactor.netty.Metrics.UNKNOWN; +import static reactor.netty.Metrics.formatSocketAddress; import static reactor.netty.Metrics.updateChannelContext; import static reactor.netty.channel.ConnectObservations.ConnectTimeHighCardinalityTags.NET_PEER_NAME; import static reactor.netty.channel.ConnectObservations.ConnectTimeHighCardinalityTags.NET_PEER_PORT; import static reactor.netty.channel.ConnectObservations.ConnectTimeHighCardinalityTags.REACTOR_NETTY_PROTOCOL; import static reactor.netty.channel.ConnectObservations.ConnectTimeHighCardinalityTags.REACTOR_NETTY_STATUS; import static reactor.netty.channel.ConnectObservations.ConnectTimeHighCardinalityTags.REACTOR_NETTY_TYPE; +import static reactor.netty.channel.ConnectObservations.ConnectTimeLowCardinalityTags.PROXY_ADDRESS; import static reactor.netty.channel.ConnectObservations.ConnectTimeLowCardinalityTags.REMOTE_ADDRESS; import static reactor.netty.channel.ConnectObservations.ConnectTimeLowCardinalityTags.STATUS; @@ -67,11 +69,11 @@ public final class MicrometerChannelMetricsHandler extends AbstractChannelMetric @Override public ChannelHandler connectMetricsHandler() { - return new ConnectMetricsHandler(recorder); + return new ConnectMetricsHandler(recorder, proxyAddress); } @Override public ChannelHandler tlsMetricsHandler() { - return new TlsMetricsHandler(recorder, onServer, remoteAddress); + return new TlsMetricsHandler(recorder, onServer, remoteAddress, proxyAddress); } @Override @@ -87,6 +89,7 @@ static final class ConnectMetricsHandler extends Observation.Context static final String CONTEXTUAL_NAME = "connect"; static final String TYPE = "client"; + final String proxyAddress; final MicrometerChannelMetricsRecorder recorder; // remote address and status are not known beforehand @@ -95,7 +98,8 @@ static final class ConnectMetricsHandler extends Observation.Context String status = UNKNOWN; ContextView parentContextView; - ConnectMetricsHandler(MicrometerChannelMetricsRecorder recorder) { + ConnectMetricsHandler(MicrometerChannelMetricsRecorder recorder, @Nullable SocketAddress proxyAddress) { + this.proxyAddress = formatSocketAddress(proxyAddress); this.recorder = recorder; } @@ -106,7 +110,12 @@ public Observation.Context get() { @Override public Timer getTimer() { - return recorder.getConnectTimer(getName(), netPeerName + ":" + netPeerPort, status); + if (proxyAddress == null) { + return recorder.getConnectTimer(getName(), netPeerName + ":" + netPeerPort, status); + } + else { + return recorder.getConnectTimer(getName(), netPeerName + ":" + netPeerPort, proxyAddress, status); + } } @Override @@ -196,7 +205,13 @@ public KeyValues getHighCardinalityKeyValues() { @Override public KeyValues getLowCardinalityKeyValues() { - return KeyValues.of(REMOTE_ADDRESS.asString(), netPeerName + ":" + netPeerPort, STATUS.asString(), status); + if (proxyAddress == null) { + return KeyValues.of(REMOTE_ADDRESS.asString(), netPeerName + ":" + netPeerPort, STATUS.asString(), status); + } + else { + return KeyValues.of(REMOTE_ADDRESS.asString(), netPeerName + ":" + netPeerPort, + PROXY_ADDRESS.asString(), proxyAddress, STATUS.asString(), status); + } } @Override @@ -228,6 +243,7 @@ static final class TlsMetricsHandler extends Observation.Context static final String TYPE_CLIENT = "client"; static final String TYPE_SERVER = "server"; + final String proxyAddress; final MicrometerChannelMetricsRecorder recorder; final SocketAddress remoteAddress; final String type; @@ -239,7 +255,9 @@ static final class TlsMetricsHandler extends Observation.Context String status = UNKNOWN; ContextView parentContextView; - TlsMetricsHandler(MicrometerChannelMetricsRecorder recorder, boolean onServer, @Nullable SocketAddress remoteAddress) { + TlsMetricsHandler(MicrometerChannelMetricsRecorder recorder, boolean onServer, + @Nullable SocketAddress remoteAddress, @Nullable SocketAddress proxyAddress) { + this.proxyAddress = formatSocketAddress(proxyAddress); this.recorder = recorder; this.remoteAddress = remoteAddress; this.type = onServer ? TYPE_SERVER : TYPE_CLIENT; @@ -331,7 +349,13 @@ public KeyValues getHighCardinalityKeyValues() { @Override public KeyValues getLowCardinalityKeyValues() { - return KeyValues.of(REMOTE_ADDRESS.asString(), netPeerName + ':' + netPeerPort, STATUS.asString(), status); + if (proxyAddress == null) { + return KeyValues.of(REMOTE_ADDRESS.asString(), netPeerName + ':' + netPeerPort, STATUS.asString(), status); + } + else { + return KeyValues.of(REMOTE_ADDRESS.asString(), netPeerName + ':' + netPeerPort, + PROXY_ADDRESS.asString(), proxyAddress, STATUS.asString(), status); + } } @Override @@ -351,7 +375,12 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { @Override public Timer getTimer() { - return recorder.getTlsHandshakeTimer(getName(), netPeerName + ':' + netPeerPort, status); + if (proxyAddress == null) { + return recorder.getTlsHandshakeTimer(getName(), netPeerName + ':' + netPeerPort, status); + } + else { + return recorder.getTlsHandshakeTimer(getName(), netPeerName + ':' + netPeerPort, proxyAddress, status); + } } } } diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/MicrometerChannelMetricsRecorder.java b/reactor-netty-core/src/main/java/reactor/netty/channel/MicrometerChannelMetricsRecorder.java index 18ee19062d..2675c74eed 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/MicrometerChannelMetricsRecorder.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/MicrometerChannelMetricsRecorder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import static reactor.netty.Metrics.DATA_RECEIVED; import static reactor.netty.Metrics.DATA_SENT; import static reactor.netty.Metrics.ERRORS; +import static reactor.netty.Metrics.PROXY_ADDRESS; import static reactor.netty.Metrics.REGISTRY; import static reactor.netty.Metrics.REMOTE_ADDRESS; import static reactor.netty.Metrics.STATUS; @@ -49,11 +50,14 @@ * @since 0.9 */ public class MicrometerChannelMetricsRecorder implements ChannelMetricsRecorder { - final ConcurrentMap dataReceivedCache = new ConcurrentHashMap<>(); + final ConcurrentMap dataReceivedCacheNoProxy = new ConcurrentHashMap<>(); + final ConcurrentMap dataReceivedCache = new ConcurrentHashMap<>(); - final ConcurrentMap dataSentCache = new ConcurrentHashMap<>(); + final ConcurrentMap dataSentCacheNoProxy = new ConcurrentHashMap<>(); + final ConcurrentMap dataSentCache = new ConcurrentHashMap<>(); - final ConcurrentMap errorsCache = new ConcurrentHashMap<>(); + final ConcurrentMap errorsCacheNoProxy = new ConcurrentHashMap<>(); + final ConcurrentMap errorsCache = new ConcurrentHashMap<>(); final ConcurrentMap connectTimeCache = new ConcurrentHashMap<>(); @@ -74,7 +78,7 @@ public MicrometerChannelMetricsRecorder(String name, String protocol) { @Override public void recordDataReceived(SocketAddress remoteAddress, long bytes) { String address = formatSocketAddress(remoteAddress); - DistributionSummary ds = MapUtils.computeIfAbsent(dataReceivedCache, address, + DistributionSummary ds = MapUtils.computeIfAbsent(dataReceivedCacheNoProxy, address, key -> filter(DistributionSummary.builder(name + DATA_RECEIVED) .baseUnit(ChannelMeters.DATA_RECEIVED.getBaseUnit()) .tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol, @@ -85,10 +89,27 @@ public void recordDataReceived(SocketAddress remoteAddress, long bytes) { } } + @Override + public void recordDataReceived(SocketAddress remoteAddress, SocketAddress proxyAddress, long bytes) { + String address = formatSocketAddress(remoteAddress); + String proxyAddr = formatSocketAddress(proxyAddress); + MeterKey meterKey = new MeterKey(null, address, proxyAddr, null, null); + DistributionSummary ds = MapUtils.computeIfAbsent(dataReceivedCache, meterKey, + key -> filter(DistributionSummary.builder(name + DATA_RECEIVED) + .baseUnit(ChannelMeters.DATA_RECEIVED.getBaseUnit()) + .tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol, + ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address, + ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddr) + .register(REGISTRY))); + if (ds != null) { + ds.record(bytes); + } + } + @Override public void recordDataSent(SocketAddress remoteAddress, long bytes) { String address = formatSocketAddress(remoteAddress); - DistributionSummary ds = MapUtils.computeIfAbsent(dataSentCache, address, + DistributionSummary ds = MapUtils.computeIfAbsent(dataSentCacheNoProxy, address, key -> filter(DistributionSummary.builder(name + DATA_SENT) .baseUnit(ChannelMeters.DATA_SENT.getBaseUnit()) .tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol, @@ -99,10 +120,27 @@ public void recordDataSent(SocketAddress remoteAddress, long bytes) { } } + @Override + public void recordDataSent(SocketAddress remoteAddress, SocketAddress proxyAddress, long bytes) { + String address = formatSocketAddress(remoteAddress); + String proxyAddr = formatSocketAddress(proxyAddress); + MeterKey meterKey = new MeterKey(null, address, proxyAddr, null, null); + DistributionSummary ds = MapUtils.computeIfAbsent(dataSentCache, meterKey, + key -> filter(DistributionSummary.builder(name + DATA_SENT) + .baseUnit(ChannelMeters.DATA_SENT.getBaseUnit()) + .tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol, + ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address, + ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddr) + .register(REGISTRY))); + if (ds != null) { + ds.record(bytes); + } + } + @Override public void incrementErrorsCount(SocketAddress remoteAddress) { String address = formatSocketAddress(remoteAddress); - Counter c = MapUtils.computeIfAbsent(errorsCache, address, + Counter c = MapUtils.computeIfAbsent(errorsCacheNoProxy, address, key -> filter(Counter.builder(name + ERRORS) .tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol, ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address) @@ -112,6 +150,22 @@ public void incrementErrorsCount(SocketAddress remoteAddress) { } } + @Override + public void incrementErrorsCount(SocketAddress remoteAddress, SocketAddress proxyAddress) { + String address = formatSocketAddress(remoteAddress); + String proxyAddr = formatSocketAddress(proxyAddress); + MeterKey meterKey = new MeterKey(null, address, proxyAddr, null, null); + Counter c = MapUtils.computeIfAbsent(errorsCache, meterKey, + key -> filter(Counter.builder(name + ERRORS) + .tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol, + ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address, + ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddr) + .register(REGISTRY))); + if (c != null) { + c.increment(); + } + } + @Override public void recordTlsHandshakeTime(SocketAddress remoteAddress, Duration time, String status) { String address = formatSocketAddress(remoteAddress); @@ -123,13 +177,32 @@ public void recordTlsHandshakeTime(SocketAddress remoteAddress, Duration time, S @Nullable public final Timer getTlsHandshakeTimer(String name, String address, String status) { - MeterKey meterKey = new MeterKey(null, address, null, status); + MeterKey meterKey = new MeterKey(null, address, null, null, status); return MapUtils.computeIfAbsent(tlsHandshakeTimeCache, meterKey, key -> filter(Timer.builder(name) .tags(REMOTE_ADDRESS, address, STATUS, status) .register(REGISTRY))); } + @Override + public void recordTlsHandshakeTime(SocketAddress remoteAddress, SocketAddress proxyAddress, Duration time, String status) { + String address = formatSocketAddress(remoteAddress); + String proxyAddr = formatSocketAddress(proxyAddress); + Timer timer = getTlsHandshakeTimer(name + TLS_HANDSHAKE_TIME, address, proxyAddr, status); + if (timer != null) { + timer.record(time); + } + } + + @Nullable + public final Timer getTlsHandshakeTimer(String name, String address, String proxyAddr, String status) { + MeterKey meterKey = new MeterKey(null, address, proxyAddr, null, status); + return MapUtils.computeIfAbsent(tlsHandshakeTimeCache, meterKey, + key -> filter(Timer.builder(name) + .tags(REMOTE_ADDRESS, address, PROXY_ADDRESS, proxyAddr, STATUS, status) + .register(REGISTRY))); + } + @Override public void recordConnectTime(SocketAddress remoteAddress, Duration time, String status) { String address = formatSocketAddress(remoteAddress); @@ -141,13 +214,32 @@ public void recordConnectTime(SocketAddress remoteAddress, Duration time, String @Nullable final Timer getConnectTimer(String name, String address, String status) { - MeterKey meterKey = new MeterKey(null, address, null, status); + MeterKey meterKey = new MeterKey(null, address, null, null, status); return MapUtils.computeIfAbsent(connectTimeCache, meterKey, key -> filter(Timer.builder(name) .tags(REMOTE_ADDRESS, address, STATUS, status) .register(REGISTRY))); } + @Override + public void recordConnectTime(SocketAddress remoteAddress, SocketAddress proxyAddress, Duration time, String status) { + String address = formatSocketAddress(remoteAddress); + String proxyAddr = formatSocketAddress(proxyAddress); + Timer timer = getConnectTimer(name + CONNECT_TIME, address, proxyAddr, status); + if (timer != null) { + timer.record(time); + } + } + + @Nullable + final Timer getConnectTimer(String name, String address, String proxyAddr, String status) { + MeterKey meterKey = new MeterKey(null, address, proxyAddr, null, status); + return MapUtils.computeIfAbsent(connectTimeCache, meterKey, + key -> filter(Timer.builder(name) + .tags(REMOTE_ADDRESS, address, PROXY_ADDRESS, proxyAddr, STATUS, status) + .register(REGISTRY))); + } + @Override public void recordResolveAddressTime(SocketAddress remoteAddress, Duration time, String status) { String address = formatSocketAddress(remoteAddress); @@ -159,7 +251,7 @@ public void recordResolveAddressTime(SocketAddress remoteAddress, Duration time, @Nullable public final Timer getResolveAddressTimer(String name, String address, String status) { - MeterKey meterKey = new MeterKey(null, address, null, status); + MeterKey meterKey = new MeterKey(null, address, null, null, status); return MapUtils.computeIfAbsent(addressResolverTimeCache, meterKey, key -> filter(Timer.builder(name) .tags(REMOTE_ADDRESS, address, STATUS, status) diff --git a/reactor-netty-core/src/main/java/reactor/netty/tcp/TlsHandshakeObservations.java b/reactor-netty-core/src/main/java/reactor/netty/tcp/TlsHandshakeObservations.java index a3dd717b8f..27ff43cbde 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/tcp/TlsHandshakeObservations.java +++ b/reactor-netty-core/src/main/java/reactor/netty/tcp/TlsHandshakeObservations.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -87,6 +87,16 @@ public String asString() { */ enum TlsHandshakeTimeLowCardinalityTags implements KeyName { + /** + * Proxy address, when there is a proxy configured. + */ + PROXY_ADDRESS { + @Override + public String asString() { + return "proxy.address"; + } + }, + /** * Remote address. */ diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/MicrometerHttpMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/MicrometerHttpMetricsRecorder.java index 7373fc6b6c..3b4eddbec7 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/MicrometerHttpMetricsRecorder.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/MicrometerHttpMetricsRecorder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,7 +61,7 @@ protected MicrometerHttpMetricsRecorder(String name, String protocol) { @Override public void recordDataReceived(SocketAddress remoteAddress, String uri, long bytes) { String address = Metrics.formatSocketAddress(remoteAddress); - MeterKey meterKey = new MeterKey(uri, address, null, null); + MeterKey meterKey = new MeterKey(uri, address, null, null, null); DistributionSummary dataReceived = MapUtils.computeIfAbsent(dataReceivedCache, meterKey, key -> filter(DistributionSummary.builder(name() + DATA_RECEIVED) .baseUnit(ChannelMeters.DATA_RECEIVED.getBaseUnit()) @@ -75,7 +75,7 @@ public void recordDataReceived(SocketAddress remoteAddress, String uri, long byt @Override public void recordDataSent(SocketAddress remoteAddress, String uri, long bytes) { String address = Metrics.formatSocketAddress(remoteAddress); - MeterKey meterKey = new MeterKey(uri, address, null, null); + MeterKey meterKey = new MeterKey(uri, address, null, null, null); DistributionSummary dataSent = MapUtils.computeIfAbsent(dataSentCache, meterKey, key -> filter(DistributionSummary.builder(name() + DATA_SENT) .baseUnit(ChannelMeters.DATA_SENT.getBaseUnit()) @@ -89,7 +89,7 @@ public void recordDataSent(SocketAddress remoteAddress, String uri, long bytes) @Override public void incrementErrorsCount(SocketAddress remoteAddress, String uri) { String address = Metrics.formatSocketAddress(remoteAddress); - MeterKey meterKey = new MeterKey(uri, address, null, null); + MeterKey meterKey = new MeterKey(uri, address, null, null, null); Counter errors = MapUtils.computeIfAbsent(errorsCache, meterKey, key -> filter(Counter.builder(name() + ERRORS) .tags(REMOTE_ADDRESS.asString(), address, URI.asString(), uri) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java index 51d0df6053..aabe04fb62 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java @@ -46,6 +46,7 @@ abstract class AbstractHttpClientMetricsHandler extends ChannelDuplexHandler { private static final Logger log = Loggers.getLogger(AbstractHttpClientMetricsHandler.class); + final SocketAddress proxyAddress; final SocketAddress remoteAddress; String path; @@ -72,7 +73,8 @@ abstract class AbstractHttpClientMetricsHandler extends ChannelDuplexHandler { int lastWriteSeq; - protected AbstractHttpClientMetricsHandler(SocketAddress remoteAddress, @Nullable Function uriTagValue) { + protected AbstractHttpClientMetricsHandler(SocketAddress remoteAddress, @Nullable SocketAddress proxyAddress, @Nullable Function uriTagValue) { + this.proxyAddress = proxyAddress; this.remoteAddress = remoteAddress; this.uriTagValue = uriTagValue; } @@ -85,6 +87,7 @@ protected AbstractHttpClientMetricsHandler(AbstractHttpClientMetricsHandler copy this.dataSentTime = copy.dataSentTime; this.method = copy.method; this.path = copy.path; + this.proxyAddress = copy.proxyAddress; this.remoteAddress = copy.remoteAddress; this.status = copy.status; this.uriTagValue = copy.uriTagValue; @@ -206,24 +209,48 @@ else if (msg instanceof ByteBuf) { protected abstract HttpClientMetricsRecorder recorder(); protected void recordException(ChannelHandlerContext ctx) { - recorder().incrementErrorsCount(remoteAddress, path != null ? path : resolveUri(ctx)); + if (proxyAddress == null) { + recorder().incrementErrorsCount(remoteAddress, path != null ? path : resolveUri(ctx)); + } + else { + recorder().incrementErrorsCount(remoteAddress, proxyAddress, path != null ? path : resolveUri(ctx)); + } } protected void recordRead(Channel channel, SocketAddress address) { - recorder().recordDataReceivedTime(address, path, method, status, - Duration.ofNanos(System.nanoTime() - dataReceivedTime)); + if (proxyAddress == null) { + recorder().recordDataReceivedTime(address, path, method, status, + Duration.ofNanos(System.nanoTime() - dataReceivedTime)); + + recorder().recordResponseTime(address, path, method, status, + Duration.ofNanos(System.nanoTime() - dataSentTime)); + + recorder().recordDataReceived(address, path, dataReceived); + } + else { + recorder().recordDataReceivedTime(address, proxyAddress, path, method, status, + Duration.ofNanos(System.nanoTime() - dataReceivedTime)); - recorder().recordResponseTime(address, path, method, status, - Duration.ofNanos(System.nanoTime() - dataSentTime)); + recorder().recordResponseTime(address, proxyAddress, path, method, status, + Duration.ofNanos(System.nanoTime() - dataSentTime)); - recorder().recordDataReceived(address, path, dataReceived); + recorder().recordDataReceived(address, proxyAddress, path, dataReceived); + } } protected void recordWrite(SocketAddress address) { - recorder().recordDataSentTime(address, path, method, - Duration.ofNanos(System.nanoTime() - dataSentTime)); + if (proxyAddress == null) { + recorder().recordDataSentTime(address, path, method, + Duration.ofNanos(System.nanoTime() - dataSentTime)); + + recorder().recordDataSent(address, path, dataSent); + } + else { + recorder().recordDataSentTime(address, proxyAddress, path, method, + Duration.ofNanos(System.nanoTime() - dataSentTime)); - recorder().recordDataSent(address, path, dataSent); + recorder().recordDataSent(address, proxyAddress, path, dataSent); + } } protected void reset() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareHttpClientMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareHttpClientMetricsHandler.java index ba53b0b41a..e7dc926169 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareHttpClientMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareHttpClientMetricsHandler.java @@ -36,8 +36,9 @@ final class ContextAwareHttpClientMetricsHandler extends AbstractHttpClientMetri ContextAwareHttpClientMetricsHandler(ContextAwareHttpClientMetricsRecorder recorder, SocketAddress remoteAddress, + SocketAddress proxyAddress, @Nullable Function uriTagValue) { - super(remoteAddress, uriTagValue); + super(remoteAddress, proxyAddress, uriTagValue); this.recorder = recorder; } @@ -54,7 +55,12 @@ protected ContextAwareHttpClientMetricsRecorder recorder() { @Override protected void recordException(ChannelHandlerContext ctx) { if (contextView != null) { - recorder().incrementErrorsCount(contextView, remoteAddress, path); + if (proxyAddress == null) { + recorder().incrementErrorsCount(contextView, remoteAddress, path); + } + else { + recorder().incrementErrorsCount(contextView, remoteAddress, proxyAddress, path); + } } else { super.recordException(ctx); @@ -64,10 +70,18 @@ protected void recordException(ChannelHandlerContext ctx) { @Override protected void recordWrite(SocketAddress address) { if (contextView != null) { - recorder.recordDataSentTime(contextView, address, path, method, - Duration.ofNanos(System.nanoTime() - dataSentTime)); + if (proxyAddress == null) { + recorder.recordDataSentTime(contextView, address, path, method, + Duration.ofNanos(System.nanoTime() - dataSentTime)); - recorder.recordDataSent(contextView, address, path, dataSent); + recorder.recordDataSent(contextView, address, path, dataSent); + } + else { + recorder.recordDataSentTime(contextView, address, proxyAddress, path, method, + Duration.ofNanos(System.nanoTime() - dataSentTime)); + + recorder.recordDataSent(contextView, address, proxyAddress, path, dataSent); + } } else { super.recordWrite(address); @@ -77,13 +91,24 @@ protected void recordWrite(SocketAddress address) { @Override protected void recordRead(Channel channel, SocketAddress address) { if (contextView != null) { - recorder.recordDataReceivedTime(contextView, address, path, method, status, - Duration.ofNanos(System.nanoTime() - dataReceivedTime)); + if (proxyAddress == null) { + recorder.recordDataReceivedTime(contextView, address, path, method, status, + Duration.ofNanos(System.nanoTime() - dataReceivedTime)); + + recorder.recordResponseTime(contextView, address, path, method, status, + Duration.ofNanos(System.nanoTime() - dataSentTime)); + + recorder.recordDataReceived(contextView, address, path, dataReceived); + } + else { + recorder.recordDataReceivedTime(contextView, address, proxyAddress, path, method, status, + Duration.ofNanos(System.nanoTime() - dataReceivedTime)); - recorder.recordResponseTime(contextView, address, path, method, status, - Duration.ofNanos(System.nanoTime() - dataSentTime)); + recorder.recordResponseTime(contextView, address, proxyAddress, path, method, status, + Duration.ofNanos(System.nanoTime() - dataSentTime)); - recorder.recordDataReceived(contextView, address, path, dataReceived); + recorder.recordDataReceived(contextView, address, proxyAddress, path, dataReceived); + } } else { super.recordRead(channel, address); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareHttpClientMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareHttpClientMetricsRecorder.java index 8b2193cfe5..4997598c4b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareHttpClientMetricsRecorder.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareHttpClientMetricsRecorder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,23 @@ public abstract class ContextAwareHttpClientMetricsRecorder extends ContextAware public abstract void recordDataReceivedTime(ContextView contextView, SocketAddress remoteAddress, String uri, String method, String status, Duration time); + /** + * Records the time that is spent in consuming incoming data. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri The requested URI + * @param method The HTTP method + * @param status The HTTP status + * @param time The time in nanoseconds that is spent in consuming incoming data + * @since 1.1.17 + */ + public void recordDataReceivedTime(ContextView contextView, SocketAddress remoteAddress, SocketAddress proxyAddress, + String uri, String method, String status, Duration time) { + recordDataReceivedTime(contextView, remoteAddress, uri, method, status, time); + } + /** * Records the time that is spent in sending outgoing data. * @@ -56,6 +73,22 @@ public abstract void recordDataReceivedTime(ContextView contextView, SocketAddre public abstract void recordDataSentTime(ContextView contextView, SocketAddress remoteAddress, String uri, String method, Duration time); + /** + * Records the time that is spent in sending outgoing data. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri The requested URI + * @param method The HTTP method + * @param time The time in nanoseconds that is spent in sending outgoing data + * @since 1.1.17 + */ + public void recordDataSentTime(ContextView contextView, SocketAddress remoteAddress, SocketAddress proxyAddress, + String uri, String method, Duration time) { + recordDataSentTime(contextView, remoteAddress, uri, method, time); + } + /** * Records the total time for the request/response. * @@ -69,18 +102,106 @@ public abstract void recordDataSentTime(ContextView contextView, SocketAddress r public abstract void recordResponseTime(ContextView contextView, SocketAddress remoteAddress, String uri, String method, String status, Duration time); + /** + * Records the total time for the request/response. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri The requested URI + * @param method The HTTP method + * @param status The HTTP status + * @param time The total time in nanoseconds for the request/response + * @since 1.1.17 + */ + public void recordResponseTime(ContextView contextView, SocketAddress remoteAddress, SocketAddress proxyAddress, + String uri, String method, String status, Duration time) { + recordResponseTime(contextView, remoteAddress, uri, method, status, time); + } + + /** + * Increments the number of the errors that are occurred. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri The requested URI + * @since 1.1.17 + */ + public void incrementErrorsCount(ContextView contextView, SocketAddress remoteAddress, SocketAddress proxyAddress, String uri) { + incrementErrorsCount(contextView, remoteAddress, uri); + } + + /** + * Records the amount of the data that is received, in bytes. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri The requested URI + * @param bytes The amount of the data that is received, in bytes + * @since 1.1.17 + */ + public void recordDataReceived(ContextView contextView, SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, long bytes) { + recordDataReceived(contextView, remoteAddress, uri, bytes); + } + + /** + * Records the amount of the data that is sent, in bytes. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri The requested URI + * @param bytes The amount of the data that is sent, in bytes + * @since 1.1.17 + */ + public void recordDataSent(ContextView contextView, SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, long bytes) { + recordDataSent(contextView, remoteAddress, uri, bytes); + } + @Override public void recordDataReceivedTime(SocketAddress remoteAddress, String uri, String method, String status, Duration time) { recordDataReceivedTime(Context.empty(), remoteAddress, uri, method, status, time); } + @Override + public void recordDataReceivedTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, String status, Duration time) { + recordDataReceivedTime(Context.empty(), remoteAddress, proxyAddress, uri, method, status, time); + } + @Override public void recordDataSentTime(SocketAddress remoteAddress, String uri, String method, Duration time) { recordDataSentTime(Context.empty(), remoteAddress, uri, method, time); } + @Override + public void recordDataSentTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, Duration time) { + recordDataSentTime(Context.empty(), remoteAddress, proxyAddress, uri, method, time); + } + @Override public void recordResponseTime(SocketAddress remoteAddress, String uri, String method, String status, Duration time) { recordResponseTime(Context.empty(), remoteAddress, uri, method, status, time); } + + @Override + public void recordResponseTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, String status, Duration time) { + recordResponseTime(Context.empty(), remoteAddress, proxyAddress, uri, method, status, time); + } + + @Override + public void recordDataReceived(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, long bytes) { + recordDataReceived(Context.empty(), remoteAddress, proxyAddress, uri, bytes); + } + + @Override + public void recordDataSent(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, long bytes) { + recordDataSent(Context.empty(), remoteAddress, proxyAddress, uri, bytes); + } + + @Override + public void incrementErrorsCount(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri) { + incrementErrorsCount(Context.empty(), remoteAddress, proxyAddress, uri); + } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index e02b9844dc..1b25507f94 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -41,6 +41,7 @@ import reactor.netty.channel.ChannelOperations; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.PooledConnectionProvider; +import reactor.netty.transport.ClientTransportConfig; import reactor.netty.transport.TransportConfig; import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; import reactor.netty.internal.shaded.reactor.pool.PooledRef; @@ -118,13 +119,15 @@ protected CoreSubscriber> createDisposableAcquire( Context currentContext) { boolean acceptGzip = false; ChannelMetricsRecorder metricsRecorder = config.metricsRecorder() != null ? config.metricsRecorder().get() : null; + SocketAddress proxyAddress = ((ClientTransportConfig) config).proxyProvider() != null ? + ((ClientTransportConfig) config).proxyProvider().getAddress().get() : null; Function uriTagValue = null; if (config instanceof HttpClientConfig) { acceptGzip = ((HttpClientConfig) config).acceptGzip; uriTagValue = ((HttpClientConfig) config).uriTagValue; } return new DisposableAcquire(connectionObserver, config.channelOperationsProvider(), - acceptGzip, metricsRecorder, pendingAcquireTimeout, pool, remoteAddress, sink, currentContext, uriTagValue); + acceptGzip, metricsRecorder, pendingAcquireTimeout, pool, proxyAddress, remoteAddress, sink, currentContext, uriTagValue); } @Override @@ -238,6 +241,7 @@ static final class DisposableAcquire final ChannelMetricsRecorder metricsRecorder; final long pendingAcquireTimeout; final InstrumentedPool pool; + final SocketAddress proxyAddress; final boolean retried; final MonoSink sink; final Function uriTagValue; @@ -253,6 +257,7 @@ static final class DisposableAcquire @Nullable ChannelMetricsRecorder metricsRecorder, long pendingAcquireTimeout, InstrumentedPool pool, + @Nullable SocketAddress proxyAddress, @Nullable SocketAddress remoteAddress, MonoSink sink, Context currentContext, @@ -265,6 +270,7 @@ static final class DisposableAcquire this.metricsRecorder = metricsRecorder; this.pendingAcquireTimeout = pendingAcquireTimeout; this.pool = pool; + this.proxyAddress = proxyAddress; this.remoteAddress = remoteAddress; this.retried = false; this.sink = sink; @@ -280,6 +286,7 @@ static final class DisposableAcquire this.metricsRecorder = parent.metricsRecorder; this.pendingAcquireTimeout = parent.pendingAcquireTimeout; this.pool = parent.pool; + this.proxyAddress = parent.proxyAddress; this.remoteAddress = parent.remoteAddress; this.retried = true; this.sink = parent.sink; @@ -405,7 +412,7 @@ public void operationComplete(Future future) { setChannelContext(ch, currentContext()); } HttpClientConfig.addStreamHandlers(ch, obs.then(new HttpClientConfig.StreamConnectionObserver(currentContext())), - opsFactory, acceptGzip, metricsRecorder, remoteAddress, -1, uriTagValue); + opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, -1, uriTagValue); ChannelOperations ops = ChannelOperations.get(ch); if (ops != null) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java index fde118d7ee..2416cbe368 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java @@ -537,6 +537,7 @@ static void addStreamHandlers( ChannelOperations.OnSetup opsFactory, boolean acceptGzip, @Nullable ChannelMetricsRecorder metricsRecorder, + @Nullable SocketAddress proxyAddress, SocketAddress remoteAddress, long responseTimeoutMillis, @Nullable Function uriTagValue) { @@ -575,13 +576,13 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) { } else { if (metricsRecorder instanceof MicrometerHttpClientMetricsRecorder) { - handler = new MicrometerHttpClientMetricsHandler((MicrometerHttpClientMetricsRecorder) metricsRecorder, remoteAddress, uriTagValue); + handler = new MicrometerHttpClientMetricsHandler((MicrometerHttpClientMetricsRecorder) metricsRecorder, remoteAddress, proxyAddress, uriTagValue); } else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) { - handler = new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, remoteAddress, uriTagValue); + handler = new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, remoteAddress, proxyAddress, uriTagValue); } else { - handler = new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, remoteAddress, uriTagValue); + handler = new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, remoteAddress, proxyAddress, uriTagValue); } } pipeline.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpMetricsHandler, handler); @@ -630,6 +631,7 @@ static void configureHttp11OrH2CleartextPipeline( @Nullable ChannelMetricsRecorder metricsRecorder, ConnectionObserver observer, ChannelOperations.OnSetup opsFactory, + @Nullable SocketAddress proxyAddress, SocketAddress remoteAddress, @Nullable Function uriTagValue) { HttpClientCodec httpClientCodec = @@ -656,7 +658,7 @@ static void configureHttp11OrH2CleartextPipeline( Http2FrameCodec http2FrameCodec = http2FrameCodecBuilder.build(); Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec, - new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, remoteAddress, uriTagValue)); + new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, uriTagValue)); HttpClientUpgradeHandler upgradeHandler = new ReactorNettyHttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength()); @@ -673,13 +675,13 @@ static void configureHttp11OrH2CleartextPipeline( if (metricsRecorder instanceof HttpClientMetricsRecorder) { ChannelHandler handler; if (metricsRecorder instanceof MicrometerHttpClientMetricsRecorder) { - handler = new MicrometerHttpClientMetricsHandler((MicrometerHttpClientMetricsRecorder) metricsRecorder, remoteAddress, uriTagValue); + handler = new MicrometerHttpClientMetricsHandler((MicrometerHttpClientMetricsRecorder) metricsRecorder, remoteAddress, proxyAddress, uriTagValue); } else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) { - handler = new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, remoteAddress, uriTagValue); + handler = new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, remoteAddress, proxyAddress, uriTagValue); } else { - handler = new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, remoteAddress, uriTagValue); + handler = new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, remoteAddress, proxyAddress, uriTagValue); } p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpMetricsHandler, handler); } @@ -692,6 +694,7 @@ static void configureHttp11Pipeline(ChannelPipeline p, boolean acceptGzip, HttpResponseDecoderSpec decoder, @Nullable ChannelMetricsRecorder metricsRecorder, + @Nullable SocketAddress proxyAddress, SocketAddress remoteAddress, @Nullable Function uriTagValue) { p.addBefore(NettyPipeline.ReactiveBridge, @@ -714,13 +717,13 @@ static void configureHttp11Pipeline(ChannelPipeline p, if (metricsRecorder instanceof HttpClientMetricsRecorder) { ChannelHandler handler; if (metricsRecorder instanceof MicrometerHttpClientMetricsRecorder) { - handler = new MicrometerHttpClientMetricsHandler((MicrometerHttpClientMetricsRecorder) metricsRecorder, remoteAddress, uriTagValue); + handler = new MicrometerHttpClientMetricsHandler((MicrometerHttpClientMetricsRecorder) metricsRecorder, remoteAddress, proxyAddress, uriTagValue); } else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) { - handler = new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, remoteAddress, uriTagValue); + handler = new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, remoteAddress, proxyAddress, uriTagValue); } else { - handler = new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, remoteAddress, uriTagValue); + handler = new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, remoteAddress, proxyAddress, uriTagValue); } p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpMetricsHandler, handler); } @@ -768,6 +771,7 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter { final Http2FrameCodec http2FrameCodec; final ChannelMetricsRecorder metricsRecorder; final ChannelOperations.OnSetup opsFactory; + final SocketAddress proxyAddress; final SocketAddress remoteAddress; final Function uriTagValue; @@ -776,12 +780,14 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter { ChannelOperations.OnSetup opsFactory, boolean acceptGzip, @Nullable ChannelMetricsRecorder metricsRecorder, + @Nullable SocketAddress proxyAddress, SocketAddress remoteAddress, @Nullable Function uriTagValue) { this.acceptGzip = acceptGzip; this.http2FrameCodec = http2FrameCodec; this.metricsRecorder = metricsRecorder; this.opsFactory = opsFactory; + this.proxyAddress = proxyAddress; this.remoteAddress = remoteAddress; this.uriTagValue = uriTagValue; } @@ -802,12 +808,12 @@ public void handlerAdded(ChannelHandlerContext ctx) { if (responseTimeoutHandler != null) { pipeline.remove(NettyPipeline.ResponseTimeoutHandler); http2MultiplexHandler = new Http2MultiplexHandler(H2InboundStreamHandler.INSTANCE, - new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder, + new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, responseTimeoutHandler.getReaderIdleTimeInMillis(), uriTagValue)); } else { http2MultiplexHandler = new Http2MultiplexHandler(H2InboundStreamHandler.INSTANCE, - new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder, remoteAddress, uriTagValue)); + new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, uriTagValue)); } pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, http2FrameCodec) .addAfter(NettyPipeline.HttpCodec, NettyPipeline.H2MultiplexHandler, http2MultiplexHandler); @@ -827,6 +833,7 @@ static final class H2Codec extends ChannelInitializer { final ChannelOperations.OnSetup opsFactory; final Http2ConnectionProvider.DisposableAcquire owner; final long responseTimeoutMillis; + final SocketAddress proxyAddress; final SocketAddress remoteAddress; final Function uriTagValue; @@ -836,10 +843,11 @@ static final class H2Codec extends ChannelInitializer { ChannelOperations.OnSetup opsFactory, boolean acceptGzip, @Nullable ChannelMetricsRecorder metricsRecorder, + @Nullable SocketAddress proxyAddress, SocketAddress remoteAddress, @Nullable Function uriTagValue) { // Handle outbound and upgrade streams - this(owner, observer, opsFactory, acceptGzip, metricsRecorder, remoteAddress, -1, uriTagValue); + this(owner, observer, opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, -1, uriTagValue); } H2Codec( @@ -848,6 +856,7 @@ static final class H2Codec extends ChannelInitializer { ChannelOperations.OnSetup opsFactory, boolean acceptGzip, @Nullable ChannelMetricsRecorder metricsRecorder, + @Nullable SocketAddress proxyAddress, SocketAddress remoteAddress, long responseTimeoutMillis, @Nullable Function uriTagValue) { @@ -858,6 +867,7 @@ static final class H2Codec extends ChannelInitializer { this.opsFactory = opsFactory; this.owner = owner; this.responseTimeoutMillis = responseTimeoutMillis; + this.proxyAddress = proxyAddress; this.remoteAddress = remoteAddress; this.uriTagValue = uriTagValue; } @@ -870,7 +880,7 @@ protected void initChannel(Channel ch) { setChannelContext(ch, owner.currentContext()); } addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory, - acceptGzip, metricsRecorder, remoteAddress, responseTimeoutMillis, uriTagValue); + acceptGzip, metricsRecorder, proxyAddress, remoteAddress, responseTimeoutMillis, uriTagValue); } else { // Handle server pushes (inbound streams) @@ -898,6 +908,7 @@ static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter { final Http2Settings http2Settings; final ChannelMetricsRecorder metricsRecorder; final ConnectionObserver observer; + final SocketAddress proxyAddress; final SocketAddress remoteAddress; final Function uriTagValue; @@ -907,6 +918,7 @@ static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter { this.http2Settings = initializer.http2Settings; this.metricsRecorder = initializer.metricsRecorder; this.observer = observer; + this.proxyAddress = initializer.proxyAddress; this.remoteAddress = remoteAddress; this.uriTagValue = initializer.uriTagValue; } @@ -925,7 +937,7 @@ public void channelActive(ChannelHandlerContext ctx) { configureHttp2Pipeline(ctx.channel().pipeline(), acceptGzip, decoder, http2Settings, observer); } else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { - configureHttp11Pipeline(ctx.channel().pipeline(), acceptGzip, decoder, metricsRecorder, remoteAddress, uriTagValue); + configureHttp11Pipeline(ctx.channel().pipeline(), acceptGzip, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue); } else { throw new IllegalStateException("unknown protocol: " + protocol); @@ -949,6 +961,7 @@ static final class HttpClientChannelInitializer implements ChannelPipelineConfig final ChannelMetricsRecorder metricsRecorder; final ChannelOperations.OnSetup opsFactory; final int protocols; + final SocketAddress proxyAddress; final SslProvider sslProvider; final Function uriTagValue; @@ -959,6 +972,7 @@ static final class HttpClientChannelInitializer implements ChannelPipelineConfig this.metricsRecorder = config.metricsRecorderInternal(); this.opsFactory = config.channelOperationsProvider(); this.protocols = config._protocols; + this.proxyAddress = config.proxyProvider() != null ? config.proxyProvider().getAddress().get() : null; this.sslProvider = config.sslProvider; this.uriTagValue = config.uriTagValue; } @@ -974,7 +988,7 @@ public void onChannelInit(ConnectionObserver observer, Channel channel, @Nullabl new H2OrHttp11Codec(this, observer, remoteAddress)); } else if ((protocols & h11) == h11) { - configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, remoteAddress, uriTagValue); + configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue); } else if ((protocols & h2) == h2) { configureHttp2Pipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, observer); @@ -982,10 +996,10 @@ else if ((protocols & h2) == h2) { } else { if ((protocols & h11orH2C) == h11orH2C) { - configureHttp11OrH2CleartextPipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, metricsRecorder, observer, opsFactory, remoteAddress, uriTagValue); + configureHttp11OrH2CleartextPipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, metricsRecorder, observer, opsFactory, proxyAddress, remoteAddress, uriTagValue); } else if ((protocols & h11) == h11) { - configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, remoteAddress, uriTagValue); + configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue); } else if ((protocols & h2c) == h2c) { configureHttp2Pipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, observer); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMeters.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMeters.java index 3608066ef2..bc45810c1e 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMeters.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMeters.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,6 +79,16 @@ public String asString() { } }, + /** + * Proxy address, when there is a proxy configured. + */ + PROXY_ADDRESS { + @Override + public String asString() { + return "proxy.address"; + } + }, + /** * Remote address. */ @@ -122,6 +132,16 @@ public String asString() { } }, + /** + * Proxy address, when there is a proxy configured. + */ + PROXY_ADDRESS { + @Override + public String asString() { + return "proxy.address"; + } + }, + /** * Remote address. */ diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMetricsHandler.java index a6df476d73..49ebfc81f0 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMetricsHandler.java @@ -31,8 +31,9 @@ final class HttpClientMetricsHandler extends AbstractHttpClientMetricsHandler { HttpClientMetricsHandler(HttpClientMetricsRecorder recorder, SocketAddress remoteAddress, + @Nullable SocketAddress proxyAddress, @Nullable Function uriTagValue) { - super(remoteAddress, uriTagValue); + super(remoteAddress, proxyAddress, uriTagValue); this.recorder = recorder; } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMetricsRecorder.java index c1d59e3ef0..b3dc7b4359 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMetricsRecorder.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMetricsRecorder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,21 @@ public interface HttpClientMetricsRecorder extends HttpMetricsRecorder { */ void recordDataReceivedTime(SocketAddress remoteAddress, String uri, String method, String status, Duration time); + /** + * Records the time that is spent in consuming incoming data. + * + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri the requested URI + * @param method the HTTP method + * @param status the HTTP status + * @param time the time in nanoseconds that is spent in consuming incoming data + * @since 1.1.17 + */ + default void recordDataReceivedTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, String status, Duration time) { + recordDataReceivedTime(remoteAddress, uri, method, status, time); + } + /** * Records the time that is spent in sending outgoing data. * @@ -48,6 +63,20 @@ public interface HttpClientMetricsRecorder extends HttpMetricsRecorder { */ void recordDataSentTime(SocketAddress remoteAddress, String uri, String method, Duration time); + /** + * Records the time that is spent in sending outgoing data. + * + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri the requested URI + * @param method the HTTP method + * @param time the time in nanoseconds that is spent in sending outgoing data + * @since 1.1.17 + */ + default void recordDataSentTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, Duration time) { + recordDataSentTime(remoteAddress, uri, method, time); + } + /** * Records the total time for the request/response. * @@ -58,4 +87,57 @@ public interface HttpClientMetricsRecorder extends HttpMetricsRecorder { * @param time the total time in nanoseconds for the request/response */ void recordResponseTime(SocketAddress remoteAddress, String uri, String method, String status, Duration time); + + /** + * Records the total time for the request/response. + * + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri the requested URI + * @param method the HTTP method + * @param status the HTTP status + * @param time the total time in nanoseconds for the request/response + * @since 1.1.17 + */ + default void recordResponseTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, String status, Duration time) { + recordResponseTime(remoteAddress, uri, method, status, time); + } + + /** + * Records the amount of the data that is received, in bytes. + * + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri the requested URI + * @param bytes The amount of the data that is received, in bytes + * @since 1.1.17 + */ + default void recordDataReceived(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, long bytes) { + recordDataReceived(remoteAddress, uri, bytes); + } + + /** + * Records the amount of the data that is sent, in bytes. + * + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri the requested URI + * @param bytes The amount of the data that is sent, in bytes + * @since 1.1.17 + */ + default void recordDataSent(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, long bytes) { + recordDataSent(remoteAddress, uri, bytes); + } + + /** + * Increments the number of the errors that are occurred. + * + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri the requested URI + * @since 1.1.17 + */ + default void incrementErrorsCount(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri) { + incrementErrorsCount(remoteAddress, uri); + } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientObservations.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientObservations.java index 83592c1c5e..f9d2e0f0cd 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientObservations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientObservations.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -117,6 +117,16 @@ public String asString() { } }, + /** + * Proxy address, when there is a proxy configured. + */ + PROXY_ADDRESS { + @Override + public String asString() { + return "proxy.address"; + } + }, + /** * Remote address. */ diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttpClientMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttpClientMetricsHandler.java index 81ca07808f..8286558cfa 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttpClientMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttpClientMetricsHandler.java @@ -36,6 +36,7 @@ import static reactor.netty.Metrics.OBSERVATION_REGISTRY; import static reactor.netty.Metrics.RESPONSE_TIME; import static reactor.netty.Metrics.UNKNOWN; +import static reactor.netty.Metrics.formatSocketAddress; import static reactor.netty.Metrics.updateChannelContext; import static reactor.netty.ReactorNetty.setChannelContext; import static reactor.netty.http.client.HttpClientObservations.ResponseTimeHighCardinalityTags.HTTP_STATUS_CODE; @@ -44,6 +45,7 @@ import static reactor.netty.http.client.HttpClientObservations.ResponseTimeHighCardinalityTags.NET_PEER_PORT; import static reactor.netty.http.client.HttpClientObservations.ResponseTimeHighCardinalityTags.REACTOR_NETTY_TYPE; import static reactor.netty.http.client.HttpClientObservations.ResponseTimeLowCardinalityTags.METHOD; +import static reactor.netty.http.client.HttpClientObservations.ResponseTimeLowCardinalityTags.PROXY_ADDRESS; import static reactor.netty.http.client.HttpClientObservations.ResponseTimeLowCardinalityTags.REMOTE_ADDRESS; import static reactor.netty.http.client.HttpClientObservations.ResponseTimeLowCardinalityTags.STATUS; import static reactor.netty.http.client.HttpClientObservations.ResponseTimeLowCardinalityTags.URI; @@ -64,8 +66,9 @@ final class MicrometerHttpClientMetricsHandler extends AbstractHttpClientMetrics MicrometerHttpClientMetricsHandler(MicrometerHttpClientMetricsRecorder recorder, SocketAddress remoteAddress, + @Nullable SocketAddress proxyAddress, @Nullable Function uriTagValue) { - super(remoteAddress, uriTagValue); + super(remoteAddress, proxyAddress, uriTagValue); this.recorder = recorder; } @@ -85,10 +88,18 @@ protected HttpClientMetricsRecorder recorder() { @Override protected void recordRead(Channel channel, SocketAddress address) { - recorder().recordDataReceivedTime(address, path, method, status, - Duration.ofNanos(System.nanoTime() - dataReceivedTime)); + if (proxyAddress == null) { + recorder().recordDataReceivedTime(address, path, method, status, + Duration.ofNanos(System.nanoTime() - dataReceivedTime)); - recorder().recordDataReceived(address, path, dataReceived); + recorder().recordDataReceived(address, path, dataReceived); + } + else { + recorder().recordDataReceivedTime(address, proxyAddress, path, method, status, + Duration.ofNanos(System.nanoTime() - dataReceivedTime)); + + recorder().recordDataReceived(address, proxyAddress, path, dataReceived); + } // Cannot invoke the recorder anymore: // 1. The recorder is one instance only, it is invoked for all requests that can happen @@ -122,7 +133,7 @@ protected void startRead(HttpResponse msg) { protected void startWrite(HttpRequest msg, Channel channel, SocketAddress address) { super.startWrite(msg, channel, address); - responseTimeHandlerContext = new ResponseTimeHandlerContext(recorder, msg, path, address); + responseTimeHandlerContext = new ResponseTimeHandlerContext(recorder, msg, path, address, proxyAddress); responseTimeObservation = Observation.createNotStarted(recorder.name() + RESPONSE_TIME, responseTimeHandlerContext, OBSERVATION_REGISTRY); parentContextView = updateChannelContext(channel, responseTimeObservation); responseTimeObservation.start(); @@ -144,12 +155,14 @@ static final class ResponseTimeHandlerContext extends RequestReplySenderContext< final String netPeerName; final String netPeerPort; final String path; + final String proxyAddress; final MicrometerHttpClientMetricsRecorder recorder; // status might not be known beforehand String status = UNKNOWN; - ResponseTimeHandlerContext(MicrometerHttpClientMetricsRecorder recorder, HttpRequest request, String path, SocketAddress remoteAddress) { + ResponseTimeHandlerContext(MicrometerHttpClientMetricsRecorder recorder, HttpRequest request, String path, + SocketAddress remoteAddress, SocketAddress proxyAddress) { super((carrier, key, value) -> Objects.requireNonNull(carrier).headers().set(key, value)); this.recorder = recorder; this.method = request.method().name(); @@ -163,6 +176,7 @@ static final class ResponseTimeHandlerContext extends RequestReplySenderContext< this.netPeerPort = ""; } this.path = path; + this.proxyAddress = formatSocketAddress(proxyAddress); setCarrier(request); setContextualName(HTTP_PREFIX + this.method); } @@ -174,7 +188,12 @@ public Observation.Context get() { @Override public Timer getTimer() { - return recorder.getResponseTimeTimer(getName(), netPeerName + ":" + netPeerPort, path, method, status); + if (proxyAddress == null) { + return recorder.getResponseTimeTimer(getName(), netPeerName + ":" + netPeerPort, path, method, status); + } + else { + return recorder.getResponseTimeTimer(getName(), netPeerName + ":" + netPeerPort, proxyAddress, path, method, status); + } } @Override @@ -186,8 +205,15 @@ public KeyValues getHighCardinalityKeyValues() { @Override public KeyValues getLowCardinalityKeyValues() { - return KeyValues.of(METHOD.asString(), method, REMOTE_ADDRESS.asString(), netPeerName + ":" + netPeerPort, - STATUS.asString(), status, URI.asString(), path); + if (proxyAddress == null) { + return KeyValues.of(METHOD.asString(), method, REMOTE_ADDRESS.asString(), netPeerName + ":" + netPeerPort, + STATUS.asString(), status, URI.asString(), path); + } + else { + return KeyValues.of(METHOD.asString(), method, REMOTE_ADDRESS.asString(), netPeerName + ":" + netPeerPort, + PROXY_ADDRESS.asString(), proxyAddress, + STATUS.asString(), status, URI.asString(), path); + } } } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttpClientMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttpClientMetricsRecorder.java index f2e202c620..94d4c869c0 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttpClientMetricsRecorder.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttpClientMetricsRecorder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,11 @@ */ package reactor.netty.http.client; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; +import reactor.netty.Metrics; +import reactor.netty.channel.ChannelMeters; import reactor.netty.channel.MeterKey; import reactor.netty.http.MicrometerHttpMetricsRecorder; import reactor.netty.internal.util.MapUtils; @@ -23,11 +27,17 @@ import java.net.SocketAddress; import java.time.Duration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import static reactor.netty.Metrics.DATA_RECEIVED; import static reactor.netty.Metrics.DATA_RECEIVED_TIME; +import static reactor.netty.Metrics.DATA_SENT; import static reactor.netty.Metrics.DATA_SENT_TIME; +import static reactor.netty.Metrics.ERRORS; import static reactor.netty.Metrics.HTTP_CLIENT_PREFIX; import static reactor.netty.Metrics.METHOD; +import static reactor.netty.Metrics.PROXY_ADDRESS; import static reactor.netty.Metrics.REGISTRY; import static reactor.netty.Metrics.REMOTE_ADDRESS; import static reactor.netty.Metrics.RESPONSE_TIME; @@ -45,6 +55,12 @@ final class MicrometerHttpClientMetricsRecorder extends MicrometerHttpMetricsRec static final MicrometerHttpClientMetricsRecorder INSTANCE = new MicrometerHttpClientMetricsRecorder(); + private final ConcurrentMap dataReceivedCache = new ConcurrentHashMap<>(); + + private final ConcurrentMap dataSentCache = new ConcurrentHashMap<>(); + + private final ConcurrentMap errorsCache = new ConcurrentHashMap<>(); + private MicrometerHttpClientMetricsRecorder() { super(HTTP_CLIENT_PREFIX, "http"); } @@ -52,10 +68,28 @@ private MicrometerHttpClientMetricsRecorder() { @Override public void recordDataReceivedTime(SocketAddress remoteAddress, String uri, String method, String status, Duration time) { String address = formatSocketAddress(remoteAddress); - MeterKey meterKey = new MeterKey(uri, address, method, status); + MeterKey meterKey = new MeterKey(uri, address, null, method, status); + Timer dataReceivedTime = MapUtils.computeIfAbsent(dataReceivedTimeCache, meterKey, + key -> filter(Timer.builder(name() + DATA_RECEIVED_TIME) + .tags(HttpClientMeters.DataReceivedTimeTags.REMOTE_ADDRESS.asString(), address, + HttpClientMeters.DataReceivedTimeTags.URI.asString(), uri, + HttpClientMeters.DataReceivedTimeTags.METHOD.asString(), method, + HttpClientMeters.DataReceivedTimeTags.STATUS.asString(), status) + .register(REGISTRY))); + if (dataReceivedTime != null) { + dataReceivedTime.record(time); + } + } + + @Override + public void recordDataReceivedTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, String status, Duration time) { + String address = formatSocketAddress(remoteAddress); + String proxyAddr = formatSocketAddress(proxyAddress); + MeterKey meterKey = new MeterKey(uri, address, proxyAddr, method, status); Timer dataReceivedTime = MapUtils.computeIfAbsent(dataReceivedTimeCache, meterKey, key -> filter(Timer.builder(name() + DATA_RECEIVED_TIME) .tags(HttpClientMeters.DataReceivedTimeTags.REMOTE_ADDRESS.asString(), address, + HttpClientMeters.DataReceivedTimeTags.PROXY_ADDRESS.asString(), proxyAddr, HttpClientMeters.DataReceivedTimeTags.URI.asString(), uri, HttpClientMeters.DataReceivedTimeTags.METHOD.asString(), method, HttpClientMeters.DataReceivedTimeTags.STATUS.asString(), status) @@ -68,7 +102,7 @@ public void recordDataReceivedTime(SocketAddress remoteAddress, String uri, Stri @Override public void recordDataSentTime(SocketAddress remoteAddress, String uri, String method, Duration time) { String address = formatSocketAddress(remoteAddress); - MeterKey meterKey = new MeterKey(uri, address, method, null); + MeterKey meterKey = new MeterKey(uri, address, null, method, null); Timer dataSentTime = MapUtils.computeIfAbsent(dataSentTimeCache, meterKey, key -> filter(Timer.builder(name() + DATA_SENT_TIME) .tags(HttpClientMeters.DataSentTimeTags.REMOTE_ADDRESS.asString(), address, @@ -80,6 +114,23 @@ public void recordDataSentTime(SocketAddress remoteAddress, String uri, String m } } + @Override + public void recordDataSentTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, Duration time) { + String address = formatSocketAddress(remoteAddress); + String proxyAddr = formatSocketAddress(proxyAddress); + MeterKey meterKey = new MeterKey(uri, address, proxyAddr, method, null); + Timer dataSentTime = MapUtils.computeIfAbsent(dataSentTimeCache, meterKey, + key -> filter(Timer.builder(name() + DATA_SENT_TIME) + .tags(HttpClientMeters.DataSentTimeTags.REMOTE_ADDRESS.asString(), address, + HttpClientMeters.DataSentTimeTags.PROXY_ADDRESS.asString(), proxyAddr, + HttpClientMeters.DataSentTimeTags.URI.asString(), uri, + HttpClientMeters.DataSentTimeTags.METHOD.asString(), method) + .register(REGISTRY))); + if (dataSentTime != null) { + dataSentTime.record(time); + } + } + @Override public void recordResponseTime(SocketAddress remoteAddress, String uri, String method, String status, Duration time) { String address = formatSocketAddress(remoteAddress); @@ -91,10 +142,79 @@ public void recordResponseTime(SocketAddress remoteAddress, String uri, String m @Nullable final Timer getResponseTimeTimer(String name, String address, String uri, String method, String status) { - MeterKey meterKey = new MeterKey(uri, address, method, status); + MeterKey meterKey = new MeterKey(uri, address, null, method, status); return MapUtils.computeIfAbsent(responseTimeCache, meterKey, key -> filter(Timer.builder(name) .tags(REMOTE_ADDRESS, address, URI, uri, METHOD, method, STATUS, status) .register(REGISTRY))); } + + @Override + public void recordResponseTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, String status, Duration time) { + String address = formatSocketAddress(remoteAddress); + String proxyAddr = formatSocketAddress(proxyAddress); + Timer responseTime = getResponseTimeTimer(name() + RESPONSE_TIME, address, proxyAddr, uri, method, status); + if (responseTime != null) { + responseTime.record(time); + } + } + + @Nullable + final Timer getResponseTimeTimer(String name, String address, String proxyAddress, String uri, String method, String status) { + MeterKey meterKey = new MeterKey(uri, address, proxyAddress, method, status); + return MapUtils.computeIfAbsent(responseTimeCache, meterKey, + key -> filter(Timer.builder(name) + .tags(REMOTE_ADDRESS, address, PROXY_ADDRESS, proxyAddress, URI, uri, METHOD, method, STATUS, status) + .register(REGISTRY))); + } + + @Override + public void recordDataReceived(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, long bytes) { + String address = Metrics.formatSocketAddress(remoteAddress); + String proxyAddr = formatSocketAddress(proxyAddress); + MeterKey meterKey = new MeterKey(uri, address, proxyAddr, null, null); + DistributionSummary dataReceived = MapUtils.computeIfAbsent(dataReceivedCache, meterKey, + key -> filter(DistributionSummary.builder(name() + DATA_RECEIVED) + .baseUnit(ChannelMeters.DATA_RECEIVED.getBaseUnit()) + .tags(ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address, + ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddr, + ChannelMeters.ChannelMetersTags.URI.asString(), uri) + .register(REGISTRY))); + if (dataReceived != null) { + dataReceived.record(bytes); + } + } + + @Override + public void recordDataSent(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, long bytes) { + String address = Metrics.formatSocketAddress(remoteAddress); + String proxyAddr = formatSocketAddress(proxyAddress); + MeterKey meterKey = new MeterKey(uri, address, proxyAddr, null, null); + DistributionSummary dataSent = MapUtils.computeIfAbsent(dataSentCache, meterKey, + key -> filter(DistributionSummary.builder(name() + DATA_SENT) + .baseUnit(ChannelMeters.DATA_SENT.getBaseUnit()) + .tags(ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address, + ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddr, + ChannelMeters.ChannelMetersTags.URI.asString(), uri) + .register(REGISTRY))); + if (dataSent != null) { + dataSent.record(bytes); + } + } + + @Override + public void incrementErrorsCount(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri) { + String address = Metrics.formatSocketAddress(remoteAddress); + String proxyAddr = formatSocketAddress(proxyAddress); + MeterKey meterKey = new MeterKey(uri, address, proxyAddr, null, null); + Counter errors = MapUtils.computeIfAbsent(errorsCache, meterKey, + key -> filter(Counter.builder(name() + ERRORS) + .tags(ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address, + ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddr, + ChannelMeters.ChannelMetersTags.URI.asString(), uri) + .register(REGISTRY))); + if (errors != null) { + errors.increment(); + } + } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/MicrometerHttpServerMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/MicrometerHttpServerMetricsRecorder.java index d807079f85..20c89712cc 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/MicrometerHttpServerMetricsRecorder.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/MicrometerHttpServerMetricsRecorder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,7 +67,7 @@ private MicrometerHttpServerMetricsRecorder() { @Override public void recordDataReceivedTime(String uri, String method, Duration time) { - MeterKey meterKey = new MeterKey(uri, null, method, null); + MeterKey meterKey = new MeterKey(uri, null, null, method, null); Timer dataReceivedTime = MapUtils.computeIfAbsent(dataReceivedTimeCache, meterKey, key -> filter(Timer.builder(name() + DATA_RECEIVED_TIME) .tags(HttpServerMeters.DataReceivedTimeTags.URI.asString(), uri, @@ -80,7 +80,7 @@ public void recordDataReceivedTime(String uri, String method, Duration time) { @Override public void recordDataSentTime(String uri, String method, String status, Duration time) { - MeterKey meterKey = new MeterKey(uri, null, method, status); + MeterKey meterKey = new MeterKey(uri, null, null, method, status); Timer dataSentTime = MapUtils.computeIfAbsent(dataSentTimeCache, meterKey, key -> filter(Timer.builder(name() + DATA_SENT_TIME) .tags(HttpServerMeters.DataSentTimeTags.URI.asString(), uri, @@ -102,7 +102,7 @@ public void recordResponseTime(String uri, String method, String status, Duratio @Nullable final Timer getResponseTimeTimer(String name, String uri, String method, String status) { - MeterKey meterKey = new MeterKey(uri, null, method, status); + MeterKey meterKey = new MeterKey(uri, null, null, method, status); return MapUtils.computeIfAbsent(responseTimeCache, meterKey, key -> filter(Timer.builder(name) .tags(URI, uri, METHOD, method, STATUS, status) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientProxyTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientProxyTest.java index c85c4106b2..e2a0ec12c9 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientProxyTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientProxyTest.java @@ -64,6 +64,7 @@ import static reactor.netty.Metrics.METHOD; import static reactor.netty.Metrics.NAME; import static reactor.netty.Metrics.PENDING_CONNECTIONS; +import static reactor.netty.Metrics.PROXY_ADDRESS; import static reactor.netty.Metrics.REMOTE_ADDRESS; import static reactor.netty.Metrics.RESPONSE_TIME; import static reactor.netty.Metrics.STATUS; @@ -370,23 +371,24 @@ void testIssue3060(Hoverfly hoverfly) { .verify(Duration.ofSeconds(30)); String serverAddress = disposableServer.host() + ":" + disposableServer.port(); + String proxyAddress = "localhost:" + hoverfly.getHoverflyConfig().getProxyPort(); - String[] summaryTags1 = new String[] {REMOTE_ADDRESS, serverAddress, URI, "/"}; + String[] summaryTags1 = new String[] {REMOTE_ADDRESS, serverAddress, PROXY_ADDRESS, proxyAddress, URI, "/"}; assertDistributionSummary(registry, HTTP_CLIENT_PREFIX + DATA_RECEIVED, summaryTags1).isNotNull(); assertDistributionSummary(registry, HTTP_CLIENT_PREFIX + DATA_SENT, summaryTags1).isNotNull(); - String[] summaryTags2 = new String[] {REMOTE_ADDRESS, serverAddress, URI, "http"}; + String[] summaryTags2 = new String[] {REMOTE_ADDRESS, serverAddress, PROXY_ADDRESS, proxyAddress, URI, "http"}; assertDistributionSummary(registry, HTTP_CLIENT_PREFIX + DATA_RECEIVED, summaryTags2).isNotNull(); assertDistributionSummary(registry, HTTP_CLIENT_PREFIX + DATA_SENT, summaryTags2).isNotNull(); - String[] timerTags1 = new String[] {REMOTE_ADDRESS, serverAddress, STATUS, SUCCESS}; + String[] timerTags1 = new String[] {REMOTE_ADDRESS, serverAddress, PROXY_ADDRESS, proxyAddress, STATUS, SUCCESS}; assertTimer(registry, HTTP_CLIENT_PREFIX + CONNECT_TIME, timerTags1).isNotNull(); - String[] timerTags2 = new String[] {REMOTE_ADDRESS, serverAddress, STATUS, "200", URI, "/", METHOD, "GET"}; + String[] timerTags2 = new String[] {REMOTE_ADDRESS, serverAddress, PROXY_ADDRESS, proxyAddress, STATUS, "200", URI, "/", METHOD, "GET"}; assertTimer(registry, HTTP_CLIENT_PREFIX + DATA_RECEIVED_TIME, timerTags2).isNotNull(); assertTimer(registry, HTTP_CLIENT_PREFIX + RESPONSE_TIME, timerTags2).isNotNull(); - String[] timerTags3 = new String[] {REMOTE_ADDRESS, serverAddress, URI, "/", METHOD, "GET"}; + String[] timerTags3 = new String[] {REMOTE_ADDRESS, serverAddress, PROXY_ADDRESS, proxyAddress, URI, "/", METHOD, "GET"}; assertTimer(registry, HTTP_CLIENT_PREFIX + DATA_SENT_TIME, timerTags3).isNotNull(); String[] gaugeTags = new String[] {REMOTE_ADDRESS, serverAddress, NAME, "http"}; From 8bbb1b50047b54f8608dc1283c124e34ac35973d Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 6 Mar 2024 21:27:44 +0200 Subject: [PATCH 2/4] Trivial change to trigger the build --- .../src/main/java/reactor/netty/channel/MeterKey.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/MeterKey.java b/reactor-netty-core/src/main/java/reactor/netty/channel/MeterKey.java index 91207c4003..86db9a7c59 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/MeterKey.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/MeterKey.java @@ -34,7 +34,7 @@ public final class MeterKey { private final String status; /** - * Creates new meter key. + * Creates a new meter key. * * @param uri the requested URI * @param remoteAddress the remote address @@ -50,7 +50,7 @@ public MeterKey(@Nullable String uri, @Nullable String remoteAddress, } /** - * Creates new meter key. + * Creates a new meter key. * * @param uri the requested URI * @param remoteAddress the remote address From df0415ccd59a8d6bf0ccec11f919a35a7a91c8ca Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 6 Mar 2024 22:17:27 +0200 Subject: [PATCH 3/4] Fix the build --- .../reactor/netty/channel/AbstractChannelMetricsHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java b/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java index cfd6ab9ac6..a3864f7186 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java @@ -94,9 +94,9 @@ public void channelInactive(ChannelHandlerContext ctx) { @Override public void channelRegistered(ChannelHandlerContext ctx) { if (!onServer) { - ProxyHandler proxyHandler = ctx.pipeline().get(ProxyHandler.class); + ChannelHandler proxyHandler = ctx.pipeline().get(NettyPipeline.ProxyHandler); if (proxyHandler != null) { - proxyAddress = proxyHandler.proxyAddress(); + proxyAddress = ((ProxyHandler) proxyHandler).proxyAddress(); } ctx.pipeline() From 11b80b6119cf21a520bb93f86e074453f80eec2f Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Thu, 7 Mar 2024 10:23:33 +0200 Subject: [PATCH 4/4] Fix javadoc --- .../java/reactor/netty/channel/ChannelMetricsRecorder.java | 4 ++-- .../netty/channel/ContextAwareChannelMetricsRecorder.java | 4 ++-- .../http/client/ContextAwareHttpClientMetricsRecorder.java | 2 +- .../reactor/netty/http/client/HttpClientMetricsRecorder.java | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMetricsRecorder.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMetricsRecorder.java index 3cd94abd05..13b406ee47 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMetricsRecorder.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelMetricsRecorder.java @@ -66,14 +66,14 @@ default void recordDataSent(SocketAddress remoteAddress, SocketAddress proxyAddr } /** - * Increments the number of the errors that are occurred. + * Increments the number of the errors that have occurred. * * @param remoteAddress The remote peer */ void incrementErrorsCount(SocketAddress remoteAddress); /** - * Increments the number of the errors that are occurred. + * Increments the number of the errors that have occurred. * * @param remoteAddress The remote peer * @param proxyAddress The proxy address diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ContextAwareChannelMetricsRecorder.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ContextAwareChannelMetricsRecorder.java index 1c3bc30f8d..aac34e9646 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ContextAwareChannelMetricsRecorder.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ContextAwareChannelMetricsRecorder.java @@ -30,7 +30,7 @@ public abstract class ContextAwareChannelMetricsRecorder implements ChannelMetricsRecorder { /** - * Increments the number of the errors that are occurred. + * Increments the number of the errors that have occurred. * * @param contextView The current {@link ContextView} associated with the Mono/Flux pipeline * @param remoteAddress The remote peer @@ -38,7 +38,7 @@ public abstract class ContextAwareChannelMetricsRecorder implements ChannelMetri public abstract void incrementErrorsCount(ContextView contextView, SocketAddress remoteAddress); /** - * Increments the number of the errors that are occurred. + * Increments the number of the errors that have occurred. * * @param contextView The current {@link ContextView} associated with the Mono/Flux pipeline * @param remoteAddress The remote peer diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareHttpClientMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareHttpClientMetricsRecorder.java index 4997598c4b..b740fb5a98 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareHttpClientMetricsRecorder.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareHttpClientMetricsRecorder.java @@ -120,7 +120,7 @@ public void recordResponseTime(ContextView contextView, SocketAddress remoteAddr } /** - * Increments the number of the errors that are occurred. + * Increments the number of the errors that have occurred. * * @param contextView The current {@link ContextView} associated with the Mono/Flux * @param remoteAddress The remote peer diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMetricsRecorder.java index b3dc7b4359..0bf3262ec5 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMetricsRecorder.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientMetricsRecorder.java @@ -130,7 +130,7 @@ default void recordDataSent(SocketAddress remoteAddress, SocketAddress proxyAddr } /** - * Increments the number of the errors that are occurred. + * Increments the number of the errors that have occurred. * * @param remoteAddress The remote peer * @param proxyAddress the proxy address