Skip to content

Commit

Permalink
Limiting the number of indices that can be processed concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 13, 2024
1 parent c330e31 commit 1ff7bba
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexAction;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;

import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate;

Expand Down Expand Up @@ -82,47 +85,82 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
if (dataStreamInfos.size() == 1) {
DataStream dataStream = dataStreamInfos.getFirst().getDataStream();
if (getOldIndexVersionPredicate(clusterService.state().metadata()).test(dataStream.getWriteIndex())) {
// placeholder
reindexClient.execute(RolloverAction.INSTANCE, new RolloverRequest(sourceDataStream, null)).actionGet();
}
List<Index> indices = dataStream.getIndices();
List<Index> indicesToBeReindexed = indices.stream()
.filter(getOldIndexVersionPredicate(clusterService.state().metadata()))
.toList();
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
CountDownActionListener listener = new CountDownActionListener(
indicesToBeReindexed.size() + 1,
ActionListener.wrap(response1 -> {
completeSuccessfulPersistentTask(reindexDataStreamTask);
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, exception); })
);
// TODO: put all these on a queue, only process N from queue at a time
for (Index index : indicesToBeReindexed) {
reindexDataStreamTask.incrementInProgressIndicesCount();
reindexClient.execute(
ReindexDataStreamIndexAction.INSTANCE,
new ReindexDataStreamIndexAction.Request(index.getName()),
ActionListener.wrap(response1 -> {
updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> {
reindexDataStreamTask.reindexSucceeded();
listener.onResponse(null);
}, exception -> {
reindexDataStreamTask.reindexFailed(index.getName(), exception);
listener.onResponse(null);
}), reindexClient);
}, exception -> {
reindexDataStreamTask.reindexFailed(index.getName(), exception);
listener.onResponse(null);
})
RolloverAction.INSTANCE,
new RolloverRequest(sourceDataStream, null),
ActionListener.wrap(
rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream),
e -> completeFailedPersistentTask(reindexDataStreamTask, e)
)
);
} else {
reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream);
}
listener.onResponse(null);
} else {
completeFailedPersistentTask(reindexDataStreamTask, new ElasticsearchException("data stream does not exist"));
}
}, exception -> completeFailedPersistentTask(reindexDataStreamTask, exception)));
}

private void reindexIndices(
DataStream dataStream,
ReindexDataStreamTask reindexDataStreamTask,
ExecuteWithHeadersClient reindexClient,
String sourceDataStream
) {
List<Index> indices = dataStream.getIndices();
List<Index> indicesToBeReindexed = indices.stream()
.filter(getOldIndexVersionPredicate(clusterService.state().metadata()))
.filter(index -> index.getName().equals(dataStream.getWriteIndex().getName()) == false)
.toList();
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
CountDownActionListener listener = new CountDownActionListener(indicesToBeReindexed.size() + 1, ActionListener.wrap(response1 -> {
completeSuccessfulPersistentTask(reindexDataStreamTask);
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, exception); }));
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
final int maxConcurrentIndices = 5;
for (int i = 0; i < maxConcurrentIndices; i++) {
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener);
}
listener.onResponse(null);
}

private void maybeProcessNextIndex(
List<Index> indicesRemaining,
ReindexDataStreamTask reindexDataStreamTask,
ExecuteWithHeadersClient reindexClient,
String sourceDataStream,
CountDownActionListener listener
) {
if (indicesRemaining.isEmpty()) {
return;
}
Index index;
try {
index = indicesRemaining.removeFirst();
} catch (NoSuchElementException e) {
return;
}
reindexDataStreamTask.incrementInProgressIndicesCount();
reindexClient.execute(
ReindexDataStreamIndexAction.INSTANCE,
new ReindexDataStreamIndexAction.Request(index.getName()),
ActionListener.wrap(response1 -> {
updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> {
reindexDataStreamTask.reindexSucceeded();
listener.onResponse(null);
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener);
}, exception -> {
reindexDataStreamTask.reindexFailed(index.getName(), exception);
listener.onResponse(null);
}), reindexClient);
}, exception -> {
reindexDataStreamTask.reindexFailed(index.getName(), exception);
listener.onResponse(null);
})
);
}

private void updateDataStream(
String dataStream,
String oldIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,32 +94,25 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
}
""";

private static final String BULK = """
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507",
"ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "hamster", "uid":"947e4ced-1786-4e53-9e0c-5c447e959508",
"ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cow", "uid":"947e4ced-1786-4e53-9e0c-5c447e959509",
"ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "rat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959510",
"ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9",
"ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "tiger", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea10",
"ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "lion", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876e11",
"ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "elephant", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876eb4",
"ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}
""";
private static final String BULK =
"""
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "hamster", "uid":"947e4ced-1786-4e53-9e0c-5c447e959508", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cow", "uid":"947e4ced-1786-4e53-9e0c-5c447e959509", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "rat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959510", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "tiger", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea10", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "lion", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876e11", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "elephant", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876eb4", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}
""";;

public void testDataStreams() throws IOException {
if (CLUSTER_TYPE == ClusterType.OLD) {
Expand Down Expand Up @@ -278,7 +271,7 @@ public void testUpgradeDataStream() throws Exception {
putIndexTemplateRequest.setJsonEntity(INDEX_TEMPLATE.replace("$TEMPLATE", TEMPLATE).replace("$PATTERN", dataStreamName));
assertOK(client().performRequest(putIndexTemplateRequest));

performOldClustertOperations(templateName, dataStreamName);
performOldClusterOperations(templateName, dataStreamName);
} else if (CLUSTER_TYPE == ClusterType.MIXED) {
// nothing
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
Expand Down Expand Up @@ -313,7 +306,7 @@ private void performUpgradedClusterOperations(String dataStreamName) throws Exce
assertOK(cancelResponse);
}

private static void performOldClustertOperations(String templateName, String dataStreamName) throws IOException {
private static void performOldClusterOperations(String templateName, String dataStreamName) throws IOException {
bulkLoadData(dataStreamName);

var dataStreams = getDataStream(dataStreamName);
Expand Down

0 comments on commit 1ff7bba

Please sign in to comment.