Skip to content

Commit

Permalink
Supports OpenSearch V2 through ES_* environment variables (#229)
Browse files Browse the repository at this point in the history
Add Zipkin Dependencies support of for OpenSearch storage (see please
openzipkin/zipkin#3765). Tested with Zipkin
3.4.0-SNAPSHOT locally:

---------

Signed-off-by: Andriy Redko <drreta@gmail.com>
  • Loading branch information
reta authored May 25, 2024
1 parent 8e8ea60 commit c2311f9
Show file tree
Hide file tree
Showing 19 changed files with 1,101 additions and 12 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ jobs:
- name: zipkin-dependencies-elasticsearch-v8
module: zipkin-dependencies-elasticsearch
groups: docker,elasticsearch8
- name: zipkin-dependencies-opensearch-v2
module: zipkin-dependencies-opensearch
groups: docker,opensearch2
- name: zipkin-dependencies-mysql
module: zipkin-dependencies-mysql
groups: docker
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ are supported, including Cassandra, MySQL and Elasticsearch.

* `STORAGE_TYPE=cassandra3` : requires Cassandra 3.11.3+; tested against the latest patch of 4.0
* `STORAGE_TYPE=mysql` : requires MySQL 5.6+; tested against MySQL 10.11
* `STORAGE_TYPE=elasticsearch` : requires Elasticsearch 7+; tested against last minor release of 7.x and 8.x
* `STORAGE_TYPE=elasticsearch` : requires Elasticsearch 7+ or OpenSearch 2.x; tested against last minor release of Elasticsearch 7.x and 8.x, OpenSearch 2.x

## Quick-start

Expand Down Expand Up @@ -92,20 +92,20 @@ $ STORAGE_TYPE=mysql MYSQL_USER=root java -jar zipkin-dependencies.jar
```

### Elasticsearch Storage
Elasticsearch is used when `STORAGE_TYPE=elasticsearch`. The schema is compatible with Zipkin's [Elasticsearch storage component](https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/elasticsearch).
Elasticsearch/OpenSearch is used when `STORAGE_TYPE=elasticsearch`. The schema is compatible with Zipkin's [Elasticsearch storage component](https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/elasticsearch).

* `ES_INDEX`: The index prefix to use when generating daily index names. Defaults to zipkin.
* `ES_DATE_SEPARATOR`: The separator used when generating dates in index.
Defaults to '-' so the queried index look like zipkin-yyyy-DD-mm
Could for example be changed to '.' to give zipkin-yyyy.MM.dd
* `ES_HOSTS`: A comma separated list of elasticsearch hosts advertising http. Defaults to
* `ES_HOSTS`: A comma separated list of Elasticsearch / OpenSearch hosts advertising http. Defaults to
localhost. Add port section if not listening on port 9200. Only one of these hosts
needs to be available to fetch the remaining nodes in the cluster. It is
recommended to set this to all the master nodes of the cluster. Use url format for
SSL. For example, "https://yourhost:8888"
* `ES_NODES_WAN_ONLY`: Set to true to only use the values set in ES_HOSTS, for example if your
elasticsearch cluster is in Docker. Defaults to false
* `ES_USERNAME` and `ES_PASSWORD`: Elasticsearch basic authentication. Use when X-Pack security
Elasticsearch / OpenSearch cluster is in Docker. Defaults to false
* `ES_USERNAME` and `ES_PASSWORD`: Elasticsearch / OpenSearch basic authentication. Use when X-Pack security
(formerly Shield) is in place. By default no username or
password is provided to elasticsearch.

Expand Down
7 changes: 7 additions & 0 deletions docker/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ $ STORAGE_TYPE=elasticsearch
$ docker-compose -f docker-compose.yml -f docker-compose-${STORAGE_TYPE}.yml up
```

The `elasticsearch` storage type is also compatible with OpenSearch,
you can start the example setup like that:

```
$ docker-compose -f docker-compose.yml -f docker-compose-opensearch.yml up
```

This starts zipkin, the corresponding storage and makes an example request.
After that, it runs the dependencies job on-demand.

Expand Down
40 changes: 40 additions & 0 deletions docker/examples/docker-compose-opensearch.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# Copyright The OpenZipkin Authors
# SPDX-License-Identifier: Apache-2.0
#

# This file uses the version 2 docker-compose file format, described here:
# https://docs.docker.com/compose/compose-file/#version-2
#
# It extends the default configuration from docker-compose.yml to run the
# zipkin-opensearch2 container instead of the zipkin-mysql container.

version: '2.4'

services:
storage:
image: ghcr.io/openzipkin/zipkin-opensearch2:${TAG:-latest}
container_name: opensearch
# Uncomment to expose the storage port for testing
# ports:
# - 9200:9200

# Use OpenSearch instead of in-memory storage
zipkin:
extends:
file: docker-compose.yml
service: zipkin
environment:
- STORAGE_TYPE=elasticsearch
# Point the zipkin at the storage backend
- ES_HOSTS=opensearch:9200
# Uncomment to see requests to and from elasticsearch
# - ES_HTTP_LOGGING=BODY

dependencies:
extends:
file: docker-compose.yml
service: dependencies
environment:
- STORAGE_TYPE=elasticsearch
- ES_HOSTS=opensearch
13 changes: 13 additions & 0 deletions main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@
<artifactId>zipkin-dependencies-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-dependencies-opensearch</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.linecorp.armeria</groupId>
<artifactId>armeria-junit5</artifactId>
<version>${armeria.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
25 changes: 18 additions & 7 deletions main/src/main/java/zipkin2/dependencies/ZipkinDependenciesJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.LinkedHashMap;
import java.util.TimeZone;
import zipkin2.dependencies.elasticsearch.ElasticsearchDependenciesJob;
import zipkin2.dependencies.opensearch.OpensearchDependenciesJob;
import zipkin2.dependencies.mysql.MySQLDependenciesJob;

public final class ZipkinDependenciesJob {
Expand Down Expand Up @@ -61,13 +62,23 @@ public static void main(String[] args) throws UnsupportedEncodingException {
.run();
break;
case "elasticsearch":
ElasticsearchDependenciesJob.builder()
.logInitializer(logInitializer)
.jars(jarPath)
.day(day)
.conf(sparkConf)
.build()
.run();
if (ZipkinElasticsearchStorage.flavor().equalsIgnoreCase("elasticsearch")) {
ElasticsearchDependenciesJob.builder()
.logInitializer(logInitializer)
.jars(jarPath)
.day(day)
.conf(sparkConf)
.build()
.run();
} else { // "opensearch"
OpensearchDependenciesJob.builder()
.logInitializer(logInitializer)
.jars(jarPath)
.day(day)
.conf(sparkConf)
.build()
.run();
}
break;
default:
throw new UnsupportedOperationException("Unsupported STORAGE_TYPE: " + storageType + "\n"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/

package zipkin2.dependencies;

import java.io.IOException;
import java.net.Authenticator;
import java.net.PasswordAuthentication;
import java.net.Socket;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509ExtendedTrustManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ZipkinElasticsearchStorage {
private static final Logger LOG = LoggerFactory.getLogger(ZipkinElasticsearchStorage.class);
private static final Pattern DISTRIBUTION = Pattern.compile("\"distribution\"\s*[:]\s*\"([^\"]+)\"");

static final String HOSTS = getEnv("ES_HOSTS", "127.0.0.1");
static final String USERNAME = getEnv("ES_USERNAME", null);
static final String PASSWORD = getEnv("ES_PASSWORD", null);

static TrustManager[] TRUST_ALL = new TrustManager [] {
new X509ExtendedTrustManager() {
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}

@Override
public void checkClientTrusted(X509Certificate[] certs, String authType) {
}

@Override
public void checkServerTrusted(X509Certificate[] certs, String authType) {
}

@Override
public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException {
}

@Override
public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException {
}

@Override
public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException {
}

@Override
public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException {
}
}
};

static String flavor() {
return flavor(HOSTS, USERNAME, PASSWORD);
}

static String flavor(String hosts, String username, String password) {
final HttpClient.Builder builder = HttpClient
.newBuilder()
.connectTimeout(Duration.ofSeconds(5));

if (username != null && password != null) {
builder.authenticator(new Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(username, password.toCharArray());
}
});
}

try {
final SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, TRUST_ALL, new SecureRandom());

final HttpClient client = builder.sslContext(sslContext).build();
try {
for (String host: parseHosts(hosts)) {
final HttpRequest request = HttpRequest.newBuilder().GET().uri(URI.create(host)).build();
try {
final HttpResponse<String> response = client.send(request, BodyHandlers.ofString());
final Matcher matcher = DISTRIBUTION.matcher(response.body());
if (matcher.find()) {
return matcher.group(1).toLowerCase();
}
} catch (InterruptedException | IOException ex) {
LOG.warn("Unable issue HTTP GET request to '" + host + "'", ex);
}
}
} finally {
if (client instanceof AutoCloseable) {
try {
// Since JDK-21, the HttpClient is AutoCloseable
((AutoCloseable) client).close();
} catch (Exception ex) {
/* Ignore */
}
}
}
} catch (final NoSuchAlgorithmException | KeyManagementException ex) {
LOG.warn("Unable to configure HttpClient", ex);
}

return "elasticsearch";
}

private static String getEnv(String key, String defaultValue) {
String result = System.getenv(key);
return result != null && !result.isEmpty() ? result : defaultValue;
}

static String[] parseHosts(String hosts) {
final String[] hostParts = hosts.split(",", -1);

// Detect default scheme to use if not specified
String defaultScheme = "http";
for (int i = 0; i < hostParts.length; i++) {
String host = hostParts[i];
if (host.startsWith("https")) {
defaultScheme = "https";
break;
}
}

Collection<String> list = new ArrayList<>();
for (int i = 0; i < hostParts.length; i++) {
String host = hostParts[i];
URI httpUri = host.startsWith("http") ? URI.create(host) : URI.create(defaultScheme + "://" + host);

int port = httpUri.getPort();
if (port == -1) {
port = 9200; /* default Elasticsearch / OpenSearch port */
}

list.add(httpUri.getScheme() + "://" + httpUri.getHost() + ":" + port);
}

return list.toArray(new String[0]);
}
}
Loading

0 comments on commit c2311f9

Please sign in to comment.