Skip to content

Commit

Permalink
fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Nov 30, 2023
1 parent ab77f83 commit fa59215
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.index.mapper;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
Expand Down Expand Up @@ -104,16 +105,17 @@ private void init() throws IOException {
.get();
ensureGreen();

BulkResponse bulk = client().prepareBulk()
.add(
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
BulkResponse bulk = bulkRequestBuilder.add(
prepareIndex(INDEX_NAME).setId("all")
.setSource(Map.of("all_rank_features", Map.of(LOWER_RANKED_FEATURE, 10, HIGHER_RANKED_FEATURE, 20)))
)
.add(prepareIndex(INDEX_NAME).setId("lower").setSource(Map.of("all_rank_features", Map.of(LOWER_RANKED_FEATURE, 10))))
.add(prepareIndex(INDEX_NAME).setId("higher").setSource(Map.of("all_rank_features", Map.of(HIGHER_RANKED_FEATURE, 20))))
.get();
assertFalse(bulk.buildFailureMessage(), bulk.hasFailures());
assertThat(refresh().getFailedShards(), equalTo(0));
.add(prepareIndex(INDEX_NAME).setId("lower").setSource(Map.of("all_rank_features", Map.of(LOWER_RANKED_FEATURE, 10))))
.add(prepareIndex(INDEX_NAME).setId("higher").setSource(Map.of("all_rank_features", Map.of(HIGHER_RANKED_FEATURE, 20))))
.get();
assertFalse(bulk.buildFailureMessage(), bulk.hasFailures());
assertThat(refresh().getFailedShards(), equalTo(0));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,13 @@ void prepareBulkRequest(long thisBatchStartTimeNS, ScrollConsumableHitsResponse
}
request.timeout(mainRequest.getTimeout());
request.waitForActiveShards(mainRequest.getWaitForActiveShards());
sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request));
sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request), request::close);
}

