diff --git a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/SearchCancellationIT.java b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/SearchCancellationIT.java index a6e530a9d66cf..d775a1bae8b91 100644 --- a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/SearchCancellationIT.java +++ b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/SearchCancellationIT.java @@ -77,21 +77,22 @@ public void testCancellationDuringTimeSeriesAggregation() throws Exception { for (int i = 0; i < numberOfRefreshes; i++) { // Make sure we sometimes have a few segments - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int j = 0; j < numberOfDocsPerRefresh; j++) { - bulkRequestBuilder.add( - prepareIndex("test").setOpType(DocWriteRequest.OpType.CREATE) - .setSource( - "@timestamp", - now + (long) i * numberOfDocsPerRefresh + j, - "val", - (double) j, - "dim", - String.valueOf(j % 100) - ) - ); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + for (int j = 0; j < numberOfDocsPerRefresh; j++) { + bulkRequestBuilder.add( + prepareIndex("test").setOpType(DocWriteRequest.OpType.CREATE) + .setSource( + "@timestamp", + now + (long) i * numberOfDocsPerRefresh + j, + "val", + (double) j, + "dim", + String.valueOf(j % 100) + ) + ); + } + assertNoFailures(bulkRequestBuilder.get()); } - assertNoFailures(bulkRequestBuilder.get()); } logger.info("Executing search"); diff --git a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/TimeSeriesNestedAggregationsIT.java b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/TimeSeriesNestedAggregationsIT.java index 5c58b7f7bff5a..cd69526c76481 100644 --- a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/TimeSeriesNestedAggregationsIT.java +++ b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/TimeSeriesNestedAggregationsIT.java @@ -64,15 +64,16 @@ public void setup() throws Exception { assertTrue(prepareTimeSeriesIndex(mapping, startMillis, endMillis, routingDimensions).isAcknowledged()); logger.info("Dimensions: " + numberOfDimensions + " docs: " + numberOfDocuments + " start: " + startMillis + " end: " + endMillis); - final BulkRequestBuilder bulkIndexRequest = client().prepareBulk(); - for (int docId = 0; docId < numberOfDocuments; docId++) { - final XContentBuilder document = timeSeriesDocument(FOO_DIM_VALUE, BAR_DIM_VALUE, BAZ_DIM_VALUE, docId, timestamps::next); - bulkIndexRequest.add(prepareIndex("index").setOpType(DocWriteRequest.OpType.CREATE).setSource(document)); + try (BulkRequestBuilder bulkIndexRequest = client().prepareBulk()) { + for (int docId = 0; docId < numberOfDocuments; docId++) { + final XContentBuilder document = timeSeriesDocument(FOO_DIM_VALUE, BAR_DIM_VALUE, BAZ_DIM_VALUE, docId, timestamps::next); + bulkIndexRequest.add(prepareIndex("index").setOpType(DocWriteRequest.OpType.CREATE).setSource(document)); + } + + final BulkResponse bulkIndexResponse = bulkIndexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + assertFalse(bulkIndexResponse.hasFailures()); + assertEquals(RestStatus.OK.getStatus(), client().admin().indices().prepareFlush("index").get().getStatus().getStatus()); } - - final BulkResponse bulkIndexResponse = bulkIndexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - assertFalse(bulkIndexResponse.hasFailures()); - assertEquals(RestStatus.OK.getStatus(), client().admin().indices().prepareFlush("index").get().getStatus().getStatus()); } private static XContentBuilder timeSeriesDocument( diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java index fcea4618f4cd4..e93aa98361f3b 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java @@ -412,12 +412,12 @@ void prepareBulkRequest(long thisBatchStartTimeNS, ScrollConsumableHitsResponse /* * If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation. */ - notifyDone(thisBatchStartTimeNS, asyncResponse, 0); + notifyDone(thisBatchStartTimeNS, asyncResponse, request); return; } request.timeout(mainRequest.getTimeout()); request.waitForActiveShards(mainRequest.getWaitForActiveShards()); - sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request.requests().size())); + sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request)); } /** @@ -515,7 +515,9 @@ void onBulkResponse(BulkResponse response, Runnable onSuccess) { } } - void notifyDone(long thisBatchStartTimeNS, ScrollConsumableHitsResponse asyncResponse, int batchSize) { + void notifyDone(long thisBatchStartTimeNS, ScrollConsumableHitsResponse asyncResponse, BulkRequest request) { + int batchSize = request.requests().size(); + request.close(); if (task.isCancelled()) { logger.debug("[{}]: finishing early because the task was cancelled", task.getId()); finishHim(null); diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/AsyncBulkByScrollActionTests.java index 3c5a3eb2e40f9..455cff776cd7c 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/AsyncBulkByScrollActionTests.java @@ -715,7 +715,7 @@ public void testCancelBeforeOnBulkResponse() throws Exception { public void testCancelBeforeStartNextScroll() throws Exception { long now = System.nanoTime(); - cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.notifyDone(now, null, 0)); + cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.notifyDone(now, null, new BulkRequest())); } public void testCancelBeforeRefreshAndFinish() throws Exception { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java index f4c3704cd65c1..99151d21ccecc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java @@ -219,14 +219,14 @@ protected void writeBacklog() { doc = backlog.poll(); } - client.bulk(bulkRequest, ActionListener.wrap(bulkItemResponses -> { + client.bulk(bulkRequest, ActionListener.releaseAfter(ActionListener.wrap(bulkItemResponses -> { if (bulkItemResponses.hasFailures()) { logger.warn("Failures bulk indexing the message back log: {}", bulkItemResponses.buildFailureMessage()); } else { logger.trace("Successfully wrote audit message backlog after upgrading template"); } backlog = null; - }, AbstractAuditor::onIndexFailure)); + }, AbstractAuditor::onIndexFailure), bulkRequest)); } // for testing diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDriver.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDriver.java index db6ab6d01613d..fff83f0962221 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDriver.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDriver.java @@ -182,15 +182,17 @@ private static String randomDateForRange(long start, long end) { } private static int bulkIndex(Client client, String dataStreamName, Supplier docSourceSupplier, int docCount) { - BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < docCount; i++) { - IndexRequest indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE); - XContentBuilder source = docSourceSupplier.get(); - indexRequest.source(source); - bulkRequestBuilder.add(indexRequest); + BulkResponse bulkResponse; + try (BulkRequestBuilder bulkRequestBuilder = client.prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < docCount; i++) { + IndexRequest indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE); + XContentBuilder source = docSourceSupplier.get(); + indexRequest.source(source); + bulkRequestBuilder.add(indexRequest); + } + bulkResponse = bulkRequestBuilder.get(); } - BulkResponse bulkResponse = bulkRequestBuilder.get(); int duplicates = 0; for (BulkItemResponse response : bulkResponse.getItems()) { if (response.isFailed()) { diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleTransportFailureIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleTransportFailureIT.java index d94d609cf3470..2a58dc2888799 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleTransportFailureIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleTransportFailureIT.java @@ -207,9 +207,10 @@ public XContentBuilder indexMapping() throws IOException { } public void indexDocuments(final String indexName, final List documentsJson) { - final BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - documentsJson.forEach(document -> bulkRequestBuilder.add(new IndexRequest(indexName).source(document, XContentType.JSON))); - assertFalse(bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get().hasFailures()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + documentsJson.forEach(document -> bulkRequestBuilder.add(new IndexRequest(indexName).source(document, XContentType.JSON))); + assertFalse(bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get().hasFailures()); + } } public void blockIndexWrites(final String indexName) { diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java index a023f171ad209..6d3c348c10627 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java @@ -251,31 +251,32 @@ private void assertTargetIndex(final InternalTestCluster cluster, final String t } private int bulkIndex(final String indexName, final SourceSupplier sourceSupplier, int docCount) throws IOException { - BulkRequestBuilder bulkRequestBuilder = internalCluster().client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < docCount; i++) { - IndexRequest indexRequest = new IndexRequest(indexName).opType(DocWriteRequest.OpType.CREATE); - XContentBuilder source = sourceSupplier.get(); - indexRequest.source(source); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - int duplicates = 0; - for (BulkItemResponse response : bulkResponse.getItems()) { - if (response.isFailed()) { - if (response.getFailure().getCause() instanceof VersionConflictEngineException) { - // A duplicate event was created by random generator. We should not fail for this - // reason. - logger.debug("We tried to insert a duplicate: [{}]", response.getFailureMessage()); - duplicates++; - } else { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = internalCluster().client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < docCount; i++) { + IndexRequest indexRequest = new IndexRequest(indexName).opType(DocWriteRequest.OpType.CREATE); + XContentBuilder source = sourceSupplier.get(); + indexRequest.source(source); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + int duplicates = 0; + for (BulkItemResponse response : bulkResponse.getItems()) { + if (response.isFailed()) { + if (response.getFailure().getCause() instanceof VersionConflictEngineException) { + // A duplicate event was created by random generator. We should not fail for this + // reason. + logger.debug("We tried to insert a duplicate: [{}]", response.getFailureMessage()); + duplicates++; + } else { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } } + int docsIndexed = docCount - duplicates; + logger.info("Indexed [{}] documents. Dropped [{}] duplicates.", docsIndexed, duplicates); + return docsIndexed; } - int docsIndexed = docCount - duplicates; - logger.info("Indexed [{}] documents. Dropped [{}] duplicates.", docsIndexed, duplicates); - return docsIndexed; } private String randomDateForInterval(final DateHistogramInterval interval, final long startTime) { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java index 049684a2c778d..bf9f284868e40 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java @@ -130,30 +130,31 @@ public void testWriteThreadLivenessBackToBack() throws Exception { XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject(); - BulkRequest bulk = new BulkRequest(enrichedIndexName); - bulk.timeout(new TimeValue(10, TimeUnit.SECONDS)); - for (int idx = 0; idx < 50; idx++) { - bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName)); - } + try (BulkRequest bulk = new BulkRequest(enrichedIndexName)) { + bulk.timeout(new TimeValue(10, TimeUnit.SECONDS)); + for (int idx = 0; idx < 50; idx++) { + bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName)); + } - BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS)); + BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS)); - assertTrue(bulkItemResponses.hasFailures()); - BulkItemResponse.Failure firstFailure = null; - int successfulItems = 0; - for (BulkItemResponse item : bulkItemResponses.getItems()) { - if (item.isFailed() && firstFailure == null) { - firstFailure = item.getFailure(); - } else if (item.isFailed() == false) { - successfulItems++; + assertTrue(bulkItemResponses.hasFailures()); + BulkItemResponse.Failure firstFailure = null; + int successfulItems = 0; + for (BulkItemResponse item : bulkItemResponses.getItems()) { + if (item.isFailed() && firstFailure == null) { + firstFailure = item.getFailure(); + } else if (item.isFailed() == false) { + successfulItems++; + } } - } - assertNotNull(firstFailure); - assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429))); - assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity")); + assertNotNull(firstFailure); + assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429))); + assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity")); - client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet(); - assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value); + client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet(); + assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value); + } } public void testWriteThreadLivenessWithPipeline() throws Exception { @@ -253,29 +254,30 @@ public void testWriteThreadLivenessWithPipeline() throws Exception { XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject(); - BulkRequest bulk = new BulkRequest(enrichedIndexName); - bulk.timeout(new TimeValue(10, TimeUnit.SECONDS)); - for (int idx = 0; idx < 50; idx++) { - bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName1)); - } + try (BulkRequest bulk = new BulkRequest(enrichedIndexName)) { + bulk.timeout(new TimeValue(10, TimeUnit.SECONDS)); + for (int idx = 0; idx < 50; idx++) { + bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName1)); + } - BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS)); + BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS)); - assertTrue(bulkItemResponses.hasFailures()); - BulkItemResponse.Failure firstFailure = null; - int successfulItems = 0; - for (BulkItemResponse item : bulkItemResponses.getItems()) { - if (item.isFailed() && firstFailure == null) { - firstFailure = item.getFailure(); - } else if (item.isFailed() == false) { - successfulItems++; + assertTrue(bulkItemResponses.hasFailures()); + BulkItemResponse.Failure firstFailure = null; + int successfulItems = 0; + for (BulkItemResponse item : bulkItemResponses.getItems()) { + if (item.isFailed() && firstFailure == null) { + firstFailure = item.getFailure(); + } else if (item.isFailed() == false) { + successfulItems++; + } } - } - assertNotNull(firstFailure); - assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429))); - assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity")); + assertNotNull(firstFailure); + assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429))); + assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity")); - client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet(); - assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value); + client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet(); + assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value); + } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java index d31488cda02bd..4e0806336be10 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java @@ -99,7 +99,7 @@ protected void doExecute( ML_ORIGIN, BulkAction.INSTANCE, bulkRequestBuilder.request(), - new ActionListener() { + ActionListener.releaseAfter(new ActionListener<>() { @Override public void onResponse(BulkResponse response) { jobManager.updateProcessOnCalendarChanged( @@ -115,7 +115,7 @@ public void onResponse(BulkResponse response) { public void onFailure(Exception e) { listener.onFailure(ExceptionsHelper.serverError("Error indexing event", e)); } - } + }, bulkRequestBuilder) ); }, listener::onFailure); diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalBulk.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalBulk.java index 54c90de64eaee..a7dfecc6cce08 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalBulk.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalBulk.java @@ -105,13 +105,13 @@ protected void doFlush(ActionListener listener) { client.threadPool().getThreadContext(), MONITORING_ORIGIN, requestBuilder.request(), - ActionListener.wrap(bulkResponse -> { + ActionListener.releaseAfter(ActionListener.wrap(bulkResponse -> { if (bulkResponse.hasFailures()) { throwExportException(bulkResponse.getItems(), listener); } else { listener.onResponse(null); } - }, e -> listener.onFailure(new ExportException("failed to flush export bulk [{}]", e, name))), + }, e -> listener.onFailure(new ExportException("failed to flush export bulk [{}]", e, name))), requestBuilder), client::bulk ); } finally { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java index 2d700e23f127c..fe0331bb784ae 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java @@ -448,14 +448,17 @@ private void createApiKeyAndIndexIt( SECURITY_ORIGIN, BulkAction.INSTANCE, bulkRequest, - TransportBulkAction.unwrappingSingleItemBulkResponse(ActionListener.wrap(indexResponse -> { - assert request.getId().equals(indexResponse.getId()); - assert indexResponse.getResult() == DocWriteResponse.Result.CREATED; - final ListenableFuture listenableFuture = new ListenableFuture<>(); - listenableFuture.onResponse(new CachedApiKeyHashResult(true, apiKey)); - apiKeyAuthCache.put(request.getId(), listenableFuture); - listener.onResponse(new CreateApiKeyResponse(request.getName(), request.getId(), apiKey, expiration)); - }, listener::onFailure)) + ActionListener.releaseAfter( + TransportBulkAction.unwrappingSingleItemBulkResponse(ActionListener.wrap(indexResponse -> { + assert request.getId().equals(indexResponse.getId()); + assert indexResponse.getResult() == DocWriteResponse.Result.CREATED; + final ListenableFuture listenableFuture = new ListenableFuture<>(); + listenableFuture.onResponse(new CachedApiKeyHashResult(true, apiKey)); + apiKeyAuthCache.put(request.getId(), listenableFuture); + listener.onResponse(new CreateApiKeyResponse(request.getName(), request.getId(), apiKey, expiration)); + }, listener::onFailure)), + bulkRequestBuilder + ) ) ); } catch (IOException e) { @@ -574,9 +577,12 @@ private void updateApiKeys( client.threadPool().getThreadContext(), SECURITY_ORIGIN, bulkRequestBuilder.request(), - ActionListener.wrap( - bulkResponse -> buildResponseAndClearCache(bulkResponse, responseBuilder, listener), - ex -> listener.onFailure(traceLog("execute bulk request for update", ex)) + ActionListener.releaseAfter( + ActionListener.wrap( + bulkResponse -> buildResponseAndClearCache(bulkResponse, responseBuilder, listener), + ex -> listener.onFailure(traceLog("execute bulk request for update", ex)) + ), + bulkRequestBuilder ), client::bulk ) @@ -1688,7 +1694,7 @@ private void indexInvalidation(Collection apiKeyIds, ActionListenerwrap(bulkResponse -> { + ActionListener.releaseAfter(ActionListener.wrap(bulkResponse -> { ArrayList failedRequestResponses = new ArrayList<>(); ArrayList previouslyInvalidated = new ArrayList<>(); ArrayList invalidated = new ArrayList<>(); @@ -1718,7 +1724,7 @@ private void indexInvalidation(Collection apiKeyIds, ActionListenerwrap(bulkResponse -> { + ActionListener.releaseAfter(ActionListener.wrap(bulkResponse -> { ArrayList retryTokenDocIds = new ArrayList<>(); ArrayList failedRequestResponses = new ArrayList<>(); ArrayList previouslyInvalidated = new ArrayList<>(); @@ -1055,7 +1055,7 @@ private void indexInvalidation( } else { listener.onFailure(e); } - }), + }), bulkRequestBuilder), client::bulk ) ); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java index 004874f5b63b9..28b613d7163c7 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java @@ -393,10 +393,13 @@ public void putPrivileges( client.threadPool().getThreadContext(), SECURITY_ORIGIN, bulkRequestBuilder.request(), - ActionListener.wrap(bulkResponse -> handleBulkResponse(bulkResponse, listener), ex -> { - logger.warn(Strings.format("Failed to write application privileges to %s", securityIndexManager.aliasName()), ex); - listener.onFailure(ex); - }), + ActionListener.releaseAfter( + ActionListener.wrap(bulkResponse -> handleBulkResponse(bulkResponse, listener), ex -> { + logger.warn(Strings.format("Failed to write application privileges to %s", securityIndexManager.aliasName()), ex); + listener.onFailure(ex); + }), + bulkRequestBuilder + ), client::bulk ); }); diff --git a/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/SqlSearchPageTimeoutIT.java b/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/SqlSearchPageTimeoutIT.java index 5eec6b8b802d1..e4c85ab1723e0 100644 --- a/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/SqlSearchPageTimeoutIT.java +++ b/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/SqlSearchPageTimeoutIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.sql.action; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.support.WriteRequest; @@ -62,11 +63,12 @@ public void testSearchContextIsCleanedUpAfterPageTimeout(String query) throws Ex private void setupTestIndex() { assertAcked(indicesAdmin().prepareCreate("test").get()); - client().prepareBulk() - .add(new IndexRequest("test").id("1").source("field", "bar")) - .add(new IndexRequest("test").id("2").source("field", "baz")) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .get(); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.add(new IndexRequest("test").id("1").source("field", "bar")) + .add(new IndexRequest("test").id("2").source("field", "baz")) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + } ensureYellow("test"); }