Skip to content

Commit

Permalink
Create a DSL health indicator as part of the health API (elastic#103130)
Browse files Browse the repository at this point in the history
This adds a health indicator named `data_stream_lifecycle` that will
detect data stream backing indices that cannot make progress
(stagnating) due to repeatedly error-ing in their lifecycle execution. 

The output of the indicator looks like this:

```
"data_stream_lifecycle" : {
    "status" : "yellow",
    "symptom" : "2 backing indices have repeatedly encountered errors whilst trying to advance in its lifecycle",
    "details" : {
      "stagnating_backing_indices_count" : 2,
      "stagnating_backing_indices" : [
        {
          "index_name" : ".ds-metrics-foo-2023.12.07-000002",
          "first_occurrence_timestamp" : 1701951305340,
          "retry_count" : 4
        },
        {
          "index_name" : ".ds-metrics-foo-2023.12.07-000001",
          "first_occurrence_timestamp" : 1701951305340,
          "retry_count" : 4
        }
      ],
      "total_backing_indices_in_error" : 2
    },
    "impacts" : [
      {
        "id" : "elasticsearch:health:dsl:impact:stagnating_backing_index",
        "severity" : 3,
        "description" : "Data streams backing indices cannot make progress in their lifecycle. The performance and stability of the indices and/or the cluster could be impacted.",
        "impact_areas" : [
          "deployment_management"
        ]
      }
    ],
    "diagnosis" : [
      {
        "id" : "elasticsearch:health:dsl:diagnosis:stagnating_dsl_backing_index",
        "cause" : "Some backing indices are repeatedly encountering errors in their lifecycle execution.",
        "action" : "Check the current status of the affected indices using the [GET /<affected_index_name>/_lifecycle/explain] API. Please replace the <affected_index_name> in the API with the actual index name (or the data stream name for a wider overview).",
        "help_url" : "https://ela.st/dsl-explain",
        "affected_resources" : {
          "indices" : [
            ".ds-metrics-foo-2023.12.07-000002",
            ".ds-metrics-foo-2023.12.07-000001"
          ]
        }
      }
    ]
  }
```

Documentation will follow in a subsequent PR.
  • Loading branch information
andreidan authored Dec 8, 2023
1 parent 6e0c031 commit de70fcd
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 1 deletion.
5 changes: 5 additions & 0 deletions docs/changelog/103130.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 103130
summary: Create a DSL health indicator as part of the health API
area: Health
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry;
import org.elasticsearch.action.datastreams.lifecycle.ExplainIndexDataStreamLifecycle;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAction;
Expand All @@ -46,6 +47,11 @@
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.datastreams.lifecycle.action.ExplainDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService;
import org.elasticsearch.health.Diagnosis;
import org.elasticsearch.health.GetHealthAction;
import org.elasticsearch.health.HealthIndicatorResult;
import org.elasticsearch.health.HealthStatus;
import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo;
import org.elasticsearch.health.node.DslErrorInfo;
import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
Expand Down Expand Up @@ -76,9 +82,12 @@
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.ONE_HUNDRED_MB;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.TARGET_MERGE_FACTOR_VALUE;
import static org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService.STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF;
import static org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService.STAGNATING_INDEX_IMPACT;
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -447,6 +456,27 @@ public void testErrorRecordingOnRollover() throws Exception {
assertThat(errorInfo.retryCount(), greaterThanOrEqualTo(3));
});

GetHealthAction.Response healthResponse = client().execute(GetHealthAction.INSTANCE, new GetHealthAction.Request(true, 1000))
.actionGet();
HealthIndicatorResult masterIsStableIndicator = healthResponse.findIndicator(StableMasterHealthIndicatorService.NAME);
// if the cluster doesn't have a stable master we'll avoid asserting on the health report API as some indicators will not
// be computed
if (masterIsStableIndicator.status() == HealthStatus.GREEN) {
// the shards capacity indicator is dictating the overall status
assertThat(healthResponse.getStatus(), is(HealthStatus.RED));
HealthIndicatorResult dslIndicator = healthResponse.findIndicator(DataStreamLifecycleHealthIndicatorService.NAME);
assertThat(dslIndicator.status(), is(HealthStatus.YELLOW));
assertThat(dslIndicator.impacts(), is(STAGNATING_INDEX_IMPACT));
assertThat(
dslIndicator.symptom(),
is("A backing index has repeatedly encountered errors whilst trying to advance in its lifecycle")
);

Diagnosis diagnosis = dslIndicator.diagnosisList().get(0);
assertThat(diagnosis.definition(), is(STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF));
assertThat(diagnosis.affectedResources().get(0).getValues(), containsInAnyOrder(writeIndexName));
}

