Skip to content

Commit

Permalink
Expose metrics for pending acquire operation latency
Browse files Browse the repository at this point in the history
Fixes #2946
  • Loading branch information
violetagg committed Nov 22, 2023
1 parent 707047e commit 9ab1b64
Show file tree
Hide file tree
Showing 11 changed files with 711 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -127,6 +127,26 @@ public Meter.Type getType() {
}
},

/**
* Time spent in pending acquire a connection from the connection pool.
*/
PENDING_CONNECTIONS_TIME {
@Override
public String getName() {
return "reactor.netty.connection.provider.pending.connections.time";
}

@Override
public KeyName[] getKeyNames() {
return PendingConnectionsTimeTags.values();
}

@Override
public Meter.Type getType() {
return Meter.Type.TIMER;
}
},

/**
* The number of all connections in the connection pool, active or idle.
*/
Expand Down Expand Up @@ -179,4 +199,47 @@ public String asString() {
}
}
}

enum PendingConnectionsTimeTags implements KeyName {

/**
* ID.
*/
ID {
@Override
public String asString() {
return "id";
}
},

/**
* NAME.
*/
NAME {
@Override
public String asString() {
return "name";
}
},

/**
* Remote address.
*/
REMOTE_ADDRESS {
@Override
public String asString() {
return "remote.address";
}
},

/**
* STATUS.
*/
STATUS {
@Override
public String asString() {
return "status";
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -96,6 +96,16 @@ protected InstrumentedPool<PooledConnection> createPool(
return new PooledConnectionAllocator(config, poolFactory, remoteAddress, resolverGroup).pool;
}

@Override
protected InstrumentedPool<PooledConnection> createPool(
String id,
TransportConfig config,
PoolFactory<PooledConnection> poolFactory,
SocketAddress remoteAddress,
AddressResolverGroup<?> resolverGroup) {
return new PooledConnectionAllocator(id, name, config, poolFactory, remoteAddress, resolverGroup).pool;
}

static final Logger log = Loggers.getLogger(DefaultPooledConnectionProvider.class);

static final AttributeKey<ConnectionObserver> OWNER = AttributeKey.valueOf("connectionOwner");
Expand Down Expand Up @@ -502,10 +512,23 @@ static final class PooledConnectionAllocator {
PoolFactory<PooledConnection> provider,
SocketAddress remoteAddress,
AddressResolverGroup<?> resolver) {
this(null, null, config, provider, remoteAddress, resolver);
}

PooledConnectionAllocator(
@Nullable String id,
@Nullable String name,
TransportConfig config,
PoolFactory<PooledConnection> provider,
SocketAddress remoteAddress,
AddressResolverGroup<?> resolver) {
this.config = config;
this.remoteAddress = remoteAddress;
this.resolver = resolver;
this.pool = provider.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE);
this.pool = id == null ?
provider.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE) :
provider.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
new MicrometerPoolMetricsRecorder(id, name, remoteAddress));
}

Publisher<PooledConnection> connectChannel() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (c) 2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty.resources;

import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import reactor.core.Disposable;
import reactor.pool.PoolMetricsRecorder;

import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;

import static reactor.netty.Metrics.ERROR;
import static reactor.netty.Metrics.REGISTRY;
import static reactor.netty.Metrics.SUCCESS;
import static reactor.netty.Metrics.formatSocketAddress;
import static reactor.netty.resources.ConnectionProviderMeters.PENDING_CONNECTIONS_TIME;
import static reactor.netty.resources.ConnectionProviderMeters.PendingConnectionsTimeTags.ID;
import static reactor.netty.resources.ConnectionProviderMeters.PendingConnectionsTimeTags.NAME;
import static reactor.netty.resources.ConnectionProviderMeters.PendingConnectionsTimeTags.REMOTE_ADDRESS;
import static reactor.netty.resources.ConnectionProviderMeters.PendingConnectionsTimeTags.STATUS;

