Skip to content

Commit

Permalink
[ILM] Fix downsample to skip already downsampled indices (elastic#102250
Browse files Browse the repository at this point in the history
)

This fixes a bug in ILM where if an ILM policy is updated such that the
next phase (e.g. warm) contains the downsample action from the previous
phase (e.g. hot), whilst the managed index is waiting to enter the next
phase (e.g. warm), if the managed index was already downsampled in hot
it would get deleted in the warm phase.

Fixes elastic#102249
  • Loading branch information
andreidan authored Nov 15, 2023
1 parent ac880b7 commit ffed132
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 1 deletion.
6 changes: 6 additions & 0 deletions docs/changelog/102250.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 102250
summary: "[ILM] Fix downsample to skip already downsampled indices"
area: ILM+SLM
type: bug
issues:
- 102249
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.client.internal.Client;
Expand All @@ -32,6 +34,7 @@
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.action.downsample.DownsampleConfig.generateDownsampleIndexName;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

Expand All @@ -40,6 +43,8 @@
*/
public class DownsampleAction implements LifecycleAction {

private static final Logger logger = LogManager.getLogger(DownsampleAction.class);

public static final String NAME = "downsample";
public static final String DOWNSAMPLED_INDEX_PREFIX = "downsample-";
public static final String CONDITIONAL_TIME_SERIES_CHECK_KEY = BranchingStep.NAME + "-on-timeseries-check";
Expand Down Expand Up @@ -155,7 +160,30 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
(index, clusterState) -> {
IndexMetadata indexMetadata = clusterState.metadata().index(index);
assert indexMetadata != null : "invalid cluster metadata. index [" + index.getName() + "] metadata not found";
return IndexSettings.MODE.get(indexMetadata.getSettings()) == IndexMode.TIME_SERIES;
if (IndexSettings.MODE.get(indexMetadata.getSettings()) != IndexMode.TIME_SERIES) {
return false;
}

if (index.getName().equals(generateDownsampleIndexName(DOWNSAMPLED_INDEX_PREFIX, indexMetadata, fixedInterval))) {
var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
if (downsampleStatus == IndexMetadata.DownsampleTaskStatus.UNKNOWN) {
// This isn't a downsample index, but it has the name of our target downsample index - very bad, we'll skip the
// downsample action to avoid blocking the lifecycle of this index - if there
// is another downsample action configured in the next phase, it'll be able to proceed successfully
logger.warn(
"index [{}] as part of policy [{}] cannot be downsampled at interval [{}] in phase [{}] because it has"
+ " the name of the target downsample index and is itself not a downsampled index. Skipping the downsample "
+ "action.",
index.getName(),
indexMetadata.getLifecyclePolicyName(),
fixedInterval,
phase
);
}
return false;
}

return true;
}
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,16 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.xcontent.XContentParser;
Expand All @@ -19,7 +27,9 @@

import static org.elasticsearch.xpack.core.ilm.DownsampleAction.CONDITIONAL_DATASTREAM_CHECK_KEY;
import static org.elasticsearch.xpack.core.ilm.DownsampleAction.CONDITIONAL_TIME_SERIES_CHECK_KEY;
import static org.elasticsearch.xpack.core.ilm.DownsampleAction.DOWNSAMPLED_INDEX_PREFIX;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class DownsampleActionTests extends AbstractActionTestCase<DownsampleAction> {

Expand Down Expand Up @@ -132,6 +142,92 @@ public void testToSteps() {
assertThat(steps.get(14).getNextStepKey(), equalTo(nextStepKey));
}

public void testDownsamplingPrerequisitesStep() {
DateHistogramInterval fixedInterval = ConfigTestHelpers.randomInterval();
DownsampleAction action = new DownsampleAction(fixedInterval, WAIT_TIMEOUT);
String phase = randomAlphaOfLengthBetween(1, 10);
StepKey nextStepKey = new StepKey(
randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10)
);
{
// non time series indices skip the action
BranchingStep branchingStep = getFirstBranchingStep(action, phase, nextStepKey);
IndexMetadata indexMetadata = newIndexMeta("test", Settings.EMPTY);

ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder().put(indexMetadata, true).build())
.build();

branchingStep.performAction(indexMetadata.getIndex(), state);
assertThat(branchingStep.getNextStepKey(), is(nextStepKey));
}
{
// time series indices execute the action
BranchingStep branchingStep = getFirstBranchingStep(action, phase, nextStepKey);
Settings settings = Settings.builder()
.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
.put("index.routing_path", "uid")
.build();
IndexMetadata indexMetadata = newIndexMeta("test", settings);

ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder().put(indexMetadata, true).build())
.build();

branchingStep.performAction(indexMetadata.getIndex(), state);
assertThat(branchingStep.getNextStepKey().name(), is(CheckNotDataStreamWriteIndexStep.NAME));
}
{
// already downsampled indices for the interval skip the action
BranchingStep branchingStep = getFirstBranchingStep(action, phase, nextStepKey);
Settings settings = Settings.builder()
.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
.put("index.routing_path", "uid")
.put(IndexMetadata.INDEX_DOWNSAMPLE_STATUS_KEY, IndexMetadata.DownsampleTaskStatus.SUCCESS)
.put(IndexMetadata.INDEX_DOWNSAMPLE_ORIGIN_NAME.getKey(), "test")
.build();
String indexName = DOWNSAMPLED_INDEX_PREFIX + fixedInterval + "-test";
IndexMetadata indexMetadata = newIndexMeta(indexName, settings);

ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder().put(indexMetadata, true).build())
.build();

