From f8b94087fe88d3ca106d36726cac32f0f618b85c Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 16 Dec 2024 16:09:58 -0600 Subject: [PATCH] Adding IndicesOptions, reorganizing --- ...MigrationReindexStatusTransportAction.java | 120 +++++++++++------- 1 file changed, 74 insertions(+), 46 deletions(-) diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusTransportAction.java index 9dfc7173436b9..90152aa276e9a 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusTransportAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -74,9 +75,9 @@ protected void doExecute(Task task, Request request, ActionListener li } else if (persistentTask.isAssigned()) { String nodeId = persistentTask.getExecutorNode(); if (clusterService.localNode().getId().equals(nodeId)) { - getRunningTaskFromNode(persistentTaskId, listener); + fetchAndReportStatusForTaskOnThisNode(persistentTaskId, listener); } else { - runOnNodeWithTaskIfPossible(task, request, nodeId, listener); + fetchAndReportStatusForTaskOnRemoteNode(task, request, nodeId, listener); } } else { listener.onFailure(new ElasticsearchException("Persistent task with id [{}] is not assigned to a node", persistentTaskId)); @@ -96,7 +97,7 @@ private Task getRunningPersistentTaskFromTaskManager(String persistentTaskId) { return optionalTask.map(Map.Entry::getValue).orElse(null); } - void getRunningTaskFromNode(String persistentTaskId, ActionListener listener) { + void fetchAndReportStatusForTaskOnThisNode(String persistentTaskId, ActionListener listener) { Task runningTask = getRunningPersistentTaskFromTaskManager(persistentTaskId); if (runningTask == null) { listener.onFailure( @@ -112,54 +113,81 @@ void getRunningTaskFromNode(String persistentTaskId, ActionListener li TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true); ReindexDataStreamStatus status = (ReindexDataStreamStatus) info.status(); Set inProgressIndices = status.inProgress(); - IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); - String[] indices = inProgressIndices.stream() - .flatMap(index -> Stream.of(index, ReindexDataStreamIndexTransportAction.generateDestIndexName(index))) - .toList() - .toArray(new String[0]); - indicesStatsRequest.indices(indices); - client.execute(IndicesStatsAction.INSTANCE, indicesStatsRequest, new ActionListener() { - @Override - public void onResponse(IndicesStatsResponse indicesStatsResponse) { - Map> inProgressMap = new HashMap<>(); - for (String index : inProgressIndices) { - DocsStats totalDocsStats = indicesStatsResponse.getIndex(index).getTotal().getDocs(); - final long totalDocsInIndex = totalDocsStats == null ? 0 : totalDocsStats.getCount(); - IndexStats migratedIndexStats = indicesStatsResponse.getIndex( - ReindexDataStreamIndexTransportAction.generateDestIndexName(index) - ); - final long reindexedDocsInIndex; - if (migratedIndexStats == null) { - reindexedDocsInIndex = 0; - } else { - DocsStats reindexedDocsStats = migratedIndexStats.getTotal().getDocs(); - reindexedDocsInIndex = reindexedDocsStats == null ? 0 : reindexedDocsStats.getCount(); - } - inProgressMap.put(index, Tuple.tuple(totalDocsInIndex, reindexedDocsInIndex)); - } - ReindexDataStreamEnrichedStatus enrichedStatus = new ReindexDataStreamEnrichedStatus( - status.persistentTaskStartTime(), - status.totalIndices(), - status.totalIndicesToBeUpgraded(), - status.complete(), - status.exception(), - inProgressMap, - status.pending(), - status.errors() - ); - listener.onResponse(new Response(enrichedStatus)); - } + if (inProgressIndices.isEmpty()) { + // We have no reason to fetch index stats since there are no in progress indices + reportStatus(Map.of(), status, listener); + } else { + fetchInProgressStatsAndReportStatus(inProgressIndices, status, listener); + } + } + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); + /* + * The status is enriched with the information from inProgressMap to create a new ReindexDataStreamEnrichedStatus, which is used in the + * response sent to the listener. + */ + private void reportStatus(Map> inProgressMap, ReindexDataStreamStatus status, ActionListener listener) { + ReindexDataStreamEnrichedStatus enrichedStatus = new ReindexDataStreamEnrichedStatus( + status.persistentTaskStartTime(), + status.totalIndices(), + status.totalIndicesToBeUpgraded(), + status.complete(), + status.exception(), + inProgressMap, + status.pending(), + status.errors() + ); + listener.onResponse(new Response(enrichedStatus)); + } + + /* + * This method feches doc counts for all indices in inProgressIndices (and the indices they are being reindexed into). After + * successfully fetching those, reportStatus is called. + */ + private void fetchInProgressStatsAndReportStatus(Set inProgressIndices, ReindexDataStreamStatus status, ActionListener listener) { + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + String[] indices = inProgressIndices.stream() + .flatMap(index -> Stream.of(index, ReindexDataStreamIndexTransportAction.generateDestIndexName(index))) + .toList() + .toArray(new String[0]); + indicesStatsRequest.indices(indices); + /* + * It is possible that the destination index will not exist yet, so we want to ignore the fact that it is missing + */ + indicesStatsRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, true)); + client.execute(IndicesStatsAction.INSTANCE, indicesStatsRequest, new ActionListener() { + @Override + public void onResponse(IndicesStatsResponse indicesStatsResponse) { + Map> inProgressMap = new HashMap<>(); + for (String index : inProgressIndices) { + DocsStats totalDocsStats = indicesStatsResponse.getIndex(index).getTotal().getDocs(); + final long totalDocsInIndex = totalDocsStats == null ? 0 : totalDocsStats.getCount(); + IndexStats migratedIndexStats = indicesStatsResponse.getIndex( + ReindexDataStreamIndexTransportAction.generateDestIndexName(index) + ); + final long reindexedDocsInIndex; + if (migratedIndexStats == null) { + reindexedDocsInIndex = 0; + } else { + DocsStats reindexedDocsStats = migratedIndexStats.getTotal().getDocs(); + reindexedDocsInIndex = reindexedDocsStats == null ? 0 : reindexedDocsStats.getCount(); + } + inProgressMap.put(index, Tuple.tuple(totalDocsInIndex, reindexedDocsInIndex)); } - }); + reportStatus(inProgressMap, status, listener); + } - } + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } - private void runOnNodeWithTaskIfPossible(Task thisTask, Request request, String nodeId, ActionListener listener) { + /* + * The task and its status exist on some other node, so this method forwards the request to that node. + */ + private void fetchAndReportStatusForTaskOnRemoteNode(Task thisTask, Request request, String nodeId, ActionListener listener) { DiscoveryNode node = clusterService.state().nodes().get(nodeId); if (node == null) { listener.onFailure(