Skip to content

Commit

Permalink
removing accidental changes
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 16, 2024
1 parent a669eae commit 1530356
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public ReindexDataStreamAction() {
* in order to be writable in the _next_ lucene version.
*/
public static Predicate<Index> getOldIndexVersionPredicate(Metadata metadata) {
return index -> true;// metadata.index(index).getCreationVersion().onOrBefore(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE);
return index -> metadata.index(index).getCreationVersion().onOrBefore(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE);
}

public enum Mode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,8 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.support.CountDownActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
Expand All @@ -27,13 +20,9 @@
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexAction;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;

import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate;

Expand Down Expand Up @@ -83,18 +72,15 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
reindexClient.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = response.getDataStreams();
if (dataStreamInfos.size() == 1) {
DataStream dataStream = dataStreamInfos.getFirst().getDataStream();
if (getOldIndexVersionPredicate(clusterService.state().metadata()).test(dataStream.getWriteIndex())) {
reindexClient.execute(
RolloverAction.INSTANCE,
new RolloverRequest(sourceDataStream, null),
ActionListener.wrap(
rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream),
e -> completeFailedPersistentTask(reindexDataStreamTask, e)
)
);
} else {
reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream);
List<Index> indices = dataStreamInfos.getFirst().getDataStream().getIndices();
List<Index> indicesToBeReindexed = indices.stream()
.filter(getOldIndexVersionPredicate(clusterService.state().metadata()))
.toList();
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
for (Index index : indicesToBeReindexed) {
reindexDataStreamTask.incrementInProgressIndicesCount(index.getName());
// TODO This is just a placeholder. This is where the real data stream reindex logic will go
reindexDataStreamTask.reindexSucceeded(index.getName());
}

completeSuccessfulPersistentTask(reindexDataStreamTask);
Expand All @@ -104,93 +90,6 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
}, reindexDataStreamTask::markAsFailed));
}

private void reindexIndices(
DataStream dataStream,
ReindexDataStreamTask reindexDataStreamTask,
ExecuteWithHeadersClient reindexClient,
String sourceDataStream
) {
List<Index> indices = dataStream.getIndices();
List<Index> indicesToBeReindexed = indices.stream()
.filter(getOldIndexVersionPredicate(clusterService.state().metadata()))
.filter(index -> index.getName().equals(dataStream.getWriteIndex().getName()) == false)
.toList();
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
CountDownActionListener listener = new CountDownActionListener(indicesToBeReindexed.size() + 1, ActionListener.wrap(response1 -> {
completeSuccessfulPersistentTask(reindexDataStreamTask);
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, exception); }));
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
final int maxConcurrentIndices = 5;
for (int i = 0; i < maxConcurrentIndices; i++) {
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener);
}
listener.onResponse(null);
}

private void maybeProcessNextIndex(
List<Index> indicesRemaining,
ReindexDataStreamTask reindexDataStreamTask,
ExecuteWithHeadersClient reindexClient,
String sourceDataStream,
CountDownActionListener listener
) {
if (indicesRemaining.isEmpty()) {
return;
}
Index index;
try {
index = indicesRemaining.removeFirst();
} catch (NoSuchElementException e) {
return;
}
reindexDataStreamTask.incrementInProgressIndicesCount(index.getName());
reindexClient.execute(
ReindexDataStreamIndexAction.INSTANCE,
new ReindexDataStreamIndexAction.Request(index.getName()),
ActionListener.wrap(response1 -> {
updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> {
reindexDataStreamTask.reindexSucceeded(index.getName());
listener.onResponse(null);
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener);
}, exception -> {
reindexDataStreamTask.reindexFailed(index.getName(), exception);
listener.onResponse(null);
}), reindexClient);
}, exception -> {
reindexDataStreamTask.reindexFailed(index.getName(), exception);
listener.onResponse(null);
})
);
}

private void updateDataStream(
String dataStream,
String oldIndex,
String newIndex,
ActionListener<Void> listener,
ExecuteWithHeadersClient reindexClient
) {
reindexClient.execute(
ModifyDataStreamsAction.INSTANCE,
new ModifyDataStreamsAction.Request(
TimeValue.MAX_VALUE,
TimeValue.MAX_VALUE,
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {
listener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
);
}

private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {
persistentTask.allReindexesCompleted(threadPool, getTimeToLive(persistentTask));
}
Expand Down

0 comments on commit 1530356

Please sign in to comment.