Skip to content

Commit

Permalink
Setting parent tasks on all requests for data stream reindex (elastic…
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 19, 2024
1 parent 2866a7c commit 29e1efe
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentType;
Expand Down Expand Up @@ -96,6 +97,7 @@ protected void doExecute(Task task, CreateIndexFromSourceAction.Request request,
if (mergeMappings.isEmpty() == false) {
createIndexRequest.mapping(mergeMappings);
}
createIndexRequest.setParentTask(new TaskId(clusterService.localNode().getId(), task.getId()));

client.admin().indices().create(createIndexRequest, listener.map(response -> response));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
Expand Down Expand Up @@ -76,6 +77,7 @@ protected void doExecute(
) {
var sourceIndexName = request.getSourceIndex();
var destIndexName = generateDestIndexName(sourceIndexName);
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
IndexMetadata sourceIndex = clusterService.state().getMetadata().index(sourceIndexName);
Settings settingsBefore = sourceIndex.getSettings();

Expand All @@ -89,17 +91,17 @@ protected void doExecute(
);
}

SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l))
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l))
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l))
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l))
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l))
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l))
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId))
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l, taskId))
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l, taskId))
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l, taskId))
.andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName))
.addListener(listener);
}

private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener) {
private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
logger.debug("Setting write block on source index [{}]", sourceIndexName);
addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() {
@Override
Expand All @@ -121,18 +123,24 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}
}
});
}, parentTaskId);
}

private void deleteDestIfExists(String destIndexName, ActionListener<AcknowledgedResponse> listener) {
private void deleteDestIfExists(String destIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
logger.debug("Attempting to delete index [{}]", destIndexName);
var deleteIndexRequest = new DeleteIndexRequest(destIndexName).indicesOptions(IGNORE_MISSING_OPTIONS)
.masterNodeTimeout(TimeValue.MAX_VALUE);
deleteIndexRequest.setParentTask(parentTaskId);
var errorMessage = String.format(Locale.ROOT, "Failed to acknowledge delete of index [%s]", destIndexName);
client.admin().indices().delete(deleteIndexRequest, failIfNotAcknowledged(listener, errorMessage));
}

private void createIndex(IndexMetadata sourceIndex, String destIndexName, ActionListener<AcknowledgedResponse> listener) {
private void createIndex(
IndexMetadata sourceIndex,
String destIndexName,
ActionListener<AcknowledgedResponse> listener,
TaskId parentTaskId
) {
logger.debug("Creating destination index [{}] for source index [{}]", destIndexName, sourceIndex.getIndex().getName());

// override read-only settings if they exist
Expand All @@ -147,29 +155,32 @@ private void createIndex(IndexMetadata sourceIndex, String destIndexName, Action
removeReadOnlyOverride,
Map.of()
);
request.setParentTask(parentTaskId);
var errorMessage = String.format(Locale.ROOT, "Could not create index [%s]", request.getDestIndex());
client.execute(CreateIndexFromSourceAction.INSTANCE, request, failIfNotAcknowledged(listener, errorMessage));
}

private void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkByScrollResponse> listener) {
private void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkByScrollResponse> listener, TaskId parentTaskId) {
logger.debug("Reindex to destination index [{}] from source index [{}]", destIndexName, sourceIndexName);
var reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndexName);
reindexRequest.getSearchRequest().allowPartialSearchResults(false);
reindexRequest.getSearchRequest().source().fetchSource(true);
reindexRequest.setDestIndex(destIndexName);
reindexRequest.setParentTask(parentTaskId);
client.execute(ReindexAction.INSTANCE, reindexRequest, listener);
}

private void addBlockIfFromSource(
IndexMetadata.APIBlock block,
Settings settingsBefore,
String destIndexName,
ActionListener<AddIndexBlockResponse> listener
ActionListener<AddIndexBlockResponse> listener,
TaskId parentTaskId
) {
if (settingsBefore.getAsBoolean(block.settingName(), false)) {
var errorMessage = String.format(Locale.ROOT, "Add [%s] block to index [%s] was not acknowledged", block.name(), destIndexName);
addBlockToIndex(block, destIndexName, failIfNotAcknowledged(listener, errorMessage));
addBlockToIndex(block, destIndexName, failIfNotAcknowledged(listener, errorMessage), parentTaskId);
} else {
listener.onResponse(null);
}
Expand All @@ -192,7 +203,14 @@ private static <U extends AcknowledgedResponse> ActionListener<U> failIfNotAckno
});
}

