diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java index 05e5c57252e13..55ec4065be8c4 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.settings.SettingsModule; @@ -58,6 +59,7 @@ import java.util.function.Supplier; import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG; +import static org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor.MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING; public class MigratePlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin { @@ -153,4 +155,11 @@ public List> getPersistentTasksExecutor( return List.of(); } } + + @Override + public List> getSettings() { + List> pluginSettings = new ArrayList<>(); + pluginSettings.add(MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING); + return pluginSettings; + } } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index 69a9e9801a4d3..20cdd27c72917 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.migrate.task; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.rollover.RolloverAction; @@ -19,6 +21,7 @@ import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamAction; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; @@ -38,6 +41,19 @@ import static org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate.getReindexRequiredPredicate; public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor { + /* + * This setting controls how many indices we reindex concurrently for a single data stream. This is not an overall limit -- if five + * data streams are being reindexed, then each of them can have this many indices being reindexed at once. This setting is dynamic, + * but changing it does not have an impact if the task is already running (unless the task is restarted or moves to another node). + */ + public static final Setting MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING = Setting.intSetting( + "migrate.max_concurrent_indices_reindexed_per_data_stream", + 1, + 1, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + private static final Logger logger = LogManager.getLogger(ReindexDataStreamPersistentTaskExecutor.class); private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1); private final Client client; private final ClusterService clusterService; @@ -164,8 +180,9 @@ private void reindexIndices( CountDownActionListener listener = new CountDownActionListener(indicesToBeReindexed.size() + 1, ActionListener.wrap(response1 -> { completeSuccessfulPersistentTask(reindexDataStreamTask, updatedState); }, exception -> { completeFailedPersistentTask(reindexDataStreamTask, updatedState, exception); })); + final int maxConcurrentIndices = clusterService.getClusterSettings().get(MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING); List indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed)); - final int maxConcurrentIndices = 1; + logger.debug("Reindexing {} indices, with up to {} handled concurrently", indicesRemaining.size(), maxConcurrentIndices); for (int i = 0; i < maxConcurrentIndices; i++) { maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId); }