Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proxy address information to the metrics #3081

Merged
merged 4 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion reactor-netty-core/src/main/java/reactor/netty/Metrics.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -271,6 +271,8 @@ public class Metrics {
// Tags
public static final String LOCAL_ADDRESS = "local.address";

public static final String PROXY_ADDRESS = "proxy.address";

public static final String REMOTE_ADDRESS = "remote.address";

public static final String URI = "uri";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.ssl.AbstractSniHandler;
import io.netty.handler.ssl.SslHandler;
import reactor.netty.NettyPipeline;
Expand All @@ -47,6 +48,7 @@ public abstract class AbstractChannelMetricsHandler extends ChannelDuplexHandler
final boolean onServer;

boolean channelOpened;
SocketAddress proxyAddress;

protected AbstractChannelMetricsHandler(@Nullable SocketAddress remoteAddress, boolean onServer) {
this.remoteAddress = remoteAddress;
Expand Down Expand Up @@ -92,6 +94,11 @@ public void channelInactive(ChannelHandlerContext ctx) {
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
if (!onServer) {
ChannelHandler proxyHandler = ctx.pipeline().get(NettyPipeline.ProxyHandler);
if (proxyHandler != null) {
proxyAddress = ((ProxyHandler) proxyHandler).proxyAddress();
}

ctx.pipeline()
.addAfter(NettyPipeline.ChannelMetricsHandler,
NettyPipeline.ConnectMetricsHandler,
Expand Down Expand Up @@ -192,14 +199,29 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
public abstract ChannelMetricsRecorder recorder();

protected void recordException(ChannelHandlerContext ctx, SocketAddress address) {
recorder().incrementErrorsCount(address);
if (proxyAddress == null) {
recorder().incrementErrorsCount(address);
}
else {
recorder().incrementErrorsCount(address, proxyAddress);
}
}

protected void recordRead(ChannelHandlerContext ctx, SocketAddress address, long bytes) {
recorder().recordDataReceived(address, bytes);
if (proxyAddress == null) {
recorder().recordDataReceived(address, bytes);
}
else {
recorder().recordDataReceived(address, proxyAddress, bytes);
}
}

protected void recordWrite(ChannelHandlerContext ctx, SocketAddress address, long bytes) {
recorder().recordDataSent(address, bytes);
if (proxyAddress == null) {
recorder().recordDataSent(address, bytes);
}
else {
recorder().recordDataSent(address, proxyAddress, bytes);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -120,6 +120,16 @@ public Meter.Type getType() {

public enum ChannelMetersTags implements KeyName {

/**
* Proxy address, when there is a proxy configured.
*/
PROXY_ADDRESS {
@Override
public String asString() {
return "proxy.address";
}
},

/**
* Remote address.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public class ChannelMetricsHandler extends AbstractChannelMetricsHandler {

@Override
public ChannelHandler connectMetricsHandler() {
return new ConnectMetricsHandler(recorder());
return new ConnectMetricsHandler(recorder(), proxyAddress);
}

@Override
public ChannelHandler tlsMetricsHandler() {
return new TlsMetricsHandler(recorder, remoteAddress);
return new TlsMetricsHandler(recorder, remoteAddress, proxyAddress);
}

@Override
Expand All @@ -61,9 +61,11 @@ public ChannelMetricsRecorder recorder() {

static final class ConnectMetricsHandler extends ChannelOutboundHandlerAdapter {

final SocketAddress proxyAddress;
final ChannelMetricsRecorder recorder;

ConnectMetricsHandler(ChannelMetricsRecorder recorder) {
ConnectMetricsHandler(ChannelMetricsRecorder recorder, @Nullable SocketAddress proxyAddress) {
this.proxyAddress = proxyAddress;
this.recorder = recorder;
}

Expand All @@ -75,22 +77,34 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
promise.addListener(future -> {
ctx.pipeline().remove(this);

recorder.recordConnectTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - connectTimeStart),
future.isSuccess() ? SUCCESS : ERROR);
if (proxyAddress == null) {
recorder.recordConnectTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - connectTimeStart),
future.isSuccess() ? SUCCESS : ERROR);
}
else {
recorder.recordConnectTime(
remoteAddress,
proxyAddress,
Duration.ofNanos(System.nanoTime() - connectTimeStart),
future.isSuccess() ? SUCCESS : ERROR);
}
});
}
}

static class TlsMetricsHandler extends ChannelInboundHandlerAdapter {

protected final SocketAddress proxyAddress;
protected final ChannelMetricsRecorder recorder;
protected final SocketAddress remoteAddress;

boolean listenerAdded;

TlsMetricsHandler(ChannelMetricsRecorder recorder, @Nullable SocketAddress remoteAddress) {
TlsMetricsHandler(ChannelMetricsRecorder recorder, @Nullable SocketAddress remoteAddress,
@Nullable SocketAddress proxyAddress) {
this.proxyAddress = proxyAddress;
this.recorder = recorder;
this.remoteAddress = remoteAddress;
}
Expand All @@ -110,10 +124,19 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
}

protected void recordTlsHandshakeTime(ChannelHandlerContext ctx, long tlsHandshakeTimeStart, String status) {
recorder.recordTlsHandshakeTime(
remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress(),
Duration.ofNanos(System.nanoTime() - tlsHandshakeTimeStart),
status);
if (proxyAddress == null) {
recorder.recordTlsHandshakeTime(
remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress(),
Duration.ofNanos(System.nanoTime() - tlsHandshakeTimeStart),
status);
}
else {
recorder.recordTlsHandshakeTime(
remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress(),
proxyAddress,
Duration.ofNanos(System.nanoTime() - tlsHandshakeTimeStart),
status);
}
}

private void addListener(ChannelHandlerContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,18 @@ public interface ChannelMetricsRecorder {
*/
void recordDataReceived(SocketAddress remoteAddress, long bytes);

/**
* Records the amount of the data that is received, in bytes.
*
* @param remoteAddress The remote peer
* @param proxyAddress The proxy address
* @param bytes The amount of the data that is received, in bytes
* @since 1.1.17
*/
default void recordDataReceived(SocketAddress remoteAddress, SocketAddress proxyAddress, long bytes) {
recordDataReceived(remoteAddress, bytes);
}

/**
* Records the amount of the data that is sent, in bytes.
*
Expand All @@ -42,12 +54,35 @@ public interface ChannelMetricsRecorder {
void recordDataSent(SocketAddress remoteAddress, long bytes);

/**
* Increments the number of the errors that are occurred.
* Records the amount of the data that is sent, in bytes.
*
* @param remoteAddress The remote peer
* @param proxyAddress The proxy address
* @param bytes The amount of the data that is sent, in bytes
* @since 1.1.17
*/
default void recordDataSent(SocketAddress remoteAddress, SocketAddress proxyAddress, long bytes) {
recordDataSent(remoteAddress, bytes);
}

/**
* Increments the number of the errors that have occurred.
*
* @param remoteAddress The remote peer
*/
void incrementErrorsCount(SocketAddress remoteAddress);

/**
* Increments the number of the errors that have occurred.
*
* @param remoteAddress The remote peer
* @param proxyAddress The proxy address
* @since 1.1.17
*/
default void incrementErrorsCount(SocketAddress remoteAddress, SocketAddress proxyAddress) {
incrementErrorsCount(remoteAddress);
}

/**
* Records the time that is spent for TLS handshake.
*
Expand All @@ -57,6 +92,19 @@ public interface ChannelMetricsRecorder {
*/
void recordTlsHandshakeTime(SocketAddress remoteAddress, Duration time, String status);

/**
* Records the time that is spent for TLS handshake.
*
* @param remoteAddress The remote peer
* @param proxyAddress The proxy address
* @param time the time in nanoseconds that is spent for TLS handshake
* @param status the status of the operation
* @since 1.1.17
*/
default void recordTlsHandshakeTime(SocketAddress remoteAddress, SocketAddress proxyAddress, Duration time, String status) {
recordTlsHandshakeTime(remoteAddress, time, status);
}

/**
* Records the time that is spent for connecting to the remote address.
* Relevant only when on the client
Expand All @@ -67,6 +115,20 @@ public interface ChannelMetricsRecorder {
*/
void recordConnectTime(SocketAddress remoteAddress, Duration time, String status);

/**
* Records the time that is spent for connecting to the remote address.
* Relevant only when on the client
*
* @param remoteAddress The remote peer
* @param proxyAddress The proxy address
* @param time the time in nanoseconds that is spent for connecting to the remote address
* @param status the status of the operation
* @since 1.1.17
*/
default void recordConnectTime(SocketAddress remoteAddress, SocketAddress proxyAddress, Duration time, String status) {
recordConnectTime(remoteAddress, time, status);
}

/**
* Records the time that is spent for resolving the remote address.
* Relevant only when on the client.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -108,6 +108,16 @@ public String asString() {
*/
enum ConnectTimeLowCardinalityTags implements KeyName {

/**
* Proxy address, when there is a proxy configured.
*/
PROXY_ADDRESS {
@Override
public String asString() {
return "proxy.address";
}
},

/**
* Remote address.
*/
Expand Down
Loading