From c363a1997095729c74532b64da7b900a094d2941 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 18 Oct 2023 14:47:47 -0500 Subject: [PATCH] more testing --- .../action/bulk/SimulateBulkRequest.java | 45 +++- .../ingest/SimulateIngestService.java | 25 ++- .../ingest/RestSimulateIngestAction.java | 2 +- .../action/bulk/SimulateBulkRequestTests.java | 4 +- .../TransportSimulateBulkActionTests.java | 206 ++++++++++++++++++ .../ingest/SimulateIngestServiceTests.java | 4 +- 6 files changed, 266 insertions(+), 20 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java diff --git a/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java index 945d54abaa3ac..975195c532442 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java @@ -15,24 +15,57 @@ import java.util.Map; /** - * This extends BulkRequest with support for providing substitute pipeline definitions. + * This extends BulkRequest with support for providing substitute pipeline definitions. In a user request, the pipeline substitutions + * will look something like this: + * + * "pipeline_substitutions": { + * "my-pipeline-1": { + * "processors": [ + * { + * "set": { + * "field": "my-new-boolean-field", + * "value": true + * } + * } + * ] + * }, + * "my-pipeline-2": { + * "processors": [ + * { + * "set": { + * "field": "my-new-boolean-field", + * "value": true + * }, + * "rename": { + * "field": "old_field", + * "target_field": "new field" + * } + * } + * ] + * } + * } + * + * The pipelineSubstitutions Map held by this class is intended to be the result of XContentHelper.convertToMap(). The top-level keys + * are the pipelineIds ("my-pipeline-1" and "my-pipeline-2" in the example above). The values are the Maps of "processors" to the List of + * processor definitions. */ public class SimulateBulkRequest extends BulkRequest { - private Map pipelineSubstitutions = Map.of(); + private Map> pipelineSubstitutions = Map.of(); public SimulateBulkRequest() { super(); } + @SuppressWarnings("unchecked") public SimulateBulkRequest(StreamInput in) throws IOException { super(in); - this.pipelineSubstitutions = in.readMap(); + this.pipelineSubstitutions = (Map>) in.readGenericValue(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeGenericMap(pipelineSubstitutions); + out.writeGenericValue(pipelineSubstitutions); } /** @@ -40,11 +73,11 @@ public void writeTo(StreamOutput out) throws IOException { * The key of the map is the pipelineId, and the value the pipeline definition as parsed by XContentHelper.convertToMap(). * @param pipelineSubstitutions */ - public void setPipelineSubstitutions(Map pipelineSubstitutions) { + public void setPipelineSubstitutions(Map> pipelineSubstitutions) { this.pipelineSubstitutions = pipelineSubstitutions; } - public Map getPipelineSubstitutions() { + public Map> getPipelineSubstitutions() { return pipelineSubstitutions; } } diff --git a/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java b/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java index 3e7809970312f..c8add9229c4a2 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java @@ -43,16 +43,17 @@ public SimulateIngestService(IngestService ingestService, BulkRequest request) { * @return A transformed version of rawPipelineSubstitutions, where the values are Pipeline objects * @throws Exception */ - @SuppressWarnings("unchecked") - private Map getPipelineSubstitutions(Map rawPipelineSubstitutions, IngestService ingestService) - throws Exception { + private Map getPipelineSubstitutions( + Map> rawPipelineSubstitutions, + IngestService ingestService + ) throws Exception { Map parsedPipelineSubstitutions = new HashMap<>(); - if (pipelineSubstitutions != null) { - for (Map.Entry entry : rawPipelineSubstitutions.entrySet()) { + if (rawPipelineSubstitutions != null) { + for (Map.Entry> entry : rawPipelineSubstitutions.entrySet()) { String pipelineId = entry.getKey(); Pipeline pipeline = Pipeline.create( pipelineId, - (Map) entry.getValue(), + entry.getValue(), ingestService.getProcessorFactories(), ingestService.getScriptService() ); @@ -62,11 +63,17 @@ private Map getPipelineSubstitutions(Map rawPi return parsedPipelineSubstitutions; } + /** + * This method returns the Pipeline for the given pipelineId. If a substitute definition of the pipeline has been defined for the + * cuurent simulate, then that pipeline is returned. Otherwise the pipeline stored in the cluster state is returned. + * @param pipelineId + * @return + */ @Override - public Pipeline getPipeline(String id) { - Pipeline pipeline = pipelineSubstitutions.get(id); + public Pipeline getPipeline(String pipelineId) { + Pipeline pipeline = pipelineSubstitutions.get(pipelineId); if (pipeline == null) { - pipeline = super.getPipeline(id); + pipeline = super.getPipeline(pipelineId); } return pipeline; } diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java index bf43643dcbb33..44350c42aca9d 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java @@ -63,7 +63,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC Boolean defaultRequireAlias = null; Tuple sourceTuple = request.contentOrSourceParam(); Map sourceMap = XContentHelper.convertToMap(sourceTuple.v2(), false, sourceTuple.v1()).v2(); - bulkRequest.setPipelineSubstitutions((Map) sourceMap.remove("pipeline_substitutions")); + bulkRequest.setPipelineSubstitutions((Map>) sourceMap.remove("pipeline_substitutions")); bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); BytesReference transformedData = convertToBulkRequestXContentBytes(sourceMap); bulkRequest.add( diff --git a/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java index 0126612029638..437cafece35c8 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java @@ -19,7 +19,7 @@ public class SimulateBulkRequestTests extends ESTestCase { public void testSerialization() throws Exception { - Map pipelineSubstitutions = getTestPipelineSubstitutions(); + Map> pipelineSubstitutions = getTestPipelineSubstitutions(); SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(); simulateBulkRequest.setPipelineSubstitutions(pipelineSubstitutions); /* @@ -30,7 +30,7 @@ public void testSerialization() throws Exception { assertThat(copy.getPipelineSubstitutions(), equalTo(simulateBulkRequest.getPipelineSubstitutions())); } - private Map getTestPipelineSubstitutions() { + private Map> getTestPipelineSubstitutions() { return new HashMap<>() { { put("pipeline1", new HashMap<>() { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java new file mode 100644 index 0000000000000..d8b0048903def --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java @@ -0,0 +1,206 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.SimulateIndexResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.IndexingPressure; +import org.elasticsearch.indices.EmptySystemIndices; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.test.index.IndexVersionUtils; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.mock; + +public class TransportSimulateBulkActionTests extends ESTestCase { + + /** Services needed by bulk action */ + private TransportService transportService; + private ClusterService clusterService; + private TestThreadPool threadPool; + + private TestTransportSimulateBulkAction bulkAction; + + class TestTransportSimulateBulkAction extends TransportSimulateBulkAction { + + volatile boolean failIndexCreation = false; + boolean indexCreated = false; // set when the "real" index is created + Runnable beforeIndexCreation = null; + + TestTransportSimulateBulkAction() { + super( + TransportSimulateBulkActionTests.this.threadPool, + transportService, + clusterService, + null, + null, + new ActionFilters(Collections.emptySet()), + new TransportBulkActionTookTests.Resolver(), + new IndexingPressure(Settings.EMPTY), + EmptySystemIndices.INSTANCE + ); + } + + @Override + void createIndex(String index, TimeValue timeout, ActionListener listener) { + indexCreated = true; + if (beforeIndexCreation != null) { + beforeIndexCreation.run(); + } + if (failIndexCreation) { + listener.onFailure(new ResourceAlreadyExistsException("index already exists")); + } else { + listener.onResponse(null); + } + } + } + + @Before + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getClass().getName()); + DiscoveryNode discoveryNode = DiscoveryNodeUtils.builder("node") + .version( + VersionUtils.randomCompatibleVersion(random(), Version.CURRENT), + IndexVersion.MINIMUM_COMPATIBLE, + IndexVersionUtils.randomCompatibleVersion(random()) + ) + .build(); + clusterService = createClusterService(threadPool, discoveryNode); + CapturingTransport capturingTransport = new CapturingTransport(); + transportService = capturingTransport.createTransportService( + clusterService.getSettings(), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> clusterService.localNode(), + null, + Collections.emptySet() + ); + transportService.start(); + transportService.acceptIncomingRequests(); + bulkAction = new TestTransportSimulateBulkAction(); + } + + @After + public void tearDown() throws Exception { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + clusterService.close(); + super.tearDown(); + } + + public void testIndexData() { + Task task = mock(Task.class); // unused + BulkRequest bulkRequest = new SimulateBulkRequest(); + int bulkItemCount = randomIntBetween(0, 200); + for (int i = 0; i < bulkItemCount; i++) { + Map source = Map.of(randomAlphaOfLength(10), randomAlphaOfLength(5)); + IndexRequest indexRequest = new IndexRequest(randomAlphaOfLength(10)).id(randomAlphaOfLength(10)).source(source); + for (int j = 0; j < randomIntBetween(0, 10); j++) { + indexRequest.addPipeline(randomAlphaOfLength(12)); + } + bulkRequest.add(); + } + AtomicBoolean onResponseCalled = new AtomicBoolean(false); + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(BulkResponse response) { + onResponseCalled.set(true); + BulkItemResponse[] responseItems = response.getItems(); + assertThat(responseItems.length, equalTo(bulkRequest.requests().size())); + for (int i = 0; i < responseItems.length; i++) { + BulkItemResponse responseItem = responseItems[i]; + IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(i); + assertNull(responseItem.getFailure()); + assertThat(responseItem.getResponse(), instanceOf(SimulateIndexResponse.class)); + SimulateIndexResponse simulateIndexResponse = responseItem.getResponse(); + assertThat(simulateIndexResponse.getIndex(), equalTo(indexRequest.index())); + /* + * SimulateIndexResponse doesn't have an equals() method, and most of its state is private. So we check that + * its toXContent method produces the expected output. + */ + String output = Strings.toString(simulateIndexResponse); + try { + assertEquals( + XContentHelper.stripWhitespace( + Strings.format( + """ + { + "_index": "%s", + "_source": %s, + "executed_pipelines": [%s] + }""", + indexRequest.index(), + indexRequest.source(), + indexRequest.getExecutedPipelines() + .stream() + .map(pipeline -> "\"" + pipeline + "\"") + .collect(Collectors.joining(",")) + ) + ), + output + ); + } catch (IOException e) { + fail(e); + } + } + } + + @Override + public void onFailure(Exception e) { + fail(e, "Unexpected error"); + } + }; + Set autoCreateIndices = Set.of(); // unused + Map indicesThatCannotBeCreated = Map.of(); // unused + long startTime = 0; + bulkAction.indexData( + task, + bulkRequest, + randomAlphaOfLength(10), + listener, + autoCreateIndices, + indicesThatCannotBeCreated, + startTime + ); + assertThat(onResponseCalled.get(), equalTo(true)); + } +} diff --git a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java index 25bd72c3d6477..794fe4085058a 100644 --- a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java @@ -64,7 +64,7 @@ public void testGetPipeline() { { // Here we make sure that if we have a substitution with the same name as the original pipeline that we get the new one back SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(); - Map pipelineSubstitutions = new HashMap<>() { + Map> pipelineSubstitutions = new HashMap<>() { { put("pipeline1", new HashMap<>() { { @@ -106,7 +106,7 @@ public void testGetPipeline() { * one). */ SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(); - Map pipelineSubstitutions = new HashMap<>() { + Map> pipelineSubstitutions = new HashMap<>() { { put("pipeline2", new HashMap<>() { {