diff --git a/docs/changelog/102138.yaml b/docs/changelog/102138.yaml new file mode 100644 index 0000000000000..3819e3201150e --- /dev/null +++ b/docs/changelog/102138.yaml @@ -0,0 +1,5 @@ +pr: 102138 +summary: Skip shards that don't match the source query during checkpointing +area: Transform +type: enhancement +issues: [] diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractBaseReindexRestHandler.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractBaseReindexRestHandler.java index 952dd0585e7ba..8e7fab68ac697 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractBaseReindexRestHandler.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractBaseReindexRestHandler.java @@ -11,7 +11,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActiveShardCount; -import org.elasticsearch.action.support.ListenableActionFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; @@ -64,9 +64,9 @@ protected RestChannelConsumer doPrepareRequest(RestRequest request, NodeClient c if (validationException != null) { throw validationException; } - final var responseFuture = new ListenableActionFuture(); - final var task = client.executeLocally(action, internal, responseFuture); - responseFuture.addListener(new LoggingTaskListener<>(task)); + final var responseListener = new SubscribableListener(); + final var task = client.executeLocally(action, internal, responseListener); + responseListener.addListener(new LoggingTaskListener<>(task)); return sendTask(client.getLocalNodeId(), task); } diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index aee61361ebd10..7f46440647a54 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -57,7 +57,7 @@ import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.junit.annotations.TestIssueLogging; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.XContentFactory; @@ -179,7 +179,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { } @Override - @TestLogging(reason = "Enable request logging to debug #88841", value = "com.amazonaws.request:DEBUG") + @TestIssueLogging(issueUrl = "https://github.com/elastic/elasticsearch/issues/88841", value = "com.amazonaws.request:DEBUG") public void testRequestStats() throws Exception { super.testRequestStats(); } @@ -225,6 +225,7 @@ public void testAbortRequestStats() throws Exception { assertEquals(assertionErrorMsg, mockCalls, sdkRequestCounts); } + @TestIssueLogging(issueUrl = "https://github.com/elastic/elasticsearch/issues/101608", value = "com.amazonaws.request:DEBUG") public void testMetrics() throws Exception { // Create the repository and perform some activities final String repository = createRepository(randomRepositoryName()); @@ -281,8 +282,12 @@ public void testMetrics() throws Exception { operation, OperationPurpose.parse((String) metric.attributes().get("purpose")) ); - assertThat(statsCollectors, hasKey(statsKey)); - assertThat(metric.getLong(), equalTo(statsCollectors.get(statsKey).counter.sum())); + assertThat(nodeName + "/" + statsKey + " exists", statsCollectors, hasKey(statsKey)); + assertThat( + nodeName + "/" + statsKey + " has correct sum", + metric.getLong(), + equalTo(statsCollectors.get(statsKey).counter.sum()) + ); aggregatedMetrics.compute(operation.getKey(), (k, v) -> v == null ? metric.getLong() : v + metric.getLong()); }); diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index baae500b70d55..5ad1d43c0d4f8 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -172,6 +172,7 @@ static TransportVersion def(int id) { public static final TransportVersion DATA_STREAM_FAILURE_STORE_ADDED = def(8_541_00_0); public static final TransportVersion ML_INFERENCE_OPENAI_ADDED = def(8_542_00_0); public static final TransportVersion SHUTDOWN_MIGRATION_STATUS_INCLUDE_COUNTS = def(8_543_00_0); + public static final TransportVersion TRANSFORM_GET_CHECKPOINT_QUERY_AND_CLUSTER_ADDED = def(8_544_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java index d9911ad12df84..dc18a7950394a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java @@ -12,7 +12,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.TransportVersion; -import org.elasticsearch.Version; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.cluster.ClusterState; @@ -30,6 +30,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.UpdateForV9; import org.elasticsearch.env.Environment; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; @@ -148,10 +149,23 @@ public JoinValidationService( } public void validateJoin(DiscoveryNode discoveryNode, ActionListener listener) { - if (discoveryNode.getVersion().onOrAfter(Version.V_8_3_0)) { + // This node isn't in the cluster yet so ClusterState#getMinTransportVersion() doesn't apply, we must obtain a specific connection + // so we can check its transport version to decide how to proceed. + + final Transport.Connection connection; + try { + connection = transportService.getConnection(discoveryNode); + assert connection != null; + } catch (Exception e) { + assert e instanceof NodeNotConnectedException : e; + listener.onFailure(e); + return; + } + + if (connection.getTransportVersion().onOrAfter(TransportVersions.V_8_3_0)) { if (executeRefs.tryIncRef()) { try { - execute(new JoinValidation(discoveryNode, listener)); + execute(new JoinValidation(discoveryNode, connection, listener)); } finally { executeRefs.decRef(); } @@ -159,39 +173,44 @@ public void validateJoin(DiscoveryNode discoveryNode, ActionListener liste listener.onFailure(new NodeClosedException(transportService.getLocalNode())); } } else { - final var responseHandler = TransportResponseHandler.empty(responseExecutor, listener.delegateResponse((l, e) -> { - logger.warn(() -> "failed to validate incoming join request from node [" + discoveryNode + "]", e); - listener.onFailure( - new IllegalStateException( - String.format( - Locale.ROOT, - "failure when sending a join validation request from [%s] to [%s]", - transportService.getLocalNode().descriptionWithoutAttributes(), - discoveryNode.descriptionWithoutAttributes() - ), - e - ) - ); - })); - final var clusterState = clusterStateSupplier.get(); - if (clusterState != null) { - assert clusterState.nodes().isLocalNodeElectedMaster(); - transportService.sendRequest( - discoveryNode, - JOIN_VALIDATE_ACTION_NAME, - new ValidateJoinRequest(clusterState), - REQUEST_OPTIONS, - responseHandler - ); - } else { - transportService.sendRequest( - discoveryNode, - JoinHelper.JOIN_PING_ACTION_NAME, - TransportRequest.Empty.INSTANCE, - REQUEST_OPTIONS, - responseHandler - ); - } + legacyValidateJoin(discoveryNode, listener, connection); + } + } + + @UpdateForV9 + private void legacyValidateJoin(DiscoveryNode discoveryNode, ActionListener listener, Transport.Connection connection) { + final var responseHandler = TransportResponseHandler.empty(responseExecutor, listener.delegateResponse((l, e) -> { + logger.warn(() -> "failed to validate incoming join request from node [" + discoveryNode + "]", e); + listener.onFailure( + new IllegalStateException( + String.format( + Locale.ROOT, + "failure when sending a join validation request from [%s] to [%s]", + transportService.getLocalNode().descriptionWithoutAttributes(), + discoveryNode.descriptionWithoutAttributes() + ), + e + ) + ); + })); + final var clusterState = clusterStateSupplier.get(); + if (clusterState != null) { + assert clusterState.nodes().isLocalNodeElectedMaster(); + transportService.sendRequest( + connection, + JOIN_VALIDATE_ACTION_NAME, + new ValidateJoinRequest(clusterState), + REQUEST_OPTIONS, + responseHandler + ); + } else { + transportService.sendRequest( + connection, + JoinHelper.JOIN_PING_ACTION_NAME, + TransportRequest.Empty.INSTANCE, + REQUEST_OPTIONS, + responseHandler + ); } } @@ -312,27 +331,22 @@ public String toString() { private class JoinValidation extends ActionRunnable { private final DiscoveryNode discoveryNode; + private final Transport.Connection connection; - JoinValidation(DiscoveryNode discoveryNode, ActionListener listener) { + JoinValidation(DiscoveryNode discoveryNode, Transport.Connection connection, ActionListener listener) { super(listener); this.discoveryNode = discoveryNode; + this.connection = connection; } @Override - protected void doRun() throws Exception { - assert discoveryNode.getVersion().onOrAfter(Version.V_8_3_0) : discoveryNode.getVersion(); + protected void doRun() { + assert connection.getTransportVersion().onOrAfter(TransportVersions.V_8_3_0) : discoveryNode.getVersion(); // NB these things never run concurrently to each other, or to the cache cleaner (see IMPLEMENTATION NOTES above) so it is safe // to do these (non-atomic) things to the (unsynchronized) statesByVersion map. - Transport.Connection connection; - try { - connection = transportService.getConnection(discoveryNode); - } catch (NodeNotConnectedException e) { - listener.onFailure(e); - return; - } - var version = connection.getTransportVersion(); - var cachedBytes = statesByVersion.get(version); - var bytes = maybeSerializeClusterState(cachedBytes, discoveryNode, version); + var transportVersion = connection.getTransportVersion(); + var cachedBytes = statesByVersion.get(transportVersion); + var bytes = maybeSerializeClusterState(cachedBytes, discoveryNode, transportVersion); if (bytes == null) { // Normally if we're not the master then the Coordinator sends a ping message just to validate connectivity instead of // getting here. But if we were the master when the Coordinator checked then we might not be the master any more, so we @@ -354,7 +368,7 @@ protected void doRun() throws Exception { transportService.sendRequest( connection, JOIN_VALIDATE_ACTION_NAME, - new BytesTransportRequest(bytes, version), + new BytesTransportRequest(bytes, transportVersion), REQUEST_OPTIONS, new CleanableResponseHandler<>( listener.map(ignored -> null), diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestForceMergeAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestForceMergeAction.java index a04e23f289379..4c9ac8fcb9a3c 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestForceMergeAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestForceMergeAction.java @@ -13,7 +13,7 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.ListenableActionFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.Strings; import org.elasticsearch.rest.BaseRestHandler; @@ -65,9 +65,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC if (validationException != null) { throw validationException; } - final var responseFuture = new ListenableActionFuture(); - final var task = client.executeLocally(ForceMergeAction.INSTANCE, mergeRequest, responseFuture); - responseFuture.addListener(new LoggingTaskListener<>(task)); + final var responseListener = new SubscribableListener(); + final var task = client.executeLocally(ForceMergeAction.INSTANCE, mergeRequest, responseListener); + responseListener.addListener(new LoggingTaskListener<>(task)); return sendTask(client.getLocalNodeId(), task); } } diff --git a/server/src/main/java/org/elasticsearch/tasks/LoggingTaskListener.java b/server/src/main/java/org/elasticsearch/tasks/LoggingTaskListener.java index c99194d933131..63e17bd62f8ee 100644 --- a/server/src/main/java/org/elasticsearch/tasks/LoggingTaskListener.java +++ b/server/src/main/java/org/elasticsearch/tasks/LoggingTaskListener.java @@ -31,7 +31,6 @@ public LoggingTaskListener(Task task) { @Override public void onResponse(Response response) { logger.info("{} finished with response {}", task.getId(), response); - } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java index 7ac27d79d3cb8..e492a98748af2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; @@ -48,12 +49,21 @@ public static class Request extends ActionRequest implements IndicesRequest.Repl private String[] indices; private final IndicesOptions indicesOptions; + private final QueryBuilder query; + private final String cluster; private final TimeValue timeout; public Request(StreamInput in) throws IOException { super(in); indices = in.readStringArray(); indicesOptions = IndicesOptions.readIndicesOptions(in); + if (in.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_QUERY_AND_CLUSTER_ADDED)) { + query = in.readOptionalNamedWriteable(QueryBuilder.class); + cluster = in.readOptionalString(); + } else { + query = null; + cluster = null; + } if (in.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_TIMEOUT_ADDED)) { timeout = in.readOptionalTimeValue(); } else { @@ -61,9 +71,11 @@ public Request(StreamInput in) throws IOException { } } - public Request(String[] indices, IndicesOptions indicesOptions, TimeValue timeout) { + public Request(String[] indices, IndicesOptions indicesOptions, QueryBuilder query, String cluster, TimeValue timeout) { this.indices = indices != null ? indices : Strings.EMPTY_ARRAY; this.indicesOptions = indicesOptions; + this.query = query; + this.cluster = cluster; this.timeout = timeout; } @@ -82,6 +94,14 @@ public IndicesOptions indicesOptions() { return indicesOptions; } + public QueryBuilder getQuery() { + return query; + } + + public String getCluster() { + return cluster; + } + public TimeValue getTimeout() { return timeout; } @@ -98,12 +118,14 @@ public boolean equals(Object obj) { return Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions) + && Objects.equals(query, that.query) + && Objects.equals(cluster, that.cluster) && Objects.equals(timeout, that.timeout); } @Override public int hashCode() { - return Objects.hash(Arrays.hashCode(indices), indicesOptions, timeout); + return Objects.hash(Arrays.hashCode(indices), indicesOptions, query, cluster, timeout); } @Override @@ -111,6 +133,10 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeStringArray(indices); indicesOptions.writeIndicesOptions(out); + if (out.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_QUERY_AND_CLUSTER_ADDED)) { + out.writeOptionalNamedWriteable(query); + out.writeOptionalString(cluster); + } if (out.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_TIMEOUT_ADDED)) { out.writeOptionalTimeValue(timeout); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java index 43ec0a0f1b4f5..e96a7741b4f52 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java @@ -11,9 +11,10 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Request; import java.util.ArrayList; @@ -26,7 +27,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -public class GetCheckpointActionRequestTests extends AbstractWireSerializingTestCase { +public class GetCheckpointActionRequestTests extends AbstractWireSerializingTransformTestCase { @Override protected Request createTestInstance() { @@ -42,9 +43,11 @@ protected Reader instanceReader() { protected Request mutateInstance(Request instance) { List indices = instance.indices() != null ? new ArrayList<>(Arrays.asList(instance.indices())) : new ArrayList<>(); IndicesOptions indicesOptions = instance.indicesOptions(); + QueryBuilder query = instance.getQuery(); + String cluster = instance.getCluster(); TimeValue timeout = instance.getTimeout(); - switch (between(0, 2)) { + switch (between(0, 4)) { case 0: indices.add(randomAlphaOfLengthBetween(1, 20)); break; @@ -58,13 +61,19 @@ protected Request mutateInstance(Request instance) { ); break; case 2: + query = query != null ? null : QueryBuilders.matchAllQuery(); + break; + case 3: + cluster = cluster != null ? null : randomAlphaOfLengthBetween(1, 10); + break; + case 4: timeout = timeout != null ? null : TimeValue.timeValueSeconds(randomIntBetween(1, 300)); break; default: throw new AssertionError("Illegal randomization branch"); } - return new Request(indices.toArray(new String[0]), indicesOptions, timeout); + return new Request(indices.toArray(new String[0]), indicesOptions, query, cluster, timeout); } public void testCreateTask() { @@ -74,7 +83,7 @@ public void testCreateTask() { } public void testCreateTaskWithNullIndices() { - Request request = new Request(null, null, null); + Request request = new Request(null, null, null, null, null); CancellableTask task = request.createTask(123, "type", "action", new TaskId("dummy-node:456"), Map.of()); assertThat(task.getDescription(), is(equalTo("get_checkpoint[0]"))); } @@ -89,6 +98,8 @@ private static Request randomRequest(Integer numIndices) { Boolean.toString(randomBoolean()), SearchRequest.DEFAULT_INDICES_OPTIONS ), + randomBoolean() ? QueryBuilders.matchAllQuery() : null, + randomBoolean() ? randomAlphaOfLengthBetween(1, 10) : null, randomBoolean() ? TimeValue.timeValueSeconds(randomIntBetween(1, 300)) : null ); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java index 8eea50226253b..f2011d1cdb987 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java @@ -166,6 +166,7 @@ public void finish() { @Override public boolean isFinished() { + checkFailure(); return finished && checkpoint.getPersistedCheckpoint() == checkpoint.getMaxSeqNo(); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java index b35dc8b7b2e80..00b046abdca24 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java @@ -185,7 +185,6 @@ protected void doClose() { operator.close(); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/102264") public void testFailure() throws Exception { DriverContext driverContext = driverContext(); final SequenceLongBlockSourceOperator sourceOperator = new SequenceLongBlockSourceOperator( @@ -226,15 +225,17 @@ protected void doClose() { PlainActionFuture future = new PlainActionFuture<>(); Driver driver = new Driver(driverContext, sourceOperator, List.of(asyncOperator), outputOperator, () -> {}); Driver.start(threadPool.getThreadContext(), threadPool.executor(ESQL_TEST_EXECUTOR), driver, between(1, 1000), future); - assertBusy(() -> { - assertTrue(asyncOperator.isFinished()); - assertTrue(future.isDone()); - }); + assertBusy(() -> assertTrue(future.isDone())); if (failed.get()) { ElasticsearchException error = expectThrows(ElasticsearchException.class, future::actionGet); assertThat(error.getMessage(), containsString("simulated")); + error = expectThrows(ElasticsearchException.class, asyncOperator::isFinished); + assertThat(error.getMessage(), containsString("simulated")); + error = expectThrows(ElasticsearchException.class, asyncOperator::getOutput); + assertThat(error.getMessage(), containsString("simulated")); } else { - future.actionGet(); + assertTrue(asyncOperator.isFinished()); + assertNull(asyncOperator.getOutput()); } } diff --git a/x-pack/plugin/esql/qa/server/single-node/build.gradle b/x-pack/plugin/esql/qa/server/single-node/build.gradle index 3131b4176ee25..2d430965efb21 100644 --- a/x-pack/plugin/esql/qa/server/single-node/build.gradle +++ b/x-pack/plugin/esql/qa/server/single-node/build.gradle @@ -9,10 +9,9 @@ restResources { restApi { include '_common', 'bulk', 'get', 'indices', 'esql', 'xpack', 'enrich', 'cluster' } -} - -artifacts { - restXpackTests(new File(projectDir, "src/yamlRestTest/resources/rest-api-spec/test")) + restTests { + includeXpack 'esql' + } } testClusters.configureEach { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index 06fd9bd469b84..a141db7037263 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.internal.ClusterAdminClient; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; @@ -1242,8 +1243,15 @@ public void testFilterNestedFields() { } public void testStatsNestFields() { - String node1 = internalCluster().startDataOnlyNode(); - String node2 = internalCluster().startDataOnlyNode(); + final String node1, node2; + if (randomBoolean()) { + internalCluster().ensureAtLeastNumDataNodes(2); + node1 = randomDataNode().getName(); + node2 = randomValueOtherThan(node1, () -> randomDataNode().getName()); + } else { + node1 = randomDataNode().getName(); + node2 = randomDataNode().getName(); + } assertAcked( client().admin() .indices() @@ -1276,8 +1284,15 @@ public void testStatsNestFields() { } public void testStatsMissingFields() { - String node1 = internalCluster().startDataOnlyNode(); - String node2 = internalCluster().startDataOnlyNode(); + final String node1, node2; + if (randomBoolean()) { + internalCluster().ensureAtLeastNumDataNodes(2); + node1 = randomDataNode().getName(); + node2 = randomValueOtherThan(node1, () -> randomDataNode().getName()); + } else { + node1 = randomDataNode().getName(); + node2 = randomDataNode().getName(); + } assertAcked( client().admin() .indices() @@ -1292,7 +1307,6 @@ public void testStatsMissingFields() { .setSettings(Settings.builder().put("index.routing.allocation.require._name", node2)) .setMapping("bar_int", "type=integer", "bar_long", "type=long", "bar_float", "type=float", "bar_double", "type=double") ); - var fields = List.of("foo_int", "foo_long", "foo_float", "foo_double"); var functions = List.of("sum", "count", "avg", "count_distinct"); for (String field : fields) { @@ -1510,4 +1524,8 @@ private void clearPersistentSettings(Setting... settings) { var clearSettingsRequest = new ClusterUpdateSettingsRequest().persistentSettings(clearedSettings.build()); admin().cluster().updateSettings(clearSettingsRequest).actionGet(); } + + private DiscoveryNode randomDataNode() { + return randomFrom(clusterService().state().nodes().getDataNodes().values()); + } } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/100_bug_fix.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/100_bug_fix.yml similarity index 94% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/100_bug_fix.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/100_bug_fix.yml index d5f5bee46f50a..1876d1a6d3881 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/100_bug_fix.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/100_bug_fix.yml @@ -1,6 +1,8 @@ --- -"Bug fix https://github.com/elastic/elasticsearch/issues/99472": +"Coalesce and to_ip functions": - skip: + version: " - 8.11.99" + reason: "fixes in 8.12 or later" features: warnings - do: bulk: @@ -54,7 +56,10 @@ - match: { values.1: [ 20, null, "255.255.255.255", "255.255.255.255"] } --- -"Bug fix https://github.com/elastic/elasticsearch/issues/101489": +"unsupported and invalid mapped fields": + - skip: + version: " - 8.11.99" + reason: "fixes in 8.12 or later" - do: indices.create: index: index1 diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/10_basic.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml similarity index 99% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/10_basic.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml index a3b2de27bcb5b..e15372bc3088e 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/10_basic.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml @@ -1,6 +1,8 @@ --- setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/20_aggs.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/20_aggs.yml similarity index 99% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/20_aggs.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/20_aggs.yml index e94cb6ccd8e3c..4019b3a303345 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/20_aggs.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/20_aggs.yml @@ -1,6 +1,8 @@ --- setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: warnings - do: indices.create: diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/30_types.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/30_types.yml similarity index 99% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/30_types.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/30_types.yml index ab4d4bb54f37b..098931eb0bf96 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/30_types.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/30_types.yml @@ -1,6 +1,8 @@ --- setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: warnings --- diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/40_tsdb.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml similarity index 99% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/40_tsdb.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml index 921eb1f2cee1a..49215a6d4c479 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/40_tsdb.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml @@ -1,5 +1,7 @@ setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/40_unsupported_types.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_unsupported_types.yml similarity index 99% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/40_unsupported_types.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_unsupported_types.yml index c06456f7f127d..be5b43433983e 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/40_unsupported_types.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_unsupported_types.yml @@ -1,5 +1,7 @@ setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/45_non_tsdb_counter.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/45_non_tsdb_counter.yml similarity index 98% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/45_non_tsdb_counter.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/45_non_tsdb_counter.yml index beb7200f01230..13a88d0c2f79f 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/45_non_tsdb_counter.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/45_non_tsdb_counter.yml @@ -1,5 +1,7 @@ setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/50_index_patterns.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/50_index_patterns.yml similarity index 99% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/50_index_patterns.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/50_index_patterns.yml index 2098b9ee60d1e..38023b7791709 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/50_index_patterns.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/50_index_patterns.yml @@ -1,5 +1,7 @@ setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex --- diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/60_enrich.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_enrich.yml similarity index 95% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/60_enrich.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_enrich.yml index 84d8682508733..1673453824584 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/60_enrich.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_enrich.yml @@ -1,6 +1,8 @@ --- setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: @@ -127,3 +129,8 @@ setup: - match: { values.1: [ "Bob", "nyc", "USA" ] } - match: { values.2: [ "Denise", "sgn", null ] } - match: { values.3: [ "Mario", "rom", "Italy" ] } + + - do: + enrich.delete_policy: + name: cities_policy + - is_true: acknowledged diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/60_usage.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml similarity index 96% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/60_usage.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml index d7998651540d8..ad46a3c2d9c3e 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/60_usage.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml @@ -1,5 +1,9 @@ --- setup: + - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" + - do: indices.create: index: test diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/61_enrich_ip.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/61_enrich_ip.yml similarity index 94% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/61_enrich_ip.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/61_enrich_ip.yml index bd89af2fd3f79..0d49f169fc4b2 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/61_enrich_ip.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/61_enrich_ip.yml @@ -1,6 +1,8 @@ --- setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: @@ -95,3 +97,8 @@ setup: - match: { values.1: [ [ "10.100.0.21", "10.101.0.107" ], [ "Production", "QA" ], [ "OPS","Engineering" ], "sending messages" ] } - match: { values.2: [ "10.101.0.107" , "QA", "Engineering", "network disconnected" ] } - match: { values.3: [ "13.101.0.114" , null, null, "authentication failed" ] } + + - do: + enrich.delete_policy: + name: networks-policy + - is_true: acknowledged diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/70_locale.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/70_locale.yml similarity index 96% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/70_locale.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/70_locale.yml index a77e0569668de..bcae5e7cf24a2 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/70_locale.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/70_locale.yml @@ -1,6 +1,8 @@ --- setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/80_text.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/80_text.yml similarity index 99% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/80_text.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/80_text.yml index d6d20fa0a0aee..cef7f88506de8 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/80_text.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/80_text.yml @@ -1,6 +1,8 @@ --- setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/90_non_indexed.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/90_non_indexed.yml similarity index 97% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/90_non_indexed.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/90_non_indexed.yml index 9138a9454c571..c6124e7f75e96 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/90_non_indexed.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/90_non_indexed.yml @@ -1,5 +1,7 @@ setup: - skip: + version: " - 8.11.99" + reason: "extracting non-indexed fields available in 8.12+" features: allowed_warnings - do: indices.create: diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCCSCanMatchIT.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCCSCanMatchIT.java new file mode 100644 index 0000000000000..d05acc7a7b368 --- /dev/null +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCCSCanMatchIT.java @@ -0,0 +1,415 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.checkpoint; + +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.IndexLongFieldRange; +import org.elasticsearch.index.shard.ShardLongFieldRange; +import org.elasticsearch.node.NodeRoleSettings; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.aggregations.BaseAggregationBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.transform.MockDeprecatedAggregationBuilder; +import org.elasticsearch.xpack.core.transform.MockDeprecatedQueryBuilder; +import org.elasticsearch.xpack.core.transform.TransformNamedXContentProvider; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; +import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction; +import org.elasticsearch.xpack.core.transform.action.PutTransformAction; +import org.elasticsearch.xpack.core.transform.action.StartTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.DestConfig; +import org.elasticsearch.xpack.core.transform.transforms.QueryConfig; +import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformStats; +import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfig; +import org.elasticsearch.xpack.transform.LocalStateTransform; +import org.junit.Before; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyList; +import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class TransformCCSCanMatchIT extends AbstractMultiClustersTestCase { + + private static final String REMOTE_CLUSTER = "cluster_a"; + private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1); + + private NamedXContentRegistry namedXContentRegistry; + private long timestamp; + private int oldLocalNumShards; + private int localOldDocs; + private int oldRemoteNumShards; + private int remoteOldDocs; + private int newLocalNumShards; + private int localNewDocs; + private int newRemoteNumShards; + private int remoteNewDocs; + + @Before + public void setUpNamedXContentRegistryAndIndices() throws Exception { + SearchModule searchModule = new SearchModule(Settings.EMPTY, emptyList()); + + List namedXContents = searchModule.getNamedXContents(); + namedXContents.add( + new NamedXContentRegistry.Entry( + QueryBuilder.class, + new ParseField(MockDeprecatedQueryBuilder.NAME), + (p, c) -> MockDeprecatedQueryBuilder.fromXContent(p) + ) + ); + namedXContents.add( + new NamedXContentRegistry.Entry( + BaseAggregationBuilder.class, + new ParseField(MockDeprecatedAggregationBuilder.NAME), + (p, c) -> MockDeprecatedAggregationBuilder.fromXContent(p) + ) + ); + + namedXContents.addAll(new TransformNamedXContentProvider().getNamedXContentParsers()); + + namedXContentRegistry = new NamedXContentRegistry(namedXContents); + + timestamp = randomLongBetween(10_000_000, 50_000_000); + + oldLocalNumShards = randomIntBetween(1, 5); + localOldDocs = createIndexAndIndexDocs(LOCAL_CLUSTER, "local_old_index", oldLocalNumShards, timestamp - 10_000, true); + oldRemoteNumShards = randomIntBetween(1, 5); + remoteOldDocs = createIndexAndIndexDocs(REMOTE_CLUSTER, "remote_old_index", oldRemoteNumShards, timestamp - 10_000, true); + + newLocalNumShards = randomIntBetween(1, 5); + localNewDocs = createIndexAndIndexDocs(LOCAL_CLUSTER, "local_new_index", newLocalNumShards, timestamp, randomBoolean()); + newRemoteNumShards = randomIntBetween(1, 5); + remoteNewDocs = createIndexAndIndexDocs(REMOTE_CLUSTER, "remote_new_index", newRemoteNumShards, timestamp, randomBoolean()); + } + + private int createIndexAndIndexDocs(String cluster, String index, int numberOfShards, long timestamp, boolean exposeTimestamp) + throws Exception { + Client client = client(cluster); + ElasticsearchAssertions.assertAcked( + client.admin() + .indices() + .prepareCreate(index) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .setMapping("@timestamp", "type=date", "position", "type=long") + ); + int numDocs = between(100, 500); + for (int i = 0; i < numDocs; i++) { + client.prepareIndex(index).setSource("position", i, "@timestamp", timestamp + i).get(); + } + if (exposeTimestamp) { + client.admin().indices().prepareClose(index).get(); + client.admin() + .indices() + .prepareUpdateSettings(index) + .setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build()) + .get(); + client.admin().indices().prepareOpen(index).get(); + assertBusy(() -> { + IndexLongFieldRange timestampRange = cluster(cluster).clusterService().state().metadata().index(index).getTimestampRange(); + assertTrue(Strings.toString(timestampRange), timestampRange.containsAllShardRanges()); + }); + } else { + client.admin().indices().prepareRefresh(index).get(); + } + return numDocs; + } + + public void testSearchAction_MatchAllQuery() { + testSearchAction(QueryBuilders.matchAllQuery(), true, localOldDocs + localNewDocs + remoteOldDocs + remoteNewDocs, 0); + testSearchAction(QueryBuilders.matchAllQuery(), false, localOldDocs + localNewDocs + remoteOldDocs + remoteNewDocs, 0); + } + + public void testSearchAction_RangeQuery() { + testSearchAction( + QueryBuilders.rangeQuery("@timestamp").from(timestamp), // This query only matches new documents + true, + localNewDocs + remoteNewDocs, + oldLocalNumShards + oldRemoteNumShards + ); + testSearchAction( + QueryBuilders.rangeQuery("@timestamp").from(timestamp), // This query only matches new documents + false, + localNewDocs + remoteNewDocs, + oldLocalNumShards + oldRemoteNumShards + ); + } + + public void testSearchAction_RangeQueryThatMatchesNoShards() { + testSearchAction( + QueryBuilders.rangeQuery("@timestamp").from(100_000_000), // This query matches no documents + true, + 0, + // All but 2 shards are skipped. TBH I don't know why this 2 shards are not skipped + oldLocalNumShards + newLocalNumShards + oldRemoteNumShards + newRemoteNumShards - 2 + ); + testSearchAction( + QueryBuilders.rangeQuery("@timestamp").from(100_000_000), // This query matches no documents + false, + 0, + // All but 1 shards are skipped. TBH I don't know why this 1 shard is not skipped + oldLocalNumShards + newLocalNumShards + oldRemoteNumShards + newRemoteNumShards - 1 + ); + } + + private void testSearchAction(QueryBuilder query, boolean ccsMinimizeRoundtrips, long expectedHitCount, int expectedSkippedShards) { + SearchSourceBuilder source = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest("local_*", "*:remote_*"); + request.source(source).setCcsMinimizeRoundtrips(ccsMinimizeRoundtrips); + SearchResponse response = client().search(request).actionGet(); + ElasticsearchAssertions.assertHitCount(response, expectedHitCount); + int expectedTotalShards = oldLocalNumShards + newLocalNumShards + oldRemoteNumShards + newRemoteNumShards; + assertThat("Response was: " + response, response.getTotalShards(), is(equalTo(expectedTotalShards))); + assertThat("Response was: " + response, response.getSuccessfulShards(), is(equalTo(expectedTotalShards))); + assertThat("Response was: " + response, response.getFailedShards(), is(equalTo(0))); + assertThat("Response was: " + response, response.getSkippedShards(), is(equalTo(expectedSkippedShards))); + } + + public void testGetCheckpointAction_MatchAllQuery() throws InterruptedException { + testGetCheckpointAction( + client(), + null, + new String[] { "local_*" }, + QueryBuilders.matchAllQuery(), + Set.of("local_old_index", "local_new_index") + ); + testGetCheckpointAction( + client().getRemoteClusterClient(REMOTE_CLUSTER, EsExecutors.DIRECT_EXECUTOR_SERVICE), + REMOTE_CLUSTER, + new String[] { "remote_*" }, + QueryBuilders.matchAllQuery(), + Set.of("remote_old_index", "remote_new_index") + ); + } + + public void testGetCheckpointAction_RangeQuery() throws InterruptedException { + testGetCheckpointAction( + client(), + null, + new String[] { "local_*" }, + QueryBuilders.rangeQuery("@timestamp").from(timestamp), + Set.of("local_new_index") + ); + testGetCheckpointAction( + client().getRemoteClusterClient(REMOTE_CLUSTER, EsExecutors.DIRECT_EXECUTOR_SERVICE), + REMOTE_CLUSTER, + new String[] { "remote_*" }, + QueryBuilders.rangeQuery("@timestamp").from(timestamp), + Set.of("remote_new_index") + ); + } + + public void testGetCheckpointAction_RangeQueryThatMatchesNoShards() throws InterruptedException { + testGetCheckpointAction( + client(), + null, + new String[] { "local_*" }, + QueryBuilders.rangeQuery("@timestamp").from(100_000_000), + Set.of() + ); + testGetCheckpointAction( + client().getRemoteClusterClient(REMOTE_CLUSTER, EsExecutors.DIRECT_EXECUTOR_SERVICE), + REMOTE_CLUSTER, + new String[] { "remote_*" }, + QueryBuilders.rangeQuery("@timestamp").from(100_000_000), + Set.of() + ); + } + + private void testGetCheckpointAction(Client client, String cluster, String[] indices, QueryBuilder query, Set expectedIndices) + throws InterruptedException { + final GetCheckpointAction.Request request = new GetCheckpointAction.Request( + indices, + IndicesOptions.LENIENT_EXPAND_OPEN, + query, + cluster, + TIMEOUT + ); + + CountDownLatch latch = new CountDownLatch(1); + SetOnce finalResponse = new SetOnce<>(); + SetOnce finalException = new SetOnce<>(); + ClientHelper.executeAsyncWithOrigin( + client, + TRANSFORM_ORIGIN, + GetCheckpointAction.INSTANCE, + request, + ActionListener.wrap(response -> { + finalResponse.set(response); + latch.countDown(); + }, e -> { + finalException.set(e); + latch.countDown(); + }) + ); + latch.await(10, TimeUnit.SECONDS); + + assertThat(finalException.get(), is(nullValue())); + assertThat("Response was: " + finalResponse.get(), finalResponse.get().getCheckpoints().keySet(), is(equalTo(expectedIndices))); + } + + public void testTransformLifecycle_MatchAllQuery() throws Exception { + testTransformLifecycle(QueryBuilders.matchAllQuery(), localOldDocs + localNewDocs + remoteOldDocs + remoteNewDocs); + } + + public void testTransformLifecycle_RangeQuery() throws Exception { + testTransformLifecycle(QueryBuilders.rangeQuery("@timestamp").from(timestamp), localNewDocs + remoteNewDocs); + } + + public void testTransformLifecycle_RangeQueryThatMatchesNoShards() throws Exception { + testTransformLifecycle(QueryBuilders.rangeQuery("@timestamp").from(100_000_000), 0); + } + + private void testTransformLifecycle(QueryBuilder query, long expectedHitCount) throws Exception { + String transformId = "test-transform-lifecycle"; + { + QueryConfig queryConfig; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, query.toString())) { + queryConfig = QueryConfig.fromXContent(parser, true); + assertNotNull(queryConfig.getQuery()); + } + TransformConfig transformConfig = TransformConfig.builder() + .setId(transformId) + .setSource(new SourceConfig(new String[] { "local_*", "*:remote_*" }, queryConfig, Map.of())) + .setDest(new DestConfig(transformId + "-dest", null, null)) + .setLatestConfig(new LatestConfig(List.of("position"), "@timestamp")) + .build(); + PutTransformAction.Request request = new PutTransformAction.Request(transformConfig, false, TIMEOUT); + AcknowledgedResponse response = client().execute(PutTransformAction.INSTANCE, request).actionGet(); + assertTrue(response.isAcknowledged()); + } + { + StartTransformAction.Request request = new StartTransformAction.Request(transformId, null, TIMEOUT); + StartTransformAction.Response response = client().execute(StartTransformAction.INSTANCE, request).actionGet(); + assertTrue(response.isAcknowledged()); + } + assertBusy(() -> { + GetTransformStatsAction.Request request = new GetTransformStatsAction.Request(transformId, TIMEOUT); + GetTransformStatsAction.Response response = client().execute(GetTransformStatsAction.INSTANCE, request).actionGet(); + assertThat("Stats were: " + response.getTransformsStats(), response.getTransformsStats(), hasSize(1)); + assertThat(response.getTransformsStats().get(0).getState(), is(equalTo(TransformStats.State.STOPPED))); + assertThat(response.getTransformsStats().get(0).getIndexerStats().getNumDocuments(), is(equalTo(expectedHitCount))); + assertThat(response.getTransformsStats().get(0).getIndexerStats().getNumDeletedDocuments(), is(equalTo(0L))); + assertThat(response.getTransformsStats().get(0).getIndexerStats().getSearchFailures(), is(equalTo(0L))); + assertThat(response.getTransformsStats().get(0).getIndexerStats().getIndexFailures(), is(equalTo(0L))); + }); + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + return namedXContentRegistry; + } + + @Override + protected Collection remoteClusterAlias() { + return List.of(REMOTE_CLUSTER); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + return CollectionUtils.appendToCopy( + CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), LocalStateTransform.class), + ExposingTimestampEnginePlugin.class + ); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), "master, data, ingest, transform, remote_cluster_client") + .put(XPackSettings.SECURITY_ENABLED.getKey(), false) + .build(); + } + + private static class EngineWithExposingTimestamp extends InternalEngine { + EngineWithExposingTimestamp(EngineConfig engineConfig) { + super(engineConfig); + assert IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(config().getIndexSettings().getSettings()) : "require read-only index"; + } + + @Override + public ShardLongFieldRange getRawFieldRange(String field) { + try (Searcher searcher = acquireSearcher("test")) { + final DirectoryReader directoryReader = searcher.getDirectoryReader(); + + final byte[] minPackedValue = PointValues.getMinPackedValue(directoryReader, field); + final byte[] maxPackedValue = PointValues.getMaxPackedValue(directoryReader, field); + if (minPackedValue == null || maxPackedValue == null) { + assert minPackedValue == null && maxPackedValue == null + : Arrays.toString(minPackedValue) + "-" + Arrays.toString(maxPackedValue); + return ShardLongFieldRange.EMPTY; + } + + return ShardLongFieldRange.of(LongPoint.decodeDimension(minPackedValue, 0), LongPoint.decodeDimension(maxPackedValue, 0)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + public static class ExposingTimestampEnginePlugin extends Plugin implements EnginePlugin { + + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + if (IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings.getSettings())) { + return Optional.of(EngineWithExposingTimestamp::new); + } else { + return Optional.of(new InternalEngineFactory()); + } + } + } +} diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointIT.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointIT.java index bb159856b965d..82a3ea85bfe6a 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointIT.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase; @@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; @@ -46,6 +48,8 @@ public void testGetCheckpoint() throws Exception { final GetCheckpointAction.Request request = new GetCheckpointAction.Request( new String[] { indexNamePrefix + "*" }, IndicesOptions.LENIENT_EXPAND_OPEN, + null, + null, TimeValue.timeValueSeconds(5) ); @@ -99,6 +103,40 @@ public void testGetCheckpoint() throws Exception { ); } + public void testGetCheckpointWithQueryThatFiltersOutEverything() throws Exception { + final String indexNamePrefix = "test_index-"; + final int indices = randomIntBetween(1, 5); + final int shards = randomIntBetween(1, 5); + final int docsToCreatePerShard = randomIntBetween(0, 10); + + for (int i = 0; i < indices; ++i) { + indicesAdmin().prepareCreate(indexNamePrefix + i) + .setSettings(indexSettings(shards, 1)) + .setMapping("field", "type=long", "@timestamp", "type=date") + .get(); + for (int j = 0; j < shards; ++j) { + for (int d = 0; d < docsToCreatePerShard; ++d) { + client().prepareIndex(indexNamePrefix + i) + .setSource(Strings.format("{ \"field\":%d, \"@timestamp\": %d }", j, 10_000_000 + d + i + j), XContentType.JSON) + .get(); + } + } + } + indicesAdmin().refresh(new RefreshRequest(indexNamePrefix + "*")); + + final GetCheckpointAction.Request request = new GetCheckpointAction.Request( + new String[] { indexNamePrefix + "*" }, + IndicesOptions.LENIENT_EXPAND_OPEN, + // This query does not match any documents + QueryBuilders.rangeQuery("@timestamp").gte(20_000_000), + null, + TimeValue.timeValueSeconds(5) + ); + + final GetCheckpointAction.Response response = client().execute(GetCheckpointAction.INSTANCE, request).get(); + assertThat("Response was: " + response.getCheckpoints(), response.getCheckpoints(), is(anEmptyMap())); + } + public void testGetCheckpointTimeoutExceeded() throws Exception { final String indexNamePrefix = "test_index-"; final int indices = 100; @@ -111,6 +149,8 @@ public void testGetCheckpointTimeoutExceeded() throws Exception { final GetCheckpointAction.Request request = new GetCheckpointAction.Request( new String[] { indexNamePrefix + "*" }, IndicesOptions.LENIENT_EXPAND_OPEN, + null, + null, TimeValue.ZERO ); diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java index 1411576e61d58..300a075c9f1b2 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -71,6 +72,7 @@ public class TransformGetCheckpointTests extends ESSingleNodeTestCase { private IndicesService indicesService; private ThreadPool threadPool; private IndexNameExpressionResolver indexNameExpressionResolver; + private Client client; private MockTransport mockTransport; private Task transformTask; private final String indexNamePattern = "test_index-"; @@ -133,6 +135,8 @@ protected void onSendRequest(long requestId, String action, TransportRequest req .putCompatibilityVersions("node01", TransportVersions.V_8_5_0, Map.of()) .build(); + client = mock(Client.class); + transformTask = new Task( 1L, "persistent", @@ -157,6 +161,8 @@ public void testEmptyCheckpoint() throws InterruptedException { GetCheckpointAction.Request request = new GetCheckpointAction.Request( Strings.EMPTY_ARRAY, IndicesOptions.LENIENT_EXPAND_OPEN, + null, + null, TimeValue.timeValueSeconds(5) ); assertCheckpointAction(request, response -> { @@ -170,6 +176,8 @@ public void testSingleIndexRequest() throws InterruptedException { GetCheckpointAction.Request request = new GetCheckpointAction.Request( new String[] { indexNamePattern + "0" }, IndicesOptions.LENIENT_EXPAND_OPEN, + null, + null, TimeValue.timeValueSeconds(5) ); @@ -189,6 +197,8 @@ public void testMultiIndexRequest() throws InterruptedException { GetCheckpointAction.Request request = new GetCheckpointAction.Request( testIndices, IndicesOptions.LENIENT_EXPAND_OPEN, + null, + null, TimeValue.timeValueSeconds(5) ); assertCheckpointAction(request, response -> { @@ -208,7 +218,7 @@ public void testMultiIndexRequest() throws InterruptedException { class TestTransportGetCheckpointAction extends TransportGetCheckpointAction { TestTransportGetCheckpointAction() { - super(transportService, new ActionFilters(emptySet()), indicesService, clusterService, indexNameExpressionResolver); + super(transportService, new ActionFilters(emptySet()), indicesService, clusterService, indexNameExpressionResolver, client); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java index 5acc2d4541559..675d71a5d1db9 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java @@ -14,9 +14,15 @@ import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.UnavailableShardsException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchShardsAction; +import org.elasticsearch.action.search.SearchShardsGroup; +import org.elasticsearch.action.search.SearchShardsRequest; +import org.elasticsearch.action.search.SearchShardsResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -31,10 +37,12 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Request; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Response; @@ -57,6 +65,7 @@ public class TransportGetCheckpointAction extends HandledTransportAction listener) { - final ClusterState state = clusterService.state(); - resolveIndicesAndGetCheckpoint(task, request, listener, state); + final ClusterState clusterState = clusterService.state(); + resolveIndicesAndGetCheckpoint(task, request, listener, clusterState); } - protected void resolveIndicesAndGetCheckpoint(Task task, Request request, ActionListener listener, final ClusterState state) { + protected void resolveIndicesAndGetCheckpoint( + Task task, + Request request, + ActionListener listener, + final ClusterState clusterState + ) { + final String nodeId = clusterState.nodes().getLocalNode().getId(); + final TaskId parentTaskId = new TaskId(nodeId, task.getId()); + // note: when security is turned on, the indices are already resolved // TODO: do a quick check and only resolve if necessary?? - String[] concreteIndices = this.indexNameExpressionResolver.concreteIndexNames(state, request); - - Map> nodesAndShards = resolveIndicesToPrimaryShards(state, concreteIndices); - - if (nodesAndShards.size() == 0) { + String[] concreteIndices = this.indexNameExpressionResolver.concreteIndexNames(clusterState, request); + Map> nodesAndShards = resolveIndicesToPrimaryShards(clusterState, concreteIndices); + if (nodesAndShards.isEmpty()) { listener.onResponse(new Response(Collections.emptyMap())); return; } - new AsyncGetCheckpointsFromNodesAction(state, task, nodesAndShards, new OriginalIndices(request), request.getTimeout(), listener) - .start(); + if (request.getQuery() == null) { // If there is no query, then there is no point in filtering + getCheckpointsFromNodes(clusterState, task, nodesAndShards, new OriginalIndices(request), request.getTimeout(), listener); + return; + } + + SearchShardsRequest searchShardsRequest = new SearchShardsRequest( + request.indices(), + SearchRequest.DEFAULT_INDICES_OPTIONS, + request.getQuery(), + null, + null, + false, + request.getCluster() + ); + searchShardsRequest.setParentTask(parentTaskId); + ClientHelper.executeAsyncWithOrigin( + client, + ClientHelper.TRANSFORM_ORIGIN, + SearchShardsAction.INSTANCE, + searchShardsRequest, + ActionListener.wrap(searchShardsResponse -> { + Map> filteredNodesAndShards = filterOutSkippedShards(nodesAndShards, searchShardsResponse); + getCheckpointsFromNodes( + clusterState, + task, + filteredNodesAndShards, + new OriginalIndices(request), + request.getTimeout(), + listener + ); + }, e -> { + // search_shards API failed so we just log the error here and continue just like there was no query + logger.atWarn().withThrowable(e).log("search_shards API failed for cluster [{}]", request.getCluster()); + logger.atTrace() + .withThrowable(e) + .log("search_shards API failed for cluster [{}], request was [{}]", request.getCluster(), searchShardsRequest); + getCheckpointsFromNodes(clusterState, task, nodesAndShards, new OriginalIndices(request), request.getTimeout(), listener); + }) + ); } - private static Map> resolveIndicesToPrimaryShards(ClusterState state, String[] concreteIndices) { + private static Map> resolveIndicesToPrimaryShards(ClusterState clusterState, String[] concreteIndices) { if (concreteIndices.length == 0) { return Collections.emptyMap(); } - final DiscoveryNodes nodes = state.nodes(); + final DiscoveryNodes nodes = clusterState.nodes(); Map> nodesAndShards = new HashMap<>(); - ShardsIterator shardsIt = state.routingTable().allShards(concreteIndices); + ShardsIterator shardsIt = clusterState.routingTable().allShards(concreteIndices); for (ShardRouting shard : shardsIt) { // only take primary shards, which should be exactly 1, this isn't strictly necessary // and we should consider taking any shard copy, but then we need another way to de-dup @@ -112,7 +166,7 @@ private static Map> resolveIndicesToPrimaryShards(ClusterSt } if (shard.assignedToNode() && nodes.get(shard.currentNodeId()) != null) { // special case: The minimum TransportVersion in the cluster is on an old version - if (state.getMinTransportVersion().before(TransportVersions.V_8_2_0)) { + if (clusterState.getMinTransportVersion().before(TransportVersions.V_8_2_0)) { throw new ActionNotFoundTransportException(GetCheckpointNodeAction.NAME); } @@ -125,111 +179,128 @@ private static Map> resolveIndicesToPrimaryShards(ClusterSt return nodesAndShards; } - protected class AsyncGetCheckpointsFromNodesAction { - private final Task task; - private final ActionListener listener; - private final Map> nodesAndShards; - private final OriginalIndices originalIndices; - private final TimeValue timeout; - private final DiscoveryNodes nodes; - private final String localNodeId; - - protected AsyncGetCheckpointsFromNodesAction( - ClusterState clusterState, - Task task, - Map> nodesAndShards, - OriginalIndices originalIndices, - TimeValue timeout, - ActionListener listener - ) { - this.task = task; - this.listener = listener; - this.nodesAndShards = nodesAndShards; - this.originalIndices = originalIndices; - this.timeout = timeout; - this.nodes = clusterState.nodes(); - this.localNodeId = clusterService.localNode().getId(); + static Map> filterOutSkippedShards( + Map> nodesAndShards, + SearchShardsResponse searchShardsResponse + ) { + Map> filteredNodesAndShards = new HashMap<>(nodesAndShards.size()); + // Create a deep copy of the given nodes and shards map. + for (Map.Entry> nodeAndShardsEntry : nodesAndShards.entrySet()) { + String node = nodeAndShardsEntry.getKey(); + Set shards = nodeAndShardsEntry.getValue(); + filteredNodesAndShards.put(node, new HashSet<>(shards)); } - - public void start() { - GroupedActionListener groupedListener = new GroupedActionListener<>( - nodesAndShards.size(), - ActionListener.wrap(responses -> listener.onResponse(mergeNodeResponses(responses)), listener::onFailure) - ); - - for (Entry> oneNodeAndItsShards : nodesAndShards.entrySet()) { - if (task instanceof CancellableTask) { - // There is no point continuing this work if the task has been cancelled. - if (((CancellableTask) task).notifyIfCancelled(listener)) { - return; + // Remove (node, shard) pairs for all the skipped shards. + for (SearchShardsGroup shardGroup : searchShardsResponse.getGroups()) { + if (shardGroup.skipped()) { + for (String allocatedNode : shardGroup.allocatedNodes()) { + Set shards = filteredNodesAndShards.get(allocatedNode); + if (shards != null) { + shards.remove(shardGroup.shardId()); + if (shards.isEmpty()) { + // Remove node if no shards were left. + filteredNodesAndShards.remove(allocatedNode); + } } } - if (localNodeId.equals(oneNodeAndItsShards.getKey())) { - TransportGetCheckpointNodeAction.getGlobalCheckpoints( - indicesService, - task, - oneNodeAndItsShards.getValue(), - timeout, - Clock.systemUTC(), - groupedListener - ); - continue; - } + } + } + return filteredNodesAndShards; + } - GetCheckpointNodeAction.Request nodeCheckpointsRequest = new GetCheckpointNodeAction.Request( - oneNodeAndItsShards.getValue(), - originalIndices, - timeout - ); - DiscoveryNode node = nodes.get(oneNodeAndItsShards.getKey()); - - // paranoia: this should not be possible using the same cluster state - if (node == null) { - listener.onFailure( - new UnavailableShardsException( - oneNodeAndItsShards.getValue().iterator().next(), - "Node not found for [{}] shards", - oneNodeAndItsShards.getValue().size() - ) - ); + private void getCheckpointsFromNodes( + ClusterState clusterState, + Task task, + Map> nodesAndShards, + OriginalIndices originalIndices, + TimeValue timeout, + ActionListener listener + ) { + if (nodesAndShards.isEmpty()) { + listener.onResponse(new Response(Map.of())); + return; + } + + final String localNodeId = clusterService.localNode().getId(); + + GroupedActionListener groupedListener = new GroupedActionListener<>( + nodesAndShards.size(), + ActionListener.wrap(responses -> listener.onResponse(mergeNodeResponses(responses)), listener::onFailure) + ); + + for (Entry> oneNodeAndItsShards : nodesAndShards.entrySet()) { + if (task instanceof CancellableTask) { + // There is no point continuing this work if the task has been cancelled. + if (((CancellableTask) task).notifyIfCancelled(listener)) { return; } - - logger.trace("get checkpoints from node {}", node); - transportService.sendChildRequest( - node, - GetCheckpointNodeAction.NAME, - nodeCheckpointsRequest, + } + if (localNodeId.equals(oneNodeAndItsShards.getKey())) { + TransportGetCheckpointNodeAction.getGlobalCheckpoints( + indicesService, task, - TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>( - groupedListener, - GetCheckpointNodeAction.Response::new, - TransportResponseHandler.TRANSPORT_WORKER + oneNodeAndItsShards.getValue(), + timeout, + Clock.systemUTC(), + groupedListener + ); + continue; + } + + DiscoveryNodes nodes = clusterState.nodes(); + DiscoveryNode node = nodes.get(oneNodeAndItsShards.getKey()); + + // paranoia: this should not be possible using the same cluster state + if (node == null) { + listener.onFailure( + new UnavailableShardsException( + oneNodeAndItsShards.getValue().iterator().next(), + "Node not found for [{}] shards", + oneNodeAndItsShards.getValue().size() ) ); + return; } + + logger.trace("get checkpoints from node {}", node); + GetCheckpointNodeAction.Request nodeCheckpointsRequest = new GetCheckpointNodeAction.Request( + oneNodeAndItsShards.getValue(), + originalIndices, + timeout + ); + transportService.sendChildRequest( + node, + GetCheckpointNodeAction.NAME, + nodeCheckpointsRequest, + task, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + groupedListener, + GetCheckpointNodeAction.Response::new, + TransportResponseHandler.TRANSPORT_WORKER + ) + ); } + } - private static Response mergeNodeResponses(Collection responses) { - // the final list should be ordered by key - Map checkpointsByIndexReduced = new TreeMap<>(); - - // merge the node responses - for (GetCheckpointNodeAction.Response response : responses) { - response.getCheckpoints().forEach((index, checkpoint) -> { - if (checkpointsByIndexReduced.containsKey(index)) { - long[] shardCheckpoints = checkpointsByIndexReduced.get(index); - for (int i = 0; i < checkpoint.length; ++i) { - shardCheckpoints[i] = Math.max(shardCheckpoints[i], checkpoint[i]); - } - } else { - checkpointsByIndexReduced.put(index, checkpoint); - } - }); - } + private static Response mergeNodeResponses(Collection responses) { + // the final list should be ordered by key + Map checkpointsByIndexReduced = new TreeMap<>(); - return new Response(checkpointsByIndexReduced); + // merge the node responses + for (GetCheckpointNodeAction.Response response : responses) { + response.getCheckpoints().forEach((index, checkpoint) -> { + if (checkpointsByIndexReduced.containsKey(index)) { + long[] shardCheckpoints = checkpointsByIndexReduced.get(index); + for (int i = 0; i < checkpoint.length; ++i) { + shardCheckpoints[i] = Math.max(shardCheckpoints[i], checkpoint[i]); + } + } else { + checkpointsByIndexReduced.put(index, checkpoint); + } + }); } + + return new Response(checkpointsByIndexReduced); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java index 13abc427460be..f7e60b13b50a6 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java @@ -121,14 +121,15 @@ protected void taskOperation( TransformTask transformTask, ActionListener listener ) { - // Little extra insurance, make sure we only return transforms that aren't cancelled ClusterState clusterState = clusterService.state(); String nodeId = clusterState.nodes().getLocalNode().getId(); final TaskId parentTaskId = new TaskId(nodeId, actionTask.getId()); + // If the _stats request is cancelled there is no point in continuing this work on the task level if (actionTask.notifyIfCancelled(listener)) { return; } + // Little extra insurance, make sure we only return transforms that aren't cancelled if (transformTask.isCancelled()) { listener.onResponse(new Response(Collections.emptyList())); return; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index aa1332b95fe84..b9b7d9d8477cb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.xpack.core.ClientHelper; @@ -134,14 +135,16 @@ protected void getIndexCheckpoints(TimeValue timeout, ActionListener> remoteIndex : resolvedIndexes.getRemoteIndicesPerClusterAlias().entrySet()) { + String cluster = remoteIndex.getKey(); ParentTaskAssigningClient remoteClient = new ParentTaskAssigningClient( - client.getRemoteClusterClient(remoteIndex.getKey(), EsExecutors.DIRECT_EXECUTOR_SERVICE), + client.getRemoteClusterClient(cluster, EsExecutors.DIRECT_EXECUTOR_SERVICE), client.getParentTask() ); getCheckpointsFromOneCluster( @@ -149,7 +152,8 @@ protected void getIndexCheckpoints(TimeValue timeout, ActionListener headers, String[] indices, + QueryBuilder query, String cluster, ActionListener> listener ) { if (fallbackToBWC.contains(cluster)) { getCheckpointsFromOneClusterBWC(client, timeout, headers, indices, cluster, listener); } else { - getCheckpointsFromOneClusterV2(client, timeout, headers, indices, cluster, ActionListener.wrap(response -> { + getCheckpointsFromOneClusterV2(client, timeout, headers, indices, query, cluster, ActionListener.wrap(response -> { logger.debug( "[{}] Successfully retrieved checkpoints from cluster [{}] using transform checkpoint API", transformConfig.getId(), @@ -200,12 +205,15 @@ private static void getCheckpointsFromOneClusterV2( TimeValue timeout, Map headers, String[] indices, + QueryBuilder query, String cluster, ActionListener> listener ) { GetCheckpointAction.Request getCheckpointRequest = new GetCheckpointAction.Request( indices, IndicesOptions.LENIENT_EXPAND_OPEN, + query, + cluster, timeout ); ActionListener checkpointListener; @@ -239,7 +247,6 @@ private static void getCheckpointsFromOneClusterV2( getCheckpointRequest, checkpointListener ); - } /** diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointActionTests.java new file mode 100644 index 0000000000000..0d2d9619aca68 --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointActionTests.java @@ -0,0 +1,134 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.action; + +import org.elasticsearch.action.search.SearchShardsGroup; +import org.elasticsearch.action.search.SearchShardsResponse; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class TransportGetCheckpointActionTests extends ESTestCase { + + private static final String NODE_0 = "node-0"; + private static final String NODE_1 = "node-1"; + private static final String NODE_2 = "node-2"; + private static final Index INDEX_A = new Index("my-index-A", "A"); + private static final Index INDEX_B = new Index("my-index-B", "B"); + private static final Index INDEX_C = new Index("my-index-C", "C"); + private static final ShardId SHARD_A_0 = new ShardId(INDEX_A, 0); + private static final ShardId SHARD_A_1 = new ShardId(INDEX_A, 1); + private static final ShardId SHARD_B_0 = new ShardId(INDEX_B, 0); + private static final ShardId SHARD_B_1 = new ShardId(INDEX_B, 1); + + private static final Map> NODES_AND_SHARDS = Map.of( + NODE_0, + Set.of(SHARD_A_0, SHARD_A_1, SHARD_B_0, SHARD_B_1), + NODE_1, + Set.of(SHARD_A_0, SHARD_A_1, SHARD_B_0, SHARD_B_1), + NODE_2, + Set.of(SHARD_A_0, SHARD_A_1, SHARD_B_0, SHARD_B_1) + ); + + public void testFilterOutSkippedShards_EmptyNodesAndShards() { + SearchShardsResponse searchShardsResponse = new SearchShardsResponse( + Set.of( + new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_1), true), + new SearchShardsGroup(SHARD_B_0, List.of(NODE_1, NODE_2), false), + new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_2), true) + ), + Set.of(), + Map.of() + ); + Map> filteredNodesAndShards = TransportGetCheckpointAction.filterOutSkippedShards( + Map.of(), + searchShardsResponse + ); + assertThat(filteredNodesAndShards, is(anEmptyMap())); + } + + public void testFilterOutSkippedShards_EmptySearchShardsResponse() { + SearchShardsResponse searchShardsResponse = new SearchShardsResponse(Set.of(), Set.of(), Map.of()); + Map> filteredNodesAndShards = TransportGetCheckpointAction.filterOutSkippedShards( + NODES_AND_SHARDS, + searchShardsResponse + ); + assertThat(filteredNodesAndShards, is(equalTo(NODES_AND_SHARDS))); + } + + public void testFilterOutSkippedShards_SomeNodesEmptyAfterFiltering() { + SearchShardsResponse searchShardsResponse = new SearchShardsResponse( + Set.of( + new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_2), true), + new SearchShardsGroup(SHARD_A_1, List.of(NODE_0, NODE_2), true), + new SearchShardsGroup(SHARD_B_0, List.of(NODE_0, NODE_2), true), + new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_2), true) + ), + Set.of(), + Map.of() + ); + Map> filteredNodesAndShards = TransportGetCheckpointAction.filterOutSkippedShards( + NODES_AND_SHARDS, + searchShardsResponse + ); + Map> expectedFilteredNodesAndShards = Map.of(NODE_1, Set.of(SHARD_A_0, SHARD_A_1, SHARD_B_0, SHARD_B_1)); + assertThat(filteredNodesAndShards, is(equalTo(expectedFilteredNodesAndShards))); + } + + public void testFilterOutSkippedShards_AllNodesEmptyAfterFiltering() { + SearchShardsResponse searchShardsResponse = new SearchShardsResponse( + Set.of( + new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_1, NODE_2), true), + new SearchShardsGroup(SHARD_A_1, List.of(NODE_0, NODE_1, NODE_2), true), + new SearchShardsGroup(SHARD_B_0, List.of(NODE_0, NODE_1, NODE_2), true), + new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_1, NODE_2), true) + ), + Set.of(), + Map.of() + ); + Map> filteredNodesAndShards = TransportGetCheckpointAction.filterOutSkippedShards( + NODES_AND_SHARDS, + searchShardsResponse + ); + assertThat(filteredNodesAndShards, is(equalTo(Map.of()))); + } + + public void testFilterOutSkippedShards() { + SearchShardsResponse searchShardsResponse = new SearchShardsResponse( + Set.of( + new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_1), true), + new SearchShardsGroup(SHARD_B_0, List.of(NODE_1, NODE_2), false), + new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_2), true), + new SearchShardsGroup(new ShardId(INDEX_C, 0), List.of(NODE_0, NODE_1, NODE_2), true) + ), + Set.of(), + Map.of() + ); + Map> filteredNodesAndShards = TransportGetCheckpointAction.filterOutSkippedShards( + NODES_AND_SHARDS, + searchShardsResponse + ); + Map> expectedFilteredNodesAndShards = Map.of( + NODE_0, + Set.of(SHARD_A_1, SHARD_B_0), + NODE_1, + Set.of(SHARD_A_1, SHARD_B_0, SHARD_B_1), + NODE_2, + Set.of(SHARD_A_0, SHARD_A_1, SHARD_B_0) + ); + assertThat(filteredNodesAndShards, is(equalTo(expectedFilteredNodesAndShards))); + } +}