diff --git a/modules/mapper-extras/src/internalClusterTest/java/org/elasticsearch/index/mapper/RankFeaturesMapperIntegrationIT.java b/modules/mapper-extras/src/internalClusterTest/java/org/elasticsearch/index/mapper/RankFeaturesMapperIntegrationIT.java index c6544bac2b13c..c2b8554c072d8 100644 --- a/modules/mapper-extras/src/internalClusterTest/java/org/elasticsearch/index/mapper/RankFeaturesMapperIntegrationIT.java +++ b/modules/mapper-extras/src/internalClusterTest/java/org/elasticsearch/index/mapper/RankFeaturesMapperIntegrationIT.java @@ -8,6 +8,7 @@ package org.elasticsearch.index.mapper; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin; @@ -104,16 +105,17 @@ private void init() throws IOException { .get(); ensureGreen(); - BulkResponse bulk = client().prepareBulk() - .add( + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + BulkResponse bulk = bulkRequestBuilder.add( prepareIndex(INDEX_NAME).setId("all") .setSource(Map.of("all_rank_features", Map.of(LOWER_RANKED_FEATURE, 10, HIGHER_RANKED_FEATURE, 20))) ) - .add(prepareIndex(INDEX_NAME).setId("lower").setSource(Map.of("all_rank_features", Map.of(LOWER_RANKED_FEATURE, 10)))) - .add(prepareIndex(INDEX_NAME).setId("higher").setSource(Map.of("all_rank_features", Map.of(HIGHER_RANKED_FEATURE, 20)))) - .get(); - assertFalse(bulk.buildFailureMessage(), bulk.hasFailures()); - assertThat(refresh().getFailedShards(), equalTo(0)); + .add(prepareIndex(INDEX_NAME).setId("lower").setSource(Map.of("all_rank_features", Map.of(LOWER_RANKED_FEATURE, 10)))) + .add(prepareIndex(INDEX_NAME).setId("higher").setSource(Map.of("all_rank_features", Map.of(HIGHER_RANKED_FEATURE, 20)))) + .get(); + assertFalse(bulk.buildFailureMessage(), bulk.hasFailures()); + assertThat(refresh().getFailedShards(), equalTo(0)); + } } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java index e93aa98361f3b..123b2286b822b 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java @@ -417,13 +417,13 @@ void prepareBulkRequest(long thisBatchStartTimeNS, ScrollConsumableHitsResponse } request.timeout(mainRequest.getTimeout()); request.waitForActiveShards(mainRequest.getWaitForActiveShards()); - sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request)); + sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request), request::close); } /** * Send a bulk request, handling retries. */ - void sendBulkRequest(BulkRequest request, Runnable onSuccess) { + void sendBulkRequest(BulkRequest request, Runnable onSuccess, Runnable onFailure) { final int requestSize = request.requests().size(); if (logger.isDebugEnabled()) { logger.debug( @@ -438,7 +438,7 @@ void sendBulkRequest(BulkRequest request, Runnable onSuccess) { finishHim(null); return; } - bulkRetry.withBackoff(bulkClient::bulk, request, new ActionListener() { + bulkRetry.withBackoff(bulkClient::bulk, request, new ActionListener<>() { @Override public void onResponse(BulkResponse response) { logger.debug("[{}]: completed [{}] entry bulk request", task.getId(), requestSize); @@ -447,6 +447,7 @@ public void onResponse(BulkResponse response) { @Override public void onFailure(Exception e) { + onFailure.run(); finishHim(e); } }); diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/AsyncBulkByScrollActionTests.java index 455cff776cd7c..6c1b2381d24c6 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/AsyncBulkByScrollActionTests.java @@ -628,14 +628,14 @@ private void bulkRetryTestCase(boolean failWithRejection) throws Exception { request.add(new IndexRequest("index").id("id" + i)); } if (failWithRejection) { - action.sendBulkRequest(request, Assert::fail); + action.sendBulkRequest(request, Assert::fail, request::close); BulkByScrollResponse response = listener.get(); assertThat(response.getBulkFailures(), hasSize(1)); assertEquals(response.getBulkFailures().get(0).getStatus(), RestStatus.TOO_MANY_REQUESTS); assertThat(response.getSearchFailures(), empty()); assertNull(response.getReasonCancelled()); } else { - assertExactlyOnce(onSuccess -> action.sendBulkRequest(request, onSuccess)); + assertExactlyOnce(onSuccess -> action.sendBulkRequest(request, onSuccess, request::close)); } } @@ -704,7 +704,7 @@ public void testCancelBeforeScrollResponse() throws Exception { } public void testCancelBeforeSendBulkRequest() throws Exception { - cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.sendBulkRequest(new BulkRequest(), Assert::fail)); + cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.sendBulkRequest(new BulkRequest(), Assert::fail, () -> {})); } public void testCancelBeforeOnBulkResponse() throws Exception { diff --git a/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/SqlClearCursorActionIT.java b/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/SqlClearCursorActionIT.java index c575b73bdd878..76519abbe78da 100644 --- a/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/SqlClearCursorActionIT.java +++ b/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/SqlClearCursorActionIT.java @@ -21,13 +21,14 @@ public class SqlClearCursorActionIT extends AbstractSqlIntegTestCase { public void testSqlClearCursorAction() { assertAcked(indicesAdmin().prepareCreate("test").get()); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - int indexSize = randomIntBetween(100, 300); - logger.info("Indexing {} records", indexSize); - for (int i = 0; i < indexSize; i++) { - bulkRequestBuilder.add(new IndexRequest("test").id("id" + i).source("data", "bar", "count", i)); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + int indexSize = randomIntBetween(100, 300); + logger.info("Indexing {} records", indexSize); + for (int i = 0; i < indexSize; i++) { + bulkRequestBuilder.add(new IndexRequest("test").id("id" + i).source("data", "bar", "count", i)); + } + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); } - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); ensureYellow("test"); assertEquals(0, getNumberOfSearchContexts()); @@ -53,41 +54,42 @@ public void testSqlClearCursorAction() { public void testAutoCursorCleanup() { assertAcked(indicesAdmin().prepareCreate("test").get()); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - int indexSize = randomIntBetween(100, 300); - logger.info("Indexing {} records", indexSize); - for (int i = 0; i < indexSize; i++) { - bulkRequestBuilder.add(new IndexRequest("test").id("id" + i).source("data", "bar", "count", i)); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + int indexSize = randomIntBetween(100, 300); + logger.info("Indexing {} records", indexSize); + for (int i = 0; i < indexSize; i++) { + bulkRequestBuilder.add(new IndexRequest("test").id("id" + i).source("data", "bar", "count", i)); + } + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + ensureYellow("test"); + + assertEquals(0, getNumberOfSearchContexts()); + + int fetchSize = randomIntBetween(5, 20); + logger.info("Fetching {} records at a time", fetchSize); + SqlQueryResponse sqlQueryResponse = new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).query("SELECT * FROM test") + .fetchSize(fetchSize) + .get(); + assertEquals(fetchSize, sqlQueryResponse.size()); + + assertThat(getNumberOfSearchContexts(), greaterThan(0L)); + assertThat(sqlQueryResponse.cursor(), notNullValue()); + assertThat(sqlQueryResponse.cursor(), not(equalTo(Cursor.EMPTY))); + + long fetched = sqlQueryResponse.size(); + do { + sqlQueryResponse = new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).cursor(sqlQueryResponse.cursor()).get(); + fetched += sqlQueryResponse.size(); + } while (sqlQueryResponse.cursor().isEmpty() == false); + assertEquals(indexSize, fetched); + + SqlClearCursorResponse cleanCursorResponse = new SqlClearCursorRequestBuilder(client(), SqlClearCursorAction.INSTANCE).cursor( + sqlQueryResponse.cursor() + ).get(); + assertFalse(cleanCursorResponse.isSucceeded()); + + assertEquals(0, getNumberOfSearchContexts()); } - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - ensureYellow("test"); - - assertEquals(0, getNumberOfSearchContexts()); - - int fetchSize = randomIntBetween(5, 20); - logger.info("Fetching {} records at a time", fetchSize); - SqlQueryResponse sqlQueryResponse = new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).query("SELECT * FROM test") - .fetchSize(fetchSize) - .get(); - assertEquals(fetchSize, sqlQueryResponse.size()); - - assertThat(getNumberOfSearchContexts(), greaterThan(0L)); - assertThat(sqlQueryResponse.cursor(), notNullValue()); - assertThat(sqlQueryResponse.cursor(), not(equalTo(Cursor.EMPTY))); - - long fetched = sqlQueryResponse.size(); - do { - sqlQueryResponse = new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).cursor(sqlQueryResponse.cursor()).get(); - fetched += sqlQueryResponse.size(); - } while (sqlQueryResponse.cursor().isEmpty() == false); - assertEquals(indexSize, fetched); - - SqlClearCursorResponse cleanCursorResponse = new SqlClearCursorRequestBuilder(client(), SqlClearCursorAction.INSTANCE).cursor( - sqlQueryResponse.cursor() - ).get(); - assertFalse(cleanCursorResponse.isSucceeded()); - - assertEquals(0, getNumberOfSearchContexts()); } private long getNumberOfSearchContexts() {