Skip to content

Commit

Permalink
Making IndexRequest RefCounted
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 8, 2023
1 parent 2e5c7c8 commit 0aa10e1
Show file tree
Hide file tree
Showing 12 changed files with 258 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,37 @@
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.LeakTracker;

import java.io.IOException;
import java.util.Objects;

public class BulkItemRequest implements Writeable, Accountable {
public class BulkItemRequest implements Writeable, Accountable, RefCounted, Releasable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkItemRequest.class);

private final int id;
private final DocWriteRequest<?> request;
private volatile BulkItemResponse primaryResponse;
private final RefCounted refCounted;

BulkItemRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
id = in.readVInt();
request = DocWriteRequest.readDocumentRequest(shardId, in);
if (this.request instanceof IndexRequest indexRequest) {
indexRequest.incRef();
}
this.refCounted = LeakTracker.wrap(new BulkItemRequestRefCounted());
if (in.readBoolean()) {
if (shardId == null) {
primaryResponse = new BulkItemResponse(in);
Expand All @@ -45,6 +55,10 @@ public class BulkItemRequest implements Writeable, Accountable {
public BulkItemRequest(int id, DocWriteRequest<?> request) {
this.id = id;
this.request = request;
this.refCounted = LeakTracker.wrap(new BulkItemRequestRefCounted());
if (this.request instanceof IndexRequest indexRequest) {
indexRequest.incRef();
}
}

public int id() {
Expand Down Expand Up @@ -111,4 +125,41 @@ public void writeThin(StreamOutput out) throws IOException {
public long ramBytesUsed() {
return SHALLOW_SIZE + request.ramBytesUsed();
}

@Override
public void incRef() {
refCounted.incRef();
}

@Override
public boolean tryIncRef() {
return refCounted.tryIncRef();
}

@Override
public boolean decRef() {
boolean success = refCounted.decRef();
if (this.request instanceof IndexRequest indexRequest) {
success = indexRequest.decRef() && success;
}
return success;
}

@Override
public boolean hasReferences() {
return refCounted.hasReferences();
}

@Override
public void close() {
decRef();
}

private static class BulkItemRequestRefCounted extends AbstractRefCounted {
@Override
protected void closeInternal() {
// nothing to close
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,14 @@ public void markAsCompleted(BulkItemResponse translatedResponse) {
assert translatedResponse.getItemId() == getCurrentItem().id();

if (translatedResponse.isFailed() == false && requestToExecute != null && requestToExecute != getCurrent()) {
request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute);
BulkItemRequest oldBulkItemRequest = request.items()[currentIndex];
try {
request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute);
} finally {
if (oldBulkItemRequest != null) {
oldBulkItemRequest.decRef();
}
}
}
getCurrentItem().setPrimaryResponse(translatedResponse);
currentItemState = ItemProcessingState.COMPLETED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ BulkRequest internalAdd(IndexRequest request) {
Objects.requireNonNull(request, "'request' must not be null");
applyGlobalMandatoryParameters(request);

request.incRef();
requests.add(request);
// lack of source is validated in validate() method
sizeInBytes += (request.source() != null ? request.source().length() : 0) + REQUEST_OVERHEAD;
Expand Down Expand Up @@ -481,7 +482,15 @@ public boolean tryIncRef() {

@Override
public boolean decRef() {
return refCounted.decRef();
boolean success = refCounted.decRef();
if (refCounted.hasReferences() == false) {
for (DocWriteRequest<?> request : requests) {
if (request instanceof IndexRequest indexRequest) {
success = indexRequest.decRef() && success;
}
}
}
return success;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public BulkShardRequest(StreamInput in) throws IOException {
public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
super(shardId);
this.items = items;
for (BulkItemRequest item : items) {
item.incRef();
}
this.refCounted = LeakTracker.wrap(new BulkRequestRefCounted());
setRefreshPolicy(refreshPolicy);
}
Expand Down Expand Up @@ -175,7 +178,13 @@ public boolean tryIncRef() {

@Override
public boolean decRef() {
return refCounted.decRef();
boolean success = refCounted.decRef();
if (refCounted.hasReferences() == false) {
for (BulkItemRequest item : items) {
success = item.decRef() && success;
}
}
return success;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.internal.DocumentParsingObserver;
import org.elasticsearch.transport.LeakTracker;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
Expand Down Expand Up @@ -68,7 +72,11 @@
* @see IndexResponse
* @see org.elasticsearch.client.internal.Client#index(IndexRequest)
*/
public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implements DocWriteRequest<IndexRequest>, CompositeIndicesRequest {
public class IndexRequest extends ReplicatedWriteRequest<IndexRequest>
implements
DocWriteRequest<IndexRequest>,
CompositeIndicesRequest,
Releasable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(IndexRequest.class);
private static final TransportVersion PIPELINES_HAVE_RUN_FIELD_ADDED = TransportVersions.V_8_500_049;
Expand Down Expand Up @@ -136,13 +144,15 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
*/
private Object rawTimestamp;
private boolean pipelinesHaveRun = false;
private final RefCounted refCounted;

public IndexRequest(StreamInput in) throws IOException {
this(null, in);
}

public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
super(shardId, in);
this.refCounted = LeakTracker.wrap(new IndexRequestRefCounted());
if (in.getTransportVersion().before(TransportVersions.V_8_0_0)) {
String type = in.readOptionalString();
assert MapperService.SINGLE_MAPPING_NAME.equals(type) : "Expected [_doc] but received [" + type + "]";
Expand Down Expand Up @@ -193,6 +203,7 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio

public IndexRequest() {
super(NO_SHARD_ID);
this.refCounted = LeakTracker.wrap(new IndexRequestRefCounted());
}

/**
Expand All @@ -202,6 +213,7 @@ public IndexRequest() {
public IndexRequest(String index) {
super(NO_SHARD_ID);
this.index = index;
this.refCounted = LeakTracker.wrap(new IndexRequestRefCounted());
}

@Override
Expand Down Expand Up @@ -875,4 +887,37 @@ public List<String> getExecutedPipelines() {
return Collections.unmodifiableList(executedPipelines);
}
}

@Override
public void incRef() {
refCounted.incRef();
}

@Override
public boolean tryIncRef() {
return refCounted.tryIncRef();
}

@Override
public boolean decRef() {
return refCounted.decRef();
}

@Override
public boolean hasReferences() {
return refCounted.hasReferences();
}

@Override
public void close() {
decRef();
}

private static class IndexRequestRefCounted extends AbstractRefCounted {

@Override
protected void closeInternal() {
// nothing to close
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,22 @@ private BulkShardRequest generateRandomRequest() {
};
items[i] = new BulkItemRequest(i, request);
}
return new BulkShardRequest(new ShardId("index", "_na_", 0), randomFrom(WriteRequest.RefreshPolicy.values()), items);
BulkShardRequest bulkShardRequest = new BulkShardRequest(
new ShardId("index", "_na_", 0),
randomFrom(WriteRequest.RefreshPolicy.values()),
items
);
/*
* We're responsible for the lifecycle of the IndexRequests and the BulkItemRequests since they were created here, so we decref them
* here, so that when the returned BulkShardRequest is decRef'd, the reference counts correctly go to 0.
*/
for (BulkItemRequest bulkItemRequest : items) {
bulkItemRequest.decRef();
}
return bulkShardRequest;
}

public void testTranslogLocation() {

try (BulkShardRequest shardRequest = generateRandomRequest()) {

Translog.Location expectedLocation = null;
Expand All @@ -85,6 +96,7 @@ public void testTranslogLocation() {

BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, primary);
while (context.hasMoreOperationsToExecute()) {
IndexRequest nullableIndexRequest = null;
final Engine.Result result;
final DocWriteRequest<?> current = context.getCurrent();
final boolean failure = rarely();
Expand All @@ -106,7 +118,9 @@ public void testTranslogLocation() {
}
}
case UPDATE -> {
context.setRequestToExecute(new IndexRequest(current.index()).id(current.id()));
IndexRequest indexRequest = new IndexRequest(current.index()).id(current.id());
context.setRequestToExecute(indexRequest);
nullableIndexRequest = indexRequest;
if (failure) {
result = new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1, 1, current.id());
} else {
Expand All @@ -128,6 +142,9 @@ public void testTranslogLocation() {
}
context.markOperationAsExecuted(result);
context.markAsCompleted(context.getExecutionResult());
if (nullableIndexRequest != null) {
nullableIndexRequest.decRef();
}
}

assertThat(context.getLocationToSync(), equalTo(expectedLocation));
Expand Down
Loading

0 comments on commit 0aa10e1

Please sign in to comment.