From be5f7598324ab95f2c26d0d5d6464259d8869e25 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 17 Oct 2023 17:52:02 -0500 Subject: [PATCH] Adding a simulate ingest API --- .../elasticsearch/action/ActionModule.java | 5 + .../action/bulk/SimulateBulkAction.java | 27 ++++ .../action/bulk/SimulateBulkRequest.java | 62 ++++++++ .../action/bulk/TransportBulkAction.java | 51 +++++- .../bulk/TransportSimulateBulkAction.java | 96 ++++++++++++ .../action/index/IndexResponse.java | 2 +- .../action/ingest/SimulateIndexResponse.java | 82 ++++++++++ .../elasticsearch/ingest/IngestService.java | 24 ++- .../ingest/SimulateIngestService.java | 44 ++++++ .../ingest/RestSimulateIngestAction.java | 142 +++++++++++++++++ .../action/bulk/SimulateBulkRequestTests.java | 105 +++++++++++++ .../ingest/SimulateIndexResponseTests.java | 72 +++++++++ .../ingest/SimulateIngestServiceTests.java | 145 ++++++++++++++++++ .../xpack/security/operator/Constants.java | 2 + 14 files changed, 853 insertions(+), 6 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java create mode 100644 server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java create mode 100644 server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/ingest/SimulateIndexResponseTests.java create mode 100644 server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 2d43a2b26b960..0c87934eba0f2 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -208,8 +208,10 @@ import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction; import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.SimulateBulkAction; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkAction; +import org.elasticsearch.action.bulk.TransportSimulateBulkAction; import org.elasticsearch.action.delete.DeleteAction; import org.elasticsearch.action.delete.TransportDeleteAction; import org.elasticsearch.action.explain.ExplainAction; @@ -444,6 +446,7 @@ import org.elasticsearch.rest.action.ingest.RestDeletePipelineAction; import org.elasticsearch.rest.action.ingest.RestGetPipelineAction; import org.elasticsearch.rest.action.ingest.RestPutPipelineAction; +import org.elasticsearch.rest.action.ingest.RestSimulateIngestAction; import org.elasticsearch.rest.action.ingest.RestSimulatePipelineAction; import org.elasticsearch.rest.action.search.RestClearScrollAction; import org.elasticsearch.rest.action.search.RestCountAction; @@ -763,6 +766,7 @@ public void reg actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class); actions.register(TransportShardMultiGetAction.TYPE, TransportShardMultiGetAction.class); actions.register(BulkAction.INSTANCE, TransportBulkAction.class); + actions.register(SimulateBulkAction.INSTANCE, TransportSimulateBulkAction.class); actions.register(TransportShardBulkAction.TYPE, TransportShardBulkAction.class); actions.register(SearchAction.INSTANCE, TransportSearchAction.class); actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class); @@ -949,6 +953,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestGetComposableIndexTemplateAction()); registerHandler.accept(new RestDeleteComposableIndexTemplateAction()); registerHandler.accept(new RestSimulateIndexTemplateAction()); + registerHandler.accept(new RestSimulateIngestAction()); registerHandler.accept(new RestSimulateTemplateAction()); registerHandler.accept(new RestPutMappingAction()); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkAction.java new file mode 100644 index 0000000000000..15d26335cda15 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkAction.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.transport.TransportRequestOptions; + +public class SimulateBulkAction extends ActionType { + + public static final SimulateBulkAction INSTANCE = new SimulateBulkAction(); + public static final String NAME = "indices:admin/simulate/bulk"; + + private static final TransportRequestOptions TRANSPORT_REQUEST_OPTIONS = TransportRequestOptions.of( + null, + TransportRequestOptions.Type.BULK + ); + + private SimulateBulkAction() { + super(NAME, BulkResponse::new); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java new file mode 100644 index 0000000000000..9dec83423aec2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.Pipeline; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * This extends BulkRequest with support for providing substitute pipeline definitions. + */ +public class SimulateBulkRequest extends BulkRequest { + // Non-private for unit testing + Map pipelineSubstitutions = Map.of(); + + public SimulateBulkRequest() { + super(); + } + + public SimulateBulkRequest(StreamInput in) throws IOException { + super(in); + this.pipelineSubstitutions = in.readMap(); + } + + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeGenericMap(pipelineSubstitutions); + } + + public void setPipelineSubstitutions(Map pipelineSubstitutions) { + this.pipelineSubstitutions = pipelineSubstitutions; + } + + @SuppressWarnings("unchecked") + public Map getPipelineSubstitutions(IngestService ingestService) throws Exception { + Map parsedPipelineSubstitutions = new HashMap<>(); + if (pipelineSubstitutions != null) { + for (Map.Entry entry : pipelineSubstitutions.entrySet()) { + String pipelineId = entry.getKey(); + Pipeline pipeline = Pipeline.create( + pipelineId, + (Map) entry.getValue(), + ingestService.getProcessorFactories(), + ingestService.getScriptService() + ); + parsedPipelineSubstitutions.put(pipelineId, pipeline); + } + } + return parsedPipelineSubstitutions; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 13d10be86bd68..c849b0a8181ae 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -142,7 +142,35 @@ public TransportBulkAction( SystemIndices systemIndices, LongSupplier relativeTimeProvider ) { - super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); + this( + BulkAction.NAME, + threadPool, + transportService, + clusterService, + ingestService, + client, + actionFilters, + indexNameExpressionResolver, + indexingPressure, + systemIndices, + relativeTimeProvider + ); + } + + TransportBulkAction( + String actionName, + ThreadPool threadPool, + TransportService transportService, + ClusterService clusterService, + IngestService ingestService, + NodeClient client, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + IndexingPressure indexingPressure, + SystemIndices systemIndices, + LongSupplier relativeTimeProvider + ) { + super(actionName, transportService, actionFilters, BulkRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); Objects.requireNonNull(relativeTimeProvider); this.threadPool = threadPool; this.clusterService = clusterService; @@ -336,6 +364,19 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec } // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back. + indexData(task, bulkRequest, executorName, listener, responses, autoCreateIndices, indicesThatCannotBeCreated, startTime); + } + + protected void indexData( + Task task, + BulkRequest bulkRequest, + String executorName, + ActionListener listener, + AtomicArray responses, + Set autoCreateIndices, + Map indicesThatCannotBeCreated, + long startTime + ) { if (autoCreateIndices.isEmpty()) { executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated); } else { @@ -386,6 +427,10 @@ protected void doRun() { } } + protected IngestService getIngestService(BulkRequest request) { + return ingestService; + } + static void prohibitAppendWritesInBackingIndices(DocWriteRequest writeRequest, Metadata metadata) { DocWriteRequest.OpType opType = writeRequest.opType(); if ((opType == OpType.CREATE || opType == OpType.INDEX) == false) { @@ -491,7 +536,7 @@ private static boolean setResponseFailureIfIndexMatches( return false; } - private long buildTookInMillis(long startTimeNanos) { + protected long buildTookInMillis(long startTimeNanos) { return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos); } @@ -809,7 +854,7 @@ private void processBulkIndexIngestRequest( ) { final long ingestStartTimeInNanos = System.nanoTime(); final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); - ingestService.executeBulkRequest( + getIngestService(original).executeBulkRequest( original.numberOfActions(), () -> bulkRequestModifier, bulkRequestModifier::markItemAsDropped, diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java new file mode 100644 index 0000000000000..c4352e23abe0c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.SimulateIndexResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexingPressure; +import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.SimulateIngestService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.Map; +import java.util.Set; + +public class TransportSimulateBulkAction extends TransportBulkAction { + @Inject + public TransportSimulateBulkAction( + ThreadPool threadPool, + TransportService transportService, + ClusterService clusterService, + IngestService ingestService, + NodeClient client, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + IndexingPressure indexingPressure, + SystemIndices systemIndices + ) { + super( + SimulateBulkAction.NAME, + threadPool, + transportService, + clusterService, + ingestService, + client, + actionFilters, + indexNameExpressionResolver, + indexingPressure, + systemIndices, + System::nanoTime + ); + } + + @Override + protected void indexData( + Task task, + BulkRequest bulkRequest, + String executorName, + ActionListener listener, + AtomicArray responses, + Set autoCreateIndices, + Map indicesThatCannotBeCreated, + long startTime + ) { + for (int i = 0; i < bulkRequest.requests.size(); i++) { + DocWriteRequest request = bulkRequest.requests.get(i); + responses.set( + i, + BulkItemResponse.success( + 0, + DocWriteRequest.OpType.CREATE, + new SimulateIndexResponse( + request.index(), + ((IndexRequest) request).source(), + ((IndexRequest) request).getContentType(), + ((IndexRequest) request).getExecutedPipelines() + ) + ) + ); + } + listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTime))); + } + + @Override + protected IngestService getIngestService(BulkRequest request) { + IngestService rawIngestService = super.getIngestService(request); + return new SimulateIngestService(rawIngestService, request); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java b/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java index 9dccdfc64620e..a9c0c8ef42380 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java @@ -37,7 +37,7 @@ public class IndexResponse extends DocWriteResponse { * information about the pipelines executed. An empty list means that there were no pipelines executed. */ @Nullable - private final List executedPipelines; + protected final List executedPipelines; public IndexResponse(ShardId shardId, StreamInput in) throws IOException { super(shardId, in); diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java new file mode 100644 index 0000000000000..118027b117b2d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.ingest; + +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.List; + +/** + * This is an IndexResponse that is specifically for simulate requests. Unlike typical IndexResponses, we need to include the original + * source in a SimulateIndexResponse, and don't need most other fields. This has to extend IndexResponse though so that it can be used by + * BulkItemResponse in IngestService. + */ +public class SimulateIndexResponse extends IndexResponse { + private final BytesReference source; + private final XContentType sourceXContentType; + + public SimulateIndexResponse(StreamInput in) throws IOException { + super(in); + this.source = in.readBytesReference(); + this.sourceXContentType = XContentType.valueOf(in.readString()); + setShardInfo(new ReplicationResponse.ShardInfo(0, 0)); + } + + public SimulateIndexResponse(String index, BytesReference source, XContentType sourceXContentType, List pipelines) { + // We don't actually care about most of the IndexResponse fields: + super(new ShardId(index, "", 0), "", 0, 0, 0, true, pipelines); + this.source = source; + this.sourceXContentType = sourceXContentType; + setShardInfo(new ReplicationResponse.ShardInfo(0, 0)); + } + + @Override + public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("_index", getShardId().getIndexName()); + builder.field("_source", XContentHelper.convertToMap(source, false, sourceXContentType).v2()); + builder.array("executed_pipelines", executedPipelines.toArray()); + return builder; + } + + @Override + public RestStatus status() { + return RestStatus.CREATED; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBytesReference(source); + out.writeString(sourceXContentType.name()); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("SimulateIndexResponse["); + builder.append("index=").append(getIndex()); + try { + builder.append(",source=").append(XContentHelper.convertToJson(source, false, sourceXContentType)); + } catch (IOException e) { + throw new RuntimeException(e); + } + builder.append(",pipelines=[").append(String.join(", ", executedPipelines)); + return builder.append("]]").toString(); + } +} diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 3adaab078ad4a..84b9e09fd1005 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -113,8 +113,8 @@ public class IngestService implements ClusterStateApplier, ReportingService pipelines = Map.of(); private final ThreadPool threadPool; - private final IngestMetric totalMetrics = new IngestMetric(); - private final List> ingestClusterStateListeners = new CopyOnWriteArrayList<>(); + private final IngestMetric totalMetrics; + private final List> ingestClusterStateListeners; private volatile ClusterState state; private static BiFunction createScheduler(ThreadPool threadPool) { @@ -184,6 +184,8 @@ public IngestService( MatcherWatchdog matcherWatchdog, Supplier documentParsingObserverSupplier ) { + this.totalMetrics = new IngestMetric(); + this.ingestClusterStateListeners = new CopyOnWriteArrayList<>(); this.clusterService = clusterService; this.scriptService = scriptService; this.documentParsingObserverSupplier = documentParsingObserverSupplier; @@ -206,6 +208,24 @@ public IngestService( this.taskQueue = clusterService.createTaskQueue("ingest-pipelines", Priority.NORMAL, PIPELINE_TASK_EXECUTOR); } + /** + * This copy constructor returns a copy of the given ingestService, using all of the same internal state. The returned copy is not + * registered to listen to any cluster state changes + * @param ingestService + */ + IngestService(IngestService ingestService) { + this.totalMetrics = new IngestMetric(); + this.ingestClusterStateListeners = new CopyOnWriteArrayList<>(); + this.clusterService = ingestService.clusterService; + this.scriptService = ingestService.scriptService; + this.documentParsingObserverSupplier = ingestService.documentParsingObserverSupplier; + this.processorFactories = ingestService.processorFactories; + this.threadPool = ingestService.threadPool; + this.taskQueue = ingestService.taskQueue; + this.pipelines = ingestService.pipelines; + this.state = ingestService.state; + } + private static Map processorFactories(List ingestPlugins, Processor.Parameters parameters) { Map processorFactories = new TreeMap<>(); for (IngestPlugin ingestPlugin : ingestPlugins) { diff --git a/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java b/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java new file mode 100644 index 0000000000000..343109fbf67ec --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.SimulateBulkRequest; + +import java.util.Map; + +/** + * This is an implementation of IngestService that allows us to substitute pipeline definitions so that users can simulate ingest using + * pipelines that they define on the fly. + */ +public class SimulateIngestService extends IngestService { + private final Map pipelineSubstitutions; + + public SimulateIngestService(IngestService ingestService, BulkRequest request) { + super(ingestService); + if (request instanceof SimulateBulkRequest simulateBulkRequest) { + try { + pipelineSubstitutions = simulateBulkRequest.getPipelineSubstitutions(ingestService); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + pipelineSubstitutions = Map.of(); + } + } + + @Override + public Pipeline getPipeline(String id) { + Pipeline pipeline = pipelineSubstitutions.get(id); + if (pipeline == null) { + pipeline = super.getPipeline(id); + } + return pipeline; + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java new file mode 100644 index 0000000000000..bf43643dcbb33 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java @@ -0,0 +1,142 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.rest.action.ingest; + +import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.bulk.SimulateBulkAction; +import org.elasticsearch.action.bulk.SimulateBulkRequest; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.RestApiVersion; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.elasticsearch.rest.RestRequest.Method.POST; + +@ServerlessScope(Scope.PUBLIC) +public class RestSimulateIngestAction extends BaseRestHandler { + + @Override + public List routes() { + return List.of(new Route(GET, "/_ingest/_simulate"), new Route(POST, "/_ingest/_simulate")); + } + + @Override + public String getName() { + return "ingest_simulate_ingest_action"; + } + + @Override + @SuppressWarnings("unchecked") + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { + request.param("type"); + } + SimulateBulkRequest bulkRequest = new SimulateBulkRequest(); + String defaultIndex = request.param("index"); + String defaultRouting = null; + FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request); + String defaultPipeline = request.param("pipeline"); + Boolean defaultRequireAlias = null; + Tuple sourceTuple = request.contentOrSourceParam(); + Map sourceMap = XContentHelper.convertToMap(sourceTuple.v2(), false, sourceTuple.v1()).v2(); + bulkRequest.setPipelineSubstitutions((Map) sourceMap.remove("pipeline_substitutions")); + bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); + BytesReference transformedData = convertToBulkRequestXContentBytes(sourceMap); + bulkRequest.add( + transformedData, + defaultIndex, + defaultRouting, + defaultFetchSourceContext, + defaultPipeline, + defaultRequireAlias, + true, + true, + request.getXContentType(), + request.getRestApiVersion() + ); + return channel -> client.execute(SimulateBulkAction.INSTANCE, bulkRequest, new RestToXContentListener<>(channel)); + } + + private BytesReference convertToBulkRequestXContentBytes(Map sourceMap) throws IOException { + List> docs = ConfigurationUtils.readList(null, null, sourceMap, "docs"); + if (docs.isEmpty()) { + throw new IllegalArgumentException("must specify at least one document in [docs]"); + } + ByteBuffer[] buffers = new ByteBuffer[2 * docs.size()]; + int bufferCount = 0; + for (Object object : docs) { + if ((object instanceof Map) == false) { + throw new IllegalArgumentException("malformed [docs] section, should include an inner object"); + } + @SuppressWarnings("unchecked") + Map dataMap = (Map) object; + Map document = ConfigurationUtils.readMap(null, null, dataMap, "_source"); + String index = ConfigurationUtils.readStringOrIntProperty( + null, + null, + dataMap, + IngestDocument.Metadata.INDEX.getFieldName(), + "_index" + ); + String id = ConfigurationUtils.readStringOrIntProperty(null, null, dataMap, IngestDocument.Metadata.ID.getFieldName(), "_id"); + String routing = ConfigurationUtils.readOptionalStringOrIntProperty( + null, + null, + dataMap, + IngestDocument.Metadata.ROUTING.getFieldName() + ); + XContentBuilder actionXContentBuilder = XContentFactory.contentBuilder(XContentType.JSON).lfAtEnd(); + actionXContentBuilder.startObject().field("index").startObject(); + if ("_index".equals(index) == false) { + actionXContentBuilder.field("_index", index); + } + if ("id".equals(id) == false) { + actionXContentBuilder.field("_id", id); + } + actionXContentBuilder.endObject().endObject(); + buffers[bufferCount++] = ByteBuffer.wrap(BytesReference.bytes(actionXContentBuilder).toBytesRef().bytes); + XContentBuilder dataXContentBuilder = XContentFactory.contentBuilder(XContentType.JSON).lfAtEnd(); + dataXContentBuilder.startObject(); + for (String key : document.keySet()) { + dataXContentBuilder.field(key, document.get(key)); + } + dataXContentBuilder.endObject(); + buffers[bufferCount++] = ByteBuffer.wrap(BytesReference.bytes(dataXContentBuilder).toBytesRef().bytes); + } + return BytesReference.fromByteBuffers(buffers); + } + + @Override + public boolean supportsContentStream() { + return true; + } + + @Override + public boolean allowsUnsafeBuffers() { + return true; + } +} diff --git a/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java new file mode 100644 index 0000000000000..106a34f0dc970 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.test.ESTestCase; +import org.mockito.ArgumentCaptor; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SimulateBulkRequestTests extends ESTestCase { + + public void testSerialization() throws Exception { + Map pipelineSubstitutions = getTestPipelineSubstitutions(); + SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(); + simulateBulkRequest.setPipelineSubstitutions(pipelineSubstitutions); + /* + * Note: SimulateBulkRequest does not implement equals or hashCode, so we can't test serialization in the usual way for a + * Writable + */ + SimulateBulkRequest copy = copyWriteable(simulateBulkRequest, null, SimulateBulkRequest::new); + assertThat(copy.pipelineSubstitutions, equalTo(simulateBulkRequest.pipelineSubstitutions)); + } + + public void testGetPipelineSubstitutions() throws Exception { + Map pipelineSubstitutions = getTestPipelineSubstitutions(); + SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(); + simulateBulkRequest.setPipelineSubstitutions(pipelineSubstitutions); + Map returnedPipelineSubstitions = simulateBulkRequest.getPipelineSubstitutions(getTestIngestService()); + assertThat(returnedPipelineSubstitions.size(), equalTo(pipelineSubstitutions.size())); + for (String pipelineId : pipelineSubstitutions.keySet()) { + List processors = returnedPipelineSubstitions.get(pipelineId).getProcessors(); + } + } + + private Map getTestPipelineSubstitutions() { + return new HashMap<>() { + { + put("pipeline1", new HashMap<>() { + { + put("processors", List.of(new HashMap<>() { + { + put("processor2", new HashMap<>()); + } + }, new HashMap<>() { + { + put("processor3", new HashMap<>()); + } + })); + } + }); + put("pipeline2", new HashMap<>() { + { + put("processors", List.of(new HashMap<>() { + { + put("processor3", new HashMap<>()); + } + })); + } + }); + } + }; + } + + private IngestService getTestIngestService() throws Exception { + IngestService ingestService = mock(IngestService.class); + Processor.Factory processorFactory = mock(Processor.Factory.class); + ArgumentCaptor messageCapture = ArgumentCaptor.forClass(String.class); + when(processorFactory.create(any(), messageCapture.capture(), any(), any())).thenReturn(new Processor() { + @Override + public String getType() { + return messageCapture.getValue(); + } + + @Override + public String getTag() { + return null; + } + + @Override + public String getDescription() { + return null; + } + }); + when(ingestService.getProcessorFactories()).thenReturn( + Map.of("processor1", processorFactory, "processor2", processorFactory, "processor3", processorFactory) + ); + return ingestService; + } +} diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateIndexResponseTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateIndexResponseTests.java new file mode 100644 index 0000000000000..b4aa67df5d0f9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateIndexResponseTests.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.ingest; + +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.RandomObjects; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; + +public class SimulateIndexResponseTests extends ESTestCase { + + public void testToXContent() throws IOException { + String index = randomAlphaOfLength(5); + final List pipelines = new ArrayList<>(); + for (int i = 0; i < randomIntBetween(0, 20); i++) { + pipelines.add(randomAlphaOfLength(20)); + } + String source = """ + {"doc": {"key1": "val1", "key2": "val2"}}"""; + BytesReference sourceBytes = BytesReference.fromByteBuffer(ByteBuffer.wrap(source.getBytes(StandardCharsets.UTF_8))); + SimulateIndexResponse indexResponse = new SimulateIndexResponse(index, sourceBytes, XContentType.JSON, pipelines); + String output = Strings.toString(indexResponse); + assertEquals(XContentHelper.stripWhitespace(Strings.format(""" + { + "_index": "%s", + "_source": %s, + "executed_pipelines": [%s] + }""", index, source, pipelines.stream().map(pipeline -> "\"" + pipeline + "\"").collect(Collectors.joining(",")))), output); + } + + public void testSerialization() throws IOException { + // Note: SimulateIndexRequest does not implement equals or hashCode, so we can't test serialization in the usual way for a Writable + SimulateIndexResponse response = randomIndexResponse(); + IndexResponse copy = copyWriteable(response, null, SimulateIndexResponse::new); + assertThat(Strings.toString(response), equalTo(Strings.toString(copy))); + } + + /** + * Returns a tuple of {@link IndexResponse}s. + *

+ * The left element is the actual {@link IndexResponse} to serialize while the right element is the + * expected {@link IndexResponse} after parsing. + */ + private static SimulateIndexResponse randomIndexResponse() { + String index = randomAlphaOfLength(5); + final List pipelines = new ArrayList<>(); + for (int i = 0; i < randomIntBetween(0, 20); i++) { + pipelines.add(randomAlphaOfLength(20)); + } + XContentType xContentType = randomFrom(XContentType.values()); + BytesReference sourceBytes = RandomObjects.randomSource(random(), xContentType); + return new SimulateIndexResponse(index, sourceBytes, xContentType, pipelines); + } +} diff --git a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java new file mode 100644 index 0000000000000..25bd72c3d6477 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java @@ -0,0 +1,145 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.action.bulk.SimulateBulkRequest; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.plugins.internal.DocumentParsingObserver; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.XContentType; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SimulateIngestServiceTests extends ESTestCase { + + public void testGetPipeline() { + PipelineConfiguration pipelineConfiguration = new PipelineConfiguration("pipeline1", new BytesArray(""" + {"processors": [{"processor1" : {}}]}"""), XContentType.JSON); + IngestMetadata ingestMetadata = new IngestMetadata(Map.of("pipeline1", pipelineConfiguration)); + Map processors = new HashMap<>(); + processors.put( + "processor1", + (factories, tag, description, config) -> new FakeProcessor("processor1", tag, description, ingestDocument -> {}) { + } + ); + processors.put( + "processor2", + (factories, tag, description, config) -> new FakeProcessor("processor2", tag, description, ingestDocument -> {}) { + } + ); + processors.put( + "processor3", + (factories, tag, description, config) -> new FakeProcessor("processor3", tag, description, ingestDocument -> {}) { + } + ); + IngestService ingestService = createWithProcessors(processors); + ingestService.innerUpdatePipelines(ingestMetadata); + { + // First we make sure that if there are no substitutions that we get our original pipeline back: + SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(); + SimulateIngestService simulateIngestService = new SimulateIngestService(ingestService, simulateBulkRequest); + Pipeline pipeline = simulateIngestService.getPipeline("pipeline1"); + assertThat(pipeline.getProcessors().size(), equalTo(1)); + assertThat(pipeline.getProcessors().get(0).getType(), equalTo("processor1")); + assertNull(simulateIngestService.getPipeline("pipeline2")); + } + { + // Here we make sure that if we have a substitution with the same name as the original pipeline that we get the new one back + SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(); + Map pipelineSubstitutions = new HashMap<>() { + { + put("pipeline1", new HashMap<>() { + { + put("processors", List.of(new HashMap<>() { + { + put("processor2", new HashMap<>()); + } + }, new HashMap<>() { + { + put("processor3", new HashMap<>()); + } + })); + } + }); + put("pipeline2", new HashMap<>() { + { + put("processors", List.of(new HashMap<>() { + { + put("processor3", new HashMap<>()); + } + })); + } + }); + } + }; + simulateBulkRequest.setPipelineSubstitutions(pipelineSubstitutions); + SimulateIngestService simulateIngestService = new SimulateIngestService(ingestService, simulateBulkRequest); + Pipeline pipeline1 = simulateIngestService.getPipeline("pipeline1"); + assertThat(pipeline1.getProcessors().size(), equalTo(2)); + assertThat(pipeline1.getProcessors().get(0).getType(), equalTo("processor2")); + assertThat(pipeline1.getProcessors().get(1).getType(), equalTo("processor3")); + Pipeline pipeline2 = simulateIngestService.getPipeline("pipeline2"); + assertThat(pipeline2.getProcessors().size(), equalTo(1)); + assertThat(pipeline2.getProcessors().get(0).getType(), equalTo("processor3")); + } + { + /* + * Here we make sure that if we have a substitution for a new pipeline we still get the original one back (as well as the new + * one). + */ + SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(); + Map pipelineSubstitutions = new HashMap<>() { + { + put("pipeline2", new HashMap<>() { + { + put("processors", List.of(new HashMap<>() { + { + put("processor3", new HashMap<>()); + } + })); + } + }); + } + }; + simulateBulkRequest.setPipelineSubstitutions(pipelineSubstitutions); + SimulateIngestService simulateIngestService = new SimulateIngestService(ingestService, simulateBulkRequest); + Pipeline pipeline1 = simulateIngestService.getPipeline("pipeline1"); + assertThat(pipeline1.getProcessors().size(), equalTo(1)); + assertThat(pipeline1.getProcessors().get(0).getType(), equalTo("processor1")); + Pipeline pipeline2 = simulateIngestService.getPipeline("pipeline2"); + assertThat(pipeline2.getProcessors().size(), equalTo(1)); + assertThat(pipeline2.getProcessors().get(0).getType(), equalTo("processor3")); + } + } + + private static IngestService createWithProcessors(Map processors) { + Client client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); + when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); + return new IngestService(mock(ClusterService.class), threadPool, null, null, null, List.of(new IngestPlugin() { + @Override + public Map getProcessors(final Processor.Parameters parameters) { + return processors; + } + }), client, null, () -> DocumentParsingObserver.EMPTY_INSTANCE); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 9f490792d800f..e36a79fcf0c6f 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -75,6 +75,7 @@ public class Constants { "cluster:admin/synonym_rules/get", "cluster:admin/synonym_rules/put", "cluster:admin/settings/update", + "indices:admin/simulate/bulk", "cluster:admin/slm/delete", "cluster:admin/slm/execute", "cluster:admin/slm/execute/get_expired_snapshots", @@ -472,6 +473,7 @@ public class Constants { "indices:admin/seq_no/remove_retention_lease", "indices:admin/seq_no/renew_retention_lease", "indices:admin/settings/update", + "indices:admin/simulate/bulk", "indices:admin/shards/search_shards", "indices:admin/search/search_shards", "indices:admin/template/delete",