From cb8924286901fb87329cef2473aee07509bda6bc Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 16 Oct 2023 10:00:54 -0500 Subject: [PATCH] Adding a test for DocumentParsingObserver usage --- .../ingest/IngestServiceTests.java | 72 ++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index ee0fc053174b5..bafd7352723af 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -64,6 +64,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.cbor.CborXContent; import org.junit.Before; @@ -89,6 +90,7 @@ import java.util.function.Consumer; import java.util.function.IntConsumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils.executeAndAssertSuccessful; @@ -1162,6 +1164,67 @@ public void testExecuteBulkPipelineDoesNotExist() { verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } + public void testExecuteBulkRequestCallsDocumentParsingObserver() { + /* + * This test makes sure that for both insert and upsert requests, when we call executeBulkRequest DocumentParsingObserver is + * called using a non-null index name. + */ + AtomicInteger setNameCalledCount = new AtomicInteger(0); + AtomicInteger closeCalled = new AtomicInteger(0); + Supplier documentParsingObserverSupplier = () -> new DocumentParsingObserver() { + @Override + public XContentParser wrapParser(XContentParser xContentParser) { + return xContentParser; + } + + @Override + public void setIndexName(String indexName) { + assertNotNull(indexName); + setNameCalledCount.incrementAndGet(); + } + + @Override + public void close() { + closeCalled.incrementAndGet(); + } + }; + IngestService ingestService = createWithProcessors( + Map.of("mock", (factories, tag, description, config) -> mockCompoundProcessor()), + documentParsingObserverSupplier + ); + + PutPipelineRequest putRequest = new PutPipelineRequest( + "_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), + XContentType.JSON + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = executePut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + BulkRequest bulkRequest = new BulkRequest(); + UpdateRequest updateRequest = new UpdateRequest("_index", "_id1").upsert("{}", "{}"); + updateRequest.upsertRequest().setPipeline("_id"); + bulkRequest.add(updateRequest); + IndexRequest indexRequest = new IndexRequest("_index").id("_id1").source(Map.of()).setPipeline("_id1"); + bulkRequest.add(indexRequest); + @SuppressWarnings("unchecked") + BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + bulkRequest.numberOfActions(), + bulkRequest.requests(), + indexReq -> {}, + failureHandler, + completionHandler, + Names.WRITE + ); + assertThat(setNameCalledCount.get(), equalTo(2)); + assertThat(closeCalled.get(), equalTo(2)); + } + public void testExecuteSuccess() { IngestService ingestService = createWithProcessors( Map.of("mock", (factories, tag, description, config) -> mockCompoundProcessor()) @@ -2518,6 +2581,13 @@ private static IngestService createWithProcessors() { } private static IngestService createWithProcessors(Map processors) { + return createWithProcessors(processors, () -> DocumentParsingObserver.EMPTY_INSTANCE); + } + + private static IngestService createWithProcessors( + Map processors, + Supplier documentParsingObserverSupplier + ) { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); @@ -2527,7 +2597,7 @@ private static IngestService createWithProcessors(Map public Map getProcessors(final Processor.Parameters parameters) { return processors; } - }), client, null, () -> DocumentParsingObserver.EMPTY_INSTANCE); + }), client, null, documentParsingObserverSupplier); } private CompoundProcessor mockCompoundProcessor() {