Skip to content

Commit

Permalink
Merge branch 'main' into esql_source_meta
Browse files Browse the repository at this point in the history
  • Loading branch information
nik9000 committed Nov 20, 2023
2 parents 9863411 + 36a2f9b commit aae7cee
Show file tree
Hide file tree
Showing 34 changed files with 1,001 additions and 200 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/102138.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102138
summary: Skip shards that don't match the source query during checkpointing
area: Transform
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ListenableActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
Expand Down Expand Up @@ -64,9 +64,9 @@ protected RestChannelConsumer doPrepareRequest(RestRequest request, NodeClient c
if (validationException != null) {
throw validationException;
}
final var responseFuture = new ListenableActionFuture<BulkByScrollResponse>();
final var task = client.executeLocally(action, internal, responseFuture);
responseFuture.addListener(new LoggingTaskListener<>(task));
final var responseListener = new SubscribableListener<BulkByScrollResponse>();
final var task = client.executeLocally(action, internal, responseListener);
responseListener.addListener(new LoggingTaskListener<>(task));
return sendTask(client.getLocalNodeId(), task);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentFactory;
Expand Down Expand Up @@ -179,7 +179,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
}

@Override
@TestLogging(reason = "Enable request logging to debug #88841", value = "com.amazonaws.request:DEBUG")
@TestIssueLogging(issueUrl = "https://github.com/elastic/elasticsearch/issues/88841", value = "com.amazonaws.request:DEBUG")
public void testRequestStats() throws Exception {
super.testRequestStats();
}
Expand Down Expand Up @@ -225,6 +225,7 @@ public void testAbortRequestStats() throws Exception {
assertEquals(assertionErrorMsg, mockCalls, sdkRequestCounts);
}

