diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index c849b0a8181ae..3fd9c073485e3 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -296,7 +296,6 @@ protected void doRun() { protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener listener) { final long startTime = relativeTime(); - final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); boolean hasIndexRequestsWithPipelines = false; final Metadata metadata = clusterService.state().getMetadata(); @@ -364,7 +363,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec } // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back. - indexData(task, bulkRequest, executorName, listener, responses, autoCreateIndices, indicesThatCannotBeCreated, startTime); + indexData(task, bulkRequest, executorName, listener, autoCreateIndices, indicesThatCannotBeCreated, startTime); } protected void indexData( @@ -372,11 +371,11 @@ protected void indexData( BulkRequest bulkRequest, String executorName, ActionListener listener, - AtomicArray responses, Set autoCreateIndices, Map indicesThatCannotBeCreated, long startTime ) { + final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); if (autoCreateIndices.isEmpty()) { executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated); } else { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java index c4352e23abe0c..f26bc28ee3b8a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java @@ -64,13 +64,14 @@ protected void indexData( BulkRequest bulkRequest, String executorName, ActionListener listener, - AtomicArray responses, Set autoCreateIndices, Map indicesThatCannotBeCreated, long startTime ) { + final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest request = bulkRequest.requests.get(i); + assert request instanceof IndexRequest; // This action is only ever called with IndexRequests responses.set( i, BulkItemResponse.success( diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java index 118027b117b2d..6e5be05e0374d 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java @@ -50,6 +50,7 @@ public SimulateIndexResponse(String index, BytesReference source, XContentType s public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException { builder.field("_index", getShardId().getIndexName()); builder.field("_source", XContentHelper.convertToMap(source, false, sourceXContentType).v2()); + assert executedPipelines != null; // This ought to never be null because we always ask to list pipelines in simulate mode builder.array("executed_pipelines", executedPipelines.toArray()); return builder; } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 3b114cf0a618e..f8aa3b9431081 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -2604,12 +2604,32 @@ private static IngestService createWithProcessors( ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); - return new IngestService(mock(ClusterService.class), threadPool, null, null, null, List.of(new IngestPlugin() { - @Override - public Map getProcessors(final Processor.Parameters parameters) { - return processors; - } - }), client, null, documentParsingObserverSupplier); + IngestService ingestService = new IngestService( + mock(ClusterService.class), + threadPool, + null, + null, + null, + List.of(new IngestPlugin() { + @Override + public Map getProcessors(final Processor.Parameters parameters) { + return processors; + } + }), + client, + null, + documentParsingObserverSupplier + ); + if (randomBoolean()) { + /* + * Testing the copy constructor directly is difficult because there is no equals() method in IngestService, but there is a lot + * of private internal state. Here we use the copy constructor half the time in all of the unit tests, with the assumption that + * if none of our tests observe any difference then the copy constructor is working as expected. + */ + return new IngestService(ingestService); + } else { + return ingestService; + } } private CompoundProcessor mockCompoundProcessor() {