// let's reset the cluster max shards per node limit to allow rollover to proceed and check the error store is empty
updateClusterSettings(Settings.builder().putNull("*"));

Expand Down Expand Up @@ -476,6 +506,14 @@ public void testErrorRecordingOnRollover() throws Exception {
DataStreamLifecycleHealthInfo dslHealthInfoOnHealthNode = healthNodeResponse.getHealthInfo().dslHealthInfo();
assertThat(dslHealthInfoOnHealthNode, is(DataStreamLifecycleHealthInfo.NO_DSL_ERRORS));
});

healthResponse = client().execute(GetHealthAction.INSTANCE, new GetHealthAction.Request(true, 1000)).actionGet();
masterIsStableIndicator = healthResponse.findIndicator(StableMasterHealthIndicatorService.NAME);
// if the cluster doesn't have a stable master we'll avoid asserting on the health report API as some indicators will not
// be computed
if (masterIsStableIndicator.status() == HealthStatus.GREEN) {
assertThat(healthResponse.getStatus(), is(HealthStatus.GREEN));
}
}

public void testErrorRecordingOnRetention() throws Exception {
Expand Down Expand Up @@ -569,6 +607,30 @@ public void testErrorRecordingOnRetention() throws Exception {
assertThat(List.of(firstGenerationIndex, secondGenerationIndex).contains(errorInfo.indexName()), is(true));
});

GetHealthAction.Response healthResponse = client().execute(GetHealthAction.INSTANCE, new GetHealthAction.Request(true, 1000))
.actionGet();
HealthIndicatorResult masterIsStableIndicator = healthResponse.findIndicator(StableMasterHealthIndicatorService.NAME);
// if the cluster doesn't have a stable master we'll avoid asserting on the health report API as some indicators will not
// be computed
if (masterIsStableIndicator.status() == HealthStatus.GREEN) {
// the dsl indicator should turn the overall status yellow
assertThat(healthResponse.getStatus(), is(HealthStatus.YELLOW));
HealthIndicatorResult dslIndicator = healthResponse.findIndicator(DataStreamLifecycleHealthIndicatorService.NAME);
assertThat(dslIndicator.status(), is(HealthStatus.YELLOW));
assertThat(dslIndicator.impacts(), is(STAGNATING_INDEX_IMPACT));
assertThat(
dslIndicator.symptom(),
is("2 backing indices have repeatedly encountered errors whilst trying to advance in its lifecycle")
);

Diagnosis diagnosis = dslIndicator.diagnosisList().get(0);
assertThat(diagnosis.definition(), is(STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF));
assertThat(
diagnosis.affectedResources().get(0).getValues(),
containsInAnyOrder(firstGenerationIndex, secondGenerationIndex)
);
}

// let's mark the index as writeable and make sure it's deleted and the error store is empty
updateIndexSettings(Settings.builder().put(READ_ONLY.settingName(), false), firstGenerationIndex);

Expand Down Expand Up @@ -598,6 +660,20 @@ public void testErrorRecordingOnRetention() throws Exception {
DataStreamLifecycleHealthInfo dslHealthInfoOnHealthNode = healthNodeResponse.getHealthInfo().dslHealthInfo();
assertThat(dslHealthInfoOnHealthNode, is(DataStreamLifecycleHealthInfo.NO_DSL_ERRORS));
});

