diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java index 7ad74a6d895b6..903be7962ce20 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java @@ -47,7 +47,7 @@ public SimulateIndexResponse( List pipelines ) { // We don't actually care about most of the IndexResponse fields: - super(new ShardId(index, "", 0), id == null ? "" : id, 0, 0, version, true, pipelines); + super(new ShardId(index, "", 0), id == null ? "" : id, 0, 0, version, true, pipelines); this.source = source; this.sourceXContentType = sourceXContentType; setShardInfo(new ReplicationResponse.ShardInfo(0, 0)); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 84b9e09fd1005..3a2a810dc61b5 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -113,8 +113,8 @@ public class IngestService implements ClusterStateApplier, ReportingService pipelines = Map.of(); private final ThreadPool threadPool; - private final IngestMetric totalMetrics; - private final List> ingestClusterStateListeners; + private final IngestMetric totalMetrics = new IngestMetric(); + private final List> ingestClusterStateListeners = new CopyOnWriteArrayList<>(); private volatile ClusterState state; private static BiFunction createScheduler(ThreadPool threadPool) { @@ -184,8 +184,6 @@ public IngestService( MatcherWatchdog matcherWatchdog, Supplier documentParsingObserverSupplier ) { - this.totalMetrics = new IngestMetric(); - this.ingestClusterStateListeners = new CopyOnWriteArrayList<>(); this.clusterService = clusterService; this.scriptService = scriptService; this.documentParsingObserverSupplier = documentParsingObserverSupplier; @@ -214,8 +212,6 @@ public IngestService( * @param ingestService */ IngestService(IngestService ingestService) { - this.totalMetrics = new IngestMetric(); - this.ingestClusterStateListeners = new CopyOnWriteArrayList<>(); this.clusterService = ingestService.clusterService; this.scriptService = ingestService.scriptService; this.documentParsingObserverSupplier = ingestService.documentParsingObserverSupplier; diff --git a/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java b/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java index c8add9229c4a2..8185a9c880a45 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java @@ -30,7 +30,7 @@ public SimulateIngestService(IngestService ingestService, BulkRequest request) { throw new RuntimeException(e); } } else { - pipelineSubstitutions = Map.of(); + throw new IllegalArgumentException("Expecting a SimulateBulkRequest but got " + request.getClass()); } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java index e5159a54d8a9e..8b7f5205cb96d 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java @@ -95,8 +95,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC * The simulate ingest API is intended to have inputs and outputs that are formatted similarly to the simulate pipeline API for the * sake of consistency. But internally it uses the same code as the _bulk API, so that we have confidence that we are simulating what * really happens on ingest. This method transforms simulate-style inputs into an input that the bulk API can accept. + * Non-private for unit testing */ - private BytesReference convertToBulkRequestXContentBytes(Map sourceMap) throws IOException { + static BytesReference convertToBulkRequestXContentBytes(Map sourceMap) throws IOException { List> docs = ConfigurationUtils.readList(null, null, sourceMap, "docs"); if (docs.isEmpty()) { throw new IllegalArgumentException("must specify at least one document in [docs]"); @@ -108,20 +109,14 @@ private BytesReference convertToBulkRequestXContentBytes(Map sou throw new IllegalArgumentException("malformed [docs] section, should include an inner object"); } Map document = ConfigurationUtils.readMap(null, null, doc, "_source"); - String index = ConfigurationUtils.readStringOrIntProperty( - null, - null, - doc, - IngestDocument.Metadata.INDEX.getFieldName(), - "_index" - ); - String id = ConfigurationUtils.readStringOrIntProperty(null, null, doc, IngestDocument.Metadata.ID.getFieldName(), "_id"); + String index = ConfigurationUtils.readOptionalStringProperty(null, null, doc, IngestDocument.Metadata.INDEX.getFieldName()); + String id = ConfigurationUtils.readOptionalStringProperty(null, null, doc, IngestDocument.Metadata.ID.getFieldName()); XContentBuilder actionXContentBuilder = XContentFactory.contentBuilder(XContentType.JSON).lfAtEnd(); actionXContentBuilder.startObject().field("index").startObject(); - if ("_index".equals(index) == false) { + if (index != null) { actionXContentBuilder.field("_index", index); } - if ("id".equals(id) == false) { + if (id != null) { actionXContentBuilder.field("_id", id); } actionXContentBuilder.endObject().endObject(); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java index b2148b4991d31..7fe036f97596e 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.test.ESTestCase; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -19,7 +20,12 @@ public class SimulateBulkRequestTests extends ESTestCase { public void testSerialization() throws Exception { - Map> pipelineSubstitutions = getTestPipelineSubstitutions(); + testSerialization(getTestPipelineSubstitutions()); + testSerialization(null); + testSerialization(Map.of()); + } + + private void testSerialization(Map> pipelineSubstitutions) throws IOException { SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions); /* * Note: SimulateBulkRequest does not implement equals or hashCode, so we can't test serialization in the usual way for a diff --git a/server/src/test/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestActionTests.java new file mode 100644 index 0000000000000..40c21ee54c2d1 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestActionTests.java @@ -0,0 +1,143 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.rest.action.ingest; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentType; + +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class RestSimulateIngestActionTests extends ESTestCase { + + public void testConvertToBulkRequestXContentBytes() throws Exception { + { + // No index, no id, which we expect to be fine: + String simulateRequestJson = """ + { + "docs": [ + { + "_source": { + "my-keyword-field": "FOO" + } + }, + { + "_source": { + "my-keyword-field": "BAR" + } + } + ], + "pipeline_substitutions": { + "my-pipeline-2": { + "processors": [ + { + "set": { + "field": "my-new-boolean-field", + "value": true + } + } + ] + } + } + } + """; + String bulkRequestJson = """ + {"index":{}} + {"my-keyword-field":"FOO"} + {"index":{}} + {"my-keyword-field":"BAR"} + """; + testInputJsonConvertsToOutputJson(simulateRequestJson, bulkRequestJson); + } + + { + // index and id: + String simulateRequestJson = """ + { + "docs": [ + { + "_index": "index", + "_id": "123", + "_source": { + "foo": "bar" + } + }, + { + "_index": "index", + "_id": "456", + "_source": { + "foo": "rab" + } + } + ] + } + """; + String bulkRequestJson = """ + {"index":{"_index":"index","_id":"123"}} + {"foo":"bar"} + {"index":{"_index":"index","_id":"456"}} + {"foo":"rab"} + """; + testInputJsonConvertsToOutputJson(simulateRequestJson, bulkRequestJson); + } + + { + // We expect an IllegalArgumentException if there are no docs: + String simulateRequestJson = """ + { + "docs": [ + ] + } + """; + String bulkRequestJson = """ + {"index":{"_index":"index","_id":"123"}} + {"foo":"bar"} + {"index":{"_index":"index","_id":"456"}} + {"foo":"rab"} + """; + expectThrows(IllegalArgumentException.class, () -> testInputJsonConvertsToOutputJson(simulateRequestJson, bulkRequestJson)); + } + + { + // non-trivial source: + String simulateRequestJson = """ + { + "docs": [ + { + "_index": "index", + "_id": "123", + "_source": { + "foo": "bar", + "some_object": { + "prop1": "val1", + "some_array": [1, 2, 3, 4] + } + } + } + ] + } + """; + String bulkRequestJson = """ + {"index":{"_index":"index","_id":"123"}} + {"some_object":{"prop1":"val1","some_array":[1,2,3,4]},"foo":"bar"} + """; + testInputJsonConvertsToOutputJson(simulateRequestJson, bulkRequestJson); + } + } + + private void testInputJsonConvertsToOutputJson(String inputJson, String expectedOutputJson) throws Exception { + Map sourceMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), inputJson, false); + BytesReference bulkXcontentBytes = RestSimulateIngestAction.convertToBulkRequestXContentBytes(sourceMap); + String bulkRequestJson = XContentHelper.convertToJson(bulkXcontentBytes, false, XContentType.JSON); + assertThat(bulkRequestJson, equalTo(expectedOutputJson)); + } +}