@TestIssueLogging(issueUrl = "https://github.com/elastic/elasticsearch/issues/101608", value = "com.amazonaws.request:DEBUG")
public void testMetrics() throws Exception {
// Create the repository and perform some activities
final String repository = createRepository(randomRepositoryName());
Expand Down Expand Up @@ -281,8 +282,12 @@ public void testMetrics() throws Exception {
operation,
OperationPurpose.parse((String) metric.attributes().get("purpose"))
);
assertThat(statsCollectors, hasKey(statsKey));
assertThat(metric.getLong(), equalTo(statsCollectors.get(statsKey).counter.sum()));
assertThat(nodeName + "/" + statsKey + " exists", statsCollectors, hasKey(statsKey));
assertThat(
nodeName + "/" + statsKey + " has correct sum",
metric.getLong(),
equalTo(statsCollectors.get(statsKey).counter.sum())
);

aggregatedMetrics.compute(operation.getKey(), (k, v) -> v == null ? metric.getLong() : v + metric.getLong());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ static TransportVersion def(int id) {
public static final TransportVersion DATA_STREAM_FAILURE_STORE_ADDED = def(8_541_00_0);
public static final TransportVersion ML_INFERENCE_OPENAI_ADDED = def(8_542_00_0);
public static final TransportVersion SHUTDOWN_MIGRATION_STATUS_INCLUDE_COUNTS = def(8_543_00_0);
public static final TransportVersion TRANSFORM_GET_CHECKPOINT_QUERY_AND_CLUSTER_ADDED = def(8_544_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -30,6 +30,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -148,50 +149,68 @@ public JoinValidationService(
}

public void validateJoin(DiscoveryNode discoveryNode, ActionListener<Void> listener) {
if (discoveryNode.getVersion().onOrAfter(Version.V_8_3_0)) {
// This node isn't in the cluster yet so ClusterState#getMinTransportVersion() doesn't apply, we must obtain a specific connection
// so we can check its transport version to decide how to proceed.

final Transport.Connection connection;
try {
connection = transportService.getConnection(discoveryNode);
assert connection != null;
} catch (Exception e) {
assert e instanceof NodeNotConnectedException : e;
listener.onFailure(e);
return;
}

if (connection.getTransportVersion().onOrAfter(TransportVersions.V_8_3_0)) {
if (executeRefs.tryIncRef()) {
try {
execute(new JoinValidation(discoveryNode, listener));
execute(new JoinValidation(discoveryNode, connection, listener));
} finally {
executeRefs.decRef();
}
} else {
listener.onFailure(new NodeClosedException(transportService.getLocalNode()));
}
} else {
final var responseHandler = TransportResponseHandler.empty(responseExecutor, listener.delegateResponse((l, e) -> {
logger.warn(() -> "failed to validate incoming join request from node [" + discoveryNode + "]", e);
listener.onFailure(
new IllegalStateException(
String.format(
Locale.ROOT,
"failure when sending a join validation request from [%s] to [%s]",
transportService.getLocalNode().descriptionWithoutAttributes(),
discoveryNode.descriptionWithoutAttributes()
),
e
)
);
}));
final var clusterState = clusterStateSupplier.get();
if (clusterState != null) {
assert clusterState.nodes().isLocalNodeElectedMaster();
transportService.sendRequest(
discoveryNode,
JOIN_VALIDATE_ACTION_NAME,
new ValidateJoinRequest(clusterState),
REQUEST_OPTIONS,
responseHandler
);
} else {
transportService.sendRequest(
discoveryNode,
JoinHelper.JOIN_PING_ACTION_NAME,
TransportRequest.Empty.INSTANCE,
REQUEST_OPTIONS,
responseHandler
);
}
legacyValidateJoin(discoveryNode, listener, connection);
}
}

@UpdateForV9
private void legacyValidateJoin(DiscoveryNode discoveryNode, ActionListener<Void> listener, Transport.Connection connection) {
final var responseHandler = TransportResponseHandler.empty(responseExecutor, listener.delegateResponse((l, e) -> {
logger.warn(() -> "failed to validate incoming join request from node [" + discoveryNode + "]", e);
listener.onFailure(
new IllegalStateException(
String.format(
Locale.ROOT,
"failure when sending a join validation request from [%s] to [%s]",
transportService.getLocalNode().descriptionWithoutAttributes(),
discoveryNode.descriptionWithoutAttributes()
),
e
)
);
}));
final var clusterState = clusterStateSupplier.get();
if (clusterState != null) {
assert clusterState.nodes().isLocalNodeElectedMaster();
transportService.sendRequest(
connection,
JOIN_VALIDATE_ACTION_NAME,
new ValidateJoinRequest(clusterState),
REQUEST_OPTIONS,
responseHandler
);
} else {
transportService.sendRequest(
connection,
JoinHelper.JOIN_PING_ACTION_NAME,
TransportRequest.Empty.INSTANCE,
REQUEST_OPTIONS,
responseHandler
);
}
}

Expand Down Expand Up @@ -312,27 +331,22 @@ public String toString() {

private class JoinValidation extends ActionRunnable<Void> {
private final DiscoveryNode discoveryNode;
private final Transport.Connection connection;

JoinValidation(DiscoveryNode discoveryNode, ActionListener<Void> listener) {
JoinValidation(DiscoveryNode discoveryNode, Transport.Connection connection, ActionListener<Void> listener) {
super(listener);
this.discoveryNode = discoveryNode;
this.connection = connection;
}

@Override
protected void doRun() throws Exception {
assert discoveryNode.getVersion().onOrAfter(Version.V_8_3_0) : discoveryNode.getVersion();
protected void doRun() {
assert connection.getTransportVersion().onOrAfter(TransportVersions.V_8_3_0) : discoveryNode.getVersion();
// NB these things never run concurrently to each other, or to the cache cleaner (see IMPLEMENTATION NOTES above) so it is safe
// to do these (non-atomic) things to the (unsynchronized) statesByVersion map.
Transport.Connection connection;
try {
connection = transportService.getConnection(discoveryNode);
} catch (NodeNotConnectedException e) {
listener.onFailure(e);
return;
}
var version = connection.getTransportVersion();
var cachedBytes = statesByVersion.get(version);
var bytes = maybeSerializeClusterState(cachedBytes, discoveryNode, version);
var transportVersion = connection.getTransportVersion();
var cachedBytes = statesByVersion.get(transportVersion);
var bytes = maybeSerializeClusterState(cachedBytes, discoveryNode, transportVersion);
if (bytes == null) {
// Normally if we're not the master then the Coordinator sends a ping message just to validate connectivity instead of
// getting here. But if we were the master when the Coordinator checked then we might not be the master any more, so we
Expand All @@ -354,7 +368,7 @@ protected void doRun() throws Exception {
transportService.sendRequest(
connection,
JOIN_VALIDATE_ACTION_NAME,
new BytesTransportRequest(bytes, version),
new BytesTransportRequest(bytes, transportVersion),
REQUEST_OPTIONS,
new CleanableResponseHandler<>(
listener.map(ignored -> null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.ListenableActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -65,9 +65,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
if (validationException != null) {
throw validationException;
}
final var responseFuture = new ListenableActionFuture<ForceMergeResponse>();
final var task = client.executeLocally(ForceMergeAction.INSTANCE, mergeRequest, responseFuture);
responseFuture.addListener(new LoggingTaskListener<>(task));
final var responseListener = new SubscribableListener<ForceMergeResponse>();
final var task = client.executeLocally(ForceMergeAction.INSTANCE, mergeRequest, responseListener);
responseListener.addListener(new LoggingTaskListener<>(task));
return sendTask(client.getLocalNodeId(), task);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public LoggingTaskListener(Task task) {
@Override
public void onResponse(Response response) {
logger.info("{} finished with response {}", task.getId(), response);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;

Expand Down Expand Up @@ -48,22 +49,33 @@ public static class Request extends ActionRequest implements IndicesRequest.Repl

private String[] indices;
private final IndicesOptions indicesOptions;
private final QueryBuilder query;
private final String cluster;
private final TimeValue timeout;

public Request(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_QUERY_AND_CLUSTER_ADDED)) {
query = in.readOptionalNamedWriteable(QueryBuilder.class);
cluster = in.readOptionalString();
} else {
query = null;
cluster = null;
}
if (in.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_TIMEOUT_ADDED)) {
timeout = in.readOptionalTimeValue();
} else {
timeout = null;
}
}

public Request(String[] indices, IndicesOptions indicesOptions, TimeValue timeout) {
public Request(String[] indices, IndicesOptions indicesOptions, QueryBuilder query, String cluster, TimeValue timeout) {
this.indices = indices != null ? indices : Strings.EMPTY_ARRAY;
this.indicesOptions = indicesOptions;
this.query = query;
this.cluster = cluster;
this.timeout = timeout;
}

Expand All @@ -82,6 +94,14 @@ public IndicesOptions indicesOptions() {
return indicesOptions;
}

public QueryBuilder getQuery() {
return query;
}

public String getCluster() {
return cluster;
}

public TimeValue getTimeout() {
return timeout;
}
Expand All @@ -98,19 +118,25 @@ public boolean equals(Object obj) {

return Arrays.equals(indices, that.indices)
&& Objects.equals(indicesOptions, that.indicesOptions)
&& Objects.equals(query, that.query)
&& Objects.equals(cluster, that.cluster)
&& Objects.equals(timeout, that.timeout);
}

@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(indices), indicesOptions, timeout);
return Objects.hash(Arrays.hashCode(indices), indicesOptions, query, cluster, timeout);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_QUERY_AND_CLUSTER_ADDED)) {
out.writeOptionalNamedWriteable(query);
out.writeOptionalString(cluster);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_TIMEOUT_ADDED)) {
out.writeOptionalTimeValue(timeout);
}
Expand Down
Loading

0 comments on commit aae7cee

Please sign in to comment.