private void addBlockToIndex(IndexMetadata.APIBlock block, String index, ActionListener<AddIndexBlockResponse> listener) {
client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, new AddIndexBlockRequest(block, index), listener);
private void addBlockToIndex(
IndexMetadata.APIBlock block,
String index,
ActionListener<AddIndexBlockResponse> listener,
TaskId parentTaskId
) {
AddIndexBlockRequest addIndexBlockRequest = new AddIndexBlockRequest(block, index);
addIndexBlockRequest.setParentTask(parentTaskId);
client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ protected ReindexDataStreamTask createTask(
@Override
protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTaskParams params, PersistentTaskState state) {
String sourceDataStream = params.getSourceDataStream();
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream });
request.setParentTask(taskId);
assert task instanceof ReindexDataStreamTask;
final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task;
ExecuteWithHeadersClient reindexClient = new ExecuteWithHeadersClient(client, params.headers());
Expand All @@ -84,16 +86,18 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
if (dataStreamInfos.size() == 1) {
DataStream dataStream = dataStreamInfos.get(0).getDataStream();
if (getReindexRequiredPredicate(clusterService.state().metadata()).test(dataStream.getWriteIndex())) {
RolloverRequest rolloverRequest = new RolloverRequest(sourceDataStream, null);
rolloverRequest.setParentTask(taskId);
reindexClient.execute(
RolloverAction.INSTANCE,
new RolloverRequest(sourceDataStream, null),
rolloverRequest,
ActionListener.wrap(
rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream),
rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId),
e -> completeFailedPersistentTask(reindexDataStreamTask, e)
)
);
} else {
reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream);
reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId);
}
} else {
completeFailedPersistentTask(reindexDataStreamTask, new ElasticsearchException("data stream does not exist"));
Expand All @@ -105,7 +109,8 @@ private void reindexIndices(
DataStream dataStream,
ReindexDataStreamTask reindexDataStreamTask,
ExecuteWithHeadersClient reindexClient,
String sourceDataStream
String sourceDataStream,
TaskId parentTaskId
) {
List<Index> indices = dataStream.getIndices();
List<Index> indicesToBeReindexed = indices.stream().filter(getReindexRequiredPredicate(clusterService.state().metadata())).toList();
Expand All @@ -117,7 +122,7 @@ private void reindexIndices(
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
final int maxConcurrentIndices = 1;
for (int i = 0; i < maxConcurrentIndices; i++) {
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener);
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
}
// This takes care of the additional latch count referenced above:
listener.onResponse(null);
Expand All @@ -128,7 +133,8 @@ private void maybeProcessNextIndex(
ReindexDataStreamTask reindexDataStreamTask,
ExecuteWithHeadersClient reindexClient,
String sourceDataStream,
CountDownActionListener listener
CountDownActionListener listener,
TaskId parentTaskId
) {
if (indicesRemaining.isEmpty()) {
return;
Expand All @@ -140,51 +146,48 @@ private void maybeProcessNextIndex(
return;
}
reindexDataStreamTask.incrementInProgressIndicesCount(index.getName());
reindexClient.execute(
ReindexDataStreamIndexAction.INSTANCE,
new ReindexDataStreamIndexAction.Request(index.getName()),
ActionListener.wrap(response1 -> {
updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> {
reindexDataStreamTask.reindexSucceeded(index.getName());
listener.onResponse(null);
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener);
}, exception -> {
reindexDataStreamTask.reindexFailed(index.getName(), exception);
listener.onResponse(null);
}), reindexClient);
ReindexDataStreamIndexAction.Request reindexDataStreamIndexRequest = new ReindexDataStreamIndexAction.Request(index.getName());
reindexDataStreamIndexRequest.setParentTask(parentTaskId);
reindexClient.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, ActionListener.wrap(response1 -> {
updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> {
reindexDataStreamTask.reindexSucceeded(index.getName());
listener.onResponse(null);
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
}, exception -> {
reindexDataStreamTask.reindexFailed(index.getName(), exception);
listener.onResponse(null);
})
);
}), reindexClient, parentTaskId);
}, exception -> {
reindexDataStreamTask.reindexFailed(index.getName(), exception);
listener.onResponse(null);
}));
}

private void updateDataStream(
String dataStream,
String oldIndex,
String newIndex,
ActionListener<Void> listener,
ExecuteWithHeadersClient reindexClient
ExecuteWithHeadersClient reindexClient,
TaskId parentTaskId
) {
reindexClient.execute(
ModifyDataStreamsAction.INSTANCE,
new ModifyDataStreamsAction.Request(
TimeValue.MAX_VALUE,
TimeValue.MAX_VALUE,
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {
listener.onResponse(null);
}
ModifyDataStreamsAction.Request modifyDataStreamRequest = new ModifyDataStreamsAction.Request(
TimeValue.MAX_VALUE,
TimeValue.MAX_VALUE,
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
);
modifyDataStreamRequest.setParentTask(parentTaskId);
reindexClient.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {
listener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
);
});
}

private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {
Expand Down

0 comments on commit 29e1efe

Please sign in to comment.