Skip to content

Commit

Permalink
Adding a setting for data stream reindexing concurrency (elastic#119484)
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Jan 8, 2025
1 parent 9260bcf commit 7e2845b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
Expand Down Expand Up @@ -58,6 +59,7 @@
import java.util.function.Supplier;

import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG;
import static org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor.MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING;

public class MigratePlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin {

Expand Down Expand Up @@ -153,4 +155,11 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
return List.of();
}
}

@Override
public List<Setting<?>> getSettings() {
List<Setting<?>> pluginSettings = new ArrayList<>();
pluginSettings.add(MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING);
return pluginSettings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.migrate.task;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
Expand All @@ -19,6 +21,7 @@
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
Expand All @@ -38,6 +41,19 @@
import static org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate.getReindexRequiredPredicate;

public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor<ReindexDataStreamTaskParams> {
/*
* This setting controls how many indices we reindex concurrently for a single data stream. This is not an overall limit -- if five
* data streams are being reindexed, then each of them can have this many indices being reindexed at once. This setting is dynamic,
* but changing it does not have an impact if the task is already running (unless the task is restarted or moves to another node).
*/
public static final Setting<Integer> MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING = Setting.intSetting(
"migrate.max_concurrent_indices_reindexed_per_data_stream",
1,
1,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private static final Logger logger = LogManager.getLogger(ReindexDataStreamPersistentTaskExecutor.class);
private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1);
private final Client client;
private final ClusterService clusterService;
Expand Down Expand Up @@ -164,8 +180,9 @@ private void reindexIndices(
CountDownActionListener listener = new CountDownActionListener(indicesToBeReindexed.size() + 1, ActionListener.wrap(response1 -> {
completeSuccessfulPersistentTask(reindexDataStreamTask, updatedState);
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, updatedState, exception); }));
final int maxConcurrentIndices = clusterService.getClusterSettings().get(MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING);
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
final int maxConcurrentIndices = 1;
logger.debug("Reindexing {} indices, with up to {} handled concurrently", indicesRemaining.size(), maxConcurrentIndices);
for (int i = 0; i < maxConcurrentIndices; i++) {
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
}
Expand Down

0 comments on commit 7e2845b

Please sign in to comment.