Skip to content

Commit

Permalink
code review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Nov 9, 2023
1 parent 0fa2c56 commit 6be766e
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 36 deletions.
31 changes: 19 additions & 12 deletions docs/reference/ingest/apis/simulate-ingest.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ PUT /_ingest/pipeline/my-pipeline
{
"set" : {
"field" : "field1",
"value" : "_value"
"value" : "value1"
}
}
]
Expand All @@ -31,7 +31,7 @@ PUT /_ingest/pipeline/my-final-pipeline
{
"set" : {
"field" : "field2",
"value" : "_value"
"value" : "value2"
}
}
]
Expand Down Expand Up @@ -70,13 +70,13 @@ POST /_ingest/_simulate
}
}
],
"pipeline_substitutions": {
"pipeline_substitutions": { <1>
"my-pipeline": {
"processors": [
{
"set": {
"field": "field3",
"value": "_value"
"value": "value3"
}
}
]
Expand All @@ -85,6 +85,7 @@ POST /_ingest/_simulate
}
----

<1> This replaces the existing `my-pipeline` pipeline with the contents given here for the duration of this request.

[[simulate-ingest-api-request]]
==== {api-request-title}
Expand All @@ -111,7 +112,13 @@ new index, executing that index's pipelines as well the same way that
a non-simulated ingest would. No data is indexed into ${es}. Instead,
the transformed document is returned, along with the list of pipelines
that have been executed and the name of the index where the document
would have been indexed if this were not a simulation.
would have been indexed if this were not a simulation. This differs from
the <<simulate-pipeline-api,simulate pipeline API>> in that you sepcify
a single pipeline for that API, and it only runs that one pipeline. The
simulate pipeline API is more useful for developing a single pipeline,
while the simulate ingest API is more useful for troubleshooting the
interaction of the various pipelines that get applied when ingeseting
into an index.


By default, the pipeline definitions that are currently in the system
Expand Down Expand Up @@ -215,8 +222,8 @@ The API returns the following response:
"_index": "index",
"_version": -3,
"_source": {
"field1": "_value",
"field2": "_value",
"field1": "value1",
"field2": "value2",
"foo": "bar"
},
"executed_pipelines": [
Expand All @@ -231,8 +238,8 @@ The API returns the following response:
"_index": "index",
"_version": -3,
"_source": {
"field1": "_value",
"field2": "_value",
"field1": "value1",
"field2": "value2",
"foo": "rab"
},
"executed_pipelines": [
Expand Down Expand Up @@ -299,7 +306,7 @@ The API returns the following response:
"_index": "index",
"_version": -3,
"_source": {
"field2": "_value",
"field2": "value2",
"foo": "BAR"
},
"executed_pipelines": [
Expand All @@ -314,7 +321,7 @@ The API returns the following response:
"_index": "index",
"_version": -3,
"_source": {
"field2": "_value",
"field2": "value2",
"foo": "RAB"
},
"executed_pipelines": [
Expand All @@ -338,7 +345,7 @@ DELETE /_ingest/pipeline/*
[source,console-result]
----
{
"acknowledged": true
"acknowledged": true
}
----
////
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -50,10 +51,20 @@
* processor definitions.
*/
public class SimulateBulkRequest extends BulkRequest {
private Map<String, Map<String, Object>> pipelineSubstitutions = Map.of();
private final Map<String, Map<String, Object>> pipelineSubstitutions;

public SimulateBulkRequest() {
/**
* @param pipelineSubstitutions The pipeline definitions that are to be used in place of any pre-existing pipeline definitions with
* the same pipelineId. The key of the map is the pipelineId, and the value the pipeline definition as
* parsed by XContentHelper.convertToMap().
*/
/**
*
* @param pipelineSubstitutions
*/
public SimulateBulkRequest(@Nullable Map<String, Map<String, Object>> pipelineSubstitutions) {
super();
this.pipelineSubstitutions = pipelineSubstitutions;
}

@SuppressWarnings("unchecked")
Expand All @@ -68,15 +79,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeGenericValue(pipelineSubstitutions);
}

/**
* This sets the pipeline definitions that are to be used in place of any pre-existing pipeline definitions with the same pipelineId.
* The key of the map is the pipelineId, and the value the pipeline definition as parsed by XContentHelper.convertToMap().
* @param pipelineSubstitutions
*/
public void setPipelineSubstitutions(Map<String, Map<String, Object>> pipelineSubstitutions) {
this.pipelineSubstitutions = pipelineSubstitutions;
}

public Map<String, Map<String, Object>> getPipelineSubstitutions() {
return pipelineSubstitutions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,9 +866,8 @@ private void processBulkIndexIngestRequest(
) {
final long ingestStartTimeInNanos = System.nanoTime();
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
int numberOfActions = original.numberOfActions();
getIngestService(original).executeBulkRequest(
numberOfActions,
original.numberOfActions(),
() -> bulkRequestModifier,
bulkRequestModifier::markItemAsDropped,
bulkRequestModifier::markItemAsFailed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
builder.field("_index", getShardId().getIndexName());
builder.field("_version", getVersion());
builder.field("_source", XContentHelper.convertToMap(source, false, sourceXContentType).v2());
assert executedPipelines != null; // This ought to never be null because we always ask to list pipelines in simulate mode
assert executedPipelines != null : "executedPipelines is null when it shouldn't be - we always list pipelines in simulate mode";
builder.array("executed_pipelines", executedPipelines.toArray());
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,14 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
request.param("type");
}
SimulateBulkRequest bulkRequest = new SimulateBulkRequest();
String defaultIndex = request.param("index");
FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
String defaultPipeline = request.param("pipeline");
Tuple<XContentType, BytesReference> sourceTuple = request.contentOrSourceParam();
Map<String, Object> sourceMap = XContentHelper.convertToMap(sourceTuple.v2(), false, sourceTuple.v1()).v2();
bulkRequest.setPipelineSubstitutions((Map<String, Map<String, Object>>) sourceMap.remove("pipeline_substitutions"));
SimulateBulkRequest bulkRequest = new SimulateBulkRequest(
(Map<String, Map<String, Object>>) sourceMap.remove("pipeline_substitutions")
);
BytesReference transformedData = convertToBulkRequestXContentBytes(sourceMap);
bulkRequest.add(
transformedData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ public class SimulateBulkRequestTests extends ESTestCase {

public void testSerialization() throws Exception {
Map<String, Map<String, Object>> pipelineSubstitutions = getTestPipelineSubstitutions();
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest();
simulateBulkRequest.setPipelineSubstitutions(pipelineSubstitutions);
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions);
/*
* Note: SimulateBulkRequest does not implement equals or hashCode, so we can't test serialization in the usual way for a
* Writable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void tearDown() throws Exception {

public void testIndexData() {
Task task = mock(Task.class); // unused
BulkRequest bulkRequest = new SimulateBulkRequest();
BulkRequest bulkRequest = new SimulateBulkRequest((Map<String, Map<String, Object>>) null);
int bulkItemCount = randomIntBetween(0, 200);
for (int i = 0; i < bulkItemCount; i++) {
Map<String, ?> source = Map.of(randomAlphaOfLength(10), randomAlphaOfLength(5));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testGetPipeline() {
ingestService.innerUpdatePipelines(ingestMetadata);
{
// First we make sure that if there are no substitutions that we get our original pipeline back:
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest();
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest((Map<String, Map<String, Object>>) null);
SimulateIngestService simulateIngestService = new SimulateIngestService(ingestService, simulateBulkRequest);
Pipeline pipeline = simulateIngestService.getPipeline("pipeline1");
assertThat(pipeline.getProcessors().size(), equalTo(1));
Expand All @@ -63,7 +63,6 @@ public void testGetPipeline() {
}
{
// Here we make sure that if we have a substitution with the same name as the original pipeline that we get the new one back
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest();
Map<String, Map<String, Object>> pipelineSubstitutions = new HashMap<>() {
{
put("pipeline1", new HashMap<>() {
Expand All @@ -90,7 +89,7 @@ public void testGetPipeline() {
});
}
};
simulateBulkRequest.setPipelineSubstitutions(pipelineSubstitutions);
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions);
SimulateIngestService simulateIngestService = new SimulateIngestService(ingestService, simulateBulkRequest);
Pipeline pipeline1 = simulateIngestService.getPipeline("pipeline1");
assertThat(pipeline1.getProcessors().size(), equalTo(2));
Expand All @@ -105,7 +104,6 @@ public void testGetPipeline() {
* Here we make sure that if we have a substitution for a new pipeline we still get the original one back (as well as the new
* one).
*/
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest();
Map<String, Map<String, Object>> pipelineSubstitutions = new HashMap<>() {
{
put("pipeline2", new HashMap<>() {
Expand All @@ -119,7 +117,7 @@ public void testGetPipeline() {
});
}
};
simulateBulkRequest.setPipelineSubstitutions(pipelineSubstitutions);
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions);
SimulateIngestService simulateIngestService = new SimulateIngestService(ingestService, simulateBulkRequest);
Pipeline pipeline1 = simulateIngestService.getPipeline("pipeline1");
assertThat(pipeline1.getProcessors().size(), equalTo(1));
Expand Down

0 comments on commit 6be766e

Please sign in to comment.