Skip to content

Commit

Permalink
Avoiding allocating duplicate byte arrays in bulk request
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Nov 21, 2023
1 parent 94e510e commit e49fca0
Show file tree
Hide file tree
Showing 16 changed files with 176 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -34,7 +35,7 @@
* Generic interface to group ActionRequest, which perform writes to a single document
* Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest}
*/
public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
public interface DocWriteRequest<T> extends IndicesRequest, Accountable, RefCounted {

// Flag set for disallowing index auto creation for an individual write request.
String REQUIRE_ALIAS = "require_alias";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.Objects;

public class BulkItemRequest implements Writeable, Accountable {
public class BulkItemRequest implements Writeable, Accountable, Releasable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkItemRequest.class);

Expand Down Expand Up @@ -111,4 +112,9 @@ public void writeThin(StreamOutput out) throws IOException {
public long ramBytesUsed() {
return SHALLOW_SIZE + request.ramBytesUsed();
}

@Override
public void close() {
request.decRef();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.RawIndexingDataTransportRequest;

Expand All @@ -27,11 +28,13 @@
public final class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest>
implements
Accountable,
RawIndexingDataTransportRequest {
RawIndexingDataTransportRequest,
Releasable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkShardRequest.class);

private final BulkItemRequest[] items;
private boolean closed = false;

public BulkShardRequest(StreamInput in) throws IOException {
super(in);
Expand Down Expand Up @@ -154,4 +157,14 @@ public long ramBytesUsed() {
}
return sum;
}

@Override
public void close() {
if (closed == false) {
for (BulkItemRequest item : items) {
item.close();
}
closed = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -618,22 +618,25 @@ private static Engine.Result performOpOnReplica(
switch (docWriteRequest.opType()) {
case CREATE, INDEX -> {
final IndexRequest indexRequest = (IndexRequest) docWriteRequest;
final SourceToParse sourceToParse = new SourceToParse(
indexRequest.id(),
indexRequest.source(),
indexRequest.getContentType(),
indexRequest.routing(),
Map.of(),
false
);
result = replica.applyIndexOperationOnReplica(
primaryResponse.getSeqNo(),
primaryResponse.getPrimaryTerm(),
primaryResponse.getVersion(),
indexRequest.getAutoGeneratedTimestamp(),
indexRequest.isRetry(),
sourceToParse
);
try (
SourceToParse sourceToParse = new SourceToParse(
indexRequest.id(),
indexRequest.source(),
indexRequest.getContentType(),
indexRequest.routing(),
Map.of(),
false
)
) {
result = replica.applyIndexOperationOnReplica(
primaryResponse.getSeqNo(),
primaryResponse.getPrimaryTerm(),
primaryResponse.getVersion(),
indexRequest.getAutoGeneratedTimestamp(),
indexRequest.isRetry(),
sourceToParse
);
}
}
case DELETE -> {
DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
Expand Down Expand Up @@ -91,7 +92,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
@Nullable
private String routing;

private BytesReference source;
private ReleasableBytesReference source;

private OpType opType = OpType.INDEX;

Expand Down Expand Up @@ -149,7 +150,7 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
}
id = in.readOptionalString();
routing = in.readOptionalString();
source = in.readBytesReference();
source = in.readReleasableBytesReference();
opType = OpType.fromId(in.readByte());
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
Expand Down Expand Up @@ -378,7 +379,7 @@ public boolean isPipelineResolved() {
/**
* The source of the document to index, recopied to a new array if it is unsafe.
*/
public BytesReference source() {
public ReleasableBytesReference source() {
return source;
}

Expand Down Expand Up @@ -489,7 +490,7 @@ public IndexRequest source(XContentType xContentType, Object... source) {
* Sets the document to index in bytes form.
*/
public IndexRequest source(BytesReference source, XContentType xContentType) {
this.source = Objects.requireNonNull(source);
this.source = ReleasableBytesReference.wrap(Objects.requireNonNull(source));
this.contentType = Objects.requireNonNull(xContentType);
return this;
}
Expand Down Expand Up @@ -875,4 +876,25 @@ public List<String> getExecutedPipelines() {
return Collections.unmodifiableList(executedPipelines);
}
}

@Override
public void incRef() {
source.tryIncRef();
}

@Override
public boolean tryIncRef() {
return source.tryIncRef();
}

@Override
public boolean decRef() {
return source.decRef();
}

@Override
public boolean hasReferences() {
return source.hasReferences();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -592,8 +592,10 @@ protected void handleReplicaRequest(
final Task task
) {
Releasable releasable = checkReplicaLimits(replicaRequest.getRequest());
ActionListener<ReplicaResponse> listener = ActionListener.runBefore(new ChannelActionListener<>(channel), releasable::close);

ActionListener<ReplicaResponse> listener = ActionListener.runBefore(new ChannelActionListener<>(channel), () -> {
releasable.close();
replicaRequest.close();
});
try {
new AsyncReplicaAction(replicaRequest, listener, (ReplicationTask) task).run();
} catch (RuntimeException e) {
Expand Down Expand Up @@ -1299,7 +1301,8 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, l
/** a wrapper class to encapsulate a request when being sent to a specific allocation id **/
public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest
implements
RawIndexingDataTransportRequest {
RawIndexingDataTransportRequest,
Releasable {

/** {@link AllocationId#getId()} of the shard this request is sent to **/
private final String targetAllocationID;
Expand Down Expand Up @@ -1418,9 +1421,16 @@ public boolean isRawIndexingData() {
public String toString() {
return "request: " + request + ", target allocation id: " + targetAllocationID + ", primary term: " + primaryTerm;
}

@Override
public void close() {
if (request instanceof Releasable releasable) { // TODO
releasable.close();
}
}
}

protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {
protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> implements Releasable {

private final long globalCheckpoint;
private final long maxSeqNoOfUpdatesOrDeletes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public static XContentParser createParser(XContentParserConfiguration config, By
} else {
// TODO now that we have config we make a method on bytes to do this building wihout needing this check everywhere
if (bytes.hasArray()) {
return xContentType.xContent().createParser(config, bytes.array(), bytes.arrayOffset(), bytes.length());
return xContentType.xContent().createParser(config, bytes.array(), bytes.arrayOffset(), bytes.length()); // TODO
// incrementRef
}
return xContentType.xContent().createParser(config, bytes.streamInput());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1497,7 +1497,7 @@ public long startTime() {
public abstract TYPE operationType();
}

public static class Index extends Operation {
public static class Index extends Operation implements Releasable {

private final ParsedDocument doc;
private final long autoGeneratedIdTimestamp;
Expand Down Expand Up @@ -1608,6 +1608,11 @@ public long getIfSeqNo() {
public long getIfPrimaryTerm() {
return ifPrimaryTerm;
}

@Override
public void close() {
doc.close();
}
}

public static class Delete extends Operation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public ParsedDocument parseDocument(SourceToParse source, MappingLookup mappingL
: DocumentParsingObserver.EMPTY_INSTANCE;
try (
XContentParser parser = documentParsingObserver.wrapParser(
XContentHelper.createParser(parserConfiguration, source.source(), xContentType)
XContentHelper.createParser(parserConfiguration, source.source(), xContentType) // here we grab it
)
) {
context = new RootDocumentParserContext(mappingLookup, mappingParserContext, source, parser);
Expand Down Expand Up @@ -116,7 +116,7 @@ public ParsedDocument parseDocument(SourceToParse source, MappingLookup mappingL
context.id(),
source.routing(),
context.reorderParentAndGetDocs(),
context.sourceToParse().source(),
context.sourceToParse().source(), // pass it along
context.sourceToParse().getXContentType(),
dynamicUpdate
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.mapper.MapperService.MergeReason;
import org.elasticsearch.xcontent.XContentType;

Expand All @@ -22,7 +24,7 @@
/**
* The result of parsing a document.
*/
public class ParsedDocument {
public class ParsedDocument implements Releasable {

private final Field version;

Expand All @@ -33,7 +35,7 @@ public class ParsedDocument {

private final List<LuceneDocument> documents;

private BytesReference source;
private ReleasableBytesReference source;
private XContentType xContentType;

private Mapping dynamicMappingsUpdate;
Expand All @@ -57,7 +59,7 @@ public static ParsedDocument noopTombstone(String reason) {
"",
null,
Collections.singletonList(document),
new BytesArray("{}"),
ReleasableBytesReference.wrap(new BytesArray("{}")),
XContentType.JSON,
null
);
Expand All @@ -81,7 +83,7 @@ public static ParsedDocument deleteTombstone(String id) {
id,
null,
Collections.singletonList(document),
new BytesArray("{}"),
ReleasableBytesReference.wrap(new BytesArray("{}")),
XContentType.JSON,
null
);
Expand All @@ -97,6 +99,20 @@ public ParsedDocument(
XContentType xContentType,
Mapping dynamicMappingsUpdate
) {
this(version, seqID, id, routing, documents, ReleasableBytesReference.wrap(source), xContentType, dynamicMappingsUpdate);
}

public ParsedDocument(
Field version,
SeqNoFieldMapper.SequenceIDFields seqID,
String id,
String routing,
List<LuceneDocument> documents,
ReleasableBytesReference source,
XContentType xContentType,
Mapping dynamicMappingsUpdate
) {
source.incRef();
this.version = version;
this.seqID = seqID;
this.id = id;
Expand Down Expand Up @@ -143,7 +159,7 @@ public XContentType getXContentType() {
return this.xContentType;
}

public void setSource(BytesReference source, XContentType xContentType) {
public void setSource(ReleasableBytesReference source, XContentType xContentType) {
this.source = source;
this.xContentType = xContentType;
}
Expand Down Expand Up @@ -172,4 +188,9 @@ public String toString() {
public String documentDescription() {
return "id";
}

@Override
public void close() {
source.decRef();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public boolean isComplete() {

@Override
public void preParse(DocumentParserContext context) throws IOException {
BytesReference originalSource = context.sourceToParse().source();
BytesReference originalSource = context.sourceToParse().source(); // TODO ??
XContentType contentType = context.sourceToParse().getXContentType();
final BytesReference adaptedSource = applyFilters(originalSource, contentType);

Expand Down
Loading

0 comments on commit e49fca0

Please sign in to comment.