Skip to content

Commit

Permalink
Merge pull request #120 from NipunaMadhushan/java21
Browse files Browse the repository at this point in the history
Add metrics logs observer
  • Loading branch information
NipunaMadhushan authored Dec 20, 2024
2 parents be9da2c + bd58117 commit 3c54658
Show file tree
Hide file tree
Showing 7 changed files with 369 additions and 4 deletions.
12 changes: 12 additions & 0 deletions ballerina/init.bal
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ function init() {
externPrintError("failed to enable tracing");
}
}

if (observe:isMetricsLogsEnabled()) {
var err = externEnableMetricsLogging();
if (err is error) {
externPrintError("failed to enable tracing");
}
}
}

function externEnableMetrics(string provider) returns error? = @java:Method {
Expand All @@ -43,6 +50,11 @@ function externEnableTracing(string provider) returns error? = @java:Method {
name: "enableTracing"
} external;

function externEnableMetricsLogging() returns error? = @java:Method {
'class: "io.ballerina.stdlib.observe.internal.NativeFunctions",
name: "enableMetricsLogging"
} external;

function externPrintError(string message) = @java:Method {
'class: "io.ballerina.stdlib.observe.internal.NativeFunctions",
name: "printError"
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ org.gradle.caching=true
group=io.ballerina
version=1.4.0-SNAPSHOT

ballerinaLangVersion=2201.11.0-20241218-101200-109f6cc7
ballerinaLangVersion=2201.11.0-20241219-143200-53f6d83e
githubSpotbugsVersion=6.0.18
githubJohnrengelmanShadowVersion=8.1.1
underCouchDownloadVersion=5.4.0
Expand All @@ -28,7 +28,7 @@ openTelemetryVersion=1.7.0
nettyCodecVersion=4.1.100.Final
gsonVersion=2.10.1

observeVersion=1.4.0-20241218-111700-4d29d40
observeVersion=1.4.0-20241220-100300-6b4cf7a

# Test Dependency Versions
testngVersion=7.6.1
Expand Down
7 changes: 7 additions & 0 deletions native/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<FindBugsFilter>
<Match>
<Class name="io.ballerina.stdlib.observe.observers.BallerinaMetricsLogsObserver"/>
<Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@
*/
package io.ballerina.stdlib.observe.internal;

import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.observability.ObserveUtils;
import io.ballerina.runtime.observability.metrics.BallerinaMetricsObserver;
import io.ballerina.runtime.observability.metrics.DefaultMetricRegistry;
import io.ballerina.runtime.observability.metrics.MetricRegistry;
import io.ballerina.runtime.observability.metrics.noop.NoOpMetricProvider;
import io.ballerina.runtime.observability.metrics.spi.MetricProvider;
import io.ballerina.runtime.observability.tracer.BallerinaTracingObserver;
import io.ballerina.runtime.observability.tracer.TracersStore;
import io.ballerina.runtime.observability.tracer.noop.NoOpTracerProvider;
import io.ballerina.runtime.observability.tracer.spi.TracerProvider;
import io.ballerina.stdlib.observe.observers.BallerinaMetricsLogsObserver;
import io.ballerina.stdlib.observe.observers.BallerinaMetricsObserver;
import io.ballerina.stdlib.observe.observers.BallerinaTracingObserver;

import java.io.PrintStream;
import java.util.ServiceLoader;
Expand Down Expand Up @@ -94,6 +96,15 @@ public static BError enableTracing(BString providerName) {
}
}

public static BError enableMetricsLogging(Environment env) {
try {
ObserveUtils.addObserver(new BallerinaMetricsLogsObserver(env));
return null;
} catch (BError e) {
return e;
}
}

public static void printError(BString message) {
errStream.println("error: " + message.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.ballerina.stdlib.observe.observers;

import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Module;
import io.ballerina.runtime.api.creators.ValueCreator;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.observability.BallerinaObserver;
import io.ballerina.runtime.observability.ObserverContext;
import io.ballerina.runtime.observability.metrics.Tag;

import java.io.PrintStream;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static io.ballerina.runtime.observability.ObservabilityConstants.PROPERTY_KEY_HTTP_STATUS_CODE;
import static io.ballerina.runtime.observability.ObservabilityConstants.STATUS_CODE_GROUP_SUFFIX;
import static io.ballerina.runtime.observability.ObservabilityConstants.TAG_KEY_HTTP_STATUS_CODE_GROUP;

public class BallerinaMetricsLogsObserver implements BallerinaObserver {
private static final String ORG_NAME = "ballerinax";
private static final String MODULE_NAME = "metrics.logs";
private static final String METRIC_LOG_FUNCTION_NAME = "printMetricsLog";
private static final String PROPERTY_START_TIME = "_observation_start_time_";
private static final PrintStream consoleError = System.err;

private static Environment environment;

public BallerinaMetricsLogsObserver(Environment environment) {
BallerinaMetricsLogsObserver.environment = environment;
}

@Override
public void startServerObservation(ObserverContext observerContext) {
startObservation(observerContext);
}

@Override
public void startClientObservation(ObserverContext observerContext) {
startObservation(observerContext);
}

@Override
public void stopServerObservation(ObserverContext observerContext) {
if (!observerContext.isStarted()) {
// Do not collect metrics if the observation hasn't started
return;
}
stopObservation(observerContext);
}

@Override
public void stopClientObservation(ObserverContext observerContext) {
if (!observerContext.isStarted()) {
// Do not collect metrics if the observation hasn't started
return;
}
stopObservation(observerContext);
}

private void startObservation(ObserverContext observerContext) {
if (observerContext.getProperty(PROPERTY_START_TIME) == null) {
observerContext.addProperty(PROPERTY_START_TIME, System.nanoTime());
}
}

private void stopObservation(ObserverContext observerContext) {
Set<Tag> tags = new HashSet<>();
Map<String, Tag> customTags = observerContext.customMetricTags;
if (customTags != null) {
tags.addAll(customTags.values());
}
tags.addAll(observerContext.getAllTags());

// Add status_code_group tag
Integer statusCode = (Integer) observerContext.getProperty(PROPERTY_KEY_HTTP_STATUS_CODE);
if (statusCode != null && statusCode > 0) {
tags.add(Tag.of(TAG_KEY_HTTP_STATUS_CODE_GROUP, (statusCode / 100) + STATUS_CODE_GROUP_SUFFIX));
}

try {
Long startTime = (Long) observerContext.getProperty(PROPERTY_START_TIME);
long duration = System.nanoTime() - startTime;

Optional<String> protocolValue = Optional.empty();
if (tags.stream().anyMatch(tag -> tag.getKey().equals("protocol"))) {
protocolValue = tags.stream().filter(tag -> tag.getKey().equals("protocol")).map(Tag::getValue)
.findFirst();
}
String protocol = protocolValue.orElse("http");

BMap<BString, Object> logAttributes = ValueCreator.createMapValue();
logAttributes.put(StringUtils.fromString("protocol"), StringUtils.fromString(protocol));
tags.stream().filter(tag -> !tag.getKey().equals("protocol"))
.forEach(tag -> logAttributes.put(StringUtils.fromString(tag.getKey()),
StringUtils.fromString(tag.getValue())));
logAttributes.put(StringUtils.fromString("response_time_seconds"),
StringUtils.fromString(String.valueOf(duration / 1E9)));

printMetricLog(logAttributes);
} catch (RuntimeException e) {
handleError("multiple metrics", tags, e);
}
}

private void handleError(String metricName, Set<Tag> tags, RuntimeException e) {
// Metric Provider may throw exceptions if there is a mismatch in tags.
consoleError.println("error: error collecting metric logs for " + metricName + " with tags " + tags +
": " + e.getMessage());
}

private static void printMetricLog(BMap<BString, Object> logAttributes) {
// TODO: Remove version when the API is finalized, and add the configured org name.
Module metricsLogsModule = new Module(ORG_NAME, MODULE_NAME, "1");
environment.getRuntime().callFunction(metricsLogsModule, METRIC_LOG_FUNCTION_NAME, null, logAttributes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.ballerina.stdlib.observe.observers;

import io.ballerina.runtime.observability.BallerinaObserver;
import io.ballerina.runtime.observability.ObserverContext;
import io.ballerina.runtime.observability.metrics.DefaultMetricRegistry;
import io.ballerina.runtime.observability.metrics.Gauge;
import io.ballerina.runtime.observability.metrics.MetricId;
import io.ballerina.runtime.observability.metrics.MetricRegistry;
import io.ballerina.runtime.observability.metrics.StatisticConfig;
import io.ballerina.runtime.observability.metrics.Tag;

import java.io.PrintStream;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static io.ballerina.runtime.observability.ObservabilityConstants.PROPERTY_KEY_HTTP_STATUS_CODE;
import static io.ballerina.runtime.observability.ObservabilityConstants.STATUS_CODE_GROUP_SUFFIX;
import static io.ballerina.runtime.observability.ObservabilityConstants.TAG_KEY_HTTP_STATUS_CODE_GROUP;

/**
* Observe the runtime and collect measurements.
*/
public class BallerinaMetricsObserver implements BallerinaObserver {

private static final String PROPERTY_START_TIME = "_observation_start_time_";
private static final String PROPERTY_IN_PROGRESS_COUNTER = "_observation_in_progress_counter_";

private static final PrintStream consoleError = System.err;

private static final MetricRegistry metricRegistry = DefaultMetricRegistry.getInstance();

private static final StatisticConfig[] responseTimeStatisticConfigs = new StatisticConfig[]{
StatisticConfig.builder()
.expiry(Duration.ofSeconds(10))
.percentiles(StatisticConfig.DEFAULT.getPercentiles())
.build(),
StatisticConfig.builder()
.expiry(Duration.ofMinutes(1))
.percentiles(StatisticConfig.DEFAULT.getPercentiles())
.build(),
StatisticConfig.builder()
.expiry(Duration.ofMinutes(5))
.percentiles(StatisticConfig.DEFAULT.getPercentiles())
.build()
};

@Override
public void startServerObservation(ObserverContext observerContext) {
startObservation(observerContext);
}

@Override
public void startClientObservation(ObserverContext observerContext) {
startObservation(observerContext);
}

@Override
public void stopServerObservation(ObserverContext observerContext) {
if (!observerContext.isStarted()) {
// Do not collect metrics if the observation hasn't started
return;
}
stopObservation(observerContext);
}

@Override
public void stopClientObservation(ObserverContext observerContext) {
if (!observerContext.isStarted()) {
// Do not collect metrics if the observation hasn't started
return;
}
stopObservation(observerContext);
}

private void startObservation(ObserverContext observerContext) {
observerContext.addProperty(PROPERTY_START_TIME, System.nanoTime());
Set<Tag> tags = observerContext.getAllTags();
try {
Gauge inProgressGauge = metricRegistry.gauge(new MetricId("inprogress_requests", "In-progress requests",
tags));
inProgressGauge.increment();
/*
* The in progress counter is stored so that the same counter can be decremted when the observation
* ends. This is needed as the the program may add tags to the context causing the tags to be
* different at the end compared to the start.
*/
observerContext.addProperty(PROPERTY_IN_PROGRESS_COUNTER, inProgressGauge);
} catch (RuntimeException e) {
handleError("inprogress_requests", tags, e);
}
}

private void stopObservation(ObserverContext observerContext) {
Set<Tag> tags = new HashSet<>();
Map<String, Tag> customTags = observerContext.customMetricTags;
if (customTags != null) {
tags.addAll(customTags.values());
}
tags.addAll(observerContext.getAllTags());

// Add status_code_group tag
Integer statusCode = (Integer) observerContext.getProperty(PROPERTY_KEY_HTTP_STATUS_CODE);
if (statusCode != null && statusCode > 0) {
tags.add(Tag.of(TAG_KEY_HTTP_STATUS_CODE_GROUP, (statusCode / 100) + STATUS_CODE_GROUP_SUFFIX));
}

try {
Long startTime = (Long) observerContext.getProperty(PROPERTY_START_TIME);
long duration = System.nanoTime() - startTime;
((Gauge) observerContext.getProperty(PROPERTY_IN_PROGRESS_COUNTER)).decrement();
metricRegistry.gauge(new MetricId("response_time_seconds",
"Response time", tags), responseTimeStatisticConfigs).setValue(duration / 1E9);
metricRegistry.counter(new MetricId("response_time_nanoseconds_total",
"Total response response time for all requests", tags)).increment(duration);
metricRegistry.counter(new MetricId("requests_total",
"Total number of requests", tags)).increment();
if (statusCode != null && 400 <= statusCode && statusCode < 600) {
metricRegistry.counter(new MetricId("response_errors_total",
"Total number of response errors", tags)).increment();
}
} catch (RuntimeException e) {
handleError("multiple metrics", tags, e);
}
}

private void handleError(String metricName, Set<Tag> tags, RuntimeException e) {
// Metric Provider may throw exceptions if there is a mismatch in tags.
consoleError.println("error: error collecting metrics for " + metricName + " with tags " + tags +
": " + e.getMessage());
}
}
Loading

0 comments on commit 3c54658

Please sign in to comment.