diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 425461d1f4ba1..fa73dca8f556f 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -11,27 +11,37 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.RefCounted; +import org.elasticsearch.core.Releasable; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.transport.LeakTracker; import java.io.IOException; import java.util.Objects; -public class BulkItemRequest implements Writeable, Accountable { +public class BulkItemRequest implements Writeable, Accountable, RefCounted, Releasable { private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkItemRequest.class); private final int id; private final DocWriteRequest request; private volatile BulkItemResponse primaryResponse; + private final RefCounted refCounted; BulkItemRequest(@Nullable ShardId shardId, StreamInput in) throws IOException { id = in.readVInt(); request = DocWriteRequest.readDocumentRequest(shardId, in); + if (this.request instanceof IndexRequest indexRequest) { + indexRequest.incRef(); + } + this.refCounted = LeakTracker.wrap(new BulkItemRequestRefCounted()); if (in.readBoolean()) { if (shardId == null) { primaryResponse = new BulkItemResponse(in); @@ -45,6 +55,10 @@ public class BulkItemRequest implements Writeable, Accountable { public BulkItemRequest(int id, DocWriteRequest request) { this.id = id; this.request = request; + this.refCounted = LeakTracker.wrap(new BulkItemRequestRefCounted()); + if (this.request instanceof IndexRequest indexRequest) { + indexRequest.incRef(); + } } public int id() { @@ -111,4 +125,41 @@ public void writeThin(StreamOutput out) throws IOException { public long ramBytesUsed() { return SHALLOW_SIZE + request.ramBytesUsed(); } + + @Override + public void incRef() { + refCounted.incRef(); + } + + @Override + public boolean tryIncRef() { + return refCounted.tryIncRef(); + } + + @Override + public boolean decRef() { + boolean success = refCounted.decRef(); + if (this.request instanceof IndexRequest indexRequest) { + success = indexRequest.decRef() && success; + } + return success; + } + + @Override + public boolean hasReferences() { + return refCounted.hasReferences(); + } + + @Override + public void close() { + decRef(); + } + + private static class BulkItemRequestRefCounted extends AbstractRefCounted { + @Override + protected void closeInternal() { + // nothing to close + } + } + } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java index 4291ba5895beb..49c2822599be8 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java @@ -292,7 +292,14 @@ public void markAsCompleted(BulkItemResponse translatedResponse) { assert translatedResponse.getItemId() == getCurrentItem().id(); if (translatedResponse.isFailed() == false && requestToExecute != null && requestToExecute != getCurrent()) { - request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute); + BulkItemRequest oldBulkItemRequest = request.items()[currentIndex]; + try { + request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute); + } finally { + if (oldBulkItemRequest != null) { + oldBulkItemRequest.decRef(); + } + } } getCurrentItem().setPrimaryResponse(translatedResponse); currentItemState = ItemProcessingState.COMPLETED; diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 1fbe482fbabaa..a9cb51415124f 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -159,6 +159,7 @@ BulkRequest internalAdd(IndexRequest request) { Objects.requireNonNull(request, "'request' must not be null"); applyGlobalMandatoryParameters(request); + request.incRef(); requests.add(request); // lack of source is validated in validate() method sizeInBytes += (request.source() != null ? request.source().length() : 0) + REQUEST_OVERHEAD; @@ -481,7 +482,15 @@ public boolean tryIncRef() { @Override public boolean decRef() { - return refCounted.decRef(); + boolean success = refCounted.decRef(); + if (refCounted.hasReferences() == false) { + for (DocWriteRequest request : requests) { + if (request instanceof IndexRequest indexRequest) { + success = indexRequest.decRef() && success; + } + } + } + return success; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index dcc883758e742..0399e2c73c4e6 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -48,6 +48,9 @@ public BulkShardRequest(StreamInput in) throws IOException { public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) { super(shardId); this.items = items; + for (BulkItemRequest item : items) { + item.incRef(); + } this.refCounted = LeakTracker.wrap(new BulkRequestRefCounted()); setRefreshPolicy(refreshPolicy); } @@ -175,7 +178,13 @@ public boolean tryIncRef() { @Override public boolean decRef() { - return refCounted.decRef(); + boolean success = refCounted.decRef(); + if (refCounted.hasReferences() == false) { + for (BulkItemRequest item : items) { + success = item.decRef() && success; + } + } + return success; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 8b5e077fd85b8..a8cff692bc40d 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -30,12 +30,16 @@ import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.RefCounted; +import org.elasticsearch.core.Releasable; import org.elasticsearch.index.Index; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.plugins.internal.DocumentParsingObserver; +import org.elasticsearch.transport.LeakTracker; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; @@ -68,7 +72,11 @@ * @see IndexResponse * @see org.elasticsearch.client.internal.Client#index(IndexRequest) */ -public class IndexRequest extends ReplicatedWriteRequest implements DocWriteRequest, CompositeIndicesRequest { +public class IndexRequest extends ReplicatedWriteRequest + implements + DocWriteRequest, + CompositeIndicesRequest, + Releasable { private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(IndexRequest.class); private static final TransportVersion PIPELINES_HAVE_RUN_FIELD_ADDED = TransportVersions.V_8_500_049; @@ -136,6 +144,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement */ private Object rawTimestamp; private boolean pipelinesHaveRun = false; + private final RefCounted refCounted; public IndexRequest(StreamInput in) throws IOException { this(null, in); @@ -143,6 +152,7 @@ public IndexRequest(StreamInput in) throws IOException { public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOException { super(shardId, in); + this.refCounted = LeakTracker.wrap(new IndexRequestRefCounted()); if (in.getTransportVersion().before(TransportVersions.V_8_0_0)) { String type = in.readOptionalString(); assert MapperService.SINGLE_MAPPING_NAME.equals(type) : "Expected [_doc] but received [" + type + "]"; @@ -193,6 +203,7 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio public IndexRequest() { super(NO_SHARD_ID); + this.refCounted = LeakTracker.wrap(new IndexRequestRefCounted()); } /** @@ -202,6 +213,7 @@ public IndexRequest() { public IndexRequest(String index) { super(NO_SHARD_ID); this.index = index; + this.refCounted = LeakTracker.wrap(new IndexRequestRefCounted()); } @Override @@ -875,4 +887,37 @@ public List getExecutedPipelines() { return Collections.unmodifiableList(executedPipelines); } } + + @Override + public void incRef() { + refCounted.incRef(); + } + + @Override + public boolean tryIncRef() { + return refCounted.tryIncRef(); + } + + @Override + public boolean decRef() { + return refCounted.decRef(); + } + + @Override + public boolean hasReferences() { + return refCounted.hasReferences(); + } + + @Override + public void close() { + decRef(); + } + + private static class IndexRequestRefCounted extends AbstractRefCounted { + + @Override + protected void closeInternal() { + // nothing to close + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContextTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContextTests.java index 0a41451cbc57b..c0de5031265e7 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContextTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContextTests.java @@ -69,11 +69,22 @@ private BulkShardRequest generateRandomRequest() { }; items[i] = new BulkItemRequest(i, request); } - return new BulkShardRequest(new ShardId("index", "_na_", 0), randomFrom(WriteRequest.RefreshPolicy.values()), items); + BulkShardRequest bulkShardRequest = new BulkShardRequest( + new ShardId("index", "_na_", 0), + randomFrom(WriteRequest.RefreshPolicy.values()), + items + ); + /* + * We're responsible for the lifecycle of the IndexRequests and the BulkItemRequests since they were created here, so we decref them + * here, so that when the returned BulkShardRequest is decRef'd, the reference counts correctly go to 0. + */ + for (BulkItemRequest bulkItemRequest : items) { + bulkItemRequest.decRef(); + } + return bulkShardRequest; } public void testTranslogLocation() { - try (BulkShardRequest shardRequest = generateRandomRequest()) { Translog.Location expectedLocation = null; @@ -85,6 +96,7 @@ public void testTranslogLocation() { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, primary); while (context.hasMoreOperationsToExecute()) { + IndexRequest nullableIndexRequest = null; final Engine.Result result; final DocWriteRequest current = context.getCurrent(); final boolean failure = rarely(); @@ -106,7 +118,9 @@ public void testTranslogLocation() { } } case UPDATE -> { - context.setRequestToExecute(new IndexRequest(current.index()).id(current.id())); + IndexRequest indexRequest = new IndexRequest(current.index()).id(current.id()); + context.setRequestToExecute(indexRequest); + nullableIndexRequest = indexRequest; if (failure) { result = new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1, 1, current.id()); } else { @@ -128,6 +142,9 @@ public void testTranslogLocation() { } context.markOperationAsExecuted(result); context.markAsCompleted(context.getExecutionResult()); + if (nullableIndexRequest != null) { + nullableIndexRequest.decRef(); + } } assertThat(context.getLocationToSync(), equalTo(expectedLocation)); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessor2Tests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessor2Tests.java index 30810e85382ca..fc9991bc7f463 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessor2Tests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessor2Tests.java @@ -103,7 +103,12 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure) .setFlushInterval(TimeValue.timeValueMillis(1)) .build(); try { - bulkProcessor.add(new IndexRequest()); + IndexRequest indexRequest = new IndexRequest(); + try { + bulkProcessor.add(indexRequest); + } finally { + indexRequest.decRef(); + } assertTrue(countDownLatch.await(5, TimeUnit.MINUTES)); assertThat(bulkProcessor.getTotalBytesInFlight(), equalTo(0L)); } finally { @@ -180,8 +185,12 @@ public void testConcurrentExecutions() throws Exception { ); try { IndexRequest indexRequest = new IndexRequest(); - for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments;) { - bulkProcessor.add(indexRequest); + try { + for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments;) { + bulkProcessor.add(indexRequest); + } + } finally { + indexRequest.decRef(); } assertBusy(() -> { @@ -289,6 +298,7 @@ public void testConcurrentExecutionsWithFlush() throws Exception { IndexRequest indexRequest = new IndexRequest(); List> futures = new ArrayList<>(); CountDownLatch startGate = new CountDownLatch(1 + concurrentClients); + CountDownLatch allIndexRequestsAdded = new CountDownLatch(maxDocuments); for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments;) { futures.add(executorService.submit(() -> { try { @@ -297,6 +307,7 @@ public void testConcurrentExecutionsWithFlush() throws Exception { startGate.await(); // alternate between ways to add to the bulk processor bulkProcessor.add(indexRequest); + allIndexRequestsAdded.countDown(); } catch (Exception e) { throw ExceptionsHelper.convertToRuntime(e); } @@ -304,6 +315,8 @@ public void testConcurrentExecutionsWithFlush() throws Exception { } startGate.countDown(); startGate.await(); + allIndexRequestsAdded.await(); + indexRequest.decRef(); for (Future f : futures) { try { @@ -399,6 +412,7 @@ public void testRejections() throws Exception { break; } } + indexRequest.decRef(); assertThat(rejectedRequests, equalTo(true)); } finally { bulkProcessor.awaitClose(1, TimeUnit.SECONDS); @@ -448,7 +462,12 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure) .build(); try { for (int i = 0; i < numberOfRequests; i++) { - bulkProcessor.add(new IndexRequest().source(Collections.singletonMap("this_is_a_key" + i, "this_is_a_value" + i))); + IndexRequest indexRequest = new IndexRequest().source(Collections.singletonMap("this_is_a_key" + i, "this_is_a_value" + i)); + try { + bulkProcessor.add(indexRequest); + } finally { + indexRequest.decRef(); + } } } catch (EsRejectedExecutionException e) { // We are throwing more data at the processor than the max bytes in flight setting can handle @@ -507,19 +526,20 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure) final AtomicBoolean haveSeenRejections = new AtomicBoolean(false); try { for (int i = 0; i < numberOfRequests; i++) { - bulkProcessor.addWithBackpressure( - new IndexRequest().source(Collections.singletonMap("this_is_a_key" + i, "this_is_a_value" + i)), - abort::get - ); + IndexRequest indexRequest = new IndexRequest().source(Collections.singletonMap("this_is_a_key" + i, "this_is_a_value" + i)); + bulkProcessor.addWithBackpressure(indexRequest, abort::get); + indexRequest.decRef(); } assertTrue(countDownLatch.await(5, TimeUnit.MINUTES)); assertThat(bulkProcessor.getTotalBytesInFlight(), equalTo(0L)); abort.set(true); try { - bulkProcessor.addWithBackpressure( - new IndexRequest().source(Collections.singletonMap("this_is_a_key", "this_is_a_value")), - abort::get - ); + IndexRequest indexRequest = new IndexRequest().source(Collections.singletonMap("this_is_a_key", "this_is_a_value")); + try { + bulkProcessor.addWithBackpressure(indexRequest, abort::get); + } finally { + indexRequest.decRef(); + } } catch (EsRejectedExecutionException e) { haveSeenRejections.set(true); } diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java index 107140163060d..4370e8dcc7064 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java @@ -1014,10 +1014,12 @@ private void bulkIndex(SourceSupplier sourceSupplier) throws IOException { } private void bulkIndex(final String indexName, final SourceSupplier sourceSupplier, int docCount) throws IOException { + List indexRequests = new ArrayList<>(); try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (int i = 0; i < docCount; i++) { IndexRequest indexRequest = new IndexRequest(indexName).opType(DocWriteRequest.OpType.CREATE); + indexRequests.add(indexRequest); XContentBuilder source = sourceSupplier.get(); indexRequest.source(source); bulkRequestBuilder.add(indexRequest); @@ -1040,6 +1042,9 @@ private void bulkIndex(final String indexName, final SourceSupplier sourceSuppli logger.info("Indexed [{}] documents. Dropped [{}] duplicates.", docsIndexed, duplicates); assertHitCount(client().prepareSearch(indexName).setSize(0), docsIndexed); } + for (IndexRequest indexRequest : indexRequests) { + indexRequest.decRef(); + } } private void prepareSourceIndex(final String sourceIndex, boolean blockWrite) { diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleDataStreamTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleDataStreamTests.java index 5892d35669e8f..e9e10ac220bc9 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleDataStreamTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleDataStreamTests.java @@ -53,6 +53,7 @@ import java.io.IOException; import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -216,25 +217,26 @@ private void putComposableIndexTemplate(final String id, final List patt } private void indexDocs(final String dataStream, int numDocs, long startTime) { + List indexRequests = new ArrayList<>(); try (BulkRequest bulkRequest = new BulkRequest()) { for (int i = 0; i < numDocs; i++) { final String timestamp = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(startTime + i); - bulkRequest.add( - new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE) - .source( - String.format( - Locale.ROOT, - "{\"%s\":\"%s\",\"%s\":\"%s\",\"%s\":\"%s\"}", - DEFAULT_TIMESTAMP_FIELD, - timestamp, - "routing_field", - 0, - "counter", - i + 1 - ), - XContentType.JSON - ) - ); + IndexRequest indexRequest = new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE) + .source( + String.format( + Locale.ROOT, + "{\"%s\":\"%s\",\"%s\":\"%s\",\"%s\":\"%s\"}", + DEFAULT_TIMESTAMP_FIELD, + timestamp, + "routing_field", + 0, + "counter", + i + 1 + ), + XContentType.JSON + ); + indexRequests.add(indexRequest); + bulkRequest.add(indexRequest); } final BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); final BulkItemResponse[] items = bulkResponse.getItems(); @@ -243,5 +245,8 @@ private void indexDocs(final String dataStream, int numDocs, long startTime) { final RefreshResponse refreshResponse = indicesAdmin().refresh(new RefreshRequest(dataStream)).actionGet(); assertThat(refreshResponse.getStatus().getStatus(), equalTo(RestStatus.OK.getStatus())); } + for (IndexRequest indexRequest : indexRequests) { + indexRequest.decRef(); + } } } diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java index 85d8826b98683..f559017db188d 100644 --- a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java @@ -77,14 +77,17 @@ public void setup() throws Exception { } private void createConnector(Connector connector) throws IOException, InterruptedException, ExecutionException, TimeoutException { - final IndexRequest indexRequest = new IndexRequest(ConnectorIndexService.CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) - .id(connector.getConnectorId()) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(connector.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)); - ActionFuture index = client().index(indexRequest); - - // wait 10 seconds for connector creation - index.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + try ( + IndexRequest indexRequest = new IndexRequest(ConnectorIndexService.CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) + .id(connector.getConnectorId()) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source(connector.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) + ) { + ActionFuture index = client().index(indexRequest); + + // wait 10 seconds for connector creation + index.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } } public void testCreateConnectorSyncJob() throws Exception { diff --git a/x-pack/plugin/mapper-unsigned-long/src/test/java/org/elasticsearch/xpack/unsignedlong/UnsignedLongTests.java b/x-pack/plugin/mapper-unsigned-long/src/test/java/org/elasticsearch/xpack/unsignedlong/UnsignedLongTests.java index 6fc609d122bc2..78cd15754e880 100644 --- a/x-pack/plugin/mapper-unsigned-long/src/test/java/org/elasticsearch/xpack/unsignedlong/UnsignedLongTests.java +++ b/x-pack/plugin/mapper-unsigned-long/src/test/java/org/elasticsearch/xpack/unsignedlong/UnsignedLongTests.java @@ -84,14 +84,19 @@ public void setupSuiteScopeCluster() throws Exception { indexRandom(true, builders); prepareCreate("idx2").setMapping("ul_field", "type=long").setSettings(settings).get(); + List indexRequests = new ArrayList<>(); try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (int i = 0; i < 4; i++) { IndexRequest indexRequest = new IndexRequest("idx2").source("ul_field", values[i]); + indexRequests.add(indexRequest); bulkRequestBuilder.add(indexRequest); } bulkRequestBuilder.get(); } + for (IndexRequest indexRequest : indexRequests) { + indexRequest.decRef(); + } ensureSearchable(); } diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java index d6141acfd5726..2e2febe867cc1 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java @@ -342,10 +342,13 @@ public void testExpandIds() throws Exception { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); - IndexRequest request = new IndexRequest(oldIndex).source(source) - .id(docId) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client().index(request).actionGet(); + try ( + IndexRequest request = new IndexRequest(oldIndex).source(source) + .id(docId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ) { + client().index(request).actionGet(); + } } // check that transformConfig2 gets returned, not the one from the old index or both @@ -407,10 +410,13 @@ public void testGetAllTransformIdsAndGetAllOutdatedTransformIds() throws Excepti try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); - IndexRequest request = new IndexRequest(oldIndex).source(source) - .id(docId) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client().index(request).actionGet(); + try ( + IndexRequest request = new IndexRequest(oldIndex).source(source) + .id(docId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ) { + client().index(request).actionGet(); + } } assertAsync(listener -> transformConfigManager.getAllTransformIds(null, listener), transformIds, null, null); @@ -428,10 +434,13 @@ public void testGetAllTransformIdsAndGetAllOutdatedTransformIds() throws Excepti try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); - IndexRequest request = new IndexRequest(oldIndex).source(source) - .id(docId) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client().index(request).actionGet(); + try ( + IndexRequest request = new IndexRequest(oldIndex).source(source) + .id(docId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ) { + client().index(request).actionGet(); + } } // add a new checkpoint doc for the old transform to check id expansion ignores other documents, see gh#80073 @@ -532,10 +541,13 @@ public void testDeleteOldTransformConfigurations() throws Exception { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); - IndexRequest request = new IndexRequest(oldIndex).source(source) - .id(docId) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client().index(request).actionGet(); + try ( + IndexRequest request = new IndexRequest(oldIndex).source(source) + .id(docId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ) { + client().index(request).actionGet(); + } } assertAsync(listener -> transformConfigManager.putTransformConfiguration(transformConfig, listener), true, null, null); @@ -565,8 +577,9 @@ public void testDeleteOldTransformStoredDocuments() throws Exception { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder source = transformStoredDoc.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); - IndexRequest request = new IndexRequest(oldIndex).source(source).id(docId); - client().index(request).actionGet(); + try (IndexRequest request = new IndexRequest(oldIndex).source(source).id(docId)) { + client().index(request).actionGet(); + } } // Put when referencing the old index should create the doc in the new index, even if we have seqNo|primaryTerm info @@ -702,10 +715,13 @@ public void testDeleteOldIndices() throws Exception { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder source = transformConfigOld.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); - IndexRequest request = new IndexRequest(oldIndex).source(source) - .id(docId) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client().index(request).actionGet(); + try ( + IndexRequest request = new IndexRequest(oldIndex).source(source) + .id(docId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ) { + client().index(request).actionGet(); + } } // create config in new index