diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index 7336574171330..46c07c960dfb3 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -19,6 +19,7 @@ import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamAction; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.persistent.AllocatedPersistentTask; @@ -61,6 +62,7 @@ protected ReindexDataStreamTask createTask( ) { ReindexDataStreamTaskParams params = taskInProgress.getParams(); return new ReindexDataStreamTask( + clusterService, params.startTime(), params.totalIndices(), params.totalIndicesToBeUpgraded(), @@ -74,7 +76,12 @@ protected ReindexDataStreamTask createTask( } @Override - protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTaskParams params, PersistentTaskState state) { + protected void nodeOperation( + AllocatedPersistentTask task, + ReindexDataStreamTaskParams params, + PersistentTaskState persistentTaskState + ) { + ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTaskState; String sourceDataStream = params.getSourceDataStream(); TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream }); @@ -93,33 +100,71 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask RolloverAction.INSTANCE, rolloverRequest, ActionListener.wrap( - rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId), - e -> completeFailedPersistentTask(reindexDataStreamTask, e) + rolloverResponse -> reindexIndices( + dataStream, + dataStream.getIndices().size() + 1, + reindexDataStreamTask, + params, + state, + reindexClient, + sourceDataStream, + taskId + ), + e -> completeFailedPersistentTask(reindexDataStreamTask, state, e) ) ); } else { - reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId); + reindexIndices( + dataStream, + dataStream.getIndices().size(), + reindexDataStreamTask, + params, + state, + reindexClient, + sourceDataStream, + taskId + ); } } else { - completeFailedPersistentTask(reindexDataStreamTask, new ElasticsearchException("data stream does not exist")); + completeFailedPersistentTask(reindexDataStreamTask, state, new ElasticsearchException("data stream does not exist")); } - }, exception -> completeFailedPersistentTask(reindexDataStreamTask, exception))); + }, exception -> completeFailedPersistentTask(reindexDataStreamTask, state, exception))); } private void reindexIndices( DataStream dataStream, + int totalIndicesInDataStream, ReindexDataStreamTask reindexDataStreamTask, + ReindexDataStreamTaskParams params, + ReindexDataStreamPersistentTaskState state, ExecuteWithHeadersClient reindexClient, String sourceDataStream, TaskId parentTaskId ) { List indices = dataStream.getIndices(); List indicesToBeReindexed = indices.stream().filter(getReindexRequiredPredicate(clusterService.state().metadata())).toList(); + final ReindexDataStreamPersistentTaskState updatedState; + if (params.totalIndices() != totalIndicesInDataStream + || params.totalIndicesToBeUpgraded() != indicesToBeReindexed.size() + || (state != null + && (state.totalIndices() != null + && state.totalIndicesToBeUpgraded() != null + && (state.totalIndices() != totalIndicesInDataStream + || state.totalIndicesToBeUpgraded() != indicesToBeReindexed.size())))) { + updatedState = new ReindexDataStreamPersistentTaskState( + totalIndicesInDataStream, + indicesToBeReindexed.size(), + state == null ? null : state.completionTime() + ); + reindexDataStreamTask.updatePersistentTaskState(updatedState, ActionListener.noop()); + } else { + updatedState = state; + } reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size()); // The CountDownActionListener is 1 more than the number of indices so that the count is not 0 if we have no indices CountDownActionListener listener = new CountDownActionListener(indicesToBeReindexed.size() + 1, ActionListener.wrap(response1 -> { - completeSuccessfulPersistentTask(reindexDataStreamTask); - }, exception -> { completeFailedPersistentTask(reindexDataStreamTask, exception); })); + completeSuccessfulPersistentTask(reindexDataStreamTask, updatedState); + }, exception -> { completeFailedPersistentTask(reindexDataStreamTask, updatedState, exception); })); List indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed)); final int maxConcurrentIndices = 1; for (int i = 0; i < maxConcurrentIndices; i++) { @@ -191,15 +236,25 @@ public void onFailure(Exception e) { }); } - private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) { - persistentTask.allReindexesCompleted(threadPool, getTimeToLive(persistentTask)); + private void completeSuccessfulPersistentTask( + ReindexDataStreamTask persistentTask, + @Nullable ReindexDataStreamPersistentTaskState state + ) { + persistentTask.allReindexesCompleted(threadPool, updateCompletionTimeAndgetTimeToLive(persistentTask, state)); } - private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) { - persistentTask.taskFailed(threadPool, getTimeToLive(persistentTask), e); + private void completeFailedPersistentTask( + ReindexDataStreamTask persistentTask, + @Nullable ReindexDataStreamPersistentTaskState state, + Exception e + ) { + persistentTask.taskFailed(threadPool, updateCompletionTimeAndgetTimeToLive(persistentTask, state), e); } - private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) { + private TimeValue updateCompletionTimeAndgetTimeToLive( + ReindexDataStreamTask reindexDataStreamTask, + @Nullable ReindexDataStreamPersistentTaskState state + ) { PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state() .getMetadata() .custom(PersistentTasksCustomMetadata.TYPE); @@ -209,16 +264,23 @@ private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) { if (persistentTask == null) { return TimeValue.timeValueMillis(0); } - PersistentTaskState state = persistentTask.getState(); final long completionTime; if (state == null) { completionTime = threadPool.absoluteTimeInMillis(); reindexDataStreamTask.updatePersistentTaskState( - new ReindexDataStreamPersistentTaskState(completionTime), + new ReindexDataStreamPersistentTaskState(null, null, completionTime), ActionListener.noop() ); } else { - completionTime = ((ReindexDataStreamPersistentTaskState) state).completionTime(); + if (state.completionTime() == null) { + completionTime = threadPool.absoluteTimeInMillis(); + reindexDataStreamTask.updatePersistentTaskState( + new ReindexDataStreamPersistentTaskState(state.totalIndices(), state.totalIndicesToBeUpgraded(), completionTime), + ActionListener.noop() + ); + } else { + completionTime = state.completionTime(); + } } return TimeValue.timeValueMillis(TASK_KEEP_ALIVE_TIME.millis() - (threadPool.absoluteTimeInMillis() - completionTime)); } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java index 130a8f7ce372b..8ab22674082e6 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.tasks.Task; import org.elasticsearch.xcontent.ConstructingObjectParser; @@ -18,22 +19,31 @@ import java.io.IOException; -import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; + +public record ReindexDataStreamPersistentTaskState( + @Nullable Integer totalIndices, + @Nullable Integer totalIndicesToBeUpgraded, + @Nullable Long completionTime +) implements Task.Status, PersistentTaskState { -public record ReindexDataStreamPersistentTaskState(long completionTime) implements Task.Status, PersistentTaskState { public static final String NAME = ReindexDataStreamTask.TASK_NAME; + private static final String TOTAL_INDICES_FIELD = "total_indices_in_data_stream"; + private static final String TOTAL_INDICES_REQUIRING_UPGRADE_FIELD = "total_indices_requiring_upgrade"; private static final String COMPLETION_TIME_FIELD = "completion_time"; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( NAME, true, - args -> new ReindexDataStreamPersistentTaskState((long) args[0]) + args -> new ReindexDataStreamPersistentTaskState((Integer) args[0], (Integer) args[1], (Long) args[2]) ); static { - PARSER.declareLong(constructorArg(), new ParseField(COMPLETION_TIME_FIELD)); + PARSER.declareInt(optionalConstructorArg(), new ParseField(TOTAL_INDICES_FIELD)); + PARSER.declareInt(optionalConstructorArg(), new ParseField(TOTAL_INDICES_REQUIRING_UPGRADE_FIELD)); + PARSER.declareLong(optionalConstructorArg(), new ParseField(COMPLETION_TIME_FIELD)); } public ReindexDataStreamPersistentTaskState(StreamInput in) throws IOException { - this(in.readLong()); + this(in.readOptionalInt(), in.readOptionalInt(), in.readOptionalLong()); } @Override @@ -43,13 +53,23 @@ public String getWriteableName() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeLong(completionTime); + out.writeOptionalInt(totalIndices); + out.writeOptionalInt(totalIndicesToBeUpgraded); + out.writeOptionalLong(completionTime); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(COMPLETION_TIME_FIELD, completionTime); + if (totalIndices != null) { + builder.field(TOTAL_INDICES_FIELD, totalIndices); + } + if (totalIndicesToBeUpgraded != null) { + builder.field(TOTAL_INDICES_REQUIRING_UPGRADE_FIELD, totalIndicesToBeUpgraded); + } + if (completionTime != null) { + builder.field(COMPLETION_TIME_FIELD, completionTime); + } builder.endObject(); return builder; } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java index 7a2b759dfd17a..04295a7521479 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java @@ -7,10 +7,12 @@ package org.elasticsearch.xpack.migrate.task; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -24,9 +26,10 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask { public static final String TASK_NAME = "reindex-data-stream"; + private final ClusterService clusterService; private final long persistentTaskStartTime; - private final int totalIndices; - private final int totalIndicesToBeUpgraded; + private final int initialTotalIndices; + private final int initialTotalIndicesToBeUpgraded; private volatile boolean complete = false; private volatile Exception exception; private final Set inProgress = Collections.synchronizedSet(new HashSet<>()); @@ -36,9 +39,10 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask { @SuppressWarnings("this-escape") public ReindexDataStreamTask( + ClusterService clusterService, long persistentTaskStartTime, - int totalIndices, - int totalIndicesToBeUpgraded, + int initialTotalIndices, + int initialTotalIndicesToBeUpgraded, long id, String type, String action, @@ -47,9 +51,10 @@ public ReindexDataStreamTask( Map headers ) { super(id, type, action, description, parentTask, headers); + this.clusterService = clusterService; this.persistentTaskStartTime = persistentTaskStartTime; - this.totalIndices = totalIndices; - this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded; + this.initialTotalIndices = initialTotalIndices; + this.initialTotalIndicesToBeUpgraded = initialTotalIndicesToBeUpgraded; this.completeTask = new RunOnce(() -> { if (exception == null) { markAsCompleted(); @@ -61,6 +66,19 @@ public ReindexDataStreamTask( @Override public ReindexDataStreamStatus getStatus() { + PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state() + .getMetadata() + .custom(PersistentTasksCustomMetadata.TYPE); + int totalIndices = initialTotalIndices; + int totalIndicesToBeUpgraded = initialTotalIndicesToBeUpgraded; + PersistentTasksCustomMetadata.PersistentTask persistentTask = persistentTasksCustomMetadata.getTask(getPersistentTaskId()); + if (persistentTask != null) { + ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState(); + if (state != null && state.totalIndices() != null && state.totalIndicesToBeUpgraded() != null) { + totalIndices = Math.toIntExact(state.totalIndices()); + totalIndicesToBeUpgraded = Math.toIntExact(state.totalIndicesToBeUpgraded()); + } + } return new ReindexDataStreamStatus( persistentTaskStartTime, totalIndices, diff --git a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskStateTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskStateTests.java index a35cd6e5fa474..c2dee83a91dfe 100644 --- a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskStateTests.java +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskStateTests.java @@ -26,11 +26,32 @@ protected Writeable.Reader instanceReader( @Override protected ReindexDataStreamPersistentTaskState createTestInstance() { - return new ReindexDataStreamPersistentTaskState(randomNegativeLong()); + return new ReindexDataStreamPersistentTaskState( + randomBoolean() ? null : randomNonNegativeInt(), + randomBoolean() ? null : randomNonNegativeInt(), + randomBoolean() ? null : randomNonNegativeLong() + ); } @Override protected ReindexDataStreamPersistentTaskState mutateInstance(ReindexDataStreamPersistentTaskState instance) throws IOException { - return new ReindexDataStreamPersistentTaskState(instance.completionTime() + 1); + return switch (randomInt(2)) { + case 0 -> new ReindexDataStreamPersistentTaskState( + instance.totalIndices() == null ? randomNonNegativeInt() : instance.totalIndices() + 1, + instance.totalIndicesToBeUpgraded(), + instance.completionTime() + ); + case 1 -> new ReindexDataStreamPersistentTaskState( + instance.totalIndices(), + instance.totalIndicesToBeUpgraded() == null ? randomNonNegativeInt() : instance.totalIndicesToBeUpgraded() + 1, + instance.completionTime() + ); + case 2 -> new ReindexDataStreamPersistentTaskState( + instance.totalIndices(), + instance.totalIndicesToBeUpgraded(), + instance.completionTime() == null ? randomNonNegativeLong() : instance.completionTime() + 1 + ); + default -> throw new IllegalArgumentException("Should never get here"); + }; } } diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java index 3ce92ea29ec19..4c4f915a8fe1d 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java @@ -255,7 +255,11 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo } } - private void upgradeDataStream(String dataStreamName, int numRollovers) throws Exception { + private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster) throws Exception { + final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2); + for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) { + rollover(dataStreamName); + } Request reindexRequest = new Request("POST", "/_migration/reindex"); reindexRequest.setJsonEntity(Strings.format(""" { @@ -276,18 +280,27 @@ private void upgradeDataStream(String dataStreamName, int numRollovers) throws E ); assertOK(statusResponse); assertThat(statusResponseMap.get("complete"), equalTo(true)); - /* - * total_indices_in_data_stream is determined at the beginning of the reindex, and does not take into account the write - * index being rolled over - */ - assertThat(statusResponseMap.get("total_indices_in_data_stream"), equalTo(numRollovers + 1)); + // The number of rollovers that will have happened when we call reindex: + final int rolloversPerformedByReindex = explicitRolloverOnNewClusterCount == 0 ? 1 : 0; + final int originalWriteIndex = 1; + assertThat( + statusResponseMap.get("total_indices_in_data_stream"), + equalTo(originalWriteIndex + numRolloversOnOldCluster + explicitRolloverOnNewClusterCount + rolloversPerformedByReindex) + ); if (isOriginalClusterSameMajorVersionAsCurrent()) { // If the original cluster was the same as this one, we don't want any indices reindexed: assertThat(statusResponseMap.get("total_indices_requiring_upgrade"), equalTo(0)); assertThat(statusResponseMap.get("successes"), equalTo(0)); } else { - assertThat(statusResponseMap.get("total_indices_requiring_upgrade"), equalTo(numRollovers + 1)); - assertThat(statusResponseMap.get("successes"), equalTo(numRollovers + 1)); + /* + * total_indices_requiring_upgrade is made up of: (the original write index) + numRolloversOnOldCluster. The number of + * rollovers on the upgraded cluster is irrelevant since those will not be reindexed. + */ + assertThat( + statusResponseMap.get("total_indices_requiring_upgrade"), + equalTo(originalWriteIndex + numRolloversOnOldCluster) + ); + assertThat(statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1)); } }, 60, TimeUnit.SECONDS); Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");