final class MicrometerPoolMetricsRecorder implements Disposable, PoolMetricsRecorder {

final Timer pendingSuccessTimer;
final Timer pendingErrorTimer;

MicrometerPoolMetricsRecorder(String id, String poolName, SocketAddress remoteAddress) {
pendingSuccessTimer =
Timer.builder(PENDING_CONNECTIONS_TIME.getName())
.tags(Tags.of(ID.asString(), id, REMOTE_ADDRESS.asString(), formatSocketAddress(remoteAddress),
NAME.asString(), poolName, STATUS.asString(), SUCCESS))
.register(REGISTRY);
pendingErrorTimer =
Timer.builder(PENDING_CONNECTIONS_TIME.getName())
.tags(Tags.of(ID.asString(), id, REMOTE_ADDRESS.asString(), formatSocketAddress(remoteAddress),
NAME.asString(), poolName, STATUS.asString(), ERROR))
.register(REGISTRY);
}

@Override
public void recordAllocationSuccessAndLatency(long latencyMs) {
//noop
}

@Override
public void recordAllocationFailureAndLatency(long latencyMs) {
//noop
}

@Override
public void recordResetLatency(long latencyMs) {
//noop
}

@Override
public void recordDestroyLatency(long latencyMs) {
//noop
}

@Override
public void recordRecycled() {
//noop
}

@Override
public void recordLifetimeDuration(long millisecondsSinceAllocation) {
//noop
}

@Override
public void recordIdleTime(long millisecondsIdle) {
//noop
}

@Override
public void recordSlowPath() {
//noop
}

@Override
public void recordFastPath() {
//noop
}

@Override
public void recordPendingSuccessAndLatency(long latencyMs) {
pendingSuccessTimer.record(latencyMs, TimeUnit.MILLISECONDS);
}

@Override
public void recordPendingFailureAndLatency(long latencyMs) {
pendingErrorTimer.record(latencyMs, TimeUnit.MILLISECONDS);
}

@Override
public void dispose() {
REGISTRY.remove(pendingSuccessTimer);
REGISTRY.remove(pendingErrorTimer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import reactor.pool.Pool;
import reactor.pool.PoolBuilder;
import reactor.pool.PoolConfig;
import reactor.pool.PoolMetricsRecorder;
import reactor.pool.PooledRef;
import reactor.pool.PooledRefMetadata;
import reactor.pool.decorators.GracefulShutdownInstrumentedPool;
Expand Down Expand Up @@ -131,12 +132,16 @@ public final Mono<? extends Connection> acquire(
log.debug("Creating a new [{}] client pool [{}] for [{}]", name, poolFactory, remoteAddress);
}

InstrumentedPool<T> newPool = createPool(config, poolFactory, remoteAddress, resolverGroup);
boolean metricsEnabled = poolFactory.metricsEnabled || config.metricsRecorder() != null;
String id = metricsEnabled ? poolKey.hashCode() + "" : null;

if (poolFactory.metricsEnabled || config.metricsRecorder() != null) {
InstrumentedPool<T> newPool = metricsEnabled && Metrics.isMicrometerAvailable() ?
createPool(id, config, poolFactory, remoteAddress, resolverGroup) :
createPool(config, poolFactory, remoteAddress, resolverGroup);

if (metricsEnabled) {
// registrar is null when metrics are enabled on HttpClient level or
// with the `metrics(boolean metricsEnabled)` method on ConnectionProvider
String id = poolKey.hashCode() + "";
if (poolFactory.registrar != null) {
poolFactory.registrar.get().registerMetrics(name, id, remoteAddress,
new DelegatingConnectionPoolMetrics(newPool.metrics()));
Expand Down Expand Up @@ -204,6 +209,10 @@ public final Mono<Void> disposeLater() {
}
else if (Metrics.isMicrometerAvailable()) {
deRegisterDefaultMetrics(id, remoteAddress);
PoolMetricsRecorder recorder = pool.config().metricsRecorder();
if (recorder instanceof Disposable) {
((Disposable) recorder).dispose();
}
}
});
});
Expand All @@ -215,6 +224,10 @@ else if (Metrics.isMicrometerAvailable()) {
}
else if (Metrics.isMicrometerAvailable()) {
deRegisterDefaultMetrics(id, remoteAddress);
PoolMetricsRecorder recorder = pool.config().metricsRecorder();
if (recorder instanceof Disposable) {
((Disposable) recorder).dispose();
}
}
})
);
Expand Down Expand Up @@ -251,6 +264,10 @@ public final void disposeWhen(SocketAddress address) {
}
else if (Metrics.isMicrometerAvailable()) {
deRegisterDefaultMetrics(id, address);
PoolMetricsRecorder recorder = e.getValue().config().metricsRecorder();
if (recorder instanceof Disposable) {
((Disposable) recorder).dispose();
}
}
})
).subscribe();
Expand Down Expand Up @@ -303,6 +320,15 @@ protected abstract InstrumentedPool<T> createPool(
SocketAddress remoteAddress,
AddressResolverGroup<?> resolverGroup);

