Skip to content

Commit

Permalink
Merge pull request #22 from etsy/influxdb09
Browse files Browse the repository at this point in the history
Fixes #16: Support InfluxDB 0.9
  • Loading branch information
ajsquared committed Jul 19, 2015
2 parents 2e88a48 + d7eff9f commit c44ee20
Show file tree
Hide file tree
Showing 22 changed files with 702 additions and 372 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ Name | Meaning
username | The username with which to connect to InfluxDB (required)
password | The password with which to connect to InfluxDB (required)
database | The database to which to write metrics (required)
tagMapping | A mapping of tag names from the metric prefix (optional, defaults to no mapping)

##### Tag Mapping
InfluxDB 0.9 supports tagging measurements and querying based on those tags. statsd-jvm-profilers uses these tags to support richer querying of the produced data. For compatibility with other metric backends, the tags are extracted from the metric prefix.

If the `tagMapping` argument is not defined, only the `prefix` tag will be added, with the value of the entire prefix.

`tagMapping` should be a period-delimited set of tag names. It must have the same number of components as `prefix`, or else an exception would be thrown. Each component of `tagMapping` is the name of the tag. The component in the corresponding position of `prefix` will be the value.

If you do not want to include a component of `prefix` as a tag, use the special name `SKIP` in `tagMapping` for that position.

## Metrics

