Skip to content

Commit

Permalink
Adding IndicesOptions, reorganizing
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 16, 2024
1 parent 414a17e commit f8b9408
Showing 1 changed file with 74 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -74,9 +75,9 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
} else if (persistentTask.isAssigned()) {
String nodeId = persistentTask.getExecutorNode();
if (clusterService.localNode().getId().equals(nodeId)) {
getRunningTaskFromNode(persistentTaskId, listener);
fetchAndReportStatusForTaskOnThisNode(persistentTaskId, listener);
} else {
runOnNodeWithTaskIfPossible(task, request, nodeId, listener);
fetchAndReportStatusForTaskOnRemoteNode(task, request, nodeId, listener);
}
} else {
listener.onFailure(new ElasticsearchException("Persistent task with id [{}] is not assigned to a node", persistentTaskId));
Expand All @@ -96,7 +97,7 @@ private Task getRunningPersistentTaskFromTaskManager(String persistentTaskId) {
return optionalTask.<Task>map(Map.Entry::getValue).orElse(null);
}

void getRunningTaskFromNode(String persistentTaskId, ActionListener<Response> listener) {
void fetchAndReportStatusForTaskOnThisNode(String persistentTaskId, ActionListener<Response> listener) {
Task runningTask = getRunningPersistentTaskFromTaskManager(persistentTaskId);
if (runningTask == null) {
listener.onFailure(
Expand All @@ -112,54 +113,81 @@ void getRunningTaskFromNode(String persistentTaskId, ActionListener<Response> li
TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true);
ReindexDataStreamStatus status = (ReindexDataStreamStatus) info.status();
Set<String> inProgressIndices = status.inProgress();
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
String[] indices = inProgressIndices.stream()
.flatMap(index -> Stream.of(index, ReindexDataStreamIndexTransportAction.generateDestIndexName(index)))
.toList()
.toArray(new String[0]);
indicesStatsRequest.indices(indices);
client.execute(IndicesStatsAction.INSTANCE, indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
Map<String, Tuple<Long, Long>> inProgressMap = new HashMap<>();
for (String index : inProgressIndices) {
DocsStats totalDocsStats = indicesStatsResponse.getIndex(index).getTotal().getDocs();
final long totalDocsInIndex = totalDocsStats == null ? 0 : totalDocsStats.getCount();
IndexStats migratedIndexStats = indicesStatsResponse.getIndex(
ReindexDataStreamIndexTransportAction.generateDestIndexName(index)
);
final long reindexedDocsInIndex;
if (migratedIndexStats == null) {
reindexedDocsInIndex = 0;
} else {
DocsStats reindexedDocsStats = migratedIndexStats.getTotal().getDocs();
reindexedDocsInIndex = reindexedDocsStats == null ? 0 : reindexedDocsStats.getCount();
}
inProgressMap.put(index, Tuple.tuple(totalDocsInIndex, reindexedDocsInIndex));
}
ReindexDataStreamEnrichedStatus enrichedStatus = new ReindexDataStreamEnrichedStatus(
status.persistentTaskStartTime(),
status.totalIndices(),
status.totalIndicesToBeUpgraded(),
status.complete(),
status.exception(),
inProgressMap,
status.pending(),
status.errors()
);
listener.onResponse(new Response(enrichedStatus));
}
if (inProgressIndices.isEmpty()) {
// We have no reason to fetch index stats since there are no in progress indices
reportStatus(Map.of(), status, listener);
} else {
fetchInProgressStatsAndReportStatus(inProgressIndices, status, listener);
}
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
/*
* The status is enriched with the information from inProgressMap to create a new ReindexDataStreamEnrichedStatus, which is used in the
* response sent to the listener.
*/
private void reportStatus(Map<String, Tuple<Long, Long>> inProgressMap, ReindexDataStreamStatus status, ActionListener<Response> listener) {
ReindexDataStreamEnrichedStatus enrichedStatus = new ReindexDataStreamEnrichedStatus(
status.persistentTaskStartTime(),
status.totalIndices(),
status.totalIndicesToBeUpgraded(),
status.complete(),
status.exception(),
inProgressMap,
status.pending(),
status.errors()
);
listener.onResponse(new Response(enrichedStatus));
}

/*
* This method feches doc counts for all indices in inProgressIndices (and the indices they are being reindexed into). After
* successfully fetching those, reportStatus is called.
*/
private void fetchInProgressStatsAndReportStatus(Set<String> inProgressIndices, ReindexDataStreamStatus status, ActionListener<Response> listener) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
String[] indices = inProgressIndices.stream()
.flatMap(index -> Stream.of(index, ReindexDataStreamIndexTransportAction.generateDestIndexName(index)))
.toList()
.toArray(new String[0]);
indicesStatsRequest.indices(indices);
/*
* It is possible that the destination index will not exist yet, so we want to ignore the fact that it is missing
*/
indicesStatsRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, true));
client.execute(IndicesStatsAction.INSTANCE, indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
Map<String, Tuple<Long, Long>> inProgressMap = new HashMap<>();
for (String index : inProgressIndices) {
DocsStats totalDocsStats = indicesStatsResponse.getIndex(index).getTotal().getDocs();
final long totalDocsInIndex = totalDocsStats == null ? 0 : totalDocsStats.getCount();
IndexStats migratedIndexStats = indicesStatsResponse.getIndex(
ReindexDataStreamIndexTransportAction.generateDestIndexName(index)
);
final long reindexedDocsInIndex;
if (migratedIndexStats == null) {
reindexedDocsInIndex = 0;
} else {
DocsStats reindexedDocsStats = migratedIndexStats.getTotal().getDocs();
reindexedDocsInIndex = reindexedDocsStats == null ? 0 : reindexedDocsStats.getCount();
}
inProgressMap.put(index, Tuple.tuple(totalDocsInIndex, reindexedDocsInIndex));
}
});
reportStatus(inProgressMap, status, listener);
}

}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}

private void runOnNodeWithTaskIfPossible(Task thisTask, Request request, String nodeId, ActionListener<Response> listener) {
/*
* The task and its status exist on some other node, so this method forwards the request to that node.
*/
private void fetchAndReportStatusForTaskOnRemoteNode(Task thisTask, Request request, String nodeId, ActionListener<Response> listener) {
DiscoveryNode node = clusterService.state().nodes().get(nodeId);
if (node == null) {
listener.onFailure(
Expand Down

0 comments on commit f8b9408

Please sign in to comment.