Skip to content

Commit

Permalink
Correcting index counts in data stream reindex status
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Jan 7, 2025
1 parent 0f1429d commit 865dacc
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.persistent.AllocatedPersistentTask;
Expand Down Expand Up @@ -61,6 +62,7 @@ protected ReindexDataStreamTask createTask(
) {
ReindexDataStreamTaskParams params = taskInProgress.getParams();
return new ReindexDataStreamTask(
clusterService,
params.startTime(),
params.totalIndices(),
params.totalIndicesToBeUpgraded(),
Expand All @@ -74,7 +76,12 @@ protected ReindexDataStreamTask createTask(
}

@Override
protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTaskParams params, PersistentTaskState state) {
protected void nodeOperation(
AllocatedPersistentTask task,
ReindexDataStreamTaskParams params,
PersistentTaskState persistentTaskState
) {
ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTaskState;
String sourceDataStream = params.getSourceDataStream();
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream });
Expand All @@ -93,33 +100,71 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
RolloverAction.INSTANCE,
rolloverRequest,
ActionListener.wrap(
rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId),
e -> completeFailedPersistentTask(reindexDataStreamTask, e)
rolloverResponse -> reindexIndices(
dataStream,
dataStream.getIndices().size() + 1,
reindexDataStreamTask,
params,
state,
reindexClient,
sourceDataStream,
taskId
),
e -> completeFailedPersistentTask(reindexDataStreamTask, state, e)
)
);
} else {
reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId);
reindexIndices(
dataStream,
dataStream.getIndices().size(),
reindexDataStreamTask,
params,
state,
reindexClient,
sourceDataStream,
taskId
);
}
} else {
completeFailedPersistentTask(reindexDataStreamTask, new ElasticsearchException("data stream does not exist"));
completeFailedPersistentTask(reindexDataStreamTask, state, new ElasticsearchException("data stream does not exist"));
}
}, exception -> completeFailedPersistentTask(reindexDataStreamTask, exception)));
}, exception -> completeFailedPersistentTask(reindexDataStreamTask, state, exception)));
}

private void reindexIndices(
DataStream dataStream,
int totalIndicesInDataStream,
ReindexDataStreamTask reindexDataStreamTask,
ReindexDataStreamTaskParams params,
ReindexDataStreamPersistentTaskState state,
ExecuteWithHeadersClient reindexClient,
String sourceDataStream,
TaskId parentTaskId
) {
List<Index> indices = dataStream.getIndices();
List<Index> indicesToBeReindexed = indices.stream().filter(getReindexRequiredPredicate(clusterService.state().metadata())).toList();
final ReindexDataStreamPersistentTaskState updatedState;
if (params.totalIndices() != totalIndicesInDataStream
|| params.totalIndicesToBeUpgraded() != indicesToBeReindexed.size()
|| (state != null
&& (state.totalIndices() != null
&& state.totalIndicesToBeUpgraded() != null
&& (state.totalIndices() != totalIndicesInDataStream
|| state.totalIndicesToBeUpgraded() != indicesToBeReindexed.size())))) {
updatedState = new ReindexDataStreamPersistentTaskState(
totalIndicesInDataStream,
indicesToBeReindexed.size(),
state == null ? null : state.completionTime()
);
reindexDataStreamTask.updatePersistentTaskState(updatedState, ActionListener.noop());
} else {
updatedState = state;
}
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
// The CountDownActionListener is 1 more than the number of indices so that the count is not 0 if we have no indices
CountDownActionListener listener = new CountDownActionListener(indicesToBeReindexed.size() + 1, ActionListener.wrap(response1 -> {
completeSuccessfulPersistentTask(reindexDataStreamTask);
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, exception); }));
completeSuccessfulPersistentTask(reindexDataStreamTask, updatedState);
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, updatedState, exception); }));
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
final int maxConcurrentIndices = 1;
for (int i = 0; i < maxConcurrentIndices; i++) {
Expand Down Expand Up @@ -191,15 +236,25 @@ public void onFailure(Exception e) {
});
}

