Skip to content

Commit

Permalink
Sending an index name to DocumentParsingObserver that is not ever null (
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke authored Oct 16, 2023
1 parent 8c0994e commit 8e0fd22
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 2 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/100862.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100862
summary: Sending an index name to `DocumentParsingObserver` that is not ever null
area: Ingest Node
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,8 @@ public void onFailure(Exception e) {
executePipelines(pipelines, indexRequest, ingestDocument, documentListener);
indexRequest.setPipelinesHaveRun();

documentParsingObserver.setIndexName(indexRequest.index());
assert actionRequest.index() != null;
documentParsingObserver.setIndexName(actionRequest.index());
documentParsingObserver.close();

i++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.cbor.CborXContent;
import org.junit.Before;
Expand All @@ -89,6 +90,7 @@
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils.executeAndAssertSuccessful;
Expand Down Expand Up @@ -1162,6 +1164,67 @@ public void testExecuteBulkPipelineDoesNotExist() {
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}

public void testExecuteBulkRequestCallsDocumentParsingObserver() {
/*
* This test makes sure that for both insert and upsert requests, when we call executeBulkRequest DocumentParsingObserver is
* called using a non-null index name.
*/
AtomicInteger setNameCalledCount = new AtomicInteger(0);
AtomicInteger closeCalled = new AtomicInteger(0);
Supplier<DocumentParsingObserver> documentParsingObserverSupplier = () -> new DocumentParsingObserver() {
@Override
public XContentParser wrapParser(XContentParser xContentParser) {
return xContentParser;
}

@Override
public void setIndexName(String indexName) {
assertNotNull(indexName);
setNameCalledCount.incrementAndGet();
}

@Override
public void close() {
closeCalled.incrementAndGet();
}
};
IngestService ingestService = createWithProcessors(
Map.of("mock", (factories, tag, description, config) -> mockCompoundProcessor()),
documentParsingObserverSupplier
);

PutPipelineRequest putRequest = new PutPipelineRequest(
"_id",
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"),
XContentType.JSON
);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
ClusterState previousClusterState = clusterState;
clusterState = executePut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));

BulkRequest bulkRequest = new BulkRequest();
UpdateRequest updateRequest = new UpdateRequest("_index", "_id1").upsert("{}", "{}");
updateRequest.upsertRequest().setPipeline("_id");
bulkRequest.add(updateRequest);
IndexRequest indexRequest = new IndexRequest("_index").id("_id1").source(Map.of()).setPipeline("_id1");
bulkRequest.add(indexRequest);
@SuppressWarnings("unchecked")
BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(
bulkRequest.numberOfActions(),
bulkRequest.requests(),
indexReq -> {},
failureHandler,
completionHandler,
Names.WRITE
);
assertThat(setNameCalledCount.get(), equalTo(2));
assertThat(closeCalled.get(), equalTo(2));
}

public void testExecuteSuccess() {
IngestService ingestService = createWithProcessors(
Map.of("mock", (factories, tag, description, config) -> mockCompoundProcessor())
Expand Down Expand Up @@ -2518,6 +2581,13 @@ private static IngestService createWithProcessors() {
}

private static IngestService createWithProcessors(Map<String, Processor.Factory> processors) {
return createWithProcessors(processors, () -> DocumentParsingObserver.EMPTY_INSTANCE);
}

private static IngestService createWithProcessors(
Map<String, Processor.Factory> processors,
Supplier<DocumentParsingObserver> documentParsingObserverSupplier
) {
Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
Expand All @@ -2527,7 +2597,7 @@ private static IngestService createWithProcessors(Map<String, Processor.Factory>
public Map<String, Processor.Factory> getProcessors(final Processor.Parameters parameters) {
return processors;
}
}), client, null, () -> DocumentParsingObserver.EMPTY_INSTANCE);
}), client, null, documentParsingObserverSupplier);
}

private CompoundProcessor mockCompoundProcessor() {
Expand Down

0 comments on commit 8e0fd22

Please sign in to comment.