Expand Down
5 changes: 3 additions & 2 deletions example/StatsDProfilerFlowListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import scala.collection.JavaConversions._
* Flow listener for setting up JobConf to enable statsd-jvm-profiler
*/
class StatsDProfilerFlowListener extends FlowListener {
val baseParamsFormat = "-javaagent:%s=server=%s,port=%s,prefix=bigdata.profiler.%s.%s.%s.%%s.%%s,packageWhitelist=%s,packageBlacklist=%s,username=%s,password=%s,database=%s,reporter=%s"
val baseParamsFormat = "-javaagent:%s=server=%s,port=%s,prefix=bigdata.profiler.%s.%s.%s.%%s.%%s,packageWhitelist=%s,packageBlacklist=%s,username=%s,password=%s,database=%s,reporter=%s,tagMapping=%s"

override def onStarting(flow: Flow[_]): Unit = {
val profilerProps = loadProperties("statsd-jvm-profiler.properties")
Expand All @@ -29,8 +29,9 @@ class StatsDProfilerFlowListener extends FlowListener {
val influxdbPassword = profilerProps.getProperty("influxdb.password")
val influxdbDatabase = profilerProps.getProperty("influxdb.database")
val dashboardUrl = profilerProps.getProperty("dashboard.url")
val tagMapping = profilerProps.getProperty("tagMapping")

val baseParams = baseParamsFormat.format(jarPath, host, port, userName, jobName, flowId, packageWhiteList, packageBlacklist, influxdbUser, influxdbPassword, influxdbDatabase, reporter)
val baseParams = baseParamsFormat.format(jarPath, host, port, userName, jobName, flowId, packageWhiteList, packageBlacklist, influxdbUser, influxdbPassword, influxdbDatabase, reporter, tagMapping)

flow.getFlowSteps.toList foreach { fs: FlowStep[_] =>
val stepNum = fs.getStepNum.toString
Expand Down
1 change: 1 addition & 0 deletions example/statsd-jvm-profiler.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ influxdb.user=profiler
influxdb.password=password
influxdb.database=profiler
reporter=InfluxDBReporter
tagMapping=SKIP.SKIP.username.job.flow.stage.phase
package.blacklist=java.util.zip.ZipFile.getEntry:java.lang.ClassLoader.defineClass1
package.whitelist=com.etsy:com.twitter.scalding:cascading
dashboard.url=dashboard.host:3888/render?user=%%{user}&job=%%{job_name}&run=%%{flow_id}&stage=%%{stage}&phase=%s
11 changes: 9 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@
</snapshotRepository>
</distributionManagement>

<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>com.timgroup</groupId>
Expand All @@ -55,9 +62,9 @@
<version>18.0</version>
</dependency>
<dependency>
<groupId>org.influxdb</groupId>
<groupId>com.github.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>1.5</version>
<version>influxdb-java-2.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/com/etsy/statsd/profiler/Profiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.etsy.statsd.profiler.reporter.Reporter;
import com.google.common.base.Preconditions;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -13,7 +14,7 @@
public abstract class Profiler {
public static final Class<?>[] CONSTRUCTOR_PARAM_TYPES = new Class<?>[]{Reporter.class, Arguments.class};

private Reporter reporter;
private Reporter<?> reporter;

public Profiler(Reporter reporter, Arguments arguments) {
Preconditions.checkNotNull(reporter);
Expand Down Expand Up @@ -61,4 +62,14 @@ public Profiler(Reporter reporter, Arguments arguments) {
protected void recordGaugeValue(String key, long value) {
reporter.recordGaugeValue(key, value);
}

/**
* Record multiple gauge values
* This is useful for reporters that can send points in batch
*
* @param gauges A map of gauge names to values
*/
protected void recordGaugeValues(Map<String, Long> gauges) {
reporter.recordGaugeValues(gauges);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.lang.management.ThreadInfo;
import java.util.*;
Expand Down Expand Up @@ -111,9 +112,11 @@ private List<String> parsePackageList(String packages) {
* @param flushAll Indicate if all data should be flushed
*/
private void recordMethodCounts(boolean flushAll) {
Map<String, Long> metrics = Maps.newHashMap();
for (Map.Entry<String, Long> entry : traces.getDataToFlush(flushAll).entrySet()) {
recordGaugeValue("cpu.trace." + entry.getKey(), entry.getValue());
metrics.put("cpu.trace." + entry.getKey(), entry.getValue());
}
recordGaugeValues(metrics);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import com.etsy.statsd.profiler.Arguments;
import com.etsy.statsd.profiler.Profiler;
import com.etsy.statsd.profiler.reporter.Reporter;
import com.google.common.collect.Maps;

import java.lang.management.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -64,43 +66,46 @@ protected void handleArguments(Arguments arguments) { /* No arguments needed */
* Records all memory statistics
*/
private void recordStats() {
int finalizationPendingCount = memoryMXBean.getObjectPendingFinalizationCount();
long finalizationPendingCount = memoryMXBean.getObjectPendingFinalizationCount();
MemoryUsage heap = memoryMXBean.getHeapMemoryUsage();
MemoryUsage nonHeap = memoryMXBean.getNonHeapMemoryUsage();
Map<String, Long> metrics = Maps.newHashMap();

recordGaugeValue("pending-finalization-count", finalizationPendingCount);
recordMemoryUsage("heap.total", heap);
recordMemoryUsage("nonheap.total", nonHeap);
metrics.put("pending-finalization-count", finalizationPendingCount);
recordMemoryUsage("heap.total", heap, metrics);
recordMemoryUsage("nonheap.total", nonHeap, metrics);

for (GarbageCollectorMXBean gcMXBean : gcMXBeans) {
recordGaugeValue("gc." + gcMXBean.getName() + ".count", gcMXBean.getCollectionCount());
metrics.put("gc." + gcMXBean.getName() + ".count", gcMXBean.getCollectionCount());

final long time = gcMXBean.getCollectionTime();
final long prevTime = gcTimes.get(gcMXBean).get();
final long runtime = time - prevTime;

recordGaugeValue("gc." + gcMXBean.getName() + ".time", time);
recordGaugeValue("gc." + gcMXBean.getName() + ".runtime", runtime);
metrics.put("gc." + gcMXBean.getName() + ".time", time);
metrics.put("gc." + gcMXBean.getName() + ".runtime", runtime);

if (runtime > 0) gcTimes.get(gcMXBean).set(time);
}

int loadedClassCount = classLoadingMXBean.getLoadedClassCount();
long loadedClassCount = classLoadingMXBean.getLoadedClassCount();
long totalLoadedClassCount = classLoadingMXBean.getTotalLoadedClassCount();
long unloadedClassCount = classLoadingMXBean.getUnloadedClassCount();

recordGaugeValue("loaded-class-count", loadedClassCount);
recordGaugeValue("total-loaded-class-count", totalLoadedClassCount);
recordGaugeValue("unloaded-class-count", unloadedClassCount);
metrics.put("loaded-class-count", loadedClassCount);
metrics.put("total-loaded-class-count", totalLoadedClassCount);
metrics.put("unloaded-class-count", unloadedClassCount);

for (MemoryPoolMXBean memoryPoolMXBean: memoryPoolMXBeans) {
String type = poolTypeToMetricName(memoryPoolMXBean.getType());
String name = poolNameToMetricName(memoryPoolMXBean.getName());
String prefix = type + '.' + name;
MemoryUsage usage = memoryPoolMXBean.getUsage();

recordMemoryUsage(prefix, usage);
recordMemoryUsage(prefix, usage, metrics);
}

recordGaugeValues(metrics);
}

/**
Expand All @@ -109,11 +114,11 @@ private void recordStats() {
* @param prefix The prefix to use for this object
* @param memory The MemoryUsage object containing the memory usage info
*/
private void recordMemoryUsage(String prefix, MemoryUsage memory) {
recordGaugeValue(prefix + ".init", memory.getInit());
recordGaugeValue(prefix + ".used", memory.getUsed());
recordGaugeValue(prefix + ".committed", memory.getCommitted());
recordGaugeValue(prefix + ".max", memory.getMax());
private void recordMemoryUsage(String prefix, MemoryUsage memory, Map<String, Long> metrics) {
metrics.put(prefix + ".init", memory.getInit());
metrics.put(prefix + ".used", memory.getUsed());
metrics.put(prefix + ".committed", memory.getCommitted());
metrics.put(prefix + ".max", memory.getMax());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package com.etsy.statsd.profiler.reporter;

import com.etsy.statsd.profiler.Arguments;
import com.etsy.statsd.profiler.util.TagUtil;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Serie;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -18,15 +24,21 @@ public class InfluxDBReporter extends Reporter<InfluxDB> {
public static final String USERNAME_ARG = "username";
public static final String PASSWORD_ARG = "password";
public static final String DATABASE_ARG = "database";
public static final String TAG_MAPPING_ARG = "tagMapping";

private String prefix;
private String username;
private String password;
private String database;
private String tagMapping;
private Map<String, String> tags;

public InfluxDBReporter(Arguments arguments) {
super(arguments);
this.prefix = arguments.metricsPrefix;
// If we have a tag mapping it must match the number of components of the prefix
Preconditions.checkArgument(tagMapping == null || tagMapping.split("\\.").length == prefix.split("\\.").length);
tags = TagUtil.getTags(tagMapping, prefix);
}

/**
Expand All @@ -37,11 +49,24 @@ public InfluxDBReporter(Arguments arguments) {
*/
@Override
public void recordGaugeValue(String key, long value) {
Serie s = new Serie.Builder(String.format("%s.%s", prefix, key))
.columns(VALUE_COLUMN)
.values(value)
Map<String, Long> gauges = ImmutableMap.of(key, value);
recordGaugeValues(gauges);
}

/**
* Record multiple gauge values in InfluxDB
*
* @param gauges A map of gauge names to values
*/
@Override
public void recordGaugeValues(Map<String, Long> gauges) {
long time = System.currentTimeMillis();
BatchPoints batchPoints = BatchPoints.database(database)
.build();
client.write(database, TimeUnit.MILLISECONDS, s);
for (Map.Entry<String, Long> gauge: gauges.entrySet()) {
batchPoints.point(constructPoint(time, gauge.getKey(), gauge.getValue()));
}
client.write(batchPoints);
}

/**
Expand All @@ -66,9 +91,21 @@ protected void handleArguments(Arguments arguments) {
username = arguments.remainingArgs.get(USERNAME_ARG);
password = arguments.remainingArgs.get(PASSWORD_ARG);
database = arguments.remainingArgs.get(DATABASE_ARG);
tagMapping = arguments.remainingArgs.get(TAG_MAPPING_ARG);

Preconditions.checkNotNull(username);
Preconditions.checkNotNull(password);
Preconditions.checkNotNull(database);
}

private Point constructPoint(long time, String key, long value) {
Point.Builder builder = Point.measurement(key)
.time(time, TimeUnit.MILLISECONDS)
.field(VALUE_COLUMN, value);
for (Map.Entry<String, String> entry : tags.entrySet()) {
builder = builder.tag(entry.getKey(), entry.getValue());
}

return builder.build();
}
}
10 changes: 10 additions & 0 deletions src/main/java/com/etsy/statsd/profiler/reporter/Reporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.etsy.statsd.profiler.Arguments;
import com.google.common.base.Preconditions;

import java.util.Map;

/**
* Interface for reporters
*
Expand Down Expand Up @@ -30,6 +32,14 @@ public Reporter(Arguments arguments) {
*/
public abstract void recordGaugeValue(String key, long value);

/**
* Record multiple gauge values
* This is useful for reporters that can send points in batch
*
* @param gauges A map of gauge names to values
*/
public abstract void recordGaugeValues(Map<String, Long> gauges);

/**
* Construct the underlying client implementation for this reporter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.timgroup.statsd.NonBlockingStatsDClient;
import com.timgroup.statsd.StatsDClient;

import java.util.Map;

/**
* Reporter that sends data to StatsD
*
Expand All @@ -25,6 +27,19 @@ public void recordGaugeValue(String key, long value) {
client.recordGaugeValue(key, value);
}

/**
* Record multiple gauge values in StatsD
* This simply loops over calling recordGaugeValue
*
* @param gauges A map of gauge names to values
*/
@Override
public void recordGaugeValues(Map<String, Long> gauges) {
for (Map.Entry<String, Long> gauge : gauges.entrySet()) {
recordGaugeValue(gauge.getKey(), gauge.getValue());
}
}

/**
* Construct a StatsD client
*
Expand Down
Loading

0 comments on commit c44ee20

Please sign in to comment.