Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix memory leak in mbean info cache; reference to unregistered mbean was kept forever #64

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.xnio.channels.StreamSinkChannel;

import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.server.HttpHandler;
Expand All @@ -34,6 +34,7 @@
import net.thisptr.jmx.exporter.agent.scripting.ScriptEngineRegistry;
import net.thisptr.jmx.exporter.agent.writer.PrometheusMetricWriter;
import net.thisptr.jmx.exporter.agent.writer.PrometheusMetricWriter.WritableByteChannelController;
import org.xnio.channels.StreamSinkChannel;

public class ExporterHttpHandler implements HttpHandler {
private static final Logger LOG = Logger.getLogger(ExporterHttpHandler.class.getName());
Expand Down Expand Up @@ -119,21 +120,38 @@ public void emit(final Sample sample) {
}
}

private void handleGetMetrics(final HttpServerExchange exchange) throws InterruptedException, IOException {
private void handleGetMetrics(final HttpServerExchange exchange) throws IOException {
final OptionsConfig options = getOptions(exchange);

final Map<String, List<PrometheusMetric>> allMetrics = new TreeMap<>();
// Exporter metrics
metricRegistry.forEach((instrumented) -> {
instrumented.toPrometheus((metric) -> {
allMetrics.computeIfAbsent(metric.name, (name) -> new ArrayList<>()).add(metric);

try {
// Exporter metrics
metricRegistry.forEach((instrumented) -> {
instrumented.toPrometheus((metric) -> {
allMetrics.computeIfAbsent(metric.name, (name) -> new ArrayList<>()).add(metric);
});
});
});
// User metrics
scraper.scrape(new PrometheusScrapeOutput((metric) -> {
allMetrics.computeIfAbsent(metric.name, (name) -> new ArrayList<>()).add(metric);
}), options.minimumResponseTime, TimeUnit.MILLISECONDS);
// User metrics
scraper.scrape(new PrometheusScrapeOutput((metric) -> {
allMetrics.computeIfAbsent(metric.name, (name) -> new ArrayList<>()).add(metric);
}), Duration.ofMillis(options.minimumResponseTime));
} catch (Exception e) {
sendErrorResponse(exchange, StatusCodes.INTERNAL_SERVER_ERROR, e.getClass().getSimpleName() + ": " + e.getMessage());
return;
}

sendResponse(exchange, allMetrics, options);
}

private void sendErrorResponse(final HttpServerExchange exchange, int statusCode, String message) throws IOException {
exchange.setStatusCode(statusCode);
exchange.getResponseHeaders().add(Headers.CONTENT_TYPE, "text/plain");
final StreamSinkChannel channel = exchange.getResponseChannel();
channel.write(ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8)));
}

private void sendResponse(final HttpServerExchange exchange, final Map<String, List<PrometheusMetric>> allMetrics, final OptionsConfig options) throws IOException {
exchange.setStatusCode(StatusCodes.OK);
exchange.getResponseHeaders().add(Headers.CONTENT_TYPE, "text/plain; version=0.0.4; charset=utf-8");

Expand Down Expand Up @@ -186,16 +204,16 @@ public void handleRequest(final HttpServerExchange exchange) throws Exception {
return;
}
switch (exchange.getRequestPath()) {
case "/metrics":
if (!exchange.getRequestMethod().equalToString("GET")) {
exchange.setStatusCode(StatusCodes.METHOD_NOT_ALLOWED);
return;
}
handleGetMetrics(exchange);
break;
default:
exchange.setStatusCode(StatusCodes.NOT_FOUND);
break;
case "/metrics":
if (!exchange.getRequestMethod().equalToString("GET")) {
exchange.setStatusCode(StatusCodes.METHOD_NOT_ALLOWED);
return;
}
handleGetMetrics(exchange);
break;
default:
exchange.setStatusCode(StatusCodes.NOT_FOUND);
break;
}
}
}
39 changes: 39 additions & 0 deletions src/main/java/net/thisptr/jmx/exporter/agent/misc/Pacemaker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package net.thisptr.jmx.exporter.agent.misc;

