Skip to content

Commit

Permalink
more testing
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Oct 18, 2023
1 parent f1fb903 commit c363a19
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,69 @@
import java.util.Map;

/**
* This extends BulkRequest with support for providing substitute pipeline definitions.
* This extends BulkRequest with support for providing substitute pipeline definitions. In a user request, the pipeline substitutions
* will look something like this:
*
* "pipeline_substitutions": {
* "my-pipeline-1": {
* "processors": [
* {
* "set": {
* "field": "my-new-boolean-field",
* "value": true
* }
* }
* ]
* },
* "my-pipeline-2": {
* "processors": [
* {
* "set": {
* "field": "my-new-boolean-field",
* "value": true
* },
* "rename": {
* "field": "old_field",
* "target_field": "new field"
* }
* }
* ]
* }
* }
*
* The pipelineSubstitutions Map held by this class is intended to be the result of XContentHelper.convertToMap(). The top-level keys
* are the pipelineIds ("my-pipeline-1" and "my-pipeline-2" in the example above). The values are the Maps of "processors" to the List of
* processor definitions.
*/
public class SimulateBulkRequest extends BulkRequest {
private Map<String, Object> pipelineSubstitutions = Map.of();
private Map<String, Map<String, Object>> pipelineSubstitutions = Map.of();

public SimulateBulkRequest() {
super();
}

@SuppressWarnings("unchecked")
public SimulateBulkRequest(StreamInput in) throws IOException {
super(in);
this.pipelineSubstitutions = in.readMap();
this.pipelineSubstitutions = (Map<String, Map<String, Object>>) in.readGenericValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeGenericMap(pipelineSubstitutions);
out.writeGenericValue(pipelineSubstitutions);
}

/**
* This sets the pipeline definitions that are to be used in place of any pre-existing pipeline definitions with the same pipelineId.
* The key of the map is the pipelineId, and the value the pipeline definition as parsed by XContentHelper.convertToMap().
* @param pipelineSubstitutions
*/
public void setPipelineSubstitutions(Map<String, Object> pipelineSubstitutions) {
public void setPipelineSubstitutions(Map<String, Map<String, Object>> pipelineSubstitutions) {
this.pipelineSubstitutions = pipelineSubstitutions;
}

public Map<String, Object> getPipelineSubstitutions() {
public Map<String, Map<String, Object>> getPipelineSubstitutions() {
return pipelineSubstitutions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@ public SimulateIngestService(IngestService ingestService, BulkRequest request) {
* @return A transformed version of rawPipelineSubstitutions, where the values are Pipeline objects
* @throws Exception
*/
@SuppressWarnings("unchecked")
private Map<String, Pipeline> getPipelineSubstitutions(Map<String, Object> rawPipelineSubstitutions, IngestService ingestService)
throws Exception {
private Map<String, Pipeline> getPipelineSubstitutions(
Map<String, Map<String, Object>> rawPipelineSubstitutions,
IngestService ingestService
) throws Exception {
Map<String, Pipeline> parsedPipelineSubstitutions = new HashMap<>();
if (pipelineSubstitutions != null) {
for (Map.Entry<String, Object> entry : rawPipelineSubstitutions.entrySet()) {
if (rawPipelineSubstitutions != null) {
for (Map.Entry<String, Map<String, Object>> entry : rawPipelineSubstitutions.entrySet()) {
String pipelineId = entry.getKey();
Pipeline pipeline = Pipeline.create(
pipelineId,
(Map<String, Object>) entry.getValue(),
entry.getValue(),
ingestService.getProcessorFactories(),
ingestService.getScriptService()
);
Expand All @@ -62,11 +63,17 @@ private Map<String, Pipeline> getPipelineSubstitutions(Map<String, Object> rawPi
return parsedPipelineSubstitutions;
}

/**
* This method returns the Pipeline for the given pipelineId. If a substitute definition of the pipeline has been defined for the
* cuurent simulate, then that pipeline is returned. Otherwise the pipeline stored in the cluster state is returned.
* @param pipelineId
* @return
*/
@Override
public Pipeline getPipeline(String id) {
Pipeline pipeline = pipelineSubstitutions.get(id);
public Pipeline getPipeline(String pipelineId) {
Pipeline pipeline = pipelineSubstitutions.get(pipelineId);
if (pipeline == null) {
pipeline = super.getPipeline(id);
pipeline = super.getPipeline(pipelineId);
}
return pipeline;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
Boolean defaultRequireAlias = null;
Tuple<XContentType, BytesReference> sourceTuple = request.contentOrSourceParam();
Map<String, Object> sourceMap = XContentHelper.convertToMap(sourceTuple.v2(), false, sourceTuple.v1()).v2();
bulkRequest.setPipelineSubstitutions((Map<String, Object>) sourceMap.remove("pipeline_substitutions"));
bulkRequest.setPipelineSubstitutions((Map<String, Map<String, Object>>) sourceMap.remove("pipeline_substitutions"));
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
BytesReference transformedData = convertToBulkRequestXContentBytes(sourceMap);
bulkRequest.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public class SimulateBulkRequestTests extends ESTestCase {

public void testSerialization() throws Exception {
Map<String, Object> pipelineSubstitutions = getTestPipelineSubstitutions();
Map<String, Map<String, Object>> pipelineSubstitutions = getTestPipelineSubstitutions();
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest();
simulateBulkRequest.setPipelineSubstitutions(pipelineSubstitutions);
/*
Expand All @@ -30,7 +30,7 @@ public void testSerialization() throws Exception {
assertThat(copy.getPipelineSubstitutions(), equalTo(simulateBulkRequest.getPipelineSubstitutions()));
}

private Map<String, Object> getTestPipelineSubstitutions() {
private Map<String, Map<String, Object>> getTestPipelineSubstitutions() {
return new HashMap<>() {
{
put("pipeline1", new HashMap<>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.SimulateIndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.indices.EmptySystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.index.IndexVersionUtils;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.mock;

public class TransportSimulateBulkActionTests extends ESTestCase {

/** Services needed by bulk action */
private TransportService transportService;
private ClusterService clusterService;
private TestThreadPool threadPool;

private TestTransportSimulateBulkAction bulkAction;

class TestTransportSimulateBulkAction extends TransportSimulateBulkAction {

volatile boolean failIndexCreation = false;
boolean indexCreated = false; // set when the "real" index is created
Runnable beforeIndexCreation = null;

TestTransportSimulateBulkAction() {
super(
TransportSimulateBulkActionTests.this.threadPool,
transportService,
clusterService,
null,
null,
new ActionFilters(Collections.emptySet()),
new TransportBulkActionTookTests.Resolver(),
new IndexingPressure(Settings.EMPTY),
EmptySystemIndices.INSTANCE
);
}

@Override
void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
indexCreated = true;
if (beforeIndexCreation != null) {
beforeIndexCreation.run();
}
if (failIndexCreation) {
listener.onFailure(new ResourceAlreadyExistsException("index already exists"));
} else {
listener.onResponse(null);
}
}
}

@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
DiscoveryNode discoveryNode = DiscoveryNodeUtils.builder("node")
.version(
VersionUtils.randomCompatibleVersion(random(), Version.CURRENT),
IndexVersion.MINIMUM_COMPATIBLE,
IndexVersionUtils.randomCompatibleVersion(random())
)
.build();
clusterService = createClusterService(threadPool, discoveryNode);
CapturingTransport capturingTransport = new CapturingTransport();
transportService = capturingTransport.createTransportService(
clusterService.getSettings(),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> clusterService.localNode(),
null,
Collections.emptySet()
);
transportService.start();
transportService.acceptIncomingRequests();
bulkAction = new TestTransportSimulateBulkAction();
}

@After
public void tearDown() throws Exception {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
clusterService.close();
super.tearDown();
}

public void testIndexData() {
Task task = mock(Task.class); // unused
BulkRequest bulkRequest = new SimulateBulkRequest();
int bulkItemCount = randomIntBetween(0, 200);
for (int i = 0; i < bulkItemCount; i++) {
Map<String, ?> source = Map.of(randomAlphaOfLength(10), randomAlphaOfLength(5));
IndexRequest indexRequest = new IndexRequest(randomAlphaOfLength(10)).id(randomAlphaOfLength(10)).source(source);
for (int j = 0; j < randomIntBetween(0, 10); j++) {
indexRequest.addPipeline(randomAlphaOfLength(12));
}
bulkRequest.add();
}
AtomicBoolean onResponseCalled = new AtomicBoolean(false);
ActionListener<BulkResponse> listener = new ActionListener<>() {
@Override
public void onResponse(BulkResponse response) {
onResponseCalled.set(true);
BulkItemResponse[] responseItems = response.getItems();
assertThat(responseItems.length, equalTo(bulkRequest.requests().size()));
for (int i = 0; i < responseItems.length; i++) {
BulkItemResponse responseItem = responseItems[i];
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(i);
assertNull(responseItem.getFailure());
assertThat(responseItem.getResponse(), instanceOf(SimulateIndexResponse.class));
SimulateIndexResponse simulateIndexResponse = responseItem.getResponse();
assertThat(simulateIndexResponse.getIndex(), equalTo(indexRequest.index()));
/*
* SimulateIndexResponse doesn't have an equals() method, and most of its state is private. So we check that
* its toXContent method produces the expected output.
*/
String output = Strings.toString(simulateIndexResponse);
try {
assertEquals(
XContentHelper.stripWhitespace(
Strings.format(
"""
{
"_index": "%s",
"_source": %s,
"executed_pipelines": [%s]
}""",
indexRequest.index(),
indexRequest.source(),
indexRequest.getExecutedPipelines()
.stream()
.map(pipeline -> "\"" + pipeline + "\"")
.collect(Collectors.joining(","))
)
),
output
);
} catch (IOException e) {
fail(e);
}
}
}

@Override
public void onFailure(Exception e) {
fail(e, "Unexpected error");
}
};
Set<String> autoCreateIndices = Set.of(); // unused
Map<String, IndexNotFoundException> indicesThatCannotBeCreated = Map.of(); // unused
long startTime = 0;
bulkAction.indexData(
task,
bulkRequest,
randomAlphaOfLength(10),
listener,
autoCreateIndices,
indicesThatCannotBeCreated,
startTime
);
assertThat(onResponseCalled.get(), equalTo(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testGetPipeline() {
{
// Here we make sure that if we have a substitution with the same name as the original pipeline that we get the new one back
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest();
Map<String, Object> pipelineSubstitutions = new HashMap<>() {
Map<String, Map<String, Object>> pipelineSubstitutions = new HashMap<>() {
{
put("pipeline1", new HashMap<>() {
{
Expand Down Expand Up @@ -106,7 +106,7 @@ public void testGetPipeline() {
* one).
*/
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest();
Map<String, Object> pipelineSubstitutions = new HashMap<>() {
Map<String, Map<String, Object>> pipelineSubstitutions = new HashMap<>() {
{
put("pipeline2", new HashMap<>() {
{
Expand Down

0 comments on commit c363a19

Please sign in to comment.