Skip to content

Commit

Permalink
Cleanup the examples a bit and update the query example with
Browse files Browse the repository at this point in the history
filters from 2.2.

Signed-off-by: Chris Larsen <clarsen@yahoo-inc.com>
  • Loading branch information
manolama committed Dec 21, 2016
1 parent 039bc41 commit 8993ccf
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 89 deletions.
97 changes: 32 additions & 65 deletions src/examples/AddDataExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
Expand All @@ -33,36 +32,39 @@
public class AddDataExample {
private static String pathToConfigFile;

public static void processArgs(String[] args) {
// Set these as arguments so you don't have to keep path information in
public static void processArgs(final String[] args) {
// Set these as arguments so you don't have to keep path information in
// source files
if (args == null) {
System.err.println("First (and only) argument must be the full path to the opentsdb.conf file. (e.g. /User/thisUser/opentsdb/src/opentsdb.conf");
} else {
if (args != null && args.length > 0) {
pathToConfigFile = args[0];
}
}

public static void main(String[] args) throws Exception {



public static void main(final String[] args) throws Exception {
processArgs(args);

// Create a config object with a path to the file for parsing. Or manually
// override settings.
// e.g. config.overrideConfig("tsd.storage.hbase.zk_quorum", "localhost");
Config config = new Config(pathToConfigFile);
final Config config;
if (pathToConfigFile != null && !pathToConfigFile.isEmpty()) {
config = new Config(pathToConfigFile);
} else {
// Search for a default config from /etc/opentsdb/opentsdb.conf, etc.
config = new Config(true);
}
final TSDB tsdb = new TSDB(config);

// Declare new metric
String metricName = "dummyFromjavaAPI";
String metricName = "my.tsdb.test.metric";
// First check to see it doesn't already exist
byte[] byteMetricUID; // we don't actually need this for the first
// .addPoint() call below.
// TODO: Ideally we could just call a not-yet-implemented tsdb.uIdExists()
// function.
// Note, however, that this is optional. If autometric is enabled, the UID will be assigned in call to addPoint().
// Note, however, that this is optional. If auto metric is enabled
// (tsd.core.auto_create_metrics), the UID will be assigned in call to
// addPoint().
try {
byteMetricUID = tsdb.getUID(UniqueIdType.METRIC, metricName);
} catch (IllegalArgumentException iae) {
Expand All @@ -75,100 +77,65 @@ public static void main(String[] args) throws Exception {
}

// Make a single datum
long timestamp = System.currentTimeMillis();
long timestamp = System.currentTimeMillis() / 1000;
long value = 314159;
// Make key-val
Map<String, String> tags = new HashMap<String, String>(1);
tags.put("dummy-key", "dummy-val1");



tags.put("script", "example1");

// Start timer
long startTime1 = System.currentTimeMillis();

// Write a number of data points at 30 second intervals. Each write will
// return a deferred (similar to a Java Future or JS Promise) that will
// be called on completion with either a "null" value on success or an
// exception.
int n = 100;
ArrayList<Deferred<Object>> deferreds = new ArrayList<Deferred<Object>>(n);
for (int i = 0; i < n; i++) {
Deferred<Object> deferred = tsdb.addPoint(metricName, timestamp, value + i, tags);
deferreds.add(deferred);

timestamp += 30;
}

// Add the callbacks to the deferred object. (They might have already
// returned, btw)
// This will cause the calling thread to wait until the add has completed.

System.out.println("Waiting for deferred result to return...");
Deferred.groupInOrder(deferreds)
.addErrback(new AddDataExample().new errBack())
.addCallback(new AddDataExample().new succBack())
// Block the thread until the deferred returns it's result.
.join();

// Block the thread until the deferred returns it's result.
// deferred.join();
// Alternatively you can add another callback here or use a join with a
// timeout argument.

// End timer.
long elapsedTime1 = System.currentTimeMillis() - startTime1;
System.out.println("\nAdding " + n + " points took: " + elapsedTime1
+ " milliseconds.\n");


// Gracefully shutdown connection to TSDB
tsdb.shutdown();


// Gracefully shutdown connection to TSDB. This is CRITICAL as it will
// flush any pending operations to HBase.
tsdb.shutdown().join();
}

// This is an optional errorback to handle when there is a failure.
class errBack implements Callback<String, Exception> {
public String call(final Exception e) throws Exception {
String message = ">>>>>>>>>>>Failure!>>>>>>>>>>>";
System.err.println(message);
System.err.println(message + " " + e.getMessage());
e.printStackTrace();
return message;
}
};

// This is an optional success callback to handle when there is a success.
class succBack implements Callback<Object, ArrayList<Object>> {
public Object call(ArrayList<Object> results) {
for (Object res : results) {
if (res != null) {
if (res.toString().equals("MultiActionSuccess")) {
System.err.println(">>>>>>>>>>>Success!>>>>>>>>>>>");
}
} else {
System.err.println(">>>>>>>>>>>" + res.getClass() + ">>>>>>>>>>>");
}
}
public Object call(final ArrayList<Object> results) {
System.out.println("Successfully wrote " + results.size() + " data points");
return null;
}
};

public static long[] makeRandomValues(int num, Random rand) {
long[] values = new long[num];
for (int i = 0; i < num; i++) {
// define datum
values[i] = rand.nextInt();
}
return values;
}

public static long[] makeTimestamps(int num, Random rand) {

long[] timestamps = new long[num];
for (int i = 0; i < num; i++) {
// ensures that we won't try to put two data points in the same
// millisecond
try {
Thread.sleep(1); // in millis
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
// define datum
timestamps[i] = System.currentTimeMillis();
}
return timestamps;
}

}
80 changes: 56 additions & 24 deletions src/examples/QueryExample.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// This file is part of OpenTSDB.
// Copyright (C) 2010-2012 The OpenTSDB Authors.
// Copyright (C) 2015 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
Expand All @@ -14,7 +14,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.stumbleupon.async.Callback;
Expand All @@ -27,32 +27,41 @@
import net.opentsdb.core.TSDB;
import net.opentsdb.core.TSQuery;
import net.opentsdb.core.TSSubQuery;
import net.opentsdb.query.filter.TagVFilter;
import net.opentsdb.utils.Config;
import net.opentsdb.utils.DateTime;

/**
* One example on how to query.
* Taken from <a href="https://groups.google.com/forum/#!searchin/opentsdb/java$20api/opentsdb/6MKs-FkSLoA/gifHF327CIAJ">this thread</a>
* Taken from
* <a href="https://groups.google.com/forum/#!searchin/opentsdb/java$20api/opentsdb/6MKs-FkSLoA/gifHF327CIAJ">
* this thread</a>
* The metric and key query arguments assume that you've input data from the Quick Start tutorial
* <a href="http://opentsdb.net/docs/build/html/user_guide/quickstart.html">here.</a>
*/
public class QueryExample {

public static void main(String[] args) throws IOException {
public static void main(final String[] args) throws IOException {

// Set these as arguments so you don't have to keep path information in
// source files
String pathToConfigFile = args[0]; // e.g. "/User/thisUser/opentsdb/src/opentsdb.conf"
String hostValue = args[1]; // e.g. "myComputerName"
String pathToConfigFile = (args != null && args.length > 0 ? args[0] : null);

// Create a config object with a path to the file for parsing. Or manually
// override settings.
// e.g. config.overrideConfig("tsd.storage.hbase.zk_quorum", "localhost");
Config config;
config = new Config(pathToConfigFile);
final Config config;
if (pathToConfigFile != null && !pathToConfigFile.isEmpty()) {
config = new Config(pathToConfigFile);
} else {
// Search for a default config from /etc/opentsdb/opentsdb.conf, etc.
config = new Config(true);
}
final TSDB tsdb = new TSDB(config);

// main query
final TSQuery query = new TSQuery();

// use any string format from
// http://opentsdb.net/docs/build/html/user_guide/query/dates.html
query.setStart("1h-ago");
Expand All @@ -61,13 +70,18 @@ public static void main(String[] args) throws IOException {
// at least one sub query required. This is where you specify the metric and
// tags
final TSSubQuery subQuery = new TSSubQuery();
subQuery.setMetric("proc.loadavg.1m");

// tags are optional but you can create and populate a map
final HashMap<String, String> tags = new HashMap<String, String>(1);
tags.put("host", hostValue);
subQuery.setTags(tags);

subQuery.setMetric("my.tsdb.test.metric");

// filters are optional but useful.
final List<TagVFilter> filters = new ArrayList<TagVFilter>(1);
filters.add(new TagVFilter.Builder()
.setType("literal_or")
.setFilter("example1")
.setTagk("script")
.setGroupBy(true)
.build());
subQuery.setFilters(filters);

// you do have to set an aggregator. Just provide the name as a string
subQuery.setAggregator("sum");

Expand All @@ -88,8 +102,8 @@ public static void main(String[] args) throws IOException {
final int nqueries = tsdbqueries.length;
final ArrayList<DataPoints[]> results = new ArrayList<DataPoints[]>(
nqueries);
final ArrayList<Deferred<DataPoints[]>> deferreds = new ArrayList<Deferred<DataPoints[]>>(
nqueries);
final ArrayList<Deferred<DataPoints[]>> deferreds =
new ArrayList<Deferred<DataPoints[]>>(nqueries);

// this executes each of the sub queries asynchronously and puts the
// deferred in an array so we can wait for them to complete.
Expand All @@ -98,7 +112,7 @@ public static void main(String[] args) throws IOException {
}

// Start timer
long startTime = System.nanoTime();
long startTime = DateTime.nanoTime();

// This is a required callback class to store the results after each
// query has finished
Expand All @@ -109,18 +123,30 @@ public Object call(final ArrayList<DataPoints[]> queryResults)
return null;
}
}

// Make sure to handle any errors that might crop up
class QueriesEB implements Callback<Object, Exception> {
@Override
public Object call(final Exception e) throws Exception {
System.err.println("Queries failed");
e.printStackTrace();
return null;
}
}

// this will cause the calling thread to wait until ALL of the queries
// have completed.
try {
Deferred.groupInOrder(deferreds).addCallback(new QueriesCB())
.joinUninterruptibly();
Deferred.groupInOrder(deferreds)
.addCallback(new QueriesCB())
.addErrback(new QueriesEB())
.join();
} catch (Exception e) {
e.printStackTrace();
}

// End timer.
long elapsedTime = (System.nanoTime() - startTime) / (1000*1000);
double elapsedTime = DateTime.msFromNanoDiff(DateTime.nanoTime(), startTime);
System.out.println("Query returned in: " + elapsedTime + " milliseconds.");

// now all of the results are in so we just iterate over each set of
Expand Down Expand Up @@ -156,11 +182,17 @@ public Object call(final ArrayList<DataPoints[]> queryResults)
+ (dp.isInteger() ? dp.longValue() : dp.doubleValue()));
}
System.out.println("");

// Gracefully shutdown connection to TSDB
tsdb.shutdown();
}
}

// Gracefully shutdown connection to TSDB
try {
tsdb.shutdown().join();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}

}

0 comments on commit 8993ccf

Please sign in to comment.