diff --git a/docs/changelog/103130.yaml b/docs/changelog/103130.yaml new file mode 100644 index 0000000000000..3ef56ae84d123 --- /dev/null +++ b/docs/changelog/103130.yaml @@ -0,0 +1,5 @@ +pr: 103130 +summary: Create a DSL health indicator as part of the health API +area: Health +type: feature +issues: [] diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java index d3eaee36f67f7..03bd753e29068 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry; import org.elasticsearch.action.datastreams.lifecycle.ExplainIndexDataStreamLifecycle; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamAction; @@ -46,6 +47,11 @@ import org.elasticsearch.datastreams.DataStreamsPlugin; import org.elasticsearch.datastreams.lifecycle.action.ExplainDataStreamLifecycleAction; import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamLifecycleAction; +import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService; +import org.elasticsearch.health.Diagnosis; +import org.elasticsearch.health.GetHealthAction; +import org.elasticsearch.health.HealthIndicatorResult; +import org.elasticsearch.health.HealthStatus; import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo; import org.elasticsearch.health.node.DslErrorInfo; import org.elasticsearch.health.node.FetchHealthInfoCacheAction; @@ -76,9 +82,12 @@ import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING; import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.ONE_HUNDRED_MB; import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.TARGET_MERGE_FACTOR_VALUE; +import static org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService.STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF; +import static org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService.STAGNATING_INDEX_IMPACT; import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE; import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -447,6 +456,27 @@ public void testErrorRecordingOnRollover() throws Exception { assertThat(errorInfo.retryCount(), greaterThanOrEqualTo(3)); }); + GetHealthAction.Response healthResponse = client().execute(GetHealthAction.INSTANCE, new GetHealthAction.Request(true, 1000)) + .actionGet(); + HealthIndicatorResult masterIsStableIndicator = healthResponse.findIndicator(StableMasterHealthIndicatorService.NAME); + // if the cluster doesn't have a stable master we'll avoid asserting on the health report API as some indicators will not + // be computed + if (masterIsStableIndicator.status() == HealthStatus.GREEN) { + // the shards capacity indicator is dictating the overall status + assertThat(healthResponse.getStatus(), is(HealthStatus.RED)); + HealthIndicatorResult dslIndicator = healthResponse.findIndicator(DataStreamLifecycleHealthIndicatorService.NAME); + assertThat(dslIndicator.status(), is(HealthStatus.YELLOW)); + assertThat(dslIndicator.impacts(), is(STAGNATING_INDEX_IMPACT)); + assertThat( + dslIndicator.symptom(), + is("A backing index has repeatedly encountered errors whilst trying to advance in its lifecycle") + ); + + Diagnosis diagnosis = dslIndicator.diagnosisList().get(0); + assertThat(diagnosis.definition(), is(STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF)); + assertThat(diagnosis.affectedResources().get(0).getValues(), containsInAnyOrder(writeIndexName)); + } + // let's reset the cluster max shards per node limit to allow rollover to proceed and check the error store is empty updateClusterSettings(Settings.builder().putNull("*")); @@ -476,6 +506,14 @@ public void testErrorRecordingOnRollover() throws Exception { DataStreamLifecycleHealthInfo dslHealthInfoOnHealthNode = healthNodeResponse.getHealthInfo().dslHealthInfo(); assertThat(dslHealthInfoOnHealthNode, is(DataStreamLifecycleHealthInfo.NO_DSL_ERRORS)); }); + + healthResponse = client().execute(GetHealthAction.INSTANCE, new GetHealthAction.Request(true, 1000)).actionGet(); + masterIsStableIndicator = healthResponse.findIndicator(StableMasterHealthIndicatorService.NAME); + // if the cluster doesn't have a stable master we'll avoid asserting on the health report API as some indicators will not + // be computed + if (masterIsStableIndicator.status() == HealthStatus.GREEN) { + assertThat(healthResponse.getStatus(), is(HealthStatus.GREEN)); + } } public void testErrorRecordingOnRetention() throws Exception { @@ -569,6 +607,30 @@ public void testErrorRecordingOnRetention() throws Exception { assertThat(List.of(firstGenerationIndex, secondGenerationIndex).contains(errorInfo.indexName()), is(true)); }); + GetHealthAction.Response healthResponse = client().execute(GetHealthAction.INSTANCE, new GetHealthAction.Request(true, 1000)) + .actionGet(); + HealthIndicatorResult masterIsStableIndicator = healthResponse.findIndicator(StableMasterHealthIndicatorService.NAME); + // if the cluster doesn't have a stable master we'll avoid asserting on the health report API as some indicators will not + // be computed + if (masterIsStableIndicator.status() == HealthStatus.GREEN) { + // the dsl indicator should turn the overall status yellow + assertThat(healthResponse.getStatus(), is(HealthStatus.YELLOW)); + HealthIndicatorResult dslIndicator = healthResponse.findIndicator(DataStreamLifecycleHealthIndicatorService.NAME); + assertThat(dslIndicator.status(), is(HealthStatus.YELLOW)); + assertThat(dslIndicator.impacts(), is(STAGNATING_INDEX_IMPACT)); + assertThat( + dslIndicator.symptom(), + is("2 backing indices have repeatedly encountered errors whilst trying to advance in its lifecycle") + ); + + Diagnosis diagnosis = dslIndicator.diagnosisList().get(0); + assertThat(diagnosis.definition(), is(STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF)); + assertThat( + diagnosis.affectedResources().get(0).getValues(), + containsInAnyOrder(firstGenerationIndex, secondGenerationIndex) + ); + } + // let's mark the index as writeable and make sure it's deleted and the error store is empty updateIndexSettings(Settings.builder().put(READ_ONLY.settingName(), false), firstGenerationIndex); @@ -598,6 +660,20 @@ public void testErrorRecordingOnRetention() throws Exception { DataStreamLifecycleHealthInfo dslHealthInfoOnHealthNode = healthNodeResponse.getHealthInfo().dslHealthInfo(); assertThat(dslHealthInfoOnHealthNode, is(DataStreamLifecycleHealthInfo.NO_DSL_ERRORS)); }); + + healthResponse = client().execute(GetHealthAction.INSTANCE, new GetHealthAction.Request(true, 1000)).actionGet(); + masterIsStableIndicator = healthResponse.findIndicator(StableMasterHealthIndicatorService.NAME); + // if the cluster doesn't have a stable master we'll avoid asserting on the health report API as some indicators will not + // be computed + if (masterIsStableIndicator.status() == HealthStatus.GREEN) { + // the dsl indicator should turn the overall status yellow + assertThat(healthResponse.getStatus(), is(HealthStatus.GREEN)); + HealthIndicatorResult dslIndicator = healthResponse.findIndicator(DataStreamLifecycleHealthIndicatorService.NAME); + assertThat(dslIndicator.status(), is(HealthStatus.GREEN)); + assertThat(dslIndicator.impacts().size(), is(0)); + assertThat(dslIndicator.symptom(), is("Data streams are executing their lifecycles without issues")); + assertThat(dslIndicator.diagnosisList().size(), is(0)); + } } finally { // when the test executes successfully this will not be needed however, otherwise we need to make sure the index is // "delete-able" for test cleanup diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index 9ac3a1afed5a5..fb93b7d688a74 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -47,6 +47,7 @@ import org.elasticsearch.datastreams.lifecycle.action.TransportGetDataStreamLifecycleAction; import org.elasticsearch.datastreams.lifecycle.action.TransportGetDataStreamLifecycleStatsAction; import org.elasticsearch.datastreams.lifecycle.action.TransportPutDataStreamLifecycleAction; +import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService; import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher; import org.elasticsearch.datastreams.lifecycle.rest.RestDataStreamLifecycleStatsAction; import org.elasticsearch.datastreams.lifecycle.rest.RestDeleteDataStreamLifecycleAction; @@ -60,8 +61,10 @@ import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction; import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction; import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction; +import org.elasticsearch.health.HealthIndicatorService; import org.elasticsearch.index.IndexSettingProvider; import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.HealthPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; @@ -76,7 +79,7 @@ import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN; -public class DataStreamsPlugin extends Plugin implements ActionPlugin { +public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin { public static final Setting TIME_SERIES_POLL_INTERVAL = Setting.timeSetting( "time_series.poll_interval", @@ -112,6 +115,7 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin { private final SetOnce dataLifecycleInitialisationService = new SetOnce<>(); private final SetOnce dataStreamLifecycleErrorsPublisher = new SetOnce<>(); + private final SetOnce dataStreamLifecycleHealthIndicatorService = new SetOnce<>(); private final Settings settings; public DataStreamsPlugin(Settings settings) { @@ -184,6 +188,8 @@ public Collection createComponents(PluginServices services) { ) ); dataLifecycleInitialisationService.get().init(); + dataStreamLifecycleHealthIndicatorService.set(new DataStreamLifecycleHealthIndicatorService()); + components.add(errorStoreInitialisationService.get()); components.add(dataLifecycleInitialisationService.get()); components.add(dataStreamLifecycleErrorsPublisher.get()); @@ -251,4 +257,9 @@ public void close() throws IOException { throw new ElasticsearchException("unable to close the data stream lifecycle service", e); } } + + @Override + public Collection getHealthIndicatorServices() { + return List.of(dataStreamLifecycleHealthIndicatorService.get()); + } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthIndicatorService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthIndicatorService.java new file mode 100644 index 0000000000000..0628bed0f9019 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthIndicatorService.java @@ -0,0 +1,125 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams.lifecycle.health; + +import org.elasticsearch.health.Diagnosis; +import org.elasticsearch.health.HealthIndicatorDetails; +import org.elasticsearch.health.HealthIndicatorImpact; +import org.elasticsearch.health.HealthIndicatorResult; +import org.elasticsearch.health.HealthIndicatorService; +import org.elasticsearch.health.HealthStatus; +import org.elasticsearch.health.ImpactArea; +import org.elasticsearch.health.SimpleHealthIndicatorDetails; +import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo; +import org.elasticsearch.health.node.DslErrorInfo; +import org.elasticsearch.health.node.HealthInfo; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +public class DataStreamLifecycleHealthIndicatorService implements HealthIndicatorService { + + public static final String NAME = "data_stream_lifecycle"; + public static final String DSL_EXPLAIN_HELP_URL = "https://ela.st/explain-data-stream-lifecycle"; + + public static final String STAGNATING_BACKING_INDEX_IMPACT_ID = "stagnating_backing_index"; + + public static final List STAGNATING_INDEX_IMPACT = List.of( + new HealthIndicatorImpact( + NAME, + STAGNATING_BACKING_INDEX_IMPACT_ID, + 3, + "Data streams backing indices cannot make progress in their lifecycle. The performance and " + + "stability of the indices and/or the cluster could be impacted.", + List.of(ImpactArea.DEPLOYMENT_MANAGEMENT) + ) + ); + + public static final Diagnosis.Definition STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF = new Diagnosis.Definition( + NAME, + "stagnating_dsl_backing_index", + "Some backing indices are repeatedly encountering errors in their lifecycle execution.", + "Check the current status of the affected indices using the [GET //_lifecycle/explain] API. Please " + + "replace the in the API with the actual index name (or the data stream name for a wider overview).", + DSL_EXPLAIN_HELP_URL + ); + + @Override + public String name() { + return NAME; + } + + @Override + public HealthIndicatorResult calculate(boolean verbose, int maxAffectedResourcesCount, HealthInfo healthInfo) { + DataStreamLifecycleHealthInfo dataStreamLifecycleHealthInfo = healthInfo.dslHealthInfo(); + if (dataStreamLifecycleHealthInfo == null) { + // DSL reports health information on every run, so data will eventually arrive to the health node. In the meantime, let's + // report UNKNOWN health + return createIndicator( + HealthStatus.GREEN, + "No data stream lifecycle health data available yet. Health information will be reported after the first run.", + HealthIndicatorDetails.EMPTY, + List.of(), + List.of() + ); + } + + List stagnatingBackingIndices = dataStreamLifecycleHealthInfo.dslErrorsInfo(); + if (stagnatingBackingIndices.isEmpty()) { + return createIndicator( + HealthStatus.GREEN, + "Data streams are executing their lifecycles without issues", + createDetails(verbose, dataStreamLifecycleHealthInfo), + List.of(), + List.of() + ); + } else { + List affectedIndices = stagnatingBackingIndices.stream() + .map(DslErrorInfo::indexName) + .limit(Math.min(maxAffectedResourcesCount, stagnatingBackingIndices.size())) + .collect(toList()); + return createIndicator( + HealthStatus.YELLOW, + (stagnatingBackingIndices.size() > 1 ? stagnatingBackingIndices.size() + " backing indices have" : "A backing index has") + + " repeatedly encountered errors whilst trying to advance in its lifecycle", + createDetails(verbose, dataStreamLifecycleHealthInfo), + STAGNATING_INDEX_IMPACT, + List.of( + new Diagnosis( + STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF, + List.of(new Diagnosis.Resource(Diagnosis.Resource.Type.INDEX, affectedIndices)) + ) + ) + ); + } + } + + private static HealthIndicatorDetails createDetails(boolean verbose, DataStreamLifecycleHealthInfo dataStreamLifecycleHealthInfo) { + if (verbose == false) { + return HealthIndicatorDetails.EMPTY; + } + + var details = new HashMap(); + details.put("total_backing_indices_in_error", dataStreamLifecycleHealthInfo.totalErrorEntriesCount()); + details.put("stagnating_backing_indices_count", dataStreamLifecycleHealthInfo.dslErrorsInfo().size()); + if (dataStreamLifecycleHealthInfo.dslErrorsInfo().isEmpty() == false) { + details.put("stagnating_backing_indices", dataStreamLifecycleHealthInfo.dslErrorsInfo().stream().map(dslError -> { + LinkedHashMap errorDetails = new LinkedHashMap<>(3, 1L); + errorDetails.put("index_name", dslError.indexName()); + errorDetails.put("first_occurrence_timestamp", dslError.firstOccurrence()); + errorDetails.put("retry_count", dslError.retryCount()); + return errorDetails; + }).toList()); + } + return new SimpleHealthIndicatorDetails(details); + } +} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthIndicatorServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthIndicatorServiceTests.java new file mode 100644 index 0000000000000..877b463301311 --- /dev/null +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthIndicatorServiceTests.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams.lifecycle.health; + +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.common.Strings; +import org.elasticsearch.health.Diagnosis; +import org.elasticsearch.health.HealthIndicatorDetails; +import org.elasticsearch.health.HealthIndicatorResult; +import org.elasticsearch.health.HealthStatus; +import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo; +import org.elasticsearch.health.node.DslErrorInfo; +import org.elasticsearch.health.node.HealthInfo; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService.STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF; +import static org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService.STAGNATING_INDEX_IMPACT; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.core.IsNot.not; + +public class DataStreamLifecycleHealthIndicatorServiceTests extends ESTestCase { + + private DataStreamLifecycleHealthIndicatorService service; + + @Before + public void setupService() { + service = new DataStreamLifecycleHealthIndicatorService(); + } + + public void testGreenWhenNoDSLHealthData() { + HealthIndicatorResult result = service.calculate(true, new HealthInfo(Map.of(), null)); + assertThat(result.status(), is(HealthStatus.GREEN)); + assertThat( + result.symptom(), + is("No data stream lifecycle health data available yet. Health information will be reported after the first run.") + ); + assertThat(result.details(), is(HealthIndicatorDetails.EMPTY)); + assertThat(result.impacts(), is(List.of())); + assertThat(result.diagnosisList(), is(List.of())); + } + + public void testGreenWhenEmptyListOfStagnatingIndices() { + HealthIndicatorResult result = service.calculate(true, new HealthInfo(Map.of(), new DataStreamLifecycleHealthInfo(List.of(), 15))); + assertThat(result.status(), is(HealthStatus.GREEN)); + assertThat(result.symptom(), is("Data streams are executing their lifecycles without issues")); + assertThat(result.details(), is(not(HealthIndicatorDetails.EMPTY))); + assertThat(Strings.toString(result.details()), containsString("\"total_backing_indices_in_error\":15")); + assertThat(result.impacts(), is(List.of())); + assertThat(result.diagnosisList(), is(List.of())); + } + + public void testYellowWhenStagnatingIndicesPresent() { + String secondGenerationIndex = DataStream.getDefaultBackingIndexName("foo", 2L); + String firstGenerationIndex = DataStream.getDefaultBackingIndexName("foo", 1L); + HealthIndicatorResult result = service.calculate( + true, + new HealthInfo( + Map.of(), + new DataStreamLifecycleHealthInfo( + List.of(new DslErrorInfo(secondGenerationIndex, 1L, 200), new DslErrorInfo(firstGenerationIndex, 3L, 100)), + 15 + ) + ) + ); + assertThat(result.status(), is(HealthStatus.YELLOW)); + assertThat(result.symptom(), is("2 backing indices have repeatedly encountered errors whilst trying to advance in its lifecycle")); + assertThat(result.details(), is(not(HealthIndicatorDetails.EMPTY))); + String detailsAsString = Strings.toString(result.details()); + assertThat(detailsAsString, containsString("\"total_backing_indices_in_error\":15")); + assertThat(detailsAsString, containsString("\"stagnating_backing_indices_count\":2")); + assertThat( + detailsAsString, + containsString( + String.format( + Locale.ROOT, + "\"index_name\":\"%s\"," + + "\"first_occurrence_timestamp\":1,\"retry_count\":200},{\"index_name\":\"%s\"," + + "\"first_occurrence_timestamp\":3,\"retry_count\":100", + secondGenerationIndex, + firstGenerationIndex + ) + ) + ); + assertThat(result.impacts(), is(STAGNATING_INDEX_IMPACT)); + Diagnosis diagnosis = result.diagnosisList().get(0); + assertThat(diagnosis.definition(), is(STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF)); + assertThat(diagnosis.affectedResources().get(0).getValues(), containsInAnyOrder(secondGenerationIndex, firstGenerationIndex)); + } +}