import java.time.Duration;

public class Pacemaker {

public static final Duration DEFAULT_PRECISION = Duration.ofMillis(10);

private int i = 0;

private final int n;

private final long durationNanos;

private final long startNanos;

private final long precisionNanos;

public Pacemaker(Duration duration, int n) {
this.startNanos = System.nanoTime();
this.durationNanos = duration.toNanos();
this.precisionNanos = DEFAULT_PRECISION.toNanos();
this.n = n;
}

public void yield() throws InterruptedException {
i++;
final long waitUntilNanos = startNanos + (long) ((i / (double) n) * durationNanos);
final long sleepNanos = waitUntilNanos - System.nanoTime();
if (sleepNanos > precisionNanos) // sleep only when we are more than 10ms ahead, to avoid excessive context switches.
sleepNanos(sleepNanos);
}

private static void sleepNanos(final long totalNanos) throws InterruptedException {
final int nanos = (int) (totalNanos % 1000000L);
final long millis = totalNanos / 1000000L;
Thread.sleep(millis, nanos);
}
}
46 changes: 22 additions & 24 deletions src/main/java/net/thisptr/jmx/exporter/agent/scraper/Scraper.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package net.thisptr.jmx.exporter.agent.scraper;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -24,12 +25,11 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

import net.thisptr.jmx.exporter.agent.config.Config.ScrapeRule;
import net.thisptr.jmx.exporter.agent.misc.AttributeNamePattern;
import net.thisptr.jmx.exporter.agent.misc.FastObjectName;
import net.thisptr.jmx.exporter.agent.misc.Pacemaker;
import net.thisptr.jmx.exporter.agent.misc.Pair;
import net.thisptr.jmx.exporter.agent.utils.MoreCollections;