healthResponse = client().execute(GetHealthAction.INSTANCE, new GetHealthAction.Request(true, 1000)).actionGet();
masterIsStableIndicator = healthResponse.findIndicator(StableMasterHealthIndicatorService.NAME);
// if the cluster doesn't have a stable master we'll avoid asserting on the health report API as some indicators will not
// be computed
if (masterIsStableIndicator.status() == HealthStatus.GREEN) {
// the dsl indicator should turn the overall status yellow
assertThat(healthResponse.getStatus(), is(HealthStatus.GREEN));
HealthIndicatorResult dslIndicator = healthResponse.findIndicator(DataStreamLifecycleHealthIndicatorService.NAME);
assertThat(dslIndicator.status(), is(HealthStatus.GREEN));
assertThat(dslIndicator.impacts().size(), is(0));
assertThat(dslIndicator.symptom(), is("Data streams are executing their lifecycles without issues"));
assertThat(dslIndicator.diagnosisList().size(), is(0));
}
} finally {
// when the test executes successfully this will not be needed however, otherwise we need to make sure the index is
// "delete-able" for test cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.datastreams.lifecycle.action.TransportGetDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.action.TransportGetDataStreamLifecycleStatsAction;
import org.elasticsearch.datastreams.lifecycle.action.TransportPutDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService;
import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher;
import org.elasticsearch.datastreams.lifecycle.rest.RestDataStreamLifecycleStatsAction;
import org.elasticsearch.datastreams.lifecycle.rest.RestDeleteDataStreamLifecycleAction;
Expand All @@ -60,8 +61,10 @@
import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction;
import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction;
import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction;
import org.elasticsearch.health.HealthIndicatorService;
import org.elasticsearch.index.IndexSettingProvider;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.HealthPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
Expand All @@ -76,7 +79,7 @@

import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN;

public class DataStreamsPlugin extends Plugin implements ActionPlugin {
public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin {

public static final Setting<TimeValue> TIME_SERIES_POLL_INTERVAL = Setting.timeSetting(
"time_series.poll_interval",
Expand Down Expand Up @@ -112,6 +115,7 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin {

private final SetOnce<DataStreamLifecycleService> dataLifecycleInitialisationService = new SetOnce<>();
private final SetOnce<DataStreamLifecycleHealthInfoPublisher> dataStreamLifecycleErrorsPublisher = new SetOnce<>();
private final SetOnce<DataStreamLifecycleHealthIndicatorService> dataStreamLifecycleHealthIndicatorService = new SetOnce<>();
private final Settings settings;

public DataStreamsPlugin(Settings settings) {
Expand Down Expand Up @@ -184,6 +188,8 @@ public Collection<?> createComponents(PluginServices services) {
)
);
dataLifecycleInitialisationService.get().init();
dataStreamLifecycleHealthIndicatorService.set(new DataStreamLifecycleHealthIndicatorService());

components.add(errorStoreInitialisationService.get());
components.add(dataLifecycleInitialisationService.get());
components.add(dataStreamLifecycleErrorsPublisher.get());
Expand Down Expand Up @@ -251,4 +257,9 @@ public void close() throws IOException {
throw new ElasticsearchException("unable to close the data stream lifecycle service", e);
}
}

@Override
public Collection<HealthIndicatorService> getHealthIndicatorServices() {
return List.of(dataStreamLifecycleHealthIndicatorService.get());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.datastreams.lifecycle.health;

import org.elasticsearch.health.Diagnosis;
import org.elasticsearch.health.HealthIndicatorDetails;
import org.elasticsearch.health.HealthIndicatorImpact;
import org.elasticsearch.health.HealthIndicatorResult;
import org.elasticsearch.health.HealthIndicatorService;
import org.elasticsearch.health.HealthStatus;
import org.elasticsearch.health.ImpactArea;
import org.elasticsearch.health.SimpleHealthIndicatorDetails;
import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo;
import org.elasticsearch.health.node.DslErrorInfo;
import org.elasticsearch.health.node.HealthInfo;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;

import static java.util.stream.Collectors.toList;

public class DataStreamLifecycleHealthIndicatorService implements HealthIndicatorService {

public static final String NAME = "data_stream_lifecycle";
public static final String DSL_EXPLAIN_HELP_URL = "https://ela.st/explain-data-stream-lifecycle";

public static final String STAGNATING_BACKING_INDEX_IMPACT_ID = "stagnating_backing_index";

public static final List<HealthIndicatorImpact> STAGNATING_INDEX_IMPACT = List.of(
new HealthIndicatorImpact(
NAME,
STAGNATING_BACKING_INDEX_IMPACT_ID,
3,
"Data streams backing indices cannot make progress in their lifecycle. The performance and "
+ "stability of the indices and/or the cluster could be impacted.",
List.of(ImpactArea.DEPLOYMENT_MANAGEMENT)
)
);

public static final Diagnosis.Definition STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF = new Diagnosis.Definition(
NAME,
"stagnating_dsl_backing_index",
"Some backing indices are repeatedly encountering errors in their lifecycle execution.",
"Check the current status of the affected indices using the [GET /<affected_index_name>/_lifecycle/explain] API. Please "
+ "replace the <affected_index_name> in the API with the actual index name (or the data stream name for a wider overview).",
DSL_EXPLAIN_HELP_URL
);

@Override
public String name() {
return NAME;
}

@Override
public HealthIndicatorResult calculate(boolean verbose, int maxAffectedResourcesCount, HealthInfo healthInfo) {
DataStreamLifecycleHealthInfo dataStreamLifecycleHealthInfo = healthInfo.dslHealthInfo();
if (dataStreamLifecycleHealthInfo == null) {
// DSL reports health information on every run, so data will eventually arrive to the health node. In the meantime, let's
// report UNKNOWN health
return createIndicator(
HealthStatus.GREEN,
"No data stream lifecycle health data available yet. Health information will be reported after the first run.",
HealthIndicatorDetails.EMPTY,
List.of(),
List.of()
);
}

List<DslErrorInfo> stagnatingBackingIndices = dataStreamLifecycleHealthInfo.dslErrorsInfo();
if (stagnatingBackingIndices.isEmpty()) {
return createIndicator(
HealthStatus.GREEN,
"Data streams are executing their lifecycles without issues",
createDetails(verbose, dataStreamLifecycleHealthInfo),
List.of(),
List.of()
);
} else {
List<String> affectedIndices = stagnatingBackingIndices.stream()
.map(DslErrorInfo::indexName)
.limit(Math.min(maxAffectedResourcesCount, stagnatingBackingIndices.size()))
.collect(toList());
return createIndicator(
HealthStatus.YELLOW,
(stagnatingBackingIndices.size() > 1 ? stagnatingBackingIndices.size() + " backing indices have" : "A backing index has")
+ " repeatedly encountered errors whilst trying to advance in its lifecycle",
createDetails(verbose, dataStreamLifecycleHealthInfo),
STAGNATING_INDEX_IMPACT,
List.of(
new Diagnosis(
STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF,
List.of(new Diagnosis.Resource(Diagnosis.Resource.Type.INDEX, affectedIndices))
)
)
);
}
}

private static HealthIndicatorDetails createDetails(boolean verbose, DataStreamLifecycleHealthInfo dataStreamLifecycleHealthInfo) {
if (verbose == false) {
return HealthIndicatorDetails.EMPTY;
}

var details = new HashMap<String, Object>();
details.put("total_backing_indices_in_error", dataStreamLifecycleHealthInfo.totalErrorEntriesCount());
details.put("stagnating_backing_indices_count", dataStreamLifecycleHealthInfo.dslErrorsInfo().size());
if (dataStreamLifecycleHealthInfo.dslErrorsInfo().isEmpty() == false) {
details.put("stagnating_backing_indices", dataStreamLifecycleHealthInfo.dslErrorsInfo().stream().map(dslError -> {
LinkedHashMap<String, Object> errorDetails = new LinkedHashMap<>(3, 1L);
errorDetails.put("index_name", dslError.indexName());
errorDetails.put("first_occurrence_timestamp", dslError.firstOccurrence());
errorDetails.put("retry_count", dslError.retryCount());
return errorDetails;
}).toList());
}
return new SimpleHealthIndicatorDetails(details);
}
}
Loading

0 comments on commit de70fcd

Please sign in to comment.