Skip to content

Commit

Permalink
unit testing
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Oct 18, 2023
1 parent be5f759 commit 61aeb42
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ protected void doRun() {

protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
final long startTime = relativeTime();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());

boolean hasIndexRequestsWithPipelines = false;
final Metadata metadata = clusterService.state().getMetadata();
Expand Down Expand Up @@ -364,19 +363,19 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
}

// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
indexData(task, bulkRequest, executorName, listener, responses, autoCreateIndices, indicesThatCannotBeCreated, startTime);
indexData(task, bulkRequest, executorName, listener, autoCreateIndices, indicesThatCannotBeCreated, startTime);
}

protected void indexData(
Task task,
BulkRequest bulkRequest,
String executorName,
ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses,
Set<String> autoCreateIndices,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated,
long startTime
) {
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
if (autoCreateIndices.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ protected void indexData(
BulkRequest bulkRequest,
String executorName,
ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses,
Set<String> autoCreateIndices,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated,
long startTime
) {
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> request = bulkRequest.requests.get(i);
assert request instanceof IndexRequest; // This action is only ever called with IndexRequests
responses.set(
i,
BulkItemResponse.success(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public SimulateIndexResponse(String index, BytesReference source, XContentType s
public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("_index", getShardId().getIndexName());
builder.field("_source", XContentHelper.convertToMap(source, false, sourceXContentType).v2());
assert executedPipelines != null; // This ought to never be null because we always ask to list pipelines in simulate mode
builder.array("executed_pipelines", executedPipelines.toArray());
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2604,12 +2604,32 @@ private static IngestService createWithProcessors(
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
return new IngestService(mock(ClusterService.class), threadPool, null, null, null, List.of(new IngestPlugin() {
@Override
public Map<String, Processor.Factory> getProcessors(final Processor.Parameters parameters) {
return processors;
}
}), client, null, documentParsingObserverSupplier);
IngestService ingestService = new IngestService(
mock(ClusterService.class),
threadPool,
null,
null,
null,
List.of(new IngestPlugin() {
@Override
public Map<String, Processor.Factory> getProcessors(final Processor.Parameters parameters) {
return processors;
}
}),
client,
null,
documentParsingObserverSupplier
);
if (randomBoolean()) {
/*
* Testing the copy constructor directly is difficult because there is no equals() method in IngestService, but there is a lot
* of private internal state. Here we use the copy constructor half the time in all of the unit tests, with the assumption that
* if none of our tests observe any difference then the copy constructor is working as expected.
*/
return new IngestService(ingestService);
} else {
return ingestService;
}
}

private CompoundProcessor mockCompoundProcessor() {
Expand Down

0 comments on commit 61aeb42

Please sign in to comment.