public class Scraper {
private static final Logger LOG = Logger.getLogger(Scraper.class.getName());
Expand All @@ -48,7 +48,7 @@ public class Scraper {
* of this behavior.
* </p>
*
* @see https://docs.oracle.com/javase/8/docs/technotes/guides/jmx/JMX_1_4_specification.pdf
* @see <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/jmx/JMX_1_4_specification.pdf">JMX Specification 1.4</a>
*/
private final LoadingCache<ObjectName, CachedMBeanInfo> mbeanInfoCache = CacheBuilder.newBuilder()
.refreshAfterWrite(60, TimeUnit.SECONDS)
Expand Down Expand Up @@ -101,6 +101,7 @@ public Scraper(final MBeanServer server, final List<ScrapeRule> rules) {
}

private static final RuleMatch DEFAULT_RULE;

static {
final ScrapeRule skipRule = new ScrapeRule();
skipRule.skip = true;
Expand Down Expand Up @@ -219,7 +220,7 @@ public AttributeRule(final MBeanAttributeInfo attribute, final RuleMatch ruleMat
}

public void scrape(final ScrapeOutput output) throws InterruptedException {
scrape(output, 0L, TimeUnit.MILLISECONDS);
scrape(output, Duration.ZERO);
}

/**
Expand Down Expand Up @@ -311,32 +312,29 @@ private CachedMBeanInfo prepare(final ObjectName _name) throws InstanceNotFoundE
return new CachedMBeanInfo(name, info, requests, attributes.toArray(new String[0]));
}

public void scrape(final ScrapeOutput output, final long duration, final TimeUnit unit) throws InterruptedException {
public void scrape(final ScrapeOutput output, final Duration duration) throws InterruptedException {
// We use server.queryNames() here, instead of server.queryMBeans(), to avoid costly server.getMBeanInfo() invoked internally.
final Set<ObjectName> names;
try {
names = server.queryNames(null, null);
} catch (final Throwable th) {
if (LOG.isLoggable(Level.SEVERE))
LOG.log(Level.SEVERE, String.format("MBeanServer#queryNames(null, null) #=> %s. This is bad. We can't do anything but to return an empty response.", th.getClass().getSimpleName()), th);
return;
}
if (names == null) {
LOG.log(Level.SEVERE, "MBeanServer#queryNames(null, null) #=> null. This is bad. We can't do anything but to return an empty response.");
return;
}
final Set<ObjectName> names = server.queryNames(null, null);

MoreCollections.forEachSlowlyOverDuration(names, duration, unit, (_name) -> {
Pacemaker pacemaker = new Pacemaker(duration, names.size());
for (ObjectName name : names) {
try {
scrape(output, _name);
scrape(output, name);
} catch (final InstanceNotFoundException e) {
if (LOG.isLoggable(Level.FINE))
LOG.log(Level.FINE, String.format("MBeanServer#getMBeanInfo(%s) #=> %s. This can happen when the MBean is unregistered after queryNames() and is just a temporary thing.", _name, e.getClass().getSimpleName()), e);
LOG.log(Level.FINE, String.format("MBeanServer#getMBeanInfo(%s) #=> %s. This can happen when the MBean is unregistered after queryNames() and is just a temporary thing.", name, e.getClass().getSimpleName()), e);
} catch (final Throwable th) {
if (LOG.isLoggable(Level.WARNING))
LOG.log(Level.WARNING, String.format("Got an unexpected exception while processing the MBean \"%s\". This is likely a bug in scriptable-jmx-exporter. Please report an issue on GitHub.", _name), th);
LOG.log(Level.WARNING, String.format("Got an unexpected exception while processing the MBean \"%s\". This is likely a bug in scriptable-jmx-exporter. Please report an issue on GitHub.", name), th);
}
});
pacemaker.yield();
}

evictMBeanInfoCacheExcluding(names);
}

private void evictMBeanInfoCacheExcluding(final Set<ObjectName> keepNames) {
mbeanInfoCache.asMap().keySet().retainAll(keepNames);
}

private void fallbackAndHandleNonAttributeList(final ScrapeOutput output, final CachedMBeanInfo info, final AttributeList obtainedAttributes, final long timestamp) {
Expand All @@ -349,7 +347,7 @@ private void fallbackAndHandleNonAttributeList(final ScrapeOutput output, final
final AttributeRule request = info.requests.get(info.attributeNamesToGet[i]);
if (request == null) {
if (LOG.isLoggable(Level.WARNING))
LOG.log(Level.WARNING, String.format("This is not expected happen. The attribute \"{}\" could not be found in requests.", info.attributeNamesToGet[i]));
LOG.log(Level.WARNING, String.format("This is not expected happen. The attribute \"%s\" could not be found in requests.", info.attributeNamesToGet[i]));
continue;
}
Object value = obtainedAttributes.get(i);
Expand Down Expand Up @@ -408,7 +406,7 @@ public void scrape(final ScrapeOutput output, final ObjectName _name) throws Ins
final AttributeRule request = info.requests.get(attribute.getName());
if (request == null) {
if (LOG.isLoggable(Level.FINE))
LOG.log(Level.FINE, String.format("MBeanServer#getAttributes(%s, %s) returned an attribute named \"%s\" which we didn't request. This indicates a bug in the MBean. Ignored.", _name, info.attributeNamesToGet, attribute.getName()));
LOG.log(Level.FINE, String.format("MBeanServer#getAttributes(%s, %s) returned an attribute named \"%s\" which we didn't request. This indicates a bug in the MBean. Ignored.", _name, Arrays.toString(info.attributeNamesToGet), attribute.getName()));
continue;
}
++successfulAttributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import net.thisptr.jmx.exporter.agent.misc.Pacemaker;
import org.junit.jupiter.api.Test;

import net.thisptr.jmx.exporter.agent.config.Config.ScrapeRule;
Expand Down Expand Up @@ -51,9 +53,9 @@ void testSlowScrape() throws Exception {
final Set<Sample> actual = new HashSet<>();
scraper.scrape((sample) -> {
actual.add(sample);
}, 3, TimeUnit.SECONDS);
}, Duration.ofSeconds(3));

assertTrue(3000L <= System.currentTimeMillis() - start);
assertTrue(3000L - Pacemaker.DEFAULT_PRECISION.toMillis() <= System.currentTimeMillis() - start);
assertTrue(10 < actual.size());
}
}