/**
* Send a bulk request, handling retries.
*/
void sendBulkRequest(BulkRequest request, Runnable onSuccess) {
void sendBulkRequest(BulkRequest request, Runnable onSuccess, Runnable onFailure) {
final int requestSize = request.requests().size();
if (logger.isDebugEnabled()) {
logger.debug(
Expand All @@ -438,7 +438,7 @@ void sendBulkRequest(BulkRequest request, Runnable onSuccess) {
finishHim(null);
return;
}
bulkRetry.withBackoff(bulkClient::bulk, request, new ActionListener<BulkResponse>() {
bulkRetry.withBackoff(bulkClient::bulk, request, new ActionListener<>() {
@Override
public void onResponse(BulkResponse response) {
logger.debug("[{}]: completed [{}] entry bulk request", task.getId(), requestSize);
Expand All @@ -447,6 +447,7 @@ public void onResponse(BulkResponse response) {

@Override
public void onFailure(Exception e) {
onFailure.run();
finishHim(e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,14 +628,14 @@ private void bulkRetryTestCase(boolean failWithRejection) throws Exception {
request.add(new IndexRequest("index").id("id" + i));
}
if (failWithRejection) {
action.sendBulkRequest(request, Assert::fail);
action.sendBulkRequest(request, Assert::fail, request::close);
BulkByScrollResponse response = listener.get();
assertThat(response.getBulkFailures(), hasSize(1));
assertEquals(response.getBulkFailures().get(0).getStatus(), RestStatus.TOO_MANY_REQUESTS);
assertThat(response.getSearchFailures(), empty());
assertNull(response.getReasonCancelled());
} else {
assertExactlyOnce(onSuccess -> action.sendBulkRequest(request, onSuccess));
assertExactlyOnce(onSuccess -> action.sendBulkRequest(request, onSuccess, request::close));
}
}

Expand Down Expand Up @@ -704,7 +704,7 @@ public void testCancelBeforeScrollResponse() throws Exception {
}

public void testCancelBeforeSendBulkRequest() throws Exception {
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.sendBulkRequest(new BulkRequest(), Assert::fail));
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.sendBulkRequest(new BulkRequest(), Assert::fail, () -> {}));
}

public void testCancelBeforeOnBulkResponse() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ public class SqlClearCursorActionIT extends AbstractSqlIntegTestCase {

public void testSqlClearCursorAction() {
assertAcked(indicesAdmin().prepareCreate("test").get());
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
int indexSize = randomIntBetween(100, 300);
logger.info("Indexing {} records", indexSize);
for (int i = 0; i < indexSize; i++) {
bulkRequestBuilder.add(new IndexRequest("test").id("id" + i).source("data", "bar", "count", i));
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
int indexSize = randomIntBetween(100, 300);
logger.info("Indexing {} records", indexSize);
for (int i = 0; i < indexSize; i++) {
bulkRequestBuilder.add(new IndexRequest("test").id("id" + i).source("data", "bar", "count", i));
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
ensureYellow("test");

assertEquals(0, getNumberOfSearchContexts());
Expand All @@ -53,41 +54,42 @@ public void testSqlClearCursorAction() {

public void testAutoCursorCleanup() {
assertAcked(indicesAdmin().prepareCreate("test").get());
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
int indexSize = randomIntBetween(100, 300);
logger.info("Indexing {} records", indexSize);
for (int i = 0; i < indexSize; i++) {
bulkRequestBuilder.add(new IndexRequest("test").id("id" + i).source("data", "bar", "count", i));
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
int indexSize = randomIntBetween(100, 300);
logger.info("Indexing {} records", indexSize);
for (int i = 0; i < indexSize; i++) {
bulkRequestBuilder.add(new IndexRequest("test").id("id" + i).source("data", "bar", "count", i));
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
ensureYellow("test");

assertEquals(0, getNumberOfSearchContexts());

int fetchSize = randomIntBetween(5, 20);
logger.info("Fetching {} records at a time", fetchSize);
SqlQueryResponse sqlQueryResponse = new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).query("SELECT * FROM test")
.fetchSize(fetchSize)
.get();
assertEquals(fetchSize, sqlQueryResponse.size());

assertThat(getNumberOfSearchContexts(), greaterThan(0L));
assertThat(sqlQueryResponse.cursor(), notNullValue());
assertThat(sqlQueryResponse.cursor(), not(equalTo(Cursor.EMPTY)));

long fetched = sqlQueryResponse.size();
do {
sqlQueryResponse = new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).cursor(sqlQueryResponse.cursor()).get();
fetched += sqlQueryResponse.size();
} while (sqlQueryResponse.cursor().isEmpty() == false);
assertEquals(indexSize, fetched);

SqlClearCursorResponse cleanCursorResponse = new SqlClearCursorRequestBuilder(client(), SqlClearCursorAction.INSTANCE).cursor(
sqlQueryResponse.cursor()
).get();
assertFalse(cleanCursorResponse.isSucceeded());

assertEquals(0, getNumberOfSearchContexts());
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
ensureYellow("test");

assertEquals(0, getNumberOfSearchContexts());

int fetchSize = randomIntBetween(5, 20);
logger.info("Fetching {} records at a time", fetchSize);
SqlQueryResponse sqlQueryResponse = new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).query("SELECT * FROM test")
.fetchSize(fetchSize)
.get();
assertEquals(fetchSize, sqlQueryResponse.size());

assertThat(getNumberOfSearchContexts(), greaterThan(0L));
assertThat(sqlQueryResponse.cursor(), notNullValue());
assertThat(sqlQueryResponse.cursor(), not(equalTo(Cursor.EMPTY)));

long fetched = sqlQueryResponse.size();
do {
sqlQueryResponse = new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).cursor(sqlQueryResponse.cursor()).get();
fetched += sqlQueryResponse.size();
} while (sqlQueryResponse.cursor().isEmpty() == false);
assertEquals(indexSize, fetched);

SqlClearCursorResponse cleanCursorResponse = new SqlClearCursorRequestBuilder(client(), SqlClearCursorAction.INSTANCE).cursor(
sqlQueryResponse.cursor()
).get();
assertFalse(cleanCursorResponse.isSucceeded());

assertEquals(0, getNumberOfSearchContexts());
}

private long getNumberOfSearchContexts() {
Expand Down

0 comments on commit fa59215

Please sign in to comment.