Skip to content

Commit

Permalink
Add support for InfluxDB backend
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Johnson committed Apr 10, 2015
1 parent 2807dd6 commit ad81cae
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 48 deletions.
29 changes: 23 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,33 @@ The profiler is enabled using the JVM's `-javaagent` argument. You are required

An example of setting up Cascading/Scalding jobs to use the profiler can be found in the `example` directory.

### Options
### Global Options

Name | Meaning
-------------- | -------
server | The hostname of the StatsD instance (required)
port | The port number for the StatsD instance (required)
prefix | The prefix for metrics (optional, defaults to statsd-jvm-profiler)
Name | Meaning
---------------- | -------
server | The hostname to which the reporter should send data (required)
port | The port number for the server to which the reporter should send data (required)
prefix | The prefix for metrics (optional, defaults to statsd-jvm-profiler)
packageWhitelist | Colon-delimited whitelist for packages to include (optional, defaults to include everything)
packageBlacklist | Colon-delimited whitelist for packages to exclude (optional, defaults to exclude nothing)
profilers | Colon-delimited list of profiler class names (optional, defaults to CPUProfiler and MemoryProfiler)
reporter | Class name of the reporter to use (optional, defaults to StatsDReporter)

### Reporters
statsd-jvm-profiler supports multiple backends. StatsD is the default, but InfluxDB is also supported. You can select the backend to use by passing the `reporter` argument to the profiler; `StatsDReporter` and `InfluxDBReporter` are the supported values.

Some reporters may require additional arguments.

#### StatsDReporter
This reporter does not have any additional arguments.

#### InfluxDBReporter

Name | Meaning
----------- | -------
username | The username with which to connect to InfluxDB (required)
password | The password with which to connect to InfluxDB (required)
database | The database to which to write metrics (required)

## Metrics

Expand Down
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.etsy</groupId>
<artifactId>statsd-jvm-profiler</artifactId>
<version>0.6.1-SNAPSHOT</version>
<version>0.7.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>statsd-jvm-profiler</name>
Expand Down Expand Up @@ -65,6 +65,11 @@
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>1.5</version>
</dependency>
</dependencies>

<profiles>
Expand Down
54 changes: 34 additions & 20 deletions src/main/java/com/etsy/statsd/profiler/Agent.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.etsy.statsd.profiler;

