Skip to content

Commit

Permalink
all server tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 12, 2023
1 parent d379d20 commit e64684a
Show file tree
Hide file tree
Showing 25 changed files with 348 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.reindex;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActiveShardCount;
Expand Down Expand Up @@ -65,7 +66,7 @@ protected RestChannelConsumer doPrepareRequest(RestRequest request, NodeClient c
throw validationException;
}
final var responseListener = new SubscribableListener<BulkByScrollResponse>();
final var task = client.executeLocally(action, internal, responseListener);
final var task = client.executeLocally(action, internal, ActionListener.runAfter(responseListener, internal::decRef));
responseListener.addListener(new LoggingTaskListener<>(task));
return sendTask(client.getLocalNodeId(), task);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ public BulkRequest add(IndexRequest request) {
BulkRequest internalAdd(IndexRequest request) {
Objects.requireNonNull(request, "'request' must not be null");
applyGlobalMandatoryParameters(request);

request.incRef();
requests.add(request);
// lack of source is validated in validate() method
Expand Down Expand Up @@ -210,7 +209,7 @@ public BulkRequest add(DeleteRequest request) {
* The list of requests in this bulk request.
*/
public List<DocWriteRequest<?>> requests() {
return this.requests;
return Collections.unmodifiableList(this.requests);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,19 +374,20 @@ public void parse(
}
}
} else if ("create".equals(action)) {
IndexRequest indexRequest = new IndexRequest(index).id(id)
.routing(routing)
.version(version)
.versionType(versionType)
.create(true)
.setPipeline(pipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setDynamicTemplates(dynamicTemplates)
.setRequireAlias(requireAlias)
.setListExecutedPipelines(listExecutedPipelines);
IndexRequest indexRequest = new IndexRequest(index);
try {
indexRequest.id(id)
.routing(routing)
.version(version)
.versionType(versionType)
.create(true)
.setPipeline(pipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setDynamicTemplates(dynamicTemplates)
.setRequireAlias(requireAlias)
.setListExecutedPipelines(listExecutedPipelines);
indexRequestConsumer.accept(indexRequest, type);
} finally {
indexRequest.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -415,6 +416,9 @@ public void onFailure(Exception e) {
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> request = bulkRequest.requests.get(i);
if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
if (bulkRequest.requests.get(i) instanceof RefCounted refCounted) {
refCounted.decRef();
}
bulkRequest.requests.set(i, null);
}
}
Expand Down Expand Up @@ -648,20 +652,16 @@ protected void doRun() {
new ShardId(concreteIndex, shardId),
shard -> new ArrayList<>()
);
System.out.println("**** creating");
try {
shardRequests.add(new BulkItemRequest(i, docWriteRequest));
} catch (Exception e) {
e.printStackTrace();
throw e;
}
System.out.println("**** created");
shardRequests.add(new BulkItemRequest(i, docWriteRequest));
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException | ResourceNotFoundException e) {
String name = ia != null ? ia.getName() : docWriteRequest.index();
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(name, docWriteRequest.id(), e);
BulkItemResponse bulkItemResponse = BulkItemResponse.failure(i, docWriteRequest.opType(), failure);
responses.set(i, bulkItemResponse);
// make sure the request gets never processed again
if (bulkRequest.requests.get(i) instanceof RefCounted refCounted) {
refCounted.decRef();
}
bulkRequest.requests.set(i, null);
}
}
Expand All @@ -685,7 +685,6 @@ protected void doRun() {
);
for (BulkItemRequest request : requests) {
// The BulkShardRequest constructor has incremented the ref, and we are no longer directly referencing this object
System.out.println("**** decreffing");
request.decRef();
}
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
Expand Down Expand Up @@ -828,6 +827,9 @@ private void addFailure(DocWriteRequest<?> request, int idx, Exception unavailab
BulkItemResponse bulkItemResponse = BulkItemResponse.failure(idx, request.opType(), failure);
responses.set(idx, bulkItemResponse);
// make sure the request gets never processed again
if (bulkRequest.requests.get(idx) instanceof RefCounted refCounted) {
refCounted.decRef();
}
bulkRequest.requests.set(idx, null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,40 +191,43 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
final BulkRequest bulkRequest = toSingleItemBulkRequest(upsertRequest);
client.bulk(
bulkRequest,
ActionListener.releaseAfter(unwrappingSingleItemBulkResponse(ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(
response.getShardInfo(),
response.getShardId(),
response.getId(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
response.getResult()
);
if (request.fetchSource() != null && request.fetchSource().fetchSource()) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(
upsertSourceBytes,
true,
upsertRequest.getContentType()
);
update.setGetResult(
UpdateHelper.extractGetResult(
request,
request.concreteIndex(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
sourceAndContent.v2(),
sourceAndContent.v1(),
upsertSourceBytes
)
ActionListener.runAfter(
ActionListener.releaseAfter(unwrappingSingleItemBulkResponse(ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(
response.getShardInfo(),
response.getShardId(),
response.getId(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
response.getResult()
);
} else {
update.setGetResult(null);
}
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))), bulkRequest)
if (request.fetchSource() != null && request.fetchSource().fetchSource()) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(
upsertSourceBytes,
true,
upsertRequest.getContentType()
);
update.setGetResult(
UpdateHelper.extractGetResult(
request,
request.concreteIndex(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
sourceAndContent.v2(),
sourceAndContent.v1(),
upsertSourceBytes
)
);
} else {
update.setGetResult(null);
}
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))), bulkRequest),
upsertRequest::decRef
)
);
}
case UPDATED -> {
Expand All @@ -234,31 +237,34 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
final BulkRequest bulkRequest = toSingleItemBulkRequest(indexRequest);
client.bulk(
bulkRequest,
ActionListener.releaseAfter(unwrappingSingleItemBulkResponse(ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(
response.getShardInfo(),
response.getShardId(),
response.getId(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
response.getResult()
);
update.setGetResult(
UpdateHelper.extractGetResult(
request,
request.concreteIndex(),
ActionListener.runAfter(
ActionListener.releaseAfter(unwrappingSingleItemBulkResponse(ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(
response.getShardInfo(),
response.getShardId(),
response.getId(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
result.updatedSourceAsMap(),
result.updateSourceContentType(),
indexSourceBytes
)
);
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))), bulkRequest)
response.getResult()
);
update.setGetResult(
UpdateHelper.extractGetResult(
request,
request.concreteIndex(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
result.updatedSourceAsMap(),
result.updateSourceContentType(),
indexSourceBytes
)
);
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))), bulkRequest),
indexRequest::decRef
)
);
}
case DELETED -> {
Expand Down Expand Up @@ -295,14 +301,18 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
}
case NOOP -> {
UpdateResponse update = result.action();
IndexService indexServiceOrNull = indicesService.indexService(shardId.getIndex());
if (indexServiceOrNull != null) {
IndexShard shard = indexService.getShardOrNull(shardId.getId());
if (shard != null) {
shard.noopUpdate();
try {
IndexService indexServiceOrNull = indicesService.indexService(shardId.getIndex());
if (indexServiceOrNull != null) {
IndexShard shard = indexService.getShardOrNull(shardId.getId());
if (shard != null) {
shard.noopUpdate();
}
}
listener.onResponse(update);
} finally {
update.decRef();
}
listener.onResponse(update);
}
default -> throw new IllegalStateException("Illegal result " + result.getResponseResult());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public UpdateRequest(@Nullable ShardId shardId, StreamInput in) throws IOExcepti
fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::readFrom);
if (in.readBoolean()) {
upsertRequest = new IndexRequest(shardId, in);
upsertRequest.incRef();
}
docAsUpsert = in.readBoolean();
ifSeqNo = in.readZLong();
Expand Down Expand Up @@ -717,6 +718,7 @@ private IndexRequest safeDoc() {
*/
public UpdateRequest upsert(IndexRequest upsertRequest) {
this.upsertRequest = upsertRequest;
this.upsertRequest.incRef();
return this;
}

Expand Down Expand Up @@ -1019,8 +1021,13 @@ public boolean tryIncRef() {
public boolean decRef() {
boolean success = refCounted.decRef();
// new RuntimeException("decRef").printStackTrace(System.out);
if (refCounted.hasReferences() == false && doc != null) {
success = doc.decRef() && success;
if (refCounted.hasReferences() == false) {
if (doc != null) {
success = doc.decRef() && success;
}
if (upsertRequest != null) {
success = upsertRequest.decRef() && success;
}
}
return success;
}
Expand Down
Loading

0 comments on commit e64684a

Please sign in to comment.