private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {
persistentTask.allReindexesCompleted(threadPool, getTimeToLive(persistentTask));
private void completeSuccessfulPersistentTask(
ReindexDataStreamTask persistentTask,
@Nullable ReindexDataStreamPersistentTaskState state
) {
persistentTask.allReindexesCompleted(threadPool, updateCompletionTimeAndgetTimeToLive(persistentTask, state));
}

private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) {
persistentTask.taskFailed(threadPool, getTimeToLive(persistentTask), e);
private void completeFailedPersistentTask(
ReindexDataStreamTask persistentTask,
@Nullable ReindexDataStreamPersistentTaskState state,
Exception e
) {
persistentTask.taskFailed(threadPool, updateCompletionTimeAndgetTimeToLive(persistentTask, state), e);
}

private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) {
private TimeValue updateCompletionTimeAndgetTimeToLive(
ReindexDataStreamTask reindexDataStreamTask,
@Nullable ReindexDataStreamPersistentTaskState state
) {
PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
.getMetadata()
.custom(PersistentTasksCustomMetadata.TYPE);
Expand All @@ -209,16 +264,23 @@ private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) {
if (persistentTask == null) {
return TimeValue.timeValueMillis(0);
}
PersistentTaskState state = persistentTask.getState();
final long completionTime;
if (state == null) {
completionTime = threadPool.absoluteTimeInMillis();
reindexDataStreamTask.updatePersistentTaskState(
new ReindexDataStreamPersistentTaskState(completionTime),
new ReindexDataStreamPersistentTaskState(null, null, completionTime),
ActionListener.noop()
);
} else {
completionTime = ((ReindexDataStreamPersistentTaskState) state).completionTime();
if (state.completionTime() == null) {
completionTime = threadPool.absoluteTimeInMillis();
reindexDataStreamTask.updatePersistentTaskState(
new ReindexDataStreamPersistentTaskState(state.totalIndices(), state.totalIndicesToBeUpgraded(), completionTime),
ActionListener.noop()
);
} else {
completionTime = state.completionTime();
}
}
return TimeValue.timeValueMillis(TASK_KEEP_ALIVE_TIME.millis() - (threadPool.absoluteTimeInMillis() - completionTime));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xcontent.ConstructingObjectParser;
Expand All @@ -18,22 +19,31 @@

import java.io.IOException;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

public record ReindexDataStreamPersistentTaskState(
@Nullable Integer totalIndices,
@Nullable Integer totalIndicesToBeUpgraded,
@Nullable Long completionTime
) implements Task.Status, PersistentTaskState {

public record ReindexDataStreamPersistentTaskState(long completionTime) implements Task.Status, PersistentTaskState {
public static final String NAME = ReindexDataStreamTask.TASK_NAME;
private static final String TOTAL_INDICES_FIELD = "total_indices_in_data_stream";
private static final String TOTAL_INDICES_REQUIRING_UPGRADE_FIELD = "total_indices_requiring_upgrade";
private static final String COMPLETION_TIME_FIELD = "completion_time";
private static final ConstructingObjectParser<ReindexDataStreamPersistentTaskState, Void> PARSER = new ConstructingObjectParser<>(
NAME,
true,
args -> new ReindexDataStreamPersistentTaskState((long) args[0])
args -> new ReindexDataStreamPersistentTaskState((Integer) args[0], (Integer) args[1], (Long) args[2])
);
static {
PARSER.declareLong(constructorArg(), new ParseField(COMPLETION_TIME_FIELD));
PARSER.declareInt(optionalConstructorArg(), new ParseField(TOTAL_INDICES_FIELD));
PARSER.declareInt(optionalConstructorArg(), new ParseField(TOTAL_INDICES_REQUIRING_UPGRADE_FIELD));
PARSER.declareLong(optionalConstructorArg(), new ParseField(COMPLETION_TIME_FIELD));
}

public ReindexDataStreamPersistentTaskState(StreamInput in) throws IOException {
this(in.readLong());
this(in.readOptionalInt(), in.readOptionalInt(), in.readOptionalLong());
}

@Override
Expand All @@ -43,13 +53,23 @@ public String getWriteableName() {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(completionTime);
out.writeOptionalInt(totalIndices);
out.writeOptionalInt(totalIndicesToBeUpgraded);
out.writeOptionalLong(completionTime);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(COMPLETION_TIME_FIELD, completionTime);
if (totalIndices != null) {
builder.field(TOTAL_INDICES_FIELD, totalIndices);
}
if (totalIndicesToBeUpgraded != null) {
builder.field(TOTAL_INDICES_REQUIRING_UPGRADE_FIELD, totalIndicesToBeUpgraded);
}
if (completionTime != null) {
builder.field(COMPLETION_TIME_FIELD, completionTime);
}
builder.endObject();
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@

package org.elasticsearch.xpack.migrate.task;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -24,9 +26,10 @@

public class ReindexDataStreamTask extends AllocatedPersistentTask {
public static final String TASK_NAME = "reindex-data-stream";
private final ClusterService clusterService;
private final long persistentTaskStartTime;
private final int totalIndices;
private final int totalIndicesToBeUpgraded;
private final int initialTotalIndices;
private final int initialTotalIndicesToBeUpgraded;
private volatile boolean complete = false;
private volatile Exception exception;
private final Set<String> inProgress = Collections.synchronizedSet(new HashSet<>());
Expand All @@ -36,9 +39,10 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {

@SuppressWarnings("this-escape")
public ReindexDataStreamTask(
ClusterService clusterService,
long persistentTaskStartTime,
int totalIndices,
int totalIndicesToBeUpgraded,
int initialTotalIndices,
int initialTotalIndicesToBeUpgraded,
long id,
String type,
String action,
Expand All @@ -47,9 +51,10 @@ public ReindexDataStreamTask(
Map<String, String> headers
) {
super(id, type, action, description, parentTask, headers);
this.clusterService = clusterService;
this.persistentTaskStartTime = persistentTaskStartTime;
this.totalIndices = totalIndices;
this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded;
this.initialTotalIndices = initialTotalIndices;
this.initialTotalIndicesToBeUpgraded = initialTotalIndicesToBeUpgraded;
this.completeTask = new RunOnce(() -> {
if (exception == null) {
markAsCompleted();
Expand All @@ -61,6 +66,19 @@ public ReindexDataStreamTask(

@Override
public ReindexDataStreamStatus getStatus() {
PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
.getMetadata()
.custom(PersistentTasksCustomMetadata.TYPE);
int totalIndices = initialTotalIndices;
int totalIndicesToBeUpgraded = initialTotalIndicesToBeUpgraded;
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = persistentTasksCustomMetadata.getTask(getPersistentTaskId());
if (persistentTask != null) {
ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState();
if (state != null && state.totalIndices() != null && state.totalIndicesToBeUpgraded() != null) {
totalIndices = Math.toIntExact(state.totalIndices());
totalIndicesToBeUpgraded = Math.toIntExact(state.totalIndicesToBeUpgraded());
}
}
return new ReindexDataStreamStatus(
persistentTaskStartTime,
totalIndices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,32 @@ protected Writeable.Reader<ReindexDataStreamPersistentTaskState> instanceReader(

@Override
protected ReindexDataStreamPersistentTaskState createTestInstance() {
return new ReindexDataStreamPersistentTaskState(randomNegativeLong());
return new ReindexDataStreamPersistentTaskState(
randomBoolean() ? null : randomNonNegativeInt(),
randomBoolean() ? null : randomNonNegativeInt(),
randomBoolean() ? null : randomNonNegativeLong()
);
}

@Override
protected ReindexDataStreamPersistentTaskState mutateInstance(ReindexDataStreamPersistentTaskState instance) throws IOException {
return new ReindexDataStreamPersistentTaskState(instance.completionTime() + 1);
return switch (randomInt(2)) {
case 0 -> new ReindexDataStreamPersistentTaskState(
instance.totalIndices() == null ? randomNonNegativeInt() : instance.totalIndices() + 1,
instance.totalIndicesToBeUpgraded(),
instance.completionTime()
);
case 1 -> new ReindexDataStreamPersistentTaskState(
instance.totalIndices(),
instance.totalIndicesToBeUpgraded() == null ? randomNonNegativeInt() : instance.totalIndicesToBeUpgraded() + 1,
instance.completionTime()
);
case 2 -> new ReindexDataStreamPersistentTaskState(
instance.totalIndices(),
instance.totalIndicesToBeUpgraded(),
instance.completionTime() == null ? randomNonNegativeLong() : instance.completionTime() + 1
);
default -> throw new IllegalArgumentException("Should never get here");
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,11 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo
}
}

private void upgradeDataStream(String dataStreamName, int numRollovers) throws Exception {
private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster) throws Exception {
final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2);
for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) {
rollover(dataStreamName);
}
Request reindexRequest = new Request("POST", "/_migration/reindex");
reindexRequest.setJsonEntity(Strings.format("""
{
Expand All @@ -276,18 +280,27 @@ private void upgradeDataStream(String dataStreamName, int numRollovers) throws E
);
assertOK(statusResponse);
assertThat(statusResponseMap.get("complete"), equalTo(true));
/*
* total_indices_in_data_stream is determined at the beginning of the reindex, and does not take into account the write
* index being rolled over
*/
assertThat(statusResponseMap.get("total_indices_in_data_stream"), equalTo(numRollovers + 1));
// The number of rollovers that will have happened when we call reindex:
final int rolloversPerformedByReindex = explicitRolloverOnNewClusterCount == 0 ? 1 : 0;
final int originalWriteIndex = 1;
assertThat(
statusResponseMap.get("total_indices_in_data_stream"),
equalTo(originalWriteIndex + numRolloversOnOldCluster + explicitRolloverOnNewClusterCount + rolloversPerformedByReindex)
);
if (isOriginalClusterSameMajorVersionAsCurrent()) {
// If the original cluster was the same as this one, we don't want any indices reindexed:
assertThat(statusResponseMap.get("total_indices_requiring_upgrade"), equalTo(0));
assertThat(statusResponseMap.get("successes"), equalTo(0));
} else {
assertThat(statusResponseMap.get("total_indices_requiring_upgrade"), equalTo(numRollovers + 1));
assertThat(statusResponseMap.get("successes"), equalTo(numRollovers + 1));
/*
* total_indices_requiring_upgrade is made up of: (the original write index) + numRolloversOnOldCluster. The number of
* rollovers on the upgraded cluster is irrelevant since those will not be reindexed.
*/
assertThat(
statusResponseMap.get("total_indices_requiring_upgrade"),
equalTo(originalWriteIndex + numRolloversOnOldCluster)
);
assertThat(statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1));
}
}, 60, TimeUnit.SECONDS);
Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
Expand Down

0 comments on commit 865dacc

Please sign in to comment.