protected InstrumentedPool<T> createPool(
String id,
TransportConfig config,
PoolFactory<T> poolFactory,
SocketAddress remoteAddress,
AddressResolverGroup<?> resolverGroup) {
return createPool(config, poolFactory, remoteAddress, resolverGroup);
}

protected PoolFactory<T> poolFactory(SocketAddress remoteAddress) {
return poolFactoryPerRemoteHost.getOrDefault(remoteAddress, defaultPoolFactory);
}
Expand Down Expand Up @@ -376,6 +402,10 @@ final void disposeInactivePoolsInBackground() {
}
else if (Metrics.isMicrometerAvailable()) {
deRegisterDefaultMetrics(id, address);
PoolMetricsRecorder recorder = e.getValue().config().metricsRecorder();
if (recorder instanceof Disposable) {
((Disposable) recorder).dispose();
}
}
})
).subscribe();
Expand Down Expand Up @@ -470,6 +500,18 @@ public InstrumentedPool<T> newPool(
return newPoolInternal(allocator, destroyHandler, evictionPredicate).buildPool();
}

public InstrumentedPool<T> newPool(
Publisher<T> allocator,
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> evictionPredicate,
PoolMetricsRecorder poolMetricsRecorder) {
if (disposeTimeout != null) {
return newPoolInternal(allocator, destroyHandler, evictionPredicate, poolMetricsRecorder)
.buildPoolAndDecorateWith(InstrumentedPoolDecorators::gracefulShutdown);
}
return newPoolInternal(allocator, destroyHandler, evictionPredicate, poolMetricsRecorder).buildPool();
}

public InstrumentedPool<T> newPool(
Publisher<T> allocator,
@Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility
Expand All @@ -483,10 +525,31 @@ public InstrumentedPool<T> newPool(
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate).build(poolFactory);
}

public InstrumentedPool<T> newPool(
Publisher<T> allocator,
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
PoolMetricsRecorder poolMetricsRecorder,
Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
if (disposeTimeout != null) {
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder)
.build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown));
}
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder).build(poolFactory);
}

PoolBuilder<T, PoolConfig<T>> newPoolInternal(
Publisher<T> allocator,
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate) {
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate, null);
}

PoolBuilder<T, PoolConfig<T>> newPoolInternal(
Publisher<T> allocator,
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
@Nullable PoolMetricsRecorder poolMetricsRecorder) {
PoolBuilder<T, PoolConfig<T>> poolBuilder =
PoolBuilder.from(allocator)
.destroyHandler(destroyHandler)
Expand Down Expand Up @@ -535,6 +598,10 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
poolBuilder = poolBuilder.idleResourceReuseMruOrder();
}

if (poolMetricsRecorder != null) {
poolBuilder.metricsRecorder(poolMetricsRecorder);
}

return poolBuilder;
}

Expand Down
Loading

0 comments on commit 9ab1b64

Please sign in to comment.