Skip to content

Commit

Permalink
Merge pull request #29 from jfenc91/feature_eschanges
Browse files Browse the repository at this point in the history
Fixes #28: Changes to run with long running processes.
  • Loading branch information
ajsquared committed Dec 18, 2015
2 parents a996b6a + 87bc2aa commit 158f6dc
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 33 deletions.
3 changes: 2 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ Every pull request will be built with [Travis CI](https://travis-ci.org/etsy/sta
- Andrew Stiegmann [stieg](https://github.com/stieg)
- Joe Meissler [stickperson](https://github.com/stickperson)
- Ben Darfler [bdarfler](https://github.com/bdarfler)
- Ihor Bobak [ibobak](https://github.com/ibobak)
- Ihor Bobak [ibobak](https://github.com/ibobak)
- Jeff Fenchel [jfenc91](https://github.com/jfenc91)
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,18 @@ packageWhitelist | Colon-delimited whitelist for packages to include (optional,
packageBlacklist | Colon-delimited whitelist for packages to exclude (optional, defaults to exclude nothing)
profilers | Colon-delimited list of profiler class names (optional, defaults to CPUProfiler and MemoryProfiler)
reporter | Class name of the reporter to use (optional, defaults to StatsDReporter)
httpPort | The port on which to bind the embedded HTTP server (optional, defaults to 5005)
httpPort | The port on which to bind the embedded HTTP server (optional, defaults to 5005). If this port is already in use, the next free port will be taken.

### Embedded HTTP Server
statsd-jvm-profiler embeds an HTTP server to support simple interactions with the profiler while it is in operation. You can configure the port on which this server runs with the `httpPort` option.

Endpoint | Usage
--------------- | -----
/profilers | List the currently enabled profilers
/disable/:profiler | Disable the profiler specified by `:profiler`. The name must match what is returned by `/profilers`.
Endpoint | Usage
--------------- | -----
/profilers | List the currently enabled profilers
/isRunning | List the running profilers. This should be the same as /profilers.
/disable/:profiler | Disable the profiler specified by `:profiler`. The name must match what is returned by `/profilers`.
/errors | List the past 10 errors from the running profilers and reporters.
/status/profiler/:profiler | Displays a status message with the number of recorded stats for the requested profiler.

### Reporters
statsd-jvm-profiler supports multiple backends. StatsD is the default, but InfluxDB is also supported. You can select the backend to use by passing the `reporter` argument to the profiler; `StatsDReporter` and `InfluxDBReporter` are the supported values.
Expand Down
20 changes: 11 additions & 9 deletions src/main/java/com/etsy/statsd/profiler/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@
import java.lang.instrument.Instrumentation;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;

/**
* javaagent profiler using StatsD as a backend
Expand All @@ -27,6 +25,8 @@
public class Agent {
public static final int EXECUTOR_DELAY = 0;

static AtomicReference<Boolean> isRunning = new AtomicReference<>(true);
static LinkedList<String> errors = new LinkedList<>();
/**
* Start the profiler
*
Expand Down Expand Up @@ -59,12 +59,14 @@ private static void scheduleProfilers(Collection<Profiler> profilers, int httpPo
(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(profilers.size(), new ProfilerThreadFactory()));

Map<String, ScheduledFuture<?>> runningProfilers = new HashMap<>(profilers.size());
Map<String, Profiler> activeProfilers = new HashMap<>(profilers.size());
for (Profiler profiler : profilers) {
ProfilerWorkerThread worker = new ProfilerWorkerThread(profiler);
runningProfilers.put(profiler.getClass().getSimpleName(),
scheduledExecutorService.scheduleAtFixedRate(worker, EXECUTOR_DELAY, profiler.getPeriod(), profiler.getTimeUnit()));
activeProfilers.put(profiler.getClass().getSimpleName(), profiler);
ProfilerWorkerThread worker = new ProfilerWorkerThread(profiler, errors);
ScheduledFuture future = scheduledExecutorService.scheduleAtFixedRate(worker, EXECUTOR_DELAY, profiler.getPeriod(), profiler.getTimeUnit());
runningProfilers.put(profiler.getClass().getSimpleName(), future);
}
ProfilerServer.startServer(runningProfilers, httpPort);
ProfilerServer.startServer(runningProfilers, activeProfilers, httpPort, isRunning, errors);
}

/**
Expand All @@ -73,7 +75,7 @@ private static void scheduleProfilers(Collection<Profiler> profilers, int httpPo
* @param profilers The profilers to flush at shutdown
*/
private static void registerShutdownHook(Collection<Profiler> profilers) {
Thread shutdownHook = new Thread(new ProfilerShutdownHookWorker(profilers));
Thread shutdownHook = new Thread(new ProfilerShutdownHookWorker(profilers, isRunning));
Runtime.getRuntime().addShutdownHook(shutdownHook);
}

Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/etsy/statsd/profiler/Profiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public abstract class Profiler {

private Reporter<?> reporter;

private long recordedStats = 0;
public Profiler(Reporter reporter, Arguments arguments) {
Preconditions.checkNotNull(reporter);
this.reporter = reporter;
Expand Down Expand Up @@ -71,6 +72,7 @@ protected boolean emitBounds() {
* @param value The value of the gauge
*/
protected void recordGaugeValue(String key, long value) {
recordedStats++;
reporter.recordGaugeValue(key, value);
}

Expand All @@ -81,6 +83,9 @@ protected void recordGaugeValue(String key, long value) {
* @param gauges A map of gauge names to values
*/
protected void recordGaugeValues(Map<String, Long> gauges) {
recordedStats++;
reporter.recordGaugeValues(gauges);
}

public long getRecordedStats() { return recordedStats; }
}
27 changes: 23 additions & 4 deletions src/main/java/com/etsy/statsd/profiler/server/ProfilerServer.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package com.etsy.statsd.profiler.server;

import com.etsy.statsd.profiler.Profiler;
import com.sun.org.apache.xpath.internal.operations.Bool;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.VertxFactory;
import org.vertx.java.core.http.HttpServer;

import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Sets up a simple embedded HTTP server for interacting with the profiler while it runs
*
* @author Andrew Johnson
*/
public class ProfilerServer {
private static final Logger log = Logger.getLogger(ProfilerServer.class.getName());
private static final Vertx vertx = VertxFactory.newVertx();

/**
Expand All @@ -21,9 +30,19 @@ public class ProfilerServer {
* @param activeProfilers The active profilers
* @param port The port on which to bind the server
*/
public static void startServer(final Map<String, ScheduledFuture<?>> activeProfilers, final int port) {
HttpServer server = vertx.createHttpServer();
server.requestHandler(RequestHandler.getMatcher(activeProfilers));
server.listen(port);
public static void startServer(final Map<String, ScheduledFuture<?>> runningProfilers, final Map<String, Profiler> activeProfilers, final int port, final AtomicReference<Boolean> isRunning, final LinkedList<String> errors) {
final HttpServer server = vertx.createHttpServer();
server.requestHandler(RequestHandler.getMatcher(runningProfilers, activeProfilers, isRunning, errors));
server.listen(port, new Handler<AsyncResult<HttpServer>>() {
@Override
public void handle(AsyncResult<HttpServer> event) {
if (event.failed()) {
server.close();
startServer(runningProfilers, activeProfilers, port + 1, isRunning, errors);
} else if (event.succeeded()) {
log.info("Profiler server started on port " + port);
}
}
});
}
}
65 changes: 57 additions & 8 deletions src/main/java/com/etsy/statsd/profiler/server/RequestHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.etsy.statsd.profiler.server;

import com.etsy.statsd.profiler.Profiler;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
Expand All @@ -9,41 +10,72 @@
import org.vertx.java.core.http.RouteMatcher;

import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicReference;

/**
* Handler for HTTP requests to the profiler server
*
* @author Andrew Johnson
*/
public class RequestHandler {

/**
* Construct a RouteMatcher for the supported routes
*
* @param activeProfilers The active profilers
* @return A RouteMatcher that matches all supported routes
*/
public static RouteMatcher getMatcher(final Map<String, ScheduledFuture<?>> activeProfilers) {
public static RouteMatcher getMatcher(final Map<String, ScheduledFuture<?>> runningProfilers, Map<String, Profiler> activeProfilers, AtomicReference<Boolean> isRunning, LinkedList<String> errors) {
RouteMatcher matcher = new RouteMatcher();
matcher.get("/profilers", RequestHandler.handleGetProfilers(activeProfilers));
matcher.get("/disable/:profiler", RequestHandler.handleDisableProfiler(activeProfilers));

matcher.get("/profilers", RequestHandler.handleGetProfilers(runningProfilers));
matcher.get("/disable/:profiler", RequestHandler.handleDisableProfiler(runningProfilers));
matcher.get("/status/profiler/:profiler", RequestHandler.handleProfilerStatus(activeProfilers));
matcher.get("/errors", RequestHandler.handleErrorMessages(errors));
matcher.get("/isRunning", RequestHandler.isRunning(isRunning));
return matcher;
}

/**
* Handle a GET to /isRunning
*
* @return A Handler that returns all running profilers
*/
public static Handler<HttpServerRequest> isRunning(final AtomicReference<Boolean> isRunning) {
return new Handler<HttpServerRequest>() {
@Override
public void handle(HttpServerRequest httpServerRequest) {
httpServerRequest.response().end(String.format("isRunning: %b", isRunning.get()));
}
};
}

/**
* Handle a GET to /profilers
*
* @param activeProfilers The active profilers
* @return A Handler that handles a request to the /profilers endpoint
*/
public static Handler<HttpServerRequest> handleGetProfilers(final Map<String, ScheduledFuture<?>> activeProfilers) {
public static Handler<HttpServerRequest> handleGetProfilers(final Map<String, ScheduledFuture<?>> runningProfilers) {
return new Handler<HttpServerRequest>() {
@Override
public void handle(HttpServerRequest httpServerRequest) {
httpServerRequest.response().end(Joiner.on("\n").join(getEnabledProfilers(activeProfilers)));
httpServerRequest.response().end(Joiner.on("\n").join(getEnabledProfilers(runningProfilers)));
}
};
}

/**
* Handle a GET to /errors
*
* @return The last 10 error stacktraces
*/
public static Handler<HttpServerRequest> handleErrorMessages(final LinkedList<String> errors) {
return new Handler<HttpServerRequest>() {
@Override
public void handle(HttpServerRequest httpServerRequest) {
httpServerRequest.response().end("Errors: " + Joiner.on("\n").join(errors));
}
};
}
Expand All @@ -66,6 +98,23 @@ public void handle(HttpServerRequest httpServerRequest) {
};
}

/**
* Handle a GET to /status/profiler/:profiler
*
* @param activeProfilers The active profilers
* @return A Handler that handles a request to the /disable/:profiler endpoint
*/
public static Handler<HttpServerRequest> handleProfilerStatus(final Map<String, Profiler> activeProfilers) {
return new Handler<HttpServerRequest>() {
@Override
public void handle(HttpServerRequest httpServerRequest) {
String profilerName = httpServerRequest.params().get("profiler");
Profiler profiler = activeProfilers.get(profilerName);
httpServerRequest.response().end(String.format("Recorded stats %d\n", profiler.getRecordedStats()));
}
};
}

/**
* Get all enabled profilers
* @param activeProfilers The active profilers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.etsy.statsd.profiler.Profiler;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;

/**
* Worker thread for profiler shutdown hook
Expand All @@ -11,15 +12,18 @@
*/
public class ProfilerShutdownHookWorker implements Runnable {
private Collection<Profiler> profilers;

public ProfilerShutdownHookWorker(Collection<Profiler> profilers) {
private AtomicReference<Boolean> isRunning;
public ProfilerShutdownHookWorker(Collection<Profiler> profilers, AtomicReference<Boolean> isRunning) {
this.profilers = profilers;
this.isRunning = isRunning;
}

@Override
public void run() {
for (Profiler p : profilers) {
p.flushData();
}

isRunning.set(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,36 @@

import com.etsy.statsd.profiler.Profiler;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.LinkedList;

/**
* Worker thread for executing a profiler
*
* @author Andrew Johnson
*/
public class ProfilerWorkerThread implements Runnable {
private Profiler profiler;
private LinkedList<String> errors;

public ProfilerWorkerThread(Profiler profiler) {
public ProfilerWorkerThread(Profiler profiler, LinkedList<String> errors) {
this.profiler = profiler;
this.errors = errors;
}

@Override
public void run() {
profiler.profile();
try {
profiler.profile();
} catch(Exception e) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
errors.add(String.format("Received an error running profiler: %s, error: %s", profiler.getClass().getName(), sw.toString()));
if ( errors.size() > 10) {
errors.pollFirst();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;

Expand All @@ -19,14 +21,16 @@ public void testRunnable() throws InterruptedException {
Profiler mockProfiler1 = new MockProfiler1(output);
Profiler mockProfiler2 = new MockProfiler2(output);
Collection<Profiler> profilers = Arrays.asList(mockProfiler1, mockProfiler2);
AtomicReference<Boolean> isRunning = new AtomicReference<>(true);

Thread t = new Thread(new ProfilerShutdownHookWorker(profilers));
Thread t = new Thread(new ProfilerShutdownHookWorker(profilers, isRunning));
t.run();
t.join();

Set<String> expectedOutput = new HashSet<>();
expectedOutput.add(MockProfiler1.class.getSimpleName() + "-flushData");
expectedOutput.add(MockProfiler2.class.getSimpleName() + "-flushData");
assertEquals(expectedOutput, output);
assertEquals(isRunning.get(), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.junit.Test;

import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;

import static org.junit.Assert.*;
Expand All @@ -15,7 +16,7 @@ public void testRunnable() throws InterruptedException {
Set<String> output = new HashSet<>();
Profiler mockProfiler1 = new MockProfiler1(output);

Thread t = new Thread(new ProfilerWorkerThread(mockProfiler1));
Thread t = new Thread(new ProfilerWorkerThread(mockProfiler1, new LinkedList<String>()));
t.run();
t.join();

Expand Down

0 comments on commit 158f6dc

Please sign in to comment.