From 29e1efe1c095b07e68cf3af657a20a8c0ce91c6d Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 19 Dec 2024 14:00:14 -0600 Subject: [PATCH] Setting parent tasks on all requests for data stream reindex (#119034) --- .../CreateIndexFromSourceTransportAction.java | 2 + ...ReindexDataStreamIndexTransportAction.java | 48 +++++++---- ...indexDataStreamPersistentTaskExecutor.java | 79 ++++++++++--------- 3 files changed, 76 insertions(+), 53 deletions(-) diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java index 968b2220628a9..7739a9346d47c 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentType; @@ -96,6 +97,7 @@ protected void doExecute(Task task, CreateIndexFromSourceAction.Request request, if (mergeMappings.isEmpty() == false) { createIndexRequest.mapping(mergeMappings); } + createIndexRequest.setParentTask(new TaskId(clusterService.localNode().getId(), task.getId())); client.admin().indices().create(createIndexRequest, listener.map(response -> response)); } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 38b5da6527039..8cba57240ad66 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate; @@ -76,6 +77,7 @@ protected void doExecute( ) { var sourceIndexName = request.getSourceIndex(); var destIndexName = generateDestIndexName(sourceIndexName); + TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); IndexMetadata sourceIndex = clusterService.state().getMetadata().index(sourceIndexName); Settings settingsBefore = sourceIndex.getSettings(); @@ -89,17 +91,17 @@ protected void doExecute( ); } - SubscribableListener.newForked(l -> setBlockWrites(sourceIndexName, l)) - .andThen(l -> deleteDestIfExists(destIndexName, l)) - .andThen(l -> createIndex(sourceIndex, destIndexName, l)) - .andThen(l -> reindex(sourceIndexName, destIndexName, l)) - .andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l)) - .andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l)) + SubscribableListener.newForked(l -> setBlockWrites(sourceIndexName, l, taskId)) + .andThen(l -> deleteDestIfExists(destIndexName, l, taskId)) + .andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId)) + .andThen(l -> reindex(sourceIndexName, destIndexName, l, taskId)) + .andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l, taskId)) + .andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l, taskId)) .andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName)) .addListener(listener); } - private void setBlockWrites(String sourceIndexName, ActionListener listener) { + private void setBlockWrites(String sourceIndexName, ActionListener listener, TaskId parentTaskId) { logger.debug("Setting write block on source index [{}]", sourceIndexName); addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() { @Override @@ -121,18 +123,24 @@ public void onFailure(Exception e) { listener.onFailure(e); } } - }); + }, parentTaskId); } - private void deleteDestIfExists(String destIndexName, ActionListener listener) { + private void deleteDestIfExists(String destIndexName, ActionListener listener, TaskId parentTaskId) { logger.debug("Attempting to delete index [{}]", destIndexName); var deleteIndexRequest = new DeleteIndexRequest(destIndexName).indicesOptions(IGNORE_MISSING_OPTIONS) .masterNodeTimeout(TimeValue.MAX_VALUE); + deleteIndexRequest.setParentTask(parentTaskId); var errorMessage = String.format(Locale.ROOT, "Failed to acknowledge delete of index [%s]", destIndexName); client.admin().indices().delete(deleteIndexRequest, failIfNotAcknowledged(listener, errorMessage)); } - private void createIndex(IndexMetadata sourceIndex, String destIndexName, ActionListener listener) { + private void createIndex( + IndexMetadata sourceIndex, + String destIndexName, + ActionListener listener, + TaskId parentTaskId + ) { logger.debug("Creating destination index [{}] for source index [{}]", destIndexName, sourceIndex.getIndex().getName()); // override read-only settings if they exist @@ -147,17 +155,19 @@ private void createIndex(IndexMetadata sourceIndex, String destIndexName, Action removeReadOnlyOverride, Map.of() ); + request.setParentTask(parentTaskId); var errorMessage = String.format(Locale.ROOT, "Could not create index [%s]", request.getDestIndex()); client.execute(CreateIndexFromSourceAction.INSTANCE, request, failIfNotAcknowledged(listener, errorMessage)); } - private void reindex(String sourceIndexName, String destIndexName, ActionListener listener) { + private void reindex(String sourceIndexName, String destIndexName, ActionListener listener, TaskId parentTaskId) { logger.debug("Reindex to destination index [{}] from source index [{}]", destIndexName, sourceIndexName); var reindexRequest = new ReindexRequest(); reindexRequest.setSourceIndices(sourceIndexName); reindexRequest.getSearchRequest().allowPartialSearchResults(false); reindexRequest.getSearchRequest().source().fetchSource(true); reindexRequest.setDestIndex(destIndexName); + reindexRequest.setParentTask(parentTaskId); client.execute(ReindexAction.INSTANCE, reindexRequest, listener); } @@ -165,11 +175,12 @@ private void addBlockIfFromSource( IndexMetadata.APIBlock block, Settings settingsBefore, String destIndexName, - ActionListener listener + ActionListener listener, + TaskId parentTaskId ) { if (settingsBefore.getAsBoolean(block.settingName(), false)) { var errorMessage = String.format(Locale.ROOT, "Add [%s] block to index [%s] was not acknowledged", block.name(), destIndexName); - addBlockToIndex(block, destIndexName, failIfNotAcknowledged(listener, errorMessage)); + addBlockToIndex(block, destIndexName, failIfNotAcknowledged(listener, errorMessage), parentTaskId); } else { listener.onResponse(null); } @@ -192,7 +203,14 @@ private static ActionListener failIfNotAckno }); } - private void addBlockToIndex(IndexMetadata.APIBlock block, String index, ActionListener listener) { - client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, new AddIndexBlockRequest(block, index), listener); + private void addBlockToIndex( + IndexMetadata.APIBlock block, + String index, + ActionListener listener, + TaskId parentTaskId + ) { + AddIndexBlockRequest addIndexBlockRequest = new AddIndexBlockRequest(block, index); + addIndexBlockRequest.setParentTask(parentTaskId); + client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest, listener); } } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index 2a7ee338d1fde..85995d5f9575f 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -75,7 +75,9 @@ protected ReindexDataStreamTask createTask( @Override protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTaskParams params, PersistentTaskState state) { String sourceDataStream = params.getSourceDataStream(); + TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream }); + request.setParentTask(taskId); assert task instanceof ReindexDataStreamTask; final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task; ExecuteWithHeadersClient reindexClient = new ExecuteWithHeadersClient(client, params.headers()); @@ -84,16 +86,18 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask if (dataStreamInfos.size() == 1) { DataStream dataStream = dataStreamInfos.get(0).getDataStream(); if (getReindexRequiredPredicate(clusterService.state().metadata()).test(dataStream.getWriteIndex())) { + RolloverRequest rolloverRequest = new RolloverRequest(sourceDataStream, null); + rolloverRequest.setParentTask(taskId); reindexClient.execute( RolloverAction.INSTANCE, - new RolloverRequest(sourceDataStream, null), + rolloverRequest, ActionListener.wrap( - rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream), + rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId), e -> completeFailedPersistentTask(reindexDataStreamTask, e) ) ); } else { - reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream); + reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId); } } else { completeFailedPersistentTask(reindexDataStreamTask, new ElasticsearchException("data stream does not exist")); @@ -105,7 +109,8 @@ private void reindexIndices( DataStream dataStream, ReindexDataStreamTask reindexDataStreamTask, ExecuteWithHeadersClient reindexClient, - String sourceDataStream + String sourceDataStream, + TaskId parentTaskId ) { List indices = dataStream.getIndices(); List indicesToBeReindexed = indices.stream().filter(getReindexRequiredPredicate(clusterService.state().metadata())).toList(); @@ -117,7 +122,7 @@ private void reindexIndices( List indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed)); final int maxConcurrentIndices = 1; for (int i = 0; i < maxConcurrentIndices; i++) { - maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener); + maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId); } // This takes care of the additional latch count referenced above: listener.onResponse(null); @@ -128,7 +133,8 @@ private void maybeProcessNextIndex( ReindexDataStreamTask reindexDataStreamTask, ExecuteWithHeadersClient reindexClient, String sourceDataStream, - CountDownActionListener listener + CountDownActionListener listener, + TaskId parentTaskId ) { if (indicesRemaining.isEmpty()) { return; @@ -140,23 +146,21 @@ private void maybeProcessNextIndex( return; } reindexDataStreamTask.incrementInProgressIndicesCount(index.getName()); - reindexClient.execute( - ReindexDataStreamIndexAction.INSTANCE, - new ReindexDataStreamIndexAction.Request(index.getName()), - ActionListener.wrap(response1 -> { - updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> { - reindexDataStreamTask.reindexSucceeded(index.getName()); - listener.onResponse(null); - maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener); - }, exception -> { - reindexDataStreamTask.reindexFailed(index.getName(), exception); - listener.onResponse(null); - }), reindexClient); + ReindexDataStreamIndexAction.Request reindexDataStreamIndexRequest = new ReindexDataStreamIndexAction.Request(index.getName()); + reindexDataStreamIndexRequest.setParentTask(parentTaskId); + reindexClient.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, ActionListener.wrap(response1 -> { + updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> { + reindexDataStreamTask.reindexSucceeded(index.getName()); + listener.onResponse(null); + maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId); }, exception -> { reindexDataStreamTask.reindexFailed(index.getName(), exception); listener.onResponse(null); - }) - ); + }), reindexClient, parentTaskId); + }, exception -> { + reindexDataStreamTask.reindexFailed(index.getName(), exception); + listener.onResponse(null); + })); } private void updateDataStream( @@ -164,27 +168,26 @@ private void updateDataStream( String oldIndex, String newIndex, ActionListener listener, - ExecuteWithHeadersClient reindexClient + ExecuteWithHeadersClient reindexClient, + TaskId parentTaskId ) { - reindexClient.execute( - ModifyDataStreamsAction.INSTANCE, - new ModifyDataStreamsAction.Request( - TimeValue.MAX_VALUE, - TimeValue.MAX_VALUE, - List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex)) - ), - new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse response) { - listener.onResponse(null); - } + ModifyDataStreamsAction.Request modifyDataStreamRequest = new ModifyDataStreamsAction.Request( + TimeValue.MAX_VALUE, + TimeValue.MAX_VALUE, + List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex)) + ); + modifyDataStreamRequest.setParentTask(parentTaskId); + reindexClient.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) { + listener.onResponse(null); + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } + @Override + public void onFailure(Exception e) { + listener.onFailure(e); } - ); + }); } private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {