Skip to content

Commit

Permalink
fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Nov 30, 2023
1 parent 99f2cd2 commit 3680075
Show file tree
Hide file tree
Showing 15 changed files with 150 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,22 @@ public void testCancellationDuringTimeSeriesAggregation() throws Exception {

for (int i = 0; i < numberOfRefreshes; i++) {
// Make sure we sometimes have a few segments
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int j = 0; j < numberOfDocsPerRefresh; j++) {
bulkRequestBuilder.add(
prepareIndex("test").setOpType(DocWriteRequest.OpType.CREATE)
.setSource(
"@timestamp",
now + (long) i * numberOfDocsPerRefresh + j,
"val",
(double) j,
"dim",
String.valueOf(j % 100)
)
);
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) {
for (int j = 0; j < numberOfDocsPerRefresh; j++) {
bulkRequestBuilder.add(
prepareIndex("test").setOpType(DocWriteRequest.OpType.CREATE)
.setSource(
"@timestamp",
now + (long) i * numberOfDocsPerRefresh + j,
"val",
(double) j,
"dim",
String.valueOf(j % 100)
)
);
}
assertNoFailures(bulkRequestBuilder.get());
}
assertNoFailures(bulkRequestBuilder.get());
}

logger.info("Executing search");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,16 @@ public void setup() throws Exception {
assertTrue(prepareTimeSeriesIndex(mapping, startMillis, endMillis, routingDimensions).isAcknowledged());
logger.info("Dimensions: " + numberOfDimensions + " docs: " + numberOfDocuments + " start: " + startMillis + " end: " + endMillis);

final BulkRequestBuilder bulkIndexRequest = client().prepareBulk();
for (int docId = 0; docId < numberOfDocuments; docId++) {
final XContentBuilder document = timeSeriesDocument(FOO_DIM_VALUE, BAR_DIM_VALUE, BAZ_DIM_VALUE, docId, timestamps::next);
bulkIndexRequest.add(prepareIndex("index").setOpType(DocWriteRequest.OpType.CREATE).setSource(document));
try (BulkRequestBuilder bulkIndexRequest = client().prepareBulk()) {
for (int docId = 0; docId < numberOfDocuments; docId++) {
final XContentBuilder document = timeSeriesDocument(FOO_DIM_VALUE, BAR_DIM_VALUE, BAZ_DIM_VALUE, docId, timestamps::next);
bulkIndexRequest.add(prepareIndex("index").setOpType(DocWriteRequest.OpType.CREATE).setSource(document));
}

final BulkResponse bulkIndexResponse = bulkIndexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
assertFalse(bulkIndexResponse.hasFailures());
assertEquals(RestStatus.OK.getStatus(), client().admin().indices().prepareFlush("index").get().getStatus().getStatus());
}

final BulkResponse bulkIndexResponse = bulkIndexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
assertFalse(bulkIndexResponse.hasFailures());
assertEquals(RestStatus.OK.getStatus(), client().admin().indices().prepareFlush("index").get().getStatus().getStatus());
}

private static XContentBuilder timeSeriesDocument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,12 @@ void prepareBulkRequest(long thisBatchStartTimeNS, ScrollConsumableHitsResponse
/*
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
*/
notifyDone(thisBatchStartTimeNS, asyncResponse, 0);
notifyDone(thisBatchStartTimeNS, asyncResponse, request);
return;
}
request.timeout(mainRequest.getTimeout());
request.waitForActiveShards(mainRequest.getWaitForActiveShards());
sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request.requests().size()));
sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request));
}

/**
Expand Down Expand Up @@ -515,7 +515,9 @@ void onBulkResponse(BulkResponse response, Runnable onSuccess) {
}
}

void notifyDone(long thisBatchStartTimeNS, ScrollConsumableHitsResponse asyncResponse, int batchSize) {
void notifyDone(long thisBatchStartTimeNS, ScrollConsumableHitsResponse asyncResponse, BulkRequest request) {
int batchSize = request.requests().size();
request.close();
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
finishHim(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ public void testCancelBeforeOnBulkResponse() throws Exception {

public void testCancelBeforeStartNextScroll() throws Exception {
long now = System.nanoTime();
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.notifyDone(now, null, 0));
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.notifyDone(now, null, new BulkRequest()));
}

public void testCancelBeforeRefreshAndFinish() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,14 @@ protected void writeBacklog() {
doc = backlog.poll();
}

client.bulk(bulkRequest, ActionListener.wrap(bulkItemResponses -> {
client.bulk(bulkRequest, ActionListener.releaseAfter(ActionListener.wrap(bulkItemResponses -> {
if (bulkItemResponses.hasFailures()) {
logger.warn("Failures bulk indexing the message back log: {}", bulkItemResponses.buildFailureMessage());
} else {
logger.trace("Successfully wrote audit message backlog after upgrading template");
}
backlog = null;
}, AbstractAuditor::onIndexFailure));
}, AbstractAuditor::onIndexFailure), bulkRequest));
}

// for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,17 @@ private static String randomDateForRange(long start, long end) {
}

private static int bulkIndex(Client client, String dataStreamName, Supplier<XContentBuilder> docSourceSupplier, int docCount) {
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < docCount; i++) {
IndexRequest indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE);
XContentBuilder source = docSourceSupplier.get();
indexRequest.source(source);
bulkRequestBuilder.add(indexRequest);
BulkResponse bulkResponse;
try (BulkRequestBuilder bulkRequestBuilder = client.prepareBulk()) {
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < docCount; i++) {
IndexRequest indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE);
XContentBuilder source = docSourceSupplier.get();
indexRequest.source(source);
bulkRequestBuilder.add(indexRequest);
}
bulkResponse = bulkRequestBuilder.get();
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
int duplicates = 0;
for (BulkItemResponse response : bulkResponse.getItems()) {
if (response.isFailed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,10 @@ public XContentBuilder indexMapping() throws IOException {
}

public void indexDocuments(final String indexName, final List<String> documentsJson) {
final BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
documentsJson.forEach(document -> bulkRequestBuilder.add(new IndexRequest(indexName).source(document, XContentType.JSON)));
assertFalse(bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get().hasFailures());
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
documentsJson.forEach(document -> bulkRequestBuilder.add(new IndexRequest(indexName).source(document, XContentType.JSON)));
assertFalse(bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get().hasFailures());
}
}

public void blockIndexWrites(final String indexName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,31 +251,32 @@ private void assertTargetIndex(final InternalTestCluster cluster, final String t
}

private int bulkIndex(final String indexName, final SourceSupplier sourceSupplier, int docCount) throws IOException {
BulkRequestBuilder bulkRequestBuilder = internalCluster().client().prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < docCount; i++) {
IndexRequest indexRequest = new IndexRequest(indexName).opType(DocWriteRequest.OpType.CREATE);
XContentBuilder source = sourceSupplier.get();
indexRequest.source(source);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
int duplicates = 0;
for (BulkItemResponse response : bulkResponse.getItems()) {
if (response.isFailed()) {
if (response.getFailure().getCause() instanceof VersionConflictEngineException) {
// A duplicate event was created by random generator. We should not fail for this
// reason.
logger.debug("We tried to insert a duplicate: [{}]", response.getFailureMessage());
duplicates++;
} else {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
try (BulkRequestBuilder bulkRequestBuilder = internalCluster().client().prepareBulk()) {
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < docCount; i++) {
IndexRequest indexRequest = new IndexRequest(indexName).opType(DocWriteRequest.OpType.CREATE);
XContentBuilder source = sourceSupplier.get();
indexRequest.source(source);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
int duplicates = 0;
for (BulkItemResponse response : bulkResponse.getItems()) {
if (response.isFailed()) {
if (response.getFailure().getCause() instanceof VersionConflictEngineException) {
// A duplicate event was created by random generator. We should not fail for this
// reason.
logger.debug("We tried to insert a duplicate: [{}]", response.getFailureMessage());
duplicates++;
} else {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
}
}
int docsIndexed = docCount - duplicates;
logger.info("Indexed [{}] documents. Dropped [{}] duplicates.", docsIndexed, duplicates);
return docsIndexed;
}
int docsIndexed = docCount - duplicates;
logger.info("Indexed [{}] documents. Dropped [{}] duplicates.", docsIndexed, duplicates);
return docsIndexed;
}

private String randomDateForInterval(final DateHistogramInterval interval, final long startTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,30 +130,31 @@ public void testWriteThreadLivenessBackToBack() throws Exception {

XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject();

BulkRequest bulk = new BulkRequest(enrichedIndexName);
bulk.timeout(new TimeValue(10, TimeUnit.SECONDS));
for (int idx = 0; idx < 50; idx++) {
bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName));
}
try (BulkRequest bulk = new BulkRequest(enrichedIndexName)) {
bulk.timeout(new TimeValue(10, TimeUnit.SECONDS));
for (int idx = 0; idx < 50; idx++) {
bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName));
}

BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS));
BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS));

assertTrue(bulkItemResponses.hasFailures());
BulkItemResponse.Failure firstFailure = null;
int successfulItems = 0;
for (BulkItemResponse item : bulkItemResponses.getItems()) {
if (item.isFailed() && firstFailure == null) {
firstFailure = item.getFailure();
} else if (item.isFailed() == false) {
successfulItems++;
assertTrue(bulkItemResponses.hasFailures());
BulkItemResponse.Failure firstFailure = null;
int successfulItems = 0;
for (BulkItemResponse item : bulkItemResponses.getItems()) {
if (item.isFailed() && firstFailure == null) {
firstFailure = item.getFailure();
} else if (item.isFailed() == false) {
successfulItems++;
}
}
}
assertNotNull(firstFailure);
assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429)));
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));
assertNotNull(firstFailure);
assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429)));
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));

client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
}
}

public void testWriteThreadLivenessWithPipeline() throws Exception {
Expand Down Expand Up @@ -253,29 +254,30 @@ public void testWriteThreadLivenessWithPipeline() throws Exception {

XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject();

BulkRequest bulk = new BulkRequest(enrichedIndexName);
bulk.timeout(new TimeValue(10, TimeUnit.SECONDS));
for (int idx = 0; idx < 50; idx++) {
bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName1));
}
try (BulkRequest bulk = new BulkRequest(enrichedIndexName)) {
bulk.timeout(new TimeValue(10, TimeUnit.SECONDS));
for (int idx = 0; idx < 50; idx++) {
bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName1));
}

BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS));
BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS));

assertTrue(bulkItemResponses.hasFailures());
BulkItemResponse.Failure firstFailure = null;
int successfulItems = 0;
for (BulkItemResponse item : bulkItemResponses.getItems()) {
if (item.isFailed() && firstFailure == null) {
firstFailure = item.getFailure();
} else if (item.isFailed() == false) {
successfulItems++;
assertTrue(bulkItemResponses.hasFailures());
BulkItemResponse.Failure firstFailure = null;
int successfulItems = 0;
for (BulkItemResponse item : bulkItemResponses.getItems()) {
if (item.isFailed() && firstFailure == null) {
firstFailure = item.getFailure();
} else if (item.isFailed() == false) {
successfulItems++;
}
}
}
assertNotNull(firstFailure);
assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429)));
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));
assertNotNull(firstFailure);
assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429)));
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));

client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ protected void doExecute(
ML_ORIGIN,
BulkAction.INSTANCE,
bulkRequestBuilder.request(),
new ActionListener<BulkResponse>() {
ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(BulkResponse response) {
jobManager.updateProcessOnCalendarChanged(
Expand All @@ -115,7 +115,7 @@ public void onResponse(BulkResponse response) {
public void onFailure(Exception e) {
listener.onFailure(ExceptionsHelper.serverError("Error indexing event", e));
}
}
}, bulkRequestBuilder)
);
}, listener::onFailure);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ protected void doFlush(ActionListener<Void> listener) {
client.threadPool().getThreadContext(),
MONITORING_ORIGIN,
requestBuilder.request(),
ActionListener.<BulkResponse>wrap(bulkResponse -> {
ActionListener.releaseAfter(ActionListener.<BulkResponse>wrap(bulkResponse -> {
if (bulkResponse.hasFailures()) {
throwExportException(bulkResponse.getItems(), listener);
} else {
listener.onResponse(null);
}
}, e -> listener.onFailure(new ExportException("failed to flush export bulk [{}]", e, name))),
}, e -> listener.onFailure(new ExportException("failed to flush export bulk [{}]", e, name))), requestBuilder),
client::bulk
);
} finally {
Expand Down
Loading

0 comments on commit 3680075

Please sign in to comment.