branchingStep.performAction(indexMetadata.getIndex(), state);
assertThat(branchingStep.getNextStepKey(), is(nextStepKey));
}
{
// indices with the same name as the target downsample index that are NOT downsample indices skip the action
BranchingStep branchingStep = getFirstBranchingStep(action, phase, nextStepKey);
String indexName = DOWNSAMPLED_INDEX_PREFIX + fixedInterval + "-test";
IndexMetadata indexMetadata = newIndexMeta(indexName, Settings.EMPTY);

ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder().put(indexMetadata, true).build())
.build();

branchingStep.performAction(indexMetadata.getIndex(), state);
assertThat(branchingStep.getNextStepKey(), is(nextStepKey));
}
}

private static BranchingStep getFirstBranchingStep(DownsampleAction action, String phase, StepKey nextStepKey) {
List<Step> steps = action.toSteps(null, phase, nextStepKey);
assertNotNull(steps);
assertEquals(15, steps.size());

assertTrue(steps.get(0) instanceof BranchingStep);
assertThat(steps.get(0).getKey().name(), equalTo(CONDITIONAL_TIME_SERIES_CHECK_KEY));

return (BranchingStep) steps.get(0);
}

public static IndexMetadata newIndexMeta(String name, Settings indexSettings) {
return IndexMetadata.builder(name).settings(indexSettings(IndexVersion.current(), 1, 1).put(indexSettings)).build();
}

public void testEqualsAndHashCode() {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(createTestInstance(), this::copy, this::notCopy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,111 @@ public void testDownsampleTwice() throws Exception {
}
}

public void testDownsampleTwiceSameInterval() throws Exception {
// Create the ILM policy
Request request = new Request("PUT", "_ilm/policy/" + policy);
request.setJsonEntity("""
{
"policy": {
"phases": {
"warm": {
"actions": {
"downsample": {
"fixed_interval" : "5m"
}
}
},
"cold": {
"min_age": "365d",
"actions": {}
}
}
}
}
""");
assertOK(client().performRequest(request));

// Create a template
Request createIndexTemplateRequest = new Request("POST", "/_index_template/" + dataStream);
createIndexTemplateRequest.setJsonEntity(
Strings.format(TEMPLATE, dataStream, "2006-01-08T23:40:53.384Z", "2021-01-08T23:40:53.384Z", policy)
);
assertOK(client().performRequest(createIndexTemplateRequest));

index(client(), dataStream, true, null, "@timestamp", "2020-01-01T05:10:00Z", "volume", 11.0, "metricset", randomAlphaOfLength(5));

String firstBackingIndex = getBackingIndices(client(), dataStream).get(0);
logger.info("--> firstBackingIndex: {}", firstBackingIndex);
assertBusy(
() -> assertThat(
"index must wait in the " + CheckNotDataStreamWriteIndexStep.NAME + " until it is not the write index anymore",
explainIndex(client(), firstBackingIndex).get("step"),
is(CheckNotDataStreamWriteIndexStep.NAME)
),
30,
TimeUnit.SECONDS
);

// before we rollover, update template to not contain time boundaries anymore (rollover is blocked otherwise due to index time
// boundaries overlapping after rollover)
Request updateIndexTemplateRequest = new Request("POST", "/_index_template/" + dataStream);
updateIndexTemplateRequest.setJsonEntity(Strings.format(TEMPLATE_NO_TIME_BOUNDARIES, dataStream, policy));
assertOK(client().performRequest(updateIndexTemplateRequest));

// Manual rollover the original index such that it's not the write index in the data stream anymore
rolloverMaxOneDocCondition(client(), dataStream);

String downsampleIndexName = "downsample-5m-" + firstBackingIndex;
// wait for the downsample index to get to the end of the warm phase
assertBusy(() -> {
assertThat(indexExists(downsampleIndexName), is(true));
assertThat(indexExists(firstBackingIndex), is(false));

assertThat(explainIndex(client(), downsampleIndexName).get("step"), is(PhaseCompleteStep.NAME));
assertThat(explainIndex(client(), downsampleIndexName).get("phase"), is("warm"));

Map<String, Object> settings = getOnlyIndexSettings(client(), downsampleIndexName);
assertEquals(firstBackingIndex, settings.get(IndexMetadata.INDEX_DOWNSAMPLE_ORIGIN_NAME.getKey()));
assertEquals(firstBackingIndex, settings.get(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME.getKey()));
assertEquals(DownsampleTaskStatus.SUCCESS.toString(), settings.get(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey()));
assertEquals(policy, settings.get(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey()));
}, 60, TimeUnit.SECONDS);

// update the policy to now contain the downsample action in cold, whilst not existing in warm anymore (this will have our already
// downsampled index attempt to go through the downsample action again when in cold)

Request updatePolicyRequest = new Request("PUT", "_ilm/policy/" + policy);
updatePolicyRequest.setJsonEntity("""
{
"policy": {
"phases": {
"warm": {
"actions": {
}
},
"cold": {
"min_age": "0ms",
"actions": {
"downsample": {
"fixed_interval" : "5m"
}
}
}
}
}
}
""");
assertOK(client().performRequest(updatePolicyRequest));

// the downsample index (already part of the data stream as we created it in the warm phase previously) should continue to exist and
// reach the cold/complete/complete step
assertBusy(() -> {
assertThat(indexExists(downsampleIndexName), is(true));
assertThat(explainIndex(client(), downsampleIndexName).get("step"), is(PhaseCompleteStep.NAME));
assertThat(explainIndex(client(), downsampleIndexName).get("phase"), is("cold"));
}, 60, TimeUnit.SECONDS);
}

/**
* Gets the generated rollup index name for a given index by looking at newly created indices that match the rollup index name pattern
*
Expand Down

0 comments on commit ffed132

Please sign in to comment.