Skip to content

Commit

Permalink
Add APM metrics to HealthPeriodicLogger (elastic#102765)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattc58 authored Dec 12, 2023
1 parent a9546a0 commit 9eea780
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 25 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/102765.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102765
summary: "Add APM metrics to `HealthPeriodicLogger`"
area: Health
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterPortSettings.MAX_REQUEST_HEADER_SIZE,
HealthPeriodicLogger.POLL_INTERVAL_SETTING,
HealthPeriodicLogger.ENABLED_SETTING,
HealthPeriodicLogger.OUTPUT_MODE_SETTING,
DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING,
IndicesClusterStateService.SHARD_LOCK_RETRY_INTERVAL_SETTING,
IndicesClusterStateService.SHARD_LOCK_RETRY_TIMEOUT_SETTING,
Expand Down
168 changes: 158 additions & 10 deletions server/src/main/java/org/elasticsearch/health/HealthPeriodicLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,24 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.LongGaugeMetric;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.io.Closeable;
import java.time.Clock;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.elasticsearch.health.HealthStatus.GREEN;
import static org.elasticsearch.health.HealthStatus.RED;

/**
* This class periodically logs the results of the Health API to the standard Elasticsearch server log file.
Expand All @@ -45,6 +53,37 @@ public class HealthPeriodicLogger implements ClusterStateListener, Closeable, Sc
public static final String HEALTH_FIELD_PREFIX = "elasticsearch.health";
public static final String MESSAGE_FIELD = "message";

/**
* Valid modes of output for this logger
*/
public enum OutputMode {
LOGS("logs"),
METRICS("metrics");

private final String mode;

OutputMode(String mode) {
this.mode = mode;
}

public static OutputMode fromString(String mode) {
return valueOf(mode.toUpperCase(Locale.ROOT));
}

@Override
public String toString() {
return this.mode.toLowerCase(Locale.ROOT);
}

static OutputMode parseOutputMode(String value) {
try {
return OutputMode.fromString(value);
} catch (Exception e) {
throw new IllegalArgumentException("Illegal OutputMode:" + value);
}
}
}

public static final Setting<TimeValue> POLL_INTERVAL_SETTING = Setting.timeSetting(
"health.periodic_logger.poll_interval",
TimeValue.timeValueSeconds(60),
Expand All @@ -60,6 +99,14 @@ public class HealthPeriodicLogger implements ClusterStateListener, Closeable, Sc
Setting.Property.NodeScope
);

public static final Setting<List<OutputMode>> OUTPUT_MODE_SETTING = Setting.listSetting(
"health.periodic_logger.output_mode",
List.of(OutputMode.LOGS.toString(), OutputMode.METRICS.toString()),
OutputMode::parseOutputMode,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Name constant for the job HealthService schedules
*/
Expand All @@ -81,9 +128,18 @@ public class HealthPeriodicLogger implements ClusterStateListener, Closeable, Sc
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
private volatile TimeValue pollInterval;
private volatile boolean enabled;
private volatile Set<OutputMode> outputModes;

private static final Logger logger = LogManager.getLogger(HealthPeriodicLogger.class);

private final MeterRegistry meterRegistry;
private final Map<String, LongGaugeMetric> redMetrics = new HashMap<>();

// Writers for logs or messages
// default visibility for testing purposes
private final BiConsumer<LongGaugeMetric, Long> metricWriter;
private final Consumer<ESLogMessage> logWriter;

/**
* Creates a new HealthPeriodicLogger.
* This creates a scheduled job using the SchedulerEngine framework and runs it on the current health node.
Expand All @@ -92,26 +148,63 @@ public class HealthPeriodicLogger implements ClusterStateListener, Closeable, Sc
* @param clusterService the cluster service, used to know when the health node changes.
* @param client the client used to call the Health Service.
* @param healthService the Health Service, where the actual Health API logic lives.
* @param telemetryProvider used to get the meter registry for metrics
*/
public static HealthPeriodicLogger create(
Settings settings,
ClusterService clusterService,
Client client,
HealthService healthService
HealthService healthService,
TelemetryProvider telemetryProvider
) {
return HealthPeriodicLogger.create(settings, clusterService, client, healthService, telemetryProvider, null, null);
}

static HealthPeriodicLogger create(
Settings settings,
ClusterService clusterService,
Client client,
HealthService healthService,
TelemetryProvider telemetryProvider,
BiConsumer<LongGaugeMetric, Long> metricWriter,
Consumer<ESLogMessage> logWriter
) {
HealthPeriodicLogger logger = new HealthPeriodicLogger(settings, clusterService, client, healthService);
HealthPeriodicLogger logger = new HealthPeriodicLogger(
settings,
clusterService,
client,
healthService,
telemetryProvider.getMeterRegistry(),
metricWriter,
logWriter
);
logger.registerListeners();
return logger;
}

private HealthPeriodicLogger(Settings settings, ClusterService clusterService, Client client, HealthService healthService) {
private HealthPeriodicLogger(
Settings settings,
ClusterService clusterService,
Client client,
HealthService healthService,
MeterRegistry meterRegistry,
BiConsumer<LongGaugeMetric, Long> metricWriter,
Consumer<ESLogMessage> logWriter
) {
this.settings = settings;
this.clusterService = clusterService;
this.client = client;
this.healthService = healthService;
this.clock = Clock.systemUTC();
this.pollInterval = POLL_INTERVAL_SETTING.get(settings);
this.enabled = ENABLED_SETTING.get(settings);
this.outputModes = new HashSet<>(OUTPUT_MODE_SETTING.get(settings));
this.meterRegistry = meterRegistry;
this.metricWriter = metricWriter == null ? LongGaugeMetric::set : metricWriter;
this.logWriter = logWriter == null ? logger::info : logWriter;

// create metric for overall level metrics
this.redMetrics.put("overall", LongGaugeMetric.create(this.meterRegistry, "es.health.overall.red", "Overall: Red", "{cluster}"));
}

private void registerListeners() {
Expand All @@ -120,6 +213,7 @@ private void registerListeners() {
}
clusterService.getClusterSettings().addSettingsUpdateConsumer(ENABLED_SETTING, this::enable);
clusterService.getClusterSettings().addSettingsUpdateConsumer(POLL_INTERVAL_SETTING, this::updatePollInterval);
clusterService.getClusterSettings().addSettingsUpdateConsumer(OUTPUT_MODE_SETTING, this::updateOutputModes);
}

@Override
Expand Down Expand Up @@ -186,7 +280,6 @@ SchedulerEngine getScheduler() {

/**
* Create a Map of the results, which is then turned into JSON for logging.
*
* The structure looks like:
* {"elasticsearch.health.overall.status": "green", "elasticsearch.health.[other indicators].status": "green"}
* Only the indicator status values are included, along with the computed top-level status.
Expand All @@ -202,7 +295,7 @@ static Map<String, Object> convertToLoggedFields(List<HealthIndicatorResult> ind
final Map<String, Object> result = new HashMap<>();

// overall status
final HealthStatus status = HealthStatus.merge(indicatorResults.stream().map(HealthIndicatorResult::status));
final HealthStatus status = calculateOverallStatus(indicatorResults);
result.put(String.format(Locale.ROOT, "%s.overall.status", HEALTH_FIELD_PREFIX), status.xContentValue());

// top-level status for each indicator
Expand All @@ -228,6 +321,10 @@ static Map<String, Object> convertToLoggedFields(List<HealthIndicatorResult> ind
return result;
}

static HealthStatus calculateOverallStatus(List<HealthIndicatorResult> indicatorResults) {
return HealthStatus.merge(indicatorResults.stream().map(HealthIndicatorResult::status));
}

/**
* Handle the result of the Health Service getHealth call
*/
Expand All @@ -236,13 +333,21 @@ static Map<String, Object> convertToLoggedFields(List<HealthIndicatorResult> ind
@Override
public void onResponse(List<HealthIndicatorResult> healthIndicatorResults) {
try {
Map<String, Object> resultsMap = convertToLoggedFields(healthIndicatorResults);
if (logsEnabled()) {
Map<String, Object> resultsMap = convertToLoggedFields(healthIndicatorResults);

// if we have a valid response, log in JSON format
if (resultsMap.isEmpty() == false) {
ESLogMessage msg = new ESLogMessage().withFields(resultsMap);
logWriter.accept(msg);
}
}

// if we have a valid response, log in JSON format
if (resultsMap.isEmpty() == false) {
ESLogMessage msg = new ESLogMessage().withFields(resultsMap);
logger.info(msg);
// handle metrics
if (metricsEnabled()) {
writeMetrics(healthIndicatorResults);
}

} catch (Exception e) {
logger.warn("Health Periodic Logger error:{}", e.toString());
}
Expand All @@ -254,6 +359,49 @@ public void onFailure(Exception e) {
}
};

/**
* Write (and possibly create) the APM metrics
*/
// default visibility for testing purposes
void writeMetrics(List<HealthIndicatorResult> healthIndicatorResults) {
if (healthIndicatorResults != null) {
for (HealthIndicatorResult result : healthIndicatorResults) {
String metricName = result.name();
LongGaugeMetric metric = this.redMetrics.get(metricName);
if (metric == null) {
metric = LongGaugeMetric.create(
this.meterRegistry,
String.format(Locale.ROOT, "es.health.%s.red", metricName),
String.format(Locale.ROOT, "%s: Red", metricName),
"{cluster}"
);
this.redMetrics.put(metricName, metric);
}
metricWriter.accept(metric, result.status() == RED ? 1L : 0L);
}

metricWriter.accept(this.redMetrics.get("overall"), calculateOverallStatus(healthIndicatorResults) == RED ? 1L : 0L);
}
}

private void updateOutputModes(List<OutputMode> newMode) {
this.outputModes = new HashSet<>(newMode);
}

/**
* Returns true if any of the outputModes are set to logs
*/
private boolean logsEnabled() {
return this.outputModes.contains(OutputMode.LOGS);
}

/**
* Returns true if any of the outputModes are set to metrics
*/
private boolean metricsEnabled() {
return this.outputModes.contains(OutputMode.METRICS);
}

/**
* Create the SchedulerEngine.Job if this node is the health node
*/
Expand Down
21 changes: 18 additions & 3 deletions server/src/main/java/org/elasticsearch/node/NodeConstruction.java
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,15 @@ record PluginServiceInstances(

modules.add(
loadPluginShutdownService(clusterService),
loadDiagnosticServices(settings, discoveryModule.getCoordinator(), clusterService, transportService, featureService, threadPool)
loadDiagnosticServices(
settings,
discoveryModule.getCoordinator(),
clusterService,
transportService,
featureService,
threadPool,
telemetryProvider
)
);

RecoveryPlannerService recoveryPlannerService = getRecoveryPlannerService(threadPool, clusterService, repositoryService);
Expand Down Expand Up @@ -1132,7 +1140,8 @@ private Module loadDiagnosticServices(
ClusterService clusterService,
TransportService transportService,
FeatureService featureService,
ThreadPool threadPool
ThreadPool threadPool,
TelemetryProvider telemetryProvider
) {

MasterHistoryService masterHistoryService = new MasterHistoryService(transportService, threadPool, clusterService);
Expand All @@ -1156,7 +1165,13 @@ private Module loadDiagnosticServices(
Stream.concat(serverHealthIndicatorServices, pluginHealthIndicatorServices).toList(),
threadPool
);
HealthPeriodicLogger healthPeriodicLogger = HealthPeriodicLogger.create(settings, clusterService, client, healthService);
HealthPeriodicLogger healthPeriodicLogger = HealthPeriodicLogger.create(
settings,
clusterService,
client,
healthService,
telemetryProvider
);
HealthMetadataService healthMetadataService = HealthMetadataService.create(clusterService, featureService, settings);
LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(
settings,
Expand Down
Loading

0 comments on commit 9eea780

Please sign in to comment.