diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index dab46aed5b4bc..df6131d0f3779 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.RefCounted; import org.elasticsearch.index.Index; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.shard.ShardId; @@ -34,7 +35,7 @@ * Generic interface to group ActionRequest, which perform writes to a single document * Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest} */ -public interface DocWriteRequest extends IndicesRequest, Accountable { +public interface DocWriteRequest extends IndicesRequest, Accountable, RefCounted { // Flag set for disallowing index auto creation for an individual write request. String REQUIRE_ALIAS = "require_alias"; diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 425461d1f4ba1..58a71070816ed 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -16,12 +16,13 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.index.shard.ShardId; import java.io.IOException; import java.util.Objects; -public class BulkItemRequest implements Writeable, Accountable { +public class BulkItemRequest implements Writeable, Accountable, Releasable { private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkItemRequest.class); @@ -111,4 +112,9 @@ public void writeThin(StreamOutput out) throws IOException { public long ramBytesUsed() { return SHALLOW_SIZE + request.ramBytesUsed(); } + + @Override + public void close() { + request.decRef(); + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index bd929b9a2204e..73ffaa8b47849 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.Releasable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.transport.RawIndexingDataTransportRequest; @@ -27,11 +28,13 @@ public final class BulkShardRequest extends ReplicatedWriteRequest implements Accountable, - RawIndexingDataTransportRequest { + RawIndexingDataTransportRequest, + Releasable { private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkShardRequest.class); private final BulkItemRequest[] items; + private boolean closed = false; public BulkShardRequest(StreamInput in) throws IOException { super(in); @@ -154,4 +157,14 @@ public long ramBytesUsed() { } return sum; } + + @Override + public void close() { + if (closed == false) { + for (BulkItemRequest item : items) { + item.close(); + } + closed = true; + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 9266ee3ee0b68..9acdb80750b7c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -618,22 +618,25 @@ private static Engine.Result performOpOnReplica( switch (docWriteRequest.opType()) { case CREATE, INDEX -> { final IndexRequest indexRequest = (IndexRequest) docWriteRequest; - final SourceToParse sourceToParse = new SourceToParse( - indexRequest.id(), - indexRequest.source(), - indexRequest.getContentType(), - indexRequest.routing(), - Map.of(), - false - ); - result = replica.applyIndexOperationOnReplica( - primaryResponse.getSeqNo(), - primaryResponse.getPrimaryTerm(), - primaryResponse.getVersion(), - indexRequest.getAutoGeneratedTimestamp(), - indexRequest.isRetry(), - sourceToParse - ); + try ( + SourceToParse sourceToParse = new SourceToParse( + indexRequest.id(), + indexRequest.source(), + indexRequest.getContentType(), + indexRequest.routing(), + Map.of(), + false + ) + ) { + result = replica.applyIndexOperationOnReplica( + primaryResponse.getSeqNo(), + primaryResponse.getPrimaryTerm(), + primaryResponse.getVersion(), + indexRequest.getAutoGeneratedTimestamp(), + indexRequest.isRetry(), + sourceToParse + ); + } } case DELETE -> { DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest; diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 8b5e077fd85b8..e3c685cd401c5 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.uid.Versions; @@ -91,7 +92,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement @Nullable private String routing; - private BytesReference source; + private ReleasableBytesReference source; private OpType opType = OpType.INDEX; @@ -149,7 +150,7 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio } id = in.readOptionalString(); routing = in.readOptionalString(); - source = in.readBytesReference(); + source = in.readReleasableBytesReference(); opType = OpType.fromId(in.readByte()); version = in.readLong(); versionType = VersionType.fromValue(in.readByte()); @@ -378,7 +379,7 @@ public boolean isPipelineResolved() { /** * The source of the document to index, recopied to a new array if it is unsafe. */ - public BytesReference source() { + public ReleasableBytesReference source() { return source; } @@ -489,7 +490,7 @@ public IndexRequest source(XContentType xContentType, Object... source) { * Sets the document to index in bytes form. */ public IndexRequest source(BytesReference source, XContentType xContentType) { - this.source = Objects.requireNonNull(source); + this.source = ReleasableBytesReference.wrap(Objects.requireNonNull(source)); this.contentType = Objects.requireNonNull(xContentType); return this; } @@ -875,4 +876,25 @@ public List getExecutedPipelines() { return Collections.unmodifiableList(executedPipelines); } } + + @Override + public void incRef() { + source.tryIncRef(); + } + + @Override + public boolean tryIncRef() { + return source.tryIncRef(); + } + + @Override + public boolean decRef() { + return source.decRef(); + } + + @Override + public boolean hasReferences() { + return source.hasReferences(); + } + } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 0abe7ad678dc5..67d70cf4dea59 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -592,8 +592,10 @@ protected void handleReplicaRequest( final Task task ) { Releasable releasable = checkReplicaLimits(replicaRequest.getRequest()); - ActionListener listener = ActionListener.runBefore(new ChannelActionListener<>(channel), releasable::close); - + ActionListener listener = ActionListener.runBefore(new ChannelActionListener<>(channel), () -> { + releasable.close(); + replicaRequest.close(); + }); try { new AsyncReplicaAction(replicaRequest, listener, (ReplicationTask) task).run(); } catch (RuntimeException e) { @@ -1299,7 +1301,8 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, l /** a wrapper class to encapsulate a request when being sent to a specific allocation id **/ public static class ConcreteShardRequest extends TransportRequest implements - RawIndexingDataTransportRequest { + RawIndexingDataTransportRequest, + Releasable { /** {@link AllocationId#getId()} of the shard this request is sent to **/ private final String targetAllocationID; @@ -1418,9 +1421,16 @@ public boolean isRawIndexingData() { public String toString() { return "request: " + request + ", target allocation id: " + targetAllocationID + ", primary term: " + primaryTerm; } + + @Override + public void close() { + if (request instanceof Releasable releasable) { // TODO + releasable.close(); + } + } } - protected static final class ConcreteReplicaRequest extends ConcreteShardRequest { + protected static final class ConcreteReplicaRequest extends ConcreteShardRequest implements Releasable { private final long globalCheckpoint; private final long maxSeqNoOfUpdatesOrDeletes; diff --git a/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java b/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java index 3bfe5078a3487..630b4ca6b2a7a 100644 --- a/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java +++ b/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java @@ -112,7 +112,8 @@ public static XContentParser createParser(XContentParserConfiguration config, By } else { // TODO now that we have config we make a method on bytes to do this building wihout needing this check everywhere if (bytes.hasArray()) { - return xContentType.xContent().createParser(config, bytes.array(), bytes.arrayOffset(), bytes.length()); + return xContentType.xContent().createParser(config, bytes.array(), bytes.arrayOffset(), bytes.length()); // TODO + // incrementRef } return xContentType.xContent().createParser(config, bytes.streamInput()); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index ed7fab325408e..241fe2d1b600e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1497,7 +1497,7 @@ public long startTime() { public abstract TYPE operationType(); } - public static class Index extends Operation { + public static class Index extends Operation implements Releasable { private final ParsedDocument doc; private final long autoGeneratedIdTimestamp; @@ -1608,6 +1608,11 @@ public long getIfSeqNo() { public long getIfPrimaryTerm() { return ifPrimaryTerm; } + + @Override + public void close() { + doc.close(); + } } public static class Delete extends Operation { diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index 17af6259ca27c..c972c17f78a54 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -87,7 +87,7 @@ public ParsedDocument parseDocument(SourceToParse source, MappingLookup mappingL : DocumentParsingObserver.EMPTY_INSTANCE; try ( XContentParser parser = documentParsingObserver.wrapParser( - XContentHelper.createParser(parserConfiguration, source.source(), xContentType) + XContentHelper.createParser(parserConfiguration, source.source(), xContentType) // here we grab it ) ) { context = new RootDocumentParserContext(mappingLookup, mappingParserContext, source, parser); @@ -116,7 +116,7 @@ public ParsedDocument parseDocument(SourceToParse source, MappingLookup mappingL context.id(), source.routing(), context.reorderParentAndGetDocs(), - context.sourceToParse().source(), + context.sourceToParse().source(), // pass it along context.sourceToParse().getXContentType(), dynamicUpdate ) { diff --git a/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java b/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java index 05f05dd5be941..39d696baf3bd9 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java @@ -13,6 +13,8 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.core.Releasable; import org.elasticsearch.index.mapper.MapperService.MergeReason; import org.elasticsearch.xcontent.XContentType; @@ -22,7 +24,7 @@ /** * The result of parsing a document. */ -public class ParsedDocument { +public class ParsedDocument implements Releasable { private final Field version; @@ -33,7 +35,7 @@ public class ParsedDocument { private final List documents; - private BytesReference source; + private ReleasableBytesReference source; private XContentType xContentType; private Mapping dynamicMappingsUpdate; @@ -57,7 +59,7 @@ public static ParsedDocument noopTombstone(String reason) { "", null, Collections.singletonList(document), - new BytesArray("{}"), + ReleasableBytesReference.wrap(new BytesArray("{}")), XContentType.JSON, null ); @@ -81,7 +83,7 @@ public static ParsedDocument deleteTombstone(String id) { id, null, Collections.singletonList(document), - new BytesArray("{}"), + ReleasableBytesReference.wrap(new BytesArray("{}")), XContentType.JSON, null ); @@ -97,6 +99,20 @@ public ParsedDocument( XContentType xContentType, Mapping dynamicMappingsUpdate ) { + this(version, seqID, id, routing, documents, ReleasableBytesReference.wrap(source), xContentType, dynamicMappingsUpdate); + } + + public ParsedDocument( + Field version, + SeqNoFieldMapper.SequenceIDFields seqID, + String id, + String routing, + List documents, + ReleasableBytesReference source, + XContentType xContentType, + Mapping dynamicMappingsUpdate + ) { + source.incRef(); this.version = version; this.seqID = seqID; this.id = id; @@ -143,7 +159,7 @@ public XContentType getXContentType() { return this.xContentType; } - public void setSource(BytesReference source, XContentType xContentType) { + public void setSource(ReleasableBytesReference source, XContentType xContentType) { this.source = source; this.xContentType = xContentType; } @@ -172,4 +188,9 @@ public String toString() { public String documentDescription() { return "id"; } + + @Override + public void close() { + source.decRef(); + } } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java index 42121147d7f09..2fde4a8ddd68d 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java @@ -276,7 +276,7 @@ public boolean isComplete() { @Override public void preParse(DocumentParserContext context) throws IOException { - BytesReference originalSource = context.sourceToParse().source(); + BytesReference originalSource = context.sourceToParse().source(); // TODO ?? XContentType contentType = context.sourceToParse().getXContentType(); final BytesReference adaptedSource = applyFilters(originalSource, contentType); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java b/server/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java index d3d04b63b0a04..ffdadb7315513 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java @@ -8,17 +8,18 @@ package org.elasticsearch.index.mapper; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.xcontent.XContentType; import java.util.Map; import java.util.Objects; -public class SourceToParse { +public class SourceToParse implements Releasable { - private final BytesReference source; + private final ReleasableBytesReference source; private final String id; @@ -40,7 +41,12 @@ public SourceToParse( this.id = id; // we always convert back to byte array, since we store it and Field only supports bytes.. // so, we might as well do it here, and improve the performance of working with direct byte arrays - this.source = new BytesArray(Objects.requireNonNull(source).toBytesRef()); + Objects.requireNonNull(source); + if (source instanceof ReleasableBytesReference releasableSource) { + this.source = releasableSource.retain(); + } else { + this.source = ReleasableBytesReference.wrap(source); + } this.xContentType = Objects.requireNonNull(xContentType); this.routing = routing; this.dynamicTemplates = Objects.requireNonNull(dynamicTemplates); @@ -48,6 +54,10 @@ public SourceToParse( } public SourceToParse(String id, BytesReference source, XContentType xContentType) { + this(id, ReleasableBytesReference.wrap(source), xContentType); + } + + public SourceToParse(String id, ReleasableBytesReference source, XContentType xContentType) { this(id, source, xContentType, null, Map.of(), false); } @@ -55,7 +65,7 @@ public boolean toBeReported() { return toBeReported; } - public BytesReference source() { + public ReleasableBytesReference source() { return this.source; } @@ -88,4 +98,9 @@ public Map dynamicTemplates() { public XContentType getXContentType() { return this.xContentType; } + + @Override + public void close() { + this.source.decRef(); + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8b6f6afb72042..0b9a5f831975a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -973,7 +973,9 @@ private Engine.IndexResult applyIndexOperation( ); Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { - return new Engine.IndexResult(update, operation.parsedDoc().id()); + String id = operation.parsedDoc().id(); + operation.close(); + return new Engine.IndexResult(update, id); // TODO come back to this } } catch (Exception e) { // We treat any exception during parsing and or mapping update as a document level failure @@ -983,7 +985,7 @@ private Engine.IndexResult applyIndexOperation( verifyNotClosed(e); return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo, sourceToParse.id()); } - + operation.close(); return index(engine, operation); } @@ -1086,6 +1088,7 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc throw e; } indexingOperationListeners.postIndex(shardId, preIndex, result); + // preIndex.close(); return result; } finally { active.set(true); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java b/server/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java index 64c1201df60b5..5da7b0c071c32 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java @@ -83,6 +83,7 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { logger.warn(() -> "preIndex listener [" + listener + "] failed", e); } } + // operation.incRef return operation; } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 21001f0ac2fac..25b5aadefa6c2 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -59,6 +59,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; @@ -417,7 +418,31 @@ protected static ParsedDocument testParsedDocument( } else { document.add(new StoredField(SourceFieldMapper.NAME, ref.bytes, ref.offset, ref.length)); } - return new ParsedDocument(versionField, seqID, id, routing, Arrays.asList(document), source, XContentType.JSON, mappingUpdate); + final ParsedDocument parsedDocument; + if (source instanceof ReleasableBytesReference releasableSource) { + parsedDocument = new ParsedDocument( + versionField, + seqID, + id, + routing, + Arrays.asList(document), + releasableSource, + XContentType.JSON, + mappingUpdate + ); + } else { + parsedDocument = new ParsedDocument( + versionField, + seqID, + id, + routing, + Arrays.asList(document), + source, + XContentType.JSON, + mappingUpdate + ); + } + return parsedDocument; } public static CheckedBiFunction nestedParsedDocFactory() throws Exception { @@ -449,7 +474,9 @@ public static CheckedBiFunction ne source.endObject(); } source.endObject(); - return nestedMapper.parse(new SourceToParse(docId, BytesReference.bytes(source), XContentType.JSON)); + return nestedMapper.parse( + new SourceToParse(docId, ReleasableBytesReference.wrap(BytesReference.bytes(source)), XContentType.JSON) + ); }; } diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java index aecd81882c108..76d1be3a88f04 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -292,7 +293,7 @@ protected static SourceToParse source( * Build a {@link SourceToParse} with an id of {@code "1"}. */ protected static SourceToParse source(String source) { - return new SourceToParse("1", new BytesArray(source), XContentType.JSON); + return new SourceToParse("1", ReleasableBytesReference.wrap(new BytesArray(source)), XContentType.JSON); } /**