Skip to content

Commit

Permalink
Add proxy address information to the metrics (#3081)
Browse files Browse the repository at this point in the history
Related to #3060
  • Loading branch information
violetagg authored Mar 7, 2024
1 parent 4a55911 commit 1401280
Show file tree
Hide file tree
Showing 26 changed files with 1,018 additions and 133 deletions.
4 changes: 3 additions & 1 deletion reactor-netty-core/src/main/java/reactor/netty/Metrics.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -271,6 +271,8 @@ public class Metrics {
// Tags
public static final String LOCAL_ADDRESS = "local.address";

public static final String PROXY_ADDRESS = "proxy.address";

public static final String REMOTE_ADDRESS = "remote.address";

public static final String URI = "uri";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.ssl.AbstractSniHandler;
import io.netty.handler.ssl.SslHandler;
import reactor.netty.NettyPipeline;
Expand All @@ -47,6 +48,7 @@ public abstract class AbstractChannelMetricsHandler extends ChannelDuplexHandler
final boolean onServer;

boolean channelOpened;
SocketAddress proxyAddress;

protected AbstractChannelMetricsHandler(@Nullable SocketAddress remoteAddress, boolean onServer) {
this.remoteAddress = remoteAddress;
Expand Down Expand Up @@ -92,6 +94,11 @@ public void channelInactive(ChannelHandlerContext ctx) {
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
if (!onServer) {
ChannelHandler proxyHandler = ctx.pipeline().get(NettyPipeline.ProxyHandler);
if (proxyHandler != null) {
proxyAddress = ((ProxyHandler) proxyHandler).proxyAddress();
}

ctx.pipeline()
.addAfter(NettyPipeline.ChannelMetricsHandler,
NettyPipeline.ConnectMetricsHandler,
Expand Down Expand Up @@ -192,14 +199,29 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
public abstract ChannelMetricsRecorder recorder();

protected void recordException(ChannelHandlerContext ctx, SocketAddress address) {
recorder().incrementErrorsCount(address);
if (proxyAddress == null) {
recorder().incrementErrorsCount(address);
}
else {
recorder().incrementErrorsCount(address, proxyAddress);
}
}

protected void recordRead(ChannelHandlerContext ctx, SocketAddress address, long bytes) {
recorder().recordDataReceived(address, bytes);
if (proxyAddress == null) {
recorder().recordDataReceived(address, bytes);
}
else {
recorder().recordDataReceived(address, proxyAddress, bytes);
}
}

protected void recordWrite(ChannelHandlerContext ctx, SocketAddress address, long bytes) {
recorder().recordDataSent(address, bytes);
if (proxyAddress == null) {
recorder().recordDataSent(address, bytes);
}
else {
recorder().recordDataSent(address, proxyAddress, bytes);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -120,6 +120,16 @@ public Meter.Type getType() {

public enum ChannelMetersTags implements KeyName {

/**
* Proxy address, when there is a proxy configured.
*/
PROXY_ADDRESS {
@Override
public String asString() {
return "proxy.address";
}
},

/**
* Remote address.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public class ChannelMetricsHandler extends AbstractChannelMetricsHandler {

@Override
public ChannelHandler connectMetricsHandler() {
return new ConnectMetricsHandler(recorder());
return new ConnectMetricsHandler(recorder(), proxyAddress);
}

@Override
public ChannelHandler tlsMetricsHandler() {
return new TlsMetricsHandler(recorder, remoteAddress);
return new TlsMetricsHandler(recorder, remoteAddress, proxyAddress);
}

@Override
Expand All @@ -61,9 +61,11 @@ public ChannelMetricsRecorder recorder() {

static final class ConnectMetricsHandler extends ChannelOutboundHandlerAdapter {

final SocketAddress proxyAddress;
final ChannelMetricsRecorder recorder;

ConnectMetricsHandler(ChannelMetricsRecorder recorder) {
ConnectMetricsHandler(ChannelMetricsRecorder recorder, @Nullable SocketAddress proxyAddress) {
this.proxyAddress = proxyAddress;
this.recorder = recorder;
}

Expand All @@ -75,22 +77,34 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
promise.addListener(future -> {
ctx.pipeline().remove(this);

recorder.recordConnectTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - connectTimeStart),
future.isSuccess() ? SUCCESS : ERROR);
if (proxyAddress == null) {
recorder.recordConnectTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - connectTimeStart),
future.isSuccess() ? SUCCESS : ERROR);
}
else {
recorder.recordConnectTime(
remoteAddress,
proxyAddress,
Duration.ofNanos(System.nanoTime() - connectTimeStart),
future.isSuccess() ? SUCCESS : ERROR);
}
});
}
}

static class TlsMetricsHandler extends ChannelInboundHandlerAdapter {

protected final SocketAddress proxyAddress;
protected final ChannelMetricsRecorder recorder;
protected final SocketAddress remoteAddress;

boolean listenerAdded;

TlsMetricsHandler(ChannelMetricsRecorder recorder, @Nullable SocketAddress remoteAddress) {
TlsMetricsHandler(ChannelMetricsRecorder recorder, @Nullable SocketAddress remoteAddress,
@Nullable SocketAddress proxyAddress) {
this.proxyAddress = proxyAddress;
this.recorder = recorder;
this.remoteAddress = remoteAddress;
}
Expand All @@ -110,10 +124,19 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
}

protected void recordTlsHandshakeTime(ChannelHandlerContext ctx, long tlsHandshakeTimeStart, String status) {
recorder.recordTlsHandshakeTime(
remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress(),
Duration.ofNanos(System.nanoTime() - tlsHandshakeTimeStart),
status);
if (proxyAddress == null) {
recorder.recordTlsHandshakeTime(
remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress(),
Duration.ofNanos(System.nanoTime() - tlsHandshakeTimeStart),
status);
}
else {
recorder.recordTlsHandshakeTime(
remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress(),
proxyAddress,
Duration.ofNanos(System.nanoTime() - tlsHandshakeTimeStart),
status);
}
}

private void addListener(ChannelHandlerContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,18 @@ public interface ChannelMetricsRecorder {
*/
void recordDataReceived(SocketAddress remoteAddress, long bytes);

/**
* Records the amount of the data that is received, in bytes.
*
* @param remoteAddress The remote peer
* @param proxyAddress The proxy address
* @param bytes The amount of the data that is received, in bytes
* @since 1.1.17
*/
default void recordDataReceived(SocketAddress remoteAddress, SocketAddress proxyAddress, long bytes) {
recordDataReceived(remoteAddress, bytes);
}

/**
* Records the amount of the data that is sent, in bytes.
*
Expand All @@ -42,12 +54,35 @@ public interface ChannelMetricsRecorder {
void recordDataSent(SocketAddress remoteAddress, long bytes);

/**
* Increments the number of the errors that are occurred.
* Records the amount of the data that is sent, in bytes.
*
* @param remoteAddress The remote peer
* @param proxyAddress The proxy address
* @param bytes The amount of the data that is sent, in bytes
* @since 1.1.17
*/
default void recordDataSent(SocketAddress remoteAddress, SocketAddress proxyAddress, long bytes) {
recordDataSent(remoteAddress, bytes);
}

/**
* Increments the number of the errors that have occurred.
*
* @param remoteAddress The remote peer
*/
void incrementErrorsCount(SocketAddress remoteAddress);

/**
* Increments the number of the errors that have occurred.
*
* @param remoteAddress The remote peer
* @param proxyAddress The proxy address
* @since 1.1.17
*/
default void incrementErrorsCount(SocketAddress remoteAddress, SocketAddress proxyAddress) {
incrementErrorsCount(remoteAddress);
}

/**
* Records the time that is spent for TLS handshake.
*
Expand All @@ -57,6 +92,19 @@ public interface ChannelMetricsRecorder {
*/
void recordTlsHandshakeTime(SocketAddress remoteAddress, Duration time, String status);

/**
* Records the time that is spent for TLS handshake.
*
* @param remoteAddress The remote peer
* @param proxyAddress The proxy address
* @param time the time in nanoseconds that is spent for TLS handshake
* @param status the status of the operation
* @since 1.1.17
*/
default void recordTlsHandshakeTime(SocketAddress remoteAddress, SocketAddress proxyAddress, Duration time, String status) {
recordTlsHandshakeTime(remoteAddress, time, status);
}

/**
* Records the time that is spent for connecting to the remote address.
* Relevant only when on the client
Expand All @@ -67,6 +115,20 @@ public interface ChannelMetricsRecorder {
*/
void recordConnectTime(SocketAddress remoteAddress, Duration time, String status);

/**
* Records the time that is spent for connecting to the remote address.
* Relevant only when on the client
*
* @param remoteAddress The remote peer
* @param proxyAddress The proxy address
* @param time the time in nanoseconds that is spent for connecting to the remote address
* @param status the status of the operation
* @since 1.1.17
*/
default void recordConnectTime(SocketAddress remoteAddress, SocketAddress proxyAddress, Duration time, String status) {
recordConnectTime(remoteAddress, time, status);
}

/**
* Records the time that is spent for resolving the remote address.
* Relevant only when on the client.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -108,6 +108,16 @@ public String asString() {
*/
enum ConnectTimeLowCardinalityTags implements KeyName {

/**
* Proxy address, when there is a proxy configured.
*/
PROXY_ADDRESS {
@Override
public String asString() {
return "proxy.address";
}
},

/**
* Remote address.
*/
Expand Down
Loading

0 comments on commit 1401280

Please sign in to comment.