import com.etsy.statsd.profiler.reporter.Reporter;
import com.etsy.statsd.profiler.reporter.StatsDReporter;
import com.etsy.statsd.profiler.worker.ProfilerShutdownHookWorker;
import com.etsy.statsd.profiler.worker.ProfilerThreadFactory;
import com.etsy.statsd.profiler.worker.ProfilerWorkerThread;
Expand Down Expand Up @@ -32,26 +31,15 @@ public class Agent {
*/
public static void premain(final String args, final Instrumentation instrumentation) {
Arguments arguments = Arguments.parseArgs(args);
String statsdServer = arguments.statsdServer;
int statsdPort = arguments.statsdPort;
String server = arguments.server;
int port = arguments.port;
String prefix = arguments.metricsPrefix.or("statsd-jvm-profiler");

Reporter reporter = new StatsDReporter(statsdServer, statsdPort, prefix);
Reporter reporter = instantiate(arguments.reporter, Reporter.CONSTRUCTOR_PARAM_TYPES, server, port, prefix, arguments);

Collection<Profiler> profilers = new ArrayList<Profiler>();
for (Class<? extends Profiler> profiler : arguments.profilers) {
try {
Constructor<? extends Profiler> constructor = profiler.getConstructor(Reporter.class, Arguments.class);
profilers.add(constructor.newInstance(reporter, arguments));
} catch (NoSuchMethodException e) {
handleInitializationException(profiler, e);
} catch (InvocationTargetException e) {
handleInitializationException(profiler, e);
} catch (InstantiationException e) {
handleInitializationException(profiler, e);
} catch (IllegalAccessException e) {
handleInitializationException(profiler, e);
}
profilers.add(instantiate(profiler, Profiler.CONSTRUCTOR_PARAM_TYPES, reporter, arguments));
}

scheduleProfilers(profilers);
Expand Down Expand Up @@ -88,10 +76,36 @@ private static void registerShutdownHook(Collection<Profiler> profilers) {
/**
* Uniformed handling of initialization exception since Java 6 can't do multiple catch
*
* @param profiler
* @param cause
* @param clazz The class that could not be instantiated
* @param cause The underlying exception
*/
private static void handleInitializationException(final Class<? extends Profiler> profiler, final Exception cause) {
throw new RuntimeException("Unable to instantiate " + profiler.getSimpleName(), cause);
private static void handleInitializationException(final Class<?> clazz, final Exception cause) {
throw new RuntimeException("Unable to instantiate " + clazz.getSimpleName(), cause);
}

/**
* Instantiate an object
*
* @param clazz A Class representing the type of object to instantiate
* @param parameterTypes The parameter types for the constructor
* @param initArgs The values to pass to the constructor
* @param <T> The type of the object to instantiate
* @return A new instance of type T
*/
private static <T> T instantiate(final Class<T> clazz, Class<?>[] parameterTypes, Object... initArgs) {
try {
Constructor<T> constructor = clazz.getConstructor(parameterTypes);
return constructor.newInstance(initArgs);
} catch (NoSuchMethodException e) {
handleInitializationException(clazz, e);
} catch (InvocationTargetException e) {
handleInitializationException(clazz, e);
} catch (InstantiationException e) {
handleInitializationException(clazz, e);
} catch (IllegalAccessException e) {
handleInitializationException(clazz, e);
}

return null;
}
}
41 changes: 32 additions & 9 deletions src/main/java/com/etsy/statsd/profiler/Arguments.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.etsy.statsd.profiler.profilers.CPUProfiler;
import com.etsy.statsd.profiler.profilers.MemoryProfiler;
import com.etsy.statsd.profiler.reporter.Reporter;
import com.etsy.statsd.profiler.reporter.StatsDReporter;
import com.google.common.base.Optional;

import java.util.*;
Expand All @@ -12,12 +14,13 @@
* @author Andrew Johnson
*/
public class Arguments {
private static final String STATSD_SERVER = "server";
private static final String STATSD_PORT = "port";
private static final String SERVER = "server";
private static final String PORT = "port";
private static final String METRICS_PREFIX = "prefix";
private static final String PROFILERS = "profilers";
private static final String REPORTER = "reporter";

private static final Collection<String> REQUIRED = Arrays.asList(STATSD_SERVER, STATSD_PORT);
private static final Collection<String> REQUIRED = Arrays.asList(SERVER, PORT);

/**
* Parses arguments into an Arguments object
Expand Down Expand Up @@ -45,25 +48,45 @@ public static Arguments parseArgs(final String args) {
return new Arguments(parsed);
}

public String statsdServer;
public int statsdPort;
public String server;
public int port;
public Optional<String> metricsPrefix;
public Set<Class<? extends Profiler>> profilers;
public Map<String, String> remainingArgs;
public Class<? extends Reporter<?>> reporter;

private Arguments(Map<String, String> parsedArgs) {
statsdServer = parsedArgs.get(STATSD_SERVER);
statsdPort = Integer.parseInt(parsedArgs.get(STATSD_PORT));
server = parsedArgs.get(SERVER);
port = Integer.parseInt(parsedArgs.get(PORT));
metricsPrefix = Optional.fromNullable(parsedArgs.get(METRICS_PREFIX));
profilers = parseProfilerArg(parsedArgs.get(PROFILERS));
reporter = parserReporterArg(parsedArgs.get(REPORTER));

parsedArgs.remove(STATSD_SERVER);
parsedArgs.remove(STATSD_PORT);
parsedArgs.remove(SERVER);
parsedArgs.remove(PORT);
parsedArgs.remove(METRICS_PREFIX);
parsedArgs.remove(PROFILERS);
remainingArgs = parsedArgs;
}

@SuppressWarnings("unchecked")
private Class<? extends Reporter<?>> parserReporterArg(String reporterArg) {
if (reporterArg == null) {
return StatsDReporter.class;
} else {
try {
return (Class<? extends Reporter<?>>) Class.forName(reporterArg);
} catch (ClassNotFoundException e) {
// This might indicate the package was left off, so we'll try with the default package
try {
return (Class<? extends Reporter<?>>) Class.forName("com.etsy.statsd.profiler.reporter." + reporterArg);
} catch (ClassNotFoundException inner) {
throw new IllegalArgumentException("Reporter " + reporterArg + " not found", inner);
}
}
}
}

@SuppressWarnings("unchecked")
private Set<Class<? extends Profiler>> parseProfilerArg(String profilerArg) {
Set<Class<? extends Profiler>> profilers = new HashSet<Class<? extends Profiler>>();
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/etsy/statsd/profiler/Profiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
* @author Andrew Johnson
*/
public abstract class Profiler {
public static final Class<?>[] CONSTRUCTOR_PARAM_TYPES = new Class<?>[]{Reporter.class, Arguments.class};

private Reporter reporter;

public Profiler(Reporter reporter, Arguments arguments) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.etsy.statsd.profiler.reporter;

import com.etsy.statsd.profiler.Arguments;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Serie;

import java.util.concurrent.TimeUnit;

/**
* Reporter that sends data to InfluxDB
*
* @author Andrew Johnson
*/
public class InfluxDBReporter extends Reporter<InfluxDB> {
public static final String VALUE_COLUMN = "value";
public static final String USERNAME_ARG = "username";
public static final String PASSWORD_ARG = "password";
public static final String DATABASE_ARG = "database";

private String prefix;
private String username;
private String password;
private String database;

public InfluxDBReporter(String server, int port, String prefix, Arguments arguments) {
super(server, port, prefix, arguments);
this.prefix = prefix;
}

/**
* Record a gauge value in InfluxDB
*
* @param key The key for the gauge
* @param value The value of the gauge
*/
@Override
public void recordGaugeValue(String key, long value) {
Serie s = new Serie.Builder(String.format("%s.%s", prefix, key))
.columns(VALUE_COLUMN)
.values(value)
.build();
client.write(database, TimeUnit.MILLISECONDS, s);
}

/**
*
* @param server The server to which to report data
* @param port The port on which the server is running
* @param prefix The prefix for metrics
* @return An InfluxDB client
*/
@Override
protected InfluxDB createClient(String server, int port, String prefix) {
return InfluxDBFactory.connect(String.format("http://%s:%d", server, port), username, password);
}

/**
* Handle remaining arguments
*
* @param arguments The arguments given to the profiler agent
*/
@Override
protected void handleArguments(Arguments arguments) {
username = arguments.remainingArgs.get(USERNAME_ARG);
password = arguments.remainingArgs.get(PASSWORD_ARG);
database = arguments.remainingArgs.get(DATABASE_ARG);
}
}
35 changes: 33 additions & 2 deletions src/main/java/com/etsy/statsd/profiler/reporter/Reporter.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,47 @@
package com.etsy.statsd.profiler.reporter;

import com.etsy.statsd.profiler.Arguments;

/**
* Interface for reporters
*
* @author Andrew Johnson
*/
public interface Reporter {
public abstract class Reporter<T> {
public static final Class<?>[] CONSTRUCTOR_PARAM_TYPES =new Class<?>[]{String.class, int.class, String.class, Arguments.class};

/**
* The underlying implementation for this reporter
*/
protected T client;

public Reporter(String server, int port, String prefix, Arguments arguments) {
handleArguments(arguments);
client = createClient(server, port, prefix);
}

/**
* Record a gauge value
*
* @param key The name of the gauge
* @param value The value of the gauge
*/
void recordGaugeValue(String key, long value);
public abstract void recordGaugeValue(String key, long value);

/**
* Construct the underlying client implementation for this reporter
*
* @param server The server to which to report data
* @param port The port on which the server is running
* @param prefix The prefix for metrics
* @return An instance of T, the client implementation
*/
protected abstract T createClient(String server, int port, String prefix);

/**
* Handle any additional arguments necessary for this reporter
*
* @param arguments The arguments given to the profiler agent
*/
protected abstract void handleArguments(Arguments arguments);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.etsy.statsd.profiler.reporter;

import com.etsy.statsd.profiler.Arguments;
import com.timgroup.statsd.NonBlockingStatsDClient;
import com.timgroup.statsd.StatsDClient;

Expand All @@ -8,11 +9,9 @@
*
* @author Andrew Johnson
*/
public class StatsDReporter implements Reporter {
private StatsDClient client;

public StatsDReporter(String server, int port, String prefix) {
client = new NonBlockingStatsDClient(prefix, server, port);
public class StatsDReporter extends Reporter<StatsDClient> {
public StatsDReporter(String server, int port, String prefix, Arguments arguments) {
super(server, port, prefix, arguments);
}

/**
Expand All @@ -25,4 +24,25 @@ public StatsDReporter(String server, int port, String prefix) {
public void recordGaugeValue(String key, long value) {
client.recordGaugeValue(key, value);
}

/**
* Construct a StatsD client
*
* @param server The hostname of the StatsD server
* @param port The port on which StatsD is running
* @param prefix The prefix for all metrics sent
* @return A StatsD client
*/
@Override
protected StatsDClient createClient(String server, int port, String prefix) {
return new NonBlockingStatsDClient(prefix, server, port);
}

/**
* Handle additional arguments
*
* @param arguments The arguments given to the profiler agent
*/
@Override
protected void handleArguments(Arguments arguments) { }
}
Loading

0 comments on commit ad81cae

Please sign in to comment.