From 951acf2ca25d0f32fb486a6c757e9a102a8d2f9a Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 20 Nov 2023 14:51:13 +0000 Subject: [PATCH 1/8] Migrate JoinValidationService to TransportVersion (#102372) We introduced a new join validation protocol in version 8.3, effectively a different transport protocol. However today we are still checking the node's release version when deciding which validation protocol to use. This commit migrates to using the `TransportVersion` of the relevant connection. --- .../coordination/JoinValidationService.java | 114 ++++++++++-------- 1 file changed, 64 insertions(+), 50 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java index d9911ad12df84..dc18a7950394a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java @@ -12,7 +12,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.TransportVersion; -import org.elasticsearch.Version; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.cluster.ClusterState; @@ -30,6 +30,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.UpdateForV9; import org.elasticsearch.env.Environment; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; @@ -148,10 +149,23 @@ public JoinValidationService( } public void validateJoin(DiscoveryNode discoveryNode, ActionListener listener) { - if (discoveryNode.getVersion().onOrAfter(Version.V_8_3_0)) { + // This node isn't in the cluster yet so ClusterState#getMinTransportVersion() doesn't apply, we must obtain a specific connection + // so we can check its transport version to decide how to proceed. + + final Transport.Connection connection; + try { + connection = transportService.getConnection(discoveryNode); + assert connection != null; + } catch (Exception e) { + assert e instanceof NodeNotConnectedException : e; + listener.onFailure(e); + return; + } + + if (connection.getTransportVersion().onOrAfter(TransportVersions.V_8_3_0)) { if (executeRefs.tryIncRef()) { try { - execute(new JoinValidation(discoveryNode, listener)); + execute(new JoinValidation(discoveryNode, connection, listener)); } finally { executeRefs.decRef(); } @@ -159,39 +173,44 @@ public void validateJoin(DiscoveryNode discoveryNode, ActionListener liste listener.onFailure(new NodeClosedException(transportService.getLocalNode())); } } else { - final var responseHandler = TransportResponseHandler.empty(responseExecutor, listener.delegateResponse((l, e) -> { - logger.warn(() -> "failed to validate incoming join request from node [" + discoveryNode + "]", e); - listener.onFailure( - new IllegalStateException( - String.format( - Locale.ROOT, - "failure when sending a join validation request from [%s] to [%s]", - transportService.getLocalNode().descriptionWithoutAttributes(), - discoveryNode.descriptionWithoutAttributes() - ), - e - ) - ); - })); - final var clusterState = clusterStateSupplier.get(); - if (clusterState != null) { - assert clusterState.nodes().isLocalNodeElectedMaster(); - transportService.sendRequest( - discoveryNode, - JOIN_VALIDATE_ACTION_NAME, - new ValidateJoinRequest(clusterState), - REQUEST_OPTIONS, - responseHandler - ); - } else { - transportService.sendRequest( - discoveryNode, - JoinHelper.JOIN_PING_ACTION_NAME, - TransportRequest.Empty.INSTANCE, - REQUEST_OPTIONS, - responseHandler - ); - } + legacyValidateJoin(discoveryNode, listener, connection); + } + } + + @UpdateForV9 + private void legacyValidateJoin(DiscoveryNode discoveryNode, ActionListener listener, Transport.Connection connection) { + final var responseHandler = TransportResponseHandler.empty(responseExecutor, listener.delegateResponse((l, e) -> { + logger.warn(() -> "failed to validate incoming join request from node [" + discoveryNode + "]", e); + listener.onFailure( + new IllegalStateException( + String.format( + Locale.ROOT, + "failure when sending a join validation request from [%s] to [%s]", + transportService.getLocalNode().descriptionWithoutAttributes(), + discoveryNode.descriptionWithoutAttributes() + ), + e + ) + ); + })); + final var clusterState = clusterStateSupplier.get(); + if (clusterState != null) { + assert clusterState.nodes().isLocalNodeElectedMaster(); + transportService.sendRequest( + connection, + JOIN_VALIDATE_ACTION_NAME, + new ValidateJoinRequest(clusterState), + REQUEST_OPTIONS, + responseHandler + ); + } else { + transportService.sendRequest( + connection, + JoinHelper.JOIN_PING_ACTION_NAME, + TransportRequest.Empty.INSTANCE, + REQUEST_OPTIONS, + responseHandler + ); } } @@ -312,27 +331,22 @@ public String toString() { private class JoinValidation extends ActionRunnable { private final DiscoveryNode discoveryNode; + private final Transport.Connection connection; - JoinValidation(DiscoveryNode discoveryNode, ActionListener listener) { + JoinValidation(DiscoveryNode discoveryNode, Transport.Connection connection, ActionListener listener) { super(listener); this.discoveryNode = discoveryNode; + this.connection = connection; } @Override - protected void doRun() throws Exception { - assert discoveryNode.getVersion().onOrAfter(Version.V_8_3_0) : discoveryNode.getVersion(); + protected void doRun() { + assert connection.getTransportVersion().onOrAfter(TransportVersions.V_8_3_0) : discoveryNode.getVersion(); // NB these things never run concurrently to each other, or to the cache cleaner (see IMPLEMENTATION NOTES above) so it is safe // to do these (non-atomic) things to the (unsynchronized) statesByVersion map. - Transport.Connection connection; - try { - connection = transportService.getConnection(discoveryNode); - } catch (NodeNotConnectedException e) { - listener.onFailure(e); - return; - } - var version = connection.getTransportVersion(); - var cachedBytes = statesByVersion.get(version); - var bytes = maybeSerializeClusterState(cachedBytes, discoveryNode, version); + var transportVersion = connection.getTransportVersion(); + var cachedBytes = statesByVersion.get(transportVersion); + var bytes = maybeSerializeClusterState(cachedBytes, discoveryNode, transportVersion); if (bytes == null) { // Normally if we're not the master then the Coordinator sends a ping message just to validate connectivity instead of // getting here. But if we were the master when the Coordinator checked then we might not be the master any more, so we @@ -354,7 +368,7 @@ protected void doRun() throws Exception { transportService.sendRequest( connection, JOIN_VALIDATE_ACTION_NAME, - new BytesTransportRequest(bytes, version), + new BytesTransportRequest(bytes, transportVersion), REQUEST_OPTIONS, new CleanableResponseHandler<>( listener.map(ignored -> null), From 893e0bed9f853e1b5f4fc9ded1b20b6dc1b4edc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Mon, 20 Nov 2023 16:00:57 +0100 Subject: [PATCH 2/8] [Transform] Skip shards that don't match the source query during checkpointing (#102138) --- docs/changelog/102138.yaml | 5 + .../org/elasticsearch/TransportVersions.java | 1 + .../transform/action/GetCheckpointAction.java | 30 +- .../GetCheckpointActionRequestTests.java | 21 +- .../checkpoint/TransformCCSCanMatchIT.java | 415 ++++++++++++++++++ .../checkpoint/TransformGetCheckpointIT.java | 40 ++ .../TransformGetCheckpointTests.java | 12 +- .../action/TransportGetCheckpointAction.java | 285 +++++++----- .../TransportGetTransformStatsAction.java | 3 +- .../checkpoint/DefaultCheckpointProvider.java | 15 +- .../TransportGetCheckpointActionTests.java | 134 ++++++ 11 files changed, 841 insertions(+), 120 deletions(-) create mode 100644 docs/changelog/102138.yaml create mode 100644 x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCCSCanMatchIT.java create mode 100644 x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointActionTests.java diff --git a/docs/changelog/102138.yaml b/docs/changelog/102138.yaml new file mode 100644 index 0000000000000..3819e3201150e --- /dev/null +++ b/docs/changelog/102138.yaml @@ -0,0 +1,5 @@ +pr: 102138 +summary: Skip shards that don't match the source query during checkpointing +area: Transform +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index baae500b70d55..5ad1d43c0d4f8 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -172,6 +172,7 @@ static TransportVersion def(int id) { public static final TransportVersion DATA_STREAM_FAILURE_STORE_ADDED = def(8_541_00_0); public static final TransportVersion ML_INFERENCE_OPENAI_ADDED = def(8_542_00_0); public static final TransportVersion SHUTDOWN_MIGRATION_STATUS_INCLUDE_COUNTS = def(8_543_00_0); + public static final TransportVersion TRANSFORM_GET_CHECKPOINT_QUERY_AND_CLUSTER_ADDED = def(8_544_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java index 7ac27d79d3cb8..e492a98748af2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; @@ -48,12 +49,21 @@ public static class Request extends ActionRequest implements IndicesRequest.Repl private String[] indices; private final IndicesOptions indicesOptions; + private final QueryBuilder query; + private final String cluster; private final TimeValue timeout; public Request(StreamInput in) throws IOException { super(in); indices = in.readStringArray(); indicesOptions = IndicesOptions.readIndicesOptions(in); + if (in.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_QUERY_AND_CLUSTER_ADDED)) { + query = in.readOptionalNamedWriteable(QueryBuilder.class); + cluster = in.readOptionalString(); + } else { + query = null; + cluster = null; + } if (in.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_TIMEOUT_ADDED)) { timeout = in.readOptionalTimeValue(); } else { @@ -61,9 +71,11 @@ public Request(StreamInput in) throws IOException { } } - public Request(String[] indices, IndicesOptions indicesOptions, TimeValue timeout) { + public Request(String[] indices, IndicesOptions indicesOptions, QueryBuilder query, String cluster, TimeValue timeout) { this.indices = indices != null ? indices : Strings.EMPTY_ARRAY; this.indicesOptions = indicesOptions; + this.query = query; + this.cluster = cluster; this.timeout = timeout; } @@ -82,6 +94,14 @@ public IndicesOptions indicesOptions() { return indicesOptions; } + public QueryBuilder getQuery() { + return query; + } + + public String getCluster() { + return cluster; + } + public TimeValue getTimeout() { return timeout; } @@ -98,12 +118,14 @@ public boolean equals(Object obj) { return Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions) + && Objects.equals(query, that.query) + && Objects.equals(cluster, that.cluster) && Objects.equals(timeout, that.timeout); } @Override public int hashCode() { - return Objects.hash(Arrays.hashCode(indices), indicesOptions, timeout); + return Objects.hash(Arrays.hashCode(indices), indicesOptions, query, cluster, timeout); } @Override @@ -111,6 +133,10 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeStringArray(indices); indicesOptions.writeIndicesOptions(out); + if (out.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_QUERY_AND_CLUSTER_ADDED)) { + out.writeOptionalNamedWriteable(query); + out.writeOptionalString(cluster); + } if (out.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_TIMEOUT_ADDED)) { out.writeOptionalTimeValue(timeout); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java index 43ec0a0f1b4f5..e96a7741b4f52 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java @@ -11,9 +11,10 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Request; import java.util.ArrayList; @@ -26,7 +27,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -public class GetCheckpointActionRequestTests extends AbstractWireSerializingTestCase { +public class GetCheckpointActionRequestTests extends AbstractWireSerializingTransformTestCase { @Override protected Request createTestInstance() { @@ -42,9 +43,11 @@ protected Reader instanceReader() { protected Request mutateInstance(Request instance) { List indices = instance.indices() != null ? new ArrayList<>(Arrays.asList(instance.indices())) : new ArrayList<>(); IndicesOptions indicesOptions = instance.indicesOptions(); + QueryBuilder query = instance.getQuery(); + String cluster = instance.getCluster(); TimeValue timeout = instance.getTimeout(); - switch (between(0, 2)) { + switch (between(0, 4)) { case 0: indices.add(randomAlphaOfLengthBetween(1, 20)); break; @@ -58,13 +61,19 @@ protected Request mutateInstance(Request instance) { ); break; case 2: + query = query != null ? null : QueryBuilders.matchAllQuery(); + break; + case 3: + cluster = cluster != null ? null : randomAlphaOfLengthBetween(1, 10); + break; + case 4: timeout = timeout != null ? null : TimeValue.timeValueSeconds(randomIntBetween(1, 300)); break; default: throw new AssertionError("Illegal randomization branch"); } - return new Request(indices.toArray(new String[0]), indicesOptions, timeout); + return new Request(indices.toArray(new String[0]), indicesOptions, query, cluster, timeout); } public void testCreateTask() { @@ -74,7 +83,7 @@ public void testCreateTask() { } public void testCreateTaskWithNullIndices() { - Request request = new Request(null, null, null); + Request request = new Request(null, null, null, null, null); CancellableTask task = request.createTask(123, "type", "action", new TaskId("dummy-node:456"), Map.of()); assertThat(task.getDescription(), is(equalTo("get_checkpoint[0]"))); } @@ -89,6 +98,8 @@ private static Request randomRequest(Integer numIndices) { Boolean.toString(randomBoolean()), SearchRequest.DEFAULT_INDICES_OPTIONS ), + randomBoolean() ? QueryBuilders.matchAllQuery() : null, + randomBoolean() ? randomAlphaOfLengthBetween(1, 10) : null, randomBoolean() ? TimeValue.timeValueSeconds(randomIntBetween(1, 300)) : null ); } diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCCSCanMatchIT.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCCSCanMatchIT.java new file mode 100644 index 0000000000000..d05acc7a7b368 --- /dev/null +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCCSCanMatchIT.java @@ -0,0 +1,415 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.checkpoint; + +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.IndexLongFieldRange; +import org.elasticsearch.index.shard.ShardLongFieldRange; +import org.elasticsearch.node.NodeRoleSettings; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.aggregations.BaseAggregationBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.transform.MockDeprecatedAggregationBuilder; +import org.elasticsearch.xpack.core.transform.MockDeprecatedQueryBuilder; +import org.elasticsearch.xpack.core.transform.TransformNamedXContentProvider; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; +import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction; +import org.elasticsearch.xpack.core.transform.action.PutTransformAction; +import org.elasticsearch.xpack.core.transform.action.StartTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.DestConfig; +import org.elasticsearch.xpack.core.transform.transforms.QueryConfig; +import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformStats; +import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfig; +import org.elasticsearch.xpack.transform.LocalStateTransform; +import org.junit.Before; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyList; +import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class TransformCCSCanMatchIT extends AbstractMultiClustersTestCase { + + private static final String REMOTE_CLUSTER = "cluster_a"; + private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1); + + private NamedXContentRegistry namedXContentRegistry; + private long timestamp; + private int oldLocalNumShards; + private int localOldDocs; + private int oldRemoteNumShards; + private int remoteOldDocs; + private int newLocalNumShards; + private int localNewDocs; + private int newRemoteNumShards; + private int remoteNewDocs; + + @Before + public void setUpNamedXContentRegistryAndIndices() throws Exception { + SearchModule searchModule = new SearchModule(Settings.EMPTY, emptyList()); + + List namedXContents = searchModule.getNamedXContents(); + namedXContents.add( + new NamedXContentRegistry.Entry( + QueryBuilder.class, + new ParseField(MockDeprecatedQueryBuilder.NAME), + (p, c) -> MockDeprecatedQueryBuilder.fromXContent(p) + ) + ); + namedXContents.add( + new NamedXContentRegistry.Entry( + BaseAggregationBuilder.class, + new ParseField(MockDeprecatedAggregationBuilder.NAME), + (p, c) -> MockDeprecatedAggregationBuilder.fromXContent(p) + ) + ); + + namedXContents.addAll(new TransformNamedXContentProvider().getNamedXContentParsers()); + + namedXContentRegistry = new NamedXContentRegistry(namedXContents); + + timestamp = randomLongBetween(10_000_000, 50_000_000); + + oldLocalNumShards = randomIntBetween(1, 5); + localOldDocs = createIndexAndIndexDocs(LOCAL_CLUSTER, "local_old_index", oldLocalNumShards, timestamp - 10_000, true); + oldRemoteNumShards = randomIntBetween(1, 5); + remoteOldDocs = createIndexAndIndexDocs(REMOTE_CLUSTER, "remote_old_index", oldRemoteNumShards, timestamp - 10_000, true); + + newLocalNumShards = randomIntBetween(1, 5); + localNewDocs = createIndexAndIndexDocs(LOCAL_CLUSTER, "local_new_index", newLocalNumShards, timestamp, randomBoolean()); + newRemoteNumShards = randomIntBetween(1, 5); + remoteNewDocs = createIndexAndIndexDocs(REMOTE_CLUSTER, "remote_new_index", newRemoteNumShards, timestamp, randomBoolean()); + } + + private int createIndexAndIndexDocs(String cluster, String index, int numberOfShards, long timestamp, boolean exposeTimestamp) + throws Exception { + Client client = client(cluster); + ElasticsearchAssertions.assertAcked( + client.admin() + .indices() + .prepareCreate(index) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .setMapping("@timestamp", "type=date", "position", "type=long") + ); + int numDocs = between(100, 500); + for (int i = 0; i < numDocs; i++) { + client.prepareIndex(index).setSource("position", i, "@timestamp", timestamp + i).get(); + } + if (exposeTimestamp) { + client.admin().indices().prepareClose(index).get(); + client.admin() + .indices() + .prepareUpdateSettings(index) + .setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build()) + .get(); + client.admin().indices().prepareOpen(index).get(); + assertBusy(() -> { + IndexLongFieldRange timestampRange = cluster(cluster).clusterService().state().metadata().index(index).getTimestampRange(); + assertTrue(Strings.toString(timestampRange), timestampRange.containsAllShardRanges()); + }); + } else { + client.admin().indices().prepareRefresh(index).get(); + } + return numDocs; + } + + public void testSearchAction_MatchAllQuery() { + testSearchAction(QueryBuilders.matchAllQuery(), true, localOldDocs + localNewDocs + remoteOldDocs + remoteNewDocs, 0); + testSearchAction(QueryBuilders.matchAllQuery(), false, localOldDocs + localNewDocs + remoteOldDocs + remoteNewDocs, 0); + } + + public void testSearchAction_RangeQuery() { + testSearchAction( + QueryBuilders.rangeQuery("@timestamp").from(timestamp), // This query only matches new documents + true, + localNewDocs + remoteNewDocs, + oldLocalNumShards + oldRemoteNumShards + ); + testSearchAction( + QueryBuilders.rangeQuery("@timestamp").from(timestamp), // This query only matches new documents + false, + localNewDocs + remoteNewDocs, + oldLocalNumShards + oldRemoteNumShards + ); + } + + public void testSearchAction_RangeQueryThatMatchesNoShards() { + testSearchAction( + QueryBuilders.rangeQuery("@timestamp").from(100_000_000), // This query matches no documents + true, + 0, + // All but 2 shards are skipped. TBH I don't know why this 2 shards are not skipped + oldLocalNumShards + newLocalNumShards + oldRemoteNumShards + newRemoteNumShards - 2 + ); + testSearchAction( + QueryBuilders.rangeQuery("@timestamp").from(100_000_000), // This query matches no documents + false, + 0, + // All but 1 shards are skipped. TBH I don't know why this 1 shard is not skipped + oldLocalNumShards + newLocalNumShards + oldRemoteNumShards + newRemoteNumShards - 1 + ); + } + + private void testSearchAction(QueryBuilder query, boolean ccsMinimizeRoundtrips, long expectedHitCount, int expectedSkippedShards) { + SearchSourceBuilder source = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest("local_*", "*:remote_*"); + request.source(source).setCcsMinimizeRoundtrips(ccsMinimizeRoundtrips); + SearchResponse response = client().search(request).actionGet(); + ElasticsearchAssertions.assertHitCount(response, expectedHitCount); + int expectedTotalShards = oldLocalNumShards + newLocalNumShards + oldRemoteNumShards + newRemoteNumShards; + assertThat("Response was: " + response, response.getTotalShards(), is(equalTo(expectedTotalShards))); + assertThat("Response was: " + response, response.getSuccessfulShards(), is(equalTo(expectedTotalShards))); + assertThat("Response was: " + response, response.getFailedShards(), is(equalTo(0))); + assertThat("Response was: " + response, response.getSkippedShards(), is(equalTo(expectedSkippedShards))); + } + + public void testGetCheckpointAction_MatchAllQuery() throws InterruptedException { + testGetCheckpointAction( + client(), + null, + new String[] { "local_*" }, + QueryBuilders.matchAllQuery(), + Set.of("local_old_index", "local_new_index") + ); + testGetCheckpointAction( + client().getRemoteClusterClient(REMOTE_CLUSTER, EsExecutors.DIRECT_EXECUTOR_SERVICE), + REMOTE_CLUSTER, + new String[] { "remote_*" }, + QueryBuilders.matchAllQuery(), + Set.of("remote_old_index", "remote_new_index") + ); + } + + public void testGetCheckpointAction_RangeQuery() throws InterruptedException { + testGetCheckpointAction( + client(), + null, + new String[] { "local_*" }, + QueryBuilders.rangeQuery("@timestamp").from(timestamp), + Set.of("local_new_index") + ); + testGetCheckpointAction( + client().getRemoteClusterClient(REMOTE_CLUSTER, EsExecutors.DIRECT_EXECUTOR_SERVICE), + REMOTE_CLUSTER, + new String[] { "remote_*" }, + QueryBuilders.rangeQuery("@timestamp").from(timestamp), + Set.of("remote_new_index") + ); + } + + public void testGetCheckpointAction_RangeQueryThatMatchesNoShards() throws InterruptedException { + testGetCheckpointAction( + client(), + null, + new String[] { "local_*" }, + QueryBuilders.rangeQuery("@timestamp").from(100_000_000), + Set.of() + ); + testGetCheckpointAction( + client().getRemoteClusterClient(REMOTE_CLUSTER, EsExecutors.DIRECT_EXECUTOR_SERVICE), + REMOTE_CLUSTER, + new String[] { "remote_*" }, + QueryBuilders.rangeQuery("@timestamp").from(100_000_000), + Set.of() + ); + } + + private void testGetCheckpointAction(Client client, String cluster, String[] indices, QueryBuilder query, Set expectedIndices) + throws InterruptedException { + final GetCheckpointAction.Request request = new GetCheckpointAction.Request( + indices, + IndicesOptions.LENIENT_EXPAND_OPEN, + query, + cluster, + TIMEOUT + ); + + CountDownLatch latch = new CountDownLatch(1); + SetOnce finalResponse = new SetOnce<>(); + SetOnce finalException = new SetOnce<>(); + ClientHelper.executeAsyncWithOrigin( + client, + TRANSFORM_ORIGIN, + GetCheckpointAction.INSTANCE, + request, + ActionListener.wrap(response -> { + finalResponse.set(response); + latch.countDown(); + }, e -> { + finalException.set(e); + latch.countDown(); + }) + ); + latch.await(10, TimeUnit.SECONDS); + + assertThat(finalException.get(), is(nullValue())); + assertThat("Response was: " + finalResponse.get(), finalResponse.get().getCheckpoints().keySet(), is(equalTo(expectedIndices))); + } + + public void testTransformLifecycle_MatchAllQuery() throws Exception { + testTransformLifecycle(QueryBuilders.matchAllQuery(), localOldDocs + localNewDocs + remoteOldDocs + remoteNewDocs); + } + + public void testTransformLifecycle_RangeQuery() throws Exception { + testTransformLifecycle(QueryBuilders.rangeQuery("@timestamp").from(timestamp), localNewDocs + remoteNewDocs); + } + + public void testTransformLifecycle_RangeQueryThatMatchesNoShards() throws Exception { + testTransformLifecycle(QueryBuilders.rangeQuery("@timestamp").from(100_000_000), 0); + } + + private void testTransformLifecycle(QueryBuilder query, long expectedHitCount) throws Exception { + String transformId = "test-transform-lifecycle"; + { + QueryConfig queryConfig; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, query.toString())) { + queryConfig = QueryConfig.fromXContent(parser, true); + assertNotNull(queryConfig.getQuery()); + } + TransformConfig transformConfig = TransformConfig.builder() + .setId(transformId) + .setSource(new SourceConfig(new String[] { "local_*", "*:remote_*" }, queryConfig, Map.of())) + .setDest(new DestConfig(transformId + "-dest", null, null)) + .setLatestConfig(new LatestConfig(List.of("position"), "@timestamp")) + .build(); + PutTransformAction.Request request = new PutTransformAction.Request(transformConfig, false, TIMEOUT); + AcknowledgedResponse response = client().execute(PutTransformAction.INSTANCE, request).actionGet(); + assertTrue(response.isAcknowledged()); + } + { + StartTransformAction.Request request = new StartTransformAction.Request(transformId, null, TIMEOUT); + StartTransformAction.Response response = client().execute(StartTransformAction.INSTANCE, request).actionGet(); + assertTrue(response.isAcknowledged()); + } + assertBusy(() -> { + GetTransformStatsAction.Request request = new GetTransformStatsAction.Request(transformId, TIMEOUT); + GetTransformStatsAction.Response response = client().execute(GetTransformStatsAction.INSTANCE, request).actionGet(); + assertThat("Stats were: " + response.getTransformsStats(), response.getTransformsStats(), hasSize(1)); + assertThat(response.getTransformsStats().get(0).getState(), is(equalTo(TransformStats.State.STOPPED))); + assertThat(response.getTransformsStats().get(0).getIndexerStats().getNumDocuments(), is(equalTo(expectedHitCount))); + assertThat(response.getTransformsStats().get(0).getIndexerStats().getNumDeletedDocuments(), is(equalTo(0L))); + assertThat(response.getTransformsStats().get(0).getIndexerStats().getSearchFailures(), is(equalTo(0L))); + assertThat(response.getTransformsStats().get(0).getIndexerStats().getIndexFailures(), is(equalTo(0L))); + }); + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + return namedXContentRegistry; + } + + @Override + protected Collection remoteClusterAlias() { + return List.of(REMOTE_CLUSTER); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + return CollectionUtils.appendToCopy( + CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), LocalStateTransform.class), + ExposingTimestampEnginePlugin.class + ); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), "master, data, ingest, transform, remote_cluster_client") + .put(XPackSettings.SECURITY_ENABLED.getKey(), false) + .build(); + } + + private static class EngineWithExposingTimestamp extends InternalEngine { + EngineWithExposingTimestamp(EngineConfig engineConfig) { + super(engineConfig); + assert IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(config().getIndexSettings().getSettings()) : "require read-only index"; + } + + @Override + public ShardLongFieldRange getRawFieldRange(String field) { + try (Searcher searcher = acquireSearcher("test")) { + final DirectoryReader directoryReader = searcher.getDirectoryReader(); + + final byte[] minPackedValue = PointValues.getMinPackedValue(directoryReader, field); + final byte[] maxPackedValue = PointValues.getMaxPackedValue(directoryReader, field); + if (minPackedValue == null || maxPackedValue == null) { + assert minPackedValue == null && maxPackedValue == null + : Arrays.toString(minPackedValue) + "-" + Arrays.toString(maxPackedValue); + return ShardLongFieldRange.EMPTY; + } + + return ShardLongFieldRange.of(LongPoint.decodeDimension(minPackedValue, 0), LongPoint.decodeDimension(maxPackedValue, 0)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + public static class ExposingTimestampEnginePlugin extends Plugin implements EnginePlugin { + + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + if (IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings.getSettings())) { + return Optional.of(EngineWithExposingTimestamp::new); + } else { + return Optional.of(new InternalEngineFactory()); + } + } + } +} diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointIT.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointIT.java index bb159856b965d..82a3ea85bfe6a 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointIT.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase; @@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; @@ -46,6 +48,8 @@ public void testGetCheckpoint() throws Exception { final GetCheckpointAction.Request request = new GetCheckpointAction.Request( new String[] { indexNamePrefix + "*" }, IndicesOptions.LENIENT_EXPAND_OPEN, + null, + null, TimeValue.timeValueSeconds(5) ); @@ -99,6 +103,40 @@ public void testGetCheckpoint() throws Exception { ); } + public void testGetCheckpointWithQueryThatFiltersOutEverything() throws Exception { + final String indexNamePrefix = "test_index-"; + final int indices = randomIntBetween(1, 5); + final int shards = randomIntBetween(1, 5); + final int docsToCreatePerShard = randomIntBetween(0, 10); + + for (int i = 0; i < indices; ++i) { + indicesAdmin().prepareCreate(indexNamePrefix + i) + .setSettings(indexSettings(shards, 1)) + .setMapping("field", "type=long", "@timestamp", "type=date") + .get(); + for (int j = 0; j < shards; ++j) { + for (int d = 0; d < docsToCreatePerShard; ++d) { + client().prepareIndex(indexNamePrefix + i) + .setSource(Strings.format("{ \"field\":%d, \"@timestamp\": %d }", j, 10_000_000 + d + i + j), XContentType.JSON) + .get(); + } + } + } + indicesAdmin().refresh(new RefreshRequest(indexNamePrefix + "*")); + + final GetCheckpointAction.Request request = new GetCheckpointAction.Request( + new String[] { indexNamePrefix + "*" }, + IndicesOptions.LENIENT_EXPAND_OPEN, + // This query does not match any documents + QueryBuilders.rangeQuery("@timestamp").gte(20_000_000), + null, + TimeValue.timeValueSeconds(5) + ); + + final GetCheckpointAction.Response response = client().execute(GetCheckpointAction.INSTANCE, request).get(); + assertThat("Response was: " + response.getCheckpoints(), response.getCheckpoints(), is(anEmptyMap())); + } + public void testGetCheckpointTimeoutExceeded() throws Exception { final String indexNamePrefix = "test_index-"; final int indices = 100; @@ -111,6 +149,8 @@ public void testGetCheckpointTimeoutExceeded() throws Exception { final GetCheckpointAction.Request request = new GetCheckpointAction.Request( new String[] { indexNamePrefix + "*" }, IndicesOptions.LENIENT_EXPAND_OPEN, + null, + null, TimeValue.ZERO ); diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java index 1411576e61d58..300a075c9f1b2 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -71,6 +72,7 @@ public class TransformGetCheckpointTests extends ESSingleNodeTestCase { private IndicesService indicesService; private ThreadPool threadPool; private IndexNameExpressionResolver indexNameExpressionResolver; + private Client client; private MockTransport mockTransport; private Task transformTask; private final String indexNamePattern = "test_index-"; @@ -133,6 +135,8 @@ protected void onSendRequest(long requestId, String action, TransportRequest req .putCompatibilityVersions("node01", TransportVersions.V_8_5_0, Map.of()) .build(); + client = mock(Client.class); + transformTask = new Task( 1L, "persistent", @@ -157,6 +161,8 @@ public void testEmptyCheckpoint() throws InterruptedException { GetCheckpointAction.Request request = new GetCheckpointAction.Request( Strings.EMPTY_ARRAY, IndicesOptions.LENIENT_EXPAND_OPEN, + null, + null, TimeValue.timeValueSeconds(5) ); assertCheckpointAction(request, response -> { @@ -170,6 +176,8 @@ public void testSingleIndexRequest() throws InterruptedException { GetCheckpointAction.Request request = new GetCheckpointAction.Request( new String[] { indexNamePattern + "0" }, IndicesOptions.LENIENT_EXPAND_OPEN, + null, + null, TimeValue.timeValueSeconds(5) ); @@ -189,6 +197,8 @@ public void testMultiIndexRequest() throws InterruptedException { GetCheckpointAction.Request request = new GetCheckpointAction.Request( testIndices, IndicesOptions.LENIENT_EXPAND_OPEN, + null, + null, TimeValue.timeValueSeconds(5) ); assertCheckpointAction(request, response -> { @@ -208,7 +218,7 @@ public void testMultiIndexRequest() throws InterruptedException { class TestTransportGetCheckpointAction extends TransportGetCheckpointAction { TestTransportGetCheckpointAction() { - super(transportService, new ActionFilters(emptySet()), indicesService, clusterService, indexNameExpressionResolver); + super(transportService, new ActionFilters(emptySet()), indicesService, clusterService, indexNameExpressionResolver, client); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java index 5acc2d4541559..675d71a5d1db9 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java @@ -14,9 +14,15 @@ import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.UnavailableShardsException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchShardsAction; +import org.elasticsearch.action.search.SearchShardsGroup; +import org.elasticsearch.action.search.SearchShardsRequest; +import org.elasticsearch.action.search.SearchShardsResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -31,10 +37,12 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Request; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Response; @@ -57,6 +65,7 @@ public class TransportGetCheckpointAction extends HandledTransportAction listener) { - final ClusterState state = clusterService.state(); - resolveIndicesAndGetCheckpoint(task, request, listener, state); + final ClusterState clusterState = clusterService.state(); + resolveIndicesAndGetCheckpoint(task, request, listener, clusterState); } - protected void resolveIndicesAndGetCheckpoint(Task task, Request request, ActionListener listener, final ClusterState state) { + protected void resolveIndicesAndGetCheckpoint( + Task task, + Request request, + ActionListener listener, + final ClusterState clusterState + ) { + final String nodeId = clusterState.nodes().getLocalNode().getId(); + final TaskId parentTaskId = new TaskId(nodeId, task.getId()); + // note: when security is turned on, the indices are already resolved // TODO: do a quick check and only resolve if necessary?? - String[] concreteIndices = this.indexNameExpressionResolver.concreteIndexNames(state, request); - - Map> nodesAndShards = resolveIndicesToPrimaryShards(state, concreteIndices); - - if (nodesAndShards.size() == 0) { + String[] concreteIndices = this.indexNameExpressionResolver.concreteIndexNames(clusterState, request); + Map> nodesAndShards = resolveIndicesToPrimaryShards(clusterState, concreteIndices); + if (nodesAndShards.isEmpty()) { listener.onResponse(new Response(Collections.emptyMap())); return; } - new AsyncGetCheckpointsFromNodesAction(state, task, nodesAndShards, new OriginalIndices(request), request.getTimeout(), listener) - .start(); + if (request.getQuery() == null) { // If there is no query, then there is no point in filtering + getCheckpointsFromNodes(clusterState, task, nodesAndShards, new OriginalIndices(request), request.getTimeout(), listener); + return; + } + + SearchShardsRequest searchShardsRequest = new SearchShardsRequest( + request.indices(), + SearchRequest.DEFAULT_INDICES_OPTIONS, + request.getQuery(), + null, + null, + false, + request.getCluster() + ); + searchShardsRequest.setParentTask(parentTaskId); + ClientHelper.executeAsyncWithOrigin( + client, + ClientHelper.TRANSFORM_ORIGIN, + SearchShardsAction.INSTANCE, + searchShardsRequest, + ActionListener.wrap(searchShardsResponse -> { + Map> filteredNodesAndShards = filterOutSkippedShards(nodesAndShards, searchShardsResponse); + getCheckpointsFromNodes( + clusterState, + task, + filteredNodesAndShards, + new OriginalIndices(request), + request.getTimeout(), + listener + ); + }, e -> { + // search_shards API failed so we just log the error here and continue just like there was no query + logger.atWarn().withThrowable(e).log("search_shards API failed for cluster [{}]", request.getCluster()); + logger.atTrace() + .withThrowable(e) + .log("search_shards API failed for cluster [{}], request was [{}]", request.getCluster(), searchShardsRequest); + getCheckpointsFromNodes(clusterState, task, nodesAndShards, new OriginalIndices(request), request.getTimeout(), listener); + }) + ); } - private static Map> resolveIndicesToPrimaryShards(ClusterState state, String[] concreteIndices) { + private static Map> resolveIndicesToPrimaryShards(ClusterState clusterState, String[] concreteIndices) { if (concreteIndices.length == 0) { return Collections.emptyMap(); } - final DiscoveryNodes nodes = state.nodes(); + final DiscoveryNodes nodes = clusterState.nodes(); Map> nodesAndShards = new HashMap<>(); - ShardsIterator shardsIt = state.routingTable().allShards(concreteIndices); + ShardsIterator shardsIt = clusterState.routingTable().allShards(concreteIndices); for (ShardRouting shard : shardsIt) { // only take primary shards, which should be exactly 1, this isn't strictly necessary // and we should consider taking any shard copy, but then we need another way to de-dup @@ -112,7 +166,7 @@ private static Map> resolveIndicesToPrimaryShards(ClusterSt } if (shard.assignedToNode() && nodes.get(shard.currentNodeId()) != null) { // special case: The minimum TransportVersion in the cluster is on an old version - if (state.getMinTransportVersion().before(TransportVersions.V_8_2_0)) { + if (clusterState.getMinTransportVersion().before(TransportVersions.V_8_2_0)) { throw new ActionNotFoundTransportException(GetCheckpointNodeAction.NAME); } @@ -125,111 +179,128 @@ private static Map> resolveIndicesToPrimaryShards(ClusterSt return nodesAndShards; } - protected class AsyncGetCheckpointsFromNodesAction { - private final Task task; - private final ActionListener listener; - private final Map> nodesAndShards; - private final OriginalIndices originalIndices; - private final TimeValue timeout; - private final DiscoveryNodes nodes; - private final String localNodeId; - - protected AsyncGetCheckpointsFromNodesAction( - ClusterState clusterState, - Task task, - Map> nodesAndShards, - OriginalIndices originalIndices, - TimeValue timeout, - ActionListener listener - ) { - this.task = task; - this.listener = listener; - this.nodesAndShards = nodesAndShards; - this.originalIndices = originalIndices; - this.timeout = timeout; - this.nodes = clusterState.nodes(); - this.localNodeId = clusterService.localNode().getId(); + static Map> filterOutSkippedShards( + Map> nodesAndShards, + SearchShardsResponse searchShardsResponse + ) { + Map> filteredNodesAndShards = new HashMap<>(nodesAndShards.size()); + // Create a deep copy of the given nodes and shards map. + for (Map.Entry> nodeAndShardsEntry : nodesAndShards.entrySet()) { + String node = nodeAndShardsEntry.getKey(); + Set shards = nodeAndShardsEntry.getValue(); + filteredNodesAndShards.put(node, new HashSet<>(shards)); } - - public void start() { - GroupedActionListener groupedListener = new GroupedActionListener<>( - nodesAndShards.size(), - ActionListener.wrap(responses -> listener.onResponse(mergeNodeResponses(responses)), listener::onFailure) - ); - - for (Entry> oneNodeAndItsShards : nodesAndShards.entrySet()) { - if (task instanceof CancellableTask) { - // There is no point continuing this work if the task has been cancelled. - if (((CancellableTask) task).notifyIfCancelled(listener)) { - return; + // Remove (node, shard) pairs for all the skipped shards. + for (SearchShardsGroup shardGroup : searchShardsResponse.getGroups()) { + if (shardGroup.skipped()) { + for (String allocatedNode : shardGroup.allocatedNodes()) { + Set shards = filteredNodesAndShards.get(allocatedNode); + if (shards != null) { + shards.remove(shardGroup.shardId()); + if (shards.isEmpty()) { + // Remove node if no shards were left. + filteredNodesAndShards.remove(allocatedNode); + } } } - if (localNodeId.equals(oneNodeAndItsShards.getKey())) { - TransportGetCheckpointNodeAction.getGlobalCheckpoints( - indicesService, - task, - oneNodeAndItsShards.getValue(), - timeout, - Clock.systemUTC(), - groupedListener - ); - continue; - } + } + } + return filteredNodesAndShards; + } - GetCheckpointNodeAction.Request nodeCheckpointsRequest = new GetCheckpointNodeAction.Request( - oneNodeAndItsShards.getValue(), - originalIndices, - timeout - ); - DiscoveryNode node = nodes.get(oneNodeAndItsShards.getKey()); - - // paranoia: this should not be possible using the same cluster state - if (node == null) { - listener.onFailure( - new UnavailableShardsException( - oneNodeAndItsShards.getValue().iterator().next(), - "Node not found for [{}] shards", - oneNodeAndItsShards.getValue().size() - ) - ); + private void getCheckpointsFromNodes( + ClusterState clusterState, + Task task, + Map> nodesAndShards, + OriginalIndices originalIndices, + TimeValue timeout, + ActionListener listener + ) { + if (nodesAndShards.isEmpty()) { + listener.onResponse(new Response(Map.of())); + return; + } + + final String localNodeId = clusterService.localNode().getId(); + + GroupedActionListener groupedListener = new GroupedActionListener<>( + nodesAndShards.size(), + ActionListener.wrap(responses -> listener.onResponse(mergeNodeResponses(responses)), listener::onFailure) + ); + + for (Entry> oneNodeAndItsShards : nodesAndShards.entrySet()) { + if (task instanceof CancellableTask) { + // There is no point continuing this work if the task has been cancelled. + if (((CancellableTask) task).notifyIfCancelled(listener)) { return; } - - logger.trace("get checkpoints from node {}", node); - transportService.sendChildRequest( - node, - GetCheckpointNodeAction.NAME, - nodeCheckpointsRequest, + } + if (localNodeId.equals(oneNodeAndItsShards.getKey())) { + TransportGetCheckpointNodeAction.getGlobalCheckpoints( + indicesService, task, - TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>( - groupedListener, - GetCheckpointNodeAction.Response::new, - TransportResponseHandler.TRANSPORT_WORKER + oneNodeAndItsShards.getValue(), + timeout, + Clock.systemUTC(), + groupedListener + ); + continue; + } + + DiscoveryNodes nodes = clusterState.nodes(); + DiscoveryNode node = nodes.get(oneNodeAndItsShards.getKey()); + + // paranoia: this should not be possible using the same cluster state + if (node == null) { + listener.onFailure( + new UnavailableShardsException( + oneNodeAndItsShards.getValue().iterator().next(), + "Node not found for [{}] shards", + oneNodeAndItsShards.getValue().size() ) ); + return; } + + logger.trace("get checkpoints from node {}", node); + GetCheckpointNodeAction.Request nodeCheckpointsRequest = new GetCheckpointNodeAction.Request( + oneNodeAndItsShards.getValue(), + originalIndices, + timeout + ); + transportService.sendChildRequest( + node, + GetCheckpointNodeAction.NAME, + nodeCheckpointsRequest, + task, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + groupedListener, + GetCheckpointNodeAction.Response::new, + TransportResponseHandler.TRANSPORT_WORKER + ) + ); } + } - private static Response mergeNodeResponses(Collection responses) { - // the final list should be ordered by key - Map checkpointsByIndexReduced = new TreeMap<>(); - - // merge the node responses - for (GetCheckpointNodeAction.Response response : responses) { - response.getCheckpoints().forEach((index, checkpoint) -> { - if (checkpointsByIndexReduced.containsKey(index)) { - long[] shardCheckpoints = checkpointsByIndexReduced.get(index); - for (int i = 0; i < checkpoint.length; ++i) { - shardCheckpoints[i] = Math.max(shardCheckpoints[i], checkpoint[i]); - } - } else { - checkpointsByIndexReduced.put(index, checkpoint); - } - }); - } + private static Response mergeNodeResponses(Collection responses) { + // the final list should be ordered by key + Map checkpointsByIndexReduced = new TreeMap<>(); - return new Response(checkpointsByIndexReduced); + // merge the node responses + for (GetCheckpointNodeAction.Response response : responses) { + response.getCheckpoints().forEach((index, checkpoint) -> { + if (checkpointsByIndexReduced.containsKey(index)) { + long[] shardCheckpoints = checkpointsByIndexReduced.get(index); + for (int i = 0; i < checkpoint.length; ++i) { + shardCheckpoints[i] = Math.max(shardCheckpoints[i], checkpoint[i]); + } + } else { + checkpointsByIndexReduced.put(index, checkpoint); + } + }); } + + return new Response(checkpointsByIndexReduced); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java index 13abc427460be..f7e60b13b50a6 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java @@ -121,14 +121,15 @@ protected void taskOperation( TransformTask transformTask, ActionListener listener ) { - // Little extra insurance, make sure we only return transforms that aren't cancelled ClusterState clusterState = clusterService.state(); String nodeId = clusterState.nodes().getLocalNode().getId(); final TaskId parentTaskId = new TaskId(nodeId, actionTask.getId()); + // If the _stats request is cancelled there is no point in continuing this work on the task level if (actionTask.notifyIfCancelled(listener)) { return; } + // Little extra insurance, make sure we only return transforms that aren't cancelled if (transformTask.isCancelled()) { listener.onResponse(new Response(Collections.emptyList())); return; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index aa1332b95fe84..b9b7d9d8477cb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.xpack.core.ClientHelper; @@ -134,14 +135,16 @@ protected void getIndexCheckpoints(TimeValue timeout, ActionListener> remoteIndex : resolvedIndexes.getRemoteIndicesPerClusterAlias().entrySet()) { + String cluster = remoteIndex.getKey(); ParentTaskAssigningClient remoteClient = new ParentTaskAssigningClient( - client.getRemoteClusterClient(remoteIndex.getKey(), EsExecutors.DIRECT_EXECUTOR_SERVICE), + client.getRemoteClusterClient(cluster, EsExecutors.DIRECT_EXECUTOR_SERVICE), client.getParentTask() ); getCheckpointsFromOneCluster( @@ -149,7 +152,8 @@ protected void getIndexCheckpoints(TimeValue timeout, ActionListener headers, String[] indices, + QueryBuilder query, String cluster, ActionListener> listener ) { if (fallbackToBWC.contains(cluster)) { getCheckpointsFromOneClusterBWC(client, timeout, headers, indices, cluster, listener); } else { - getCheckpointsFromOneClusterV2(client, timeout, headers, indices, cluster, ActionListener.wrap(response -> { + getCheckpointsFromOneClusterV2(client, timeout, headers, indices, query, cluster, ActionListener.wrap(response -> { logger.debug( "[{}] Successfully retrieved checkpoints from cluster [{}] using transform checkpoint API", transformConfig.getId(), @@ -200,12 +205,15 @@ private static void getCheckpointsFromOneClusterV2( TimeValue timeout, Map headers, String[] indices, + QueryBuilder query, String cluster, ActionListener> listener ) { GetCheckpointAction.Request getCheckpointRequest = new GetCheckpointAction.Request( indices, IndicesOptions.LENIENT_EXPAND_OPEN, + query, + cluster, timeout ); ActionListener checkpointListener; @@ -239,7 +247,6 @@ private static void getCheckpointsFromOneClusterV2( getCheckpointRequest, checkpointListener ); - } /** diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointActionTests.java new file mode 100644 index 0000000000000..0d2d9619aca68 --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointActionTests.java @@ -0,0 +1,134 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.action; + +import org.elasticsearch.action.search.SearchShardsGroup; +import org.elasticsearch.action.search.SearchShardsResponse; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class TransportGetCheckpointActionTests extends ESTestCase { + + private static final String NODE_0 = "node-0"; + private static final String NODE_1 = "node-1"; + private static final String NODE_2 = "node-2"; + private static final Index INDEX_A = new Index("my-index-A", "A"); + private static final Index INDEX_B = new Index("my-index-B", "B"); + private static final Index INDEX_C = new Index("my-index-C", "C"); + private static final ShardId SHARD_A_0 = new ShardId(INDEX_A, 0); + private static final ShardId SHARD_A_1 = new ShardId(INDEX_A, 1); + private static final ShardId SHARD_B_0 = new ShardId(INDEX_B, 0); + private static final ShardId SHARD_B_1 = new ShardId(INDEX_B, 1); + + private static final Map> NODES_AND_SHARDS = Map.of( + NODE_0, + Set.of(SHARD_A_0, SHARD_A_1, SHARD_B_0, SHARD_B_1), + NODE_1, + Set.of(SHARD_A_0, SHARD_A_1, SHARD_B_0, SHARD_B_1), + NODE_2, + Set.of(SHARD_A_0, SHARD_A_1, SHARD_B_0, SHARD_B_1) + ); + + public void testFilterOutSkippedShards_EmptyNodesAndShards() { + SearchShardsResponse searchShardsResponse = new SearchShardsResponse( + Set.of( + new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_1), true), + new SearchShardsGroup(SHARD_B_0, List.of(NODE_1, NODE_2), false), + new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_2), true) + ), + Set.of(), + Map.of() + ); + Map> filteredNodesAndShards = TransportGetCheckpointAction.filterOutSkippedShards( + Map.of(), + searchShardsResponse + ); + assertThat(filteredNodesAndShards, is(anEmptyMap())); + } + + public void testFilterOutSkippedShards_EmptySearchShardsResponse() { + SearchShardsResponse searchShardsResponse = new SearchShardsResponse(Set.of(), Set.of(), Map.of()); + Map> filteredNodesAndShards = TransportGetCheckpointAction.filterOutSkippedShards( + NODES_AND_SHARDS, + searchShardsResponse + ); + assertThat(filteredNodesAndShards, is(equalTo(NODES_AND_SHARDS))); + } + + public void testFilterOutSkippedShards_SomeNodesEmptyAfterFiltering() { + SearchShardsResponse searchShardsResponse = new SearchShardsResponse( + Set.of( + new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_2), true), + new SearchShardsGroup(SHARD_A_1, List.of(NODE_0, NODE_2), true), + new SearchShardsGroup(SHARD_B_0, List.of(NODE_0, NODE_2), true), + new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_2), true) + ), + Set.of(), + Map.of() + ); + Map> filteredNodesAndShards = TransportGetCheckpointAction.filterOutSkippedShards( + NODES_AND_SHARDS, + searchShardsResponse + ); + Map> expectedFilteredNodesAndShards = Map.of(NODE_1, Set.of(SHARD_A_0, SHARD_A_1, SHARD_B_0, SHARD_B_1)); + assertThat(filteredNodesAndShards, is(equalTo(expectedFilteredNodesAndShards))); + } + + public void testFilterOutSkippedShards_AllNodesEmptyAfterFiltering() { + SearchShardsResponse searchShardsResponse = new SearchShardsResponse( + Set.of( + new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_1, NODE_2), true), + new SearchShardsGroup(SHARD_A_1, List.of(NODE_0, NODE_1, NODE_2), true), + new SearchShardsGroup(SHARD_B_0, List.of(NODE_0, NODE_1, NODE_2), true), + new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_1, NODE_2), true) + ), + Set.of(), + Map.of() + ); + Map> filteredNodesAndShards = TransportGetCheckpointAction.filterOutSkippedShards( + NODES_AND_SHARDS, + searchShardsResponse + ); + assertThat(filteredNodesAndShards, is(equalTo(Map.of()))); + } + + public void testFilterOutSkippedShards() { + SearchShardsResponse searchShardsResponse = new SearchShardsResponse( + Set.of( + new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_1), true), + new SearchShardsGroup(SHARD_B_0, List.of(NODE_1, NODE_2), false), + new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_2), true), + new SearchShardsGroup(new ShardId(INDEX_C, 0), List.of(NODE_0, NODE_1, NODE_2), true) + ), + Set.of(), + Map.of() + ); + Map> filteredNodesAndShards = TransportGetCheckpointAction.filterOutSkippedShards( + NODES_AND_SHARDS, + searchShardsResponse + ); + Map> expectedFilteredNodesAndShards = Map.of( + NODE_0, + Set.of(SHARD_A_1, SHARD_B_0), + NODE_1, + Set.of(SHARD_A_1, SHARD_B_0, SHARD_B_1), + NODE_2, + Set.of(SHARD_A_0, SHARD_A_1, SHARD_B_0) + ); + assertThat(filteredNodesAndShards, is(equalTo(expectedFilteredNodesAndShards))); + } +} From 4106ad274dc64fab48f61ce6392202a1e76f0634 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 20 Nov 2023 07:07:28 -0800 Subject: [PATCH 3/8] Fix AsyncOperatorTests#testFailure (#102335) There is a bug in the test where we check the failed flag immediately after the Driver starts, instead of waiting until the Driver has completed. Also, we should check for failure in the AsyncOperator. Closes #102264 --- .../compute/operator/AsyncOperator.java | 1 + .../compute/operator/AsyncOperatorTests.java | 13 +++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java index 8eea50226253b..f2011d1cdb987 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java @@ -166,6 +166,7 @@ public void finish() { @Override public boolean isFinished() { + checkFailure(); return finished && checkpoint.getPersistedCheckpoint() == checkpoint.getMaxSeqNo(); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java index b35dc8b7b2e80..00b046abdca24 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java @@ -185,7 +185,6 @@ protected void doClose() { operator.close(); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/102264") public void testFailure() throws Exception { DriverContext driverContext = driverContext(); final SequenceLongBlockSourceOperator sourceOperator = new SequenceLongBlockSourceOperator( @@ -226,15 +225,17 @@ protected void doClose() { PlainActionFuture future = new PlainActionFuture<>(); Driver driver = new Driver(driverContext, sourceOperator, List.of(asyncOperator), outputOperator, () -> {}); Driver.start(threadPool.getThreadContext(), threadPool.executor(ESQL_TEST_EXECUTOR), driver, between(1, 1000), future); - assertBusy(() -> { - assertTrue(asyncOperator.isFinished()); - assertTrue(future.isDone()); - }); + assertBusy(() -> assertTrue(future.isDone())); if (failed.get()) { ElasticsearchException error = expectThrows(ElasticsearchException.class, future::actionGet); assertThat(error.getMessage(), containsString("simulated")); + error = expectThrows(ElasticsearchException.class, asyncOperator::isFinished); + assertThat(error.getMessage(), containsString("simulated")); + error = expectThrows(ElasticsearchException.class, asyncOperator::getOutput); + assertThat(error.getMessage(), containsString("simulated")); } else { - future.actionGet(); + assertTrue(asyncOperator.isFinished()); + assertNull(asyncOperator.getOutput()); } } From 588eabe185ad319c0268a13480465966cef058cd Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 20 Nov 2023 07:44:22 -0800 Subject: [PATCH 4/8] Avoid spawn new nodes in EsqlActionIT (#102363) We don't need to launch new nodes in these tests if we already have at least two data nodes. --- .../xpack/esql/action/EsqlActionIT.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index 06fd9bd469b84..a141db7037263 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.internal.ClusterAdminClient; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; @@ -1242,8 +1243,15 @@ public void testFilterNestedFields() { } public void testStatsNestFields() { - String node1 = internalCluster().startDataOnlyNode(); - String node2 = internalCluster().startDataOnlyNode(); + final String node1, node2; + if (randomBoolean()) { + internalCluster().ensureAtLeastNumDataNodes(2); + node1 = randomDataNode().getName(); + node2 = randomValueOtherThan(node1, () -> randomDataNode().getName()); + } else { + node1 = randomDataNode().getName(); + node2 = randomDataNode().getName(); + } assertAcked( client().admin() .indices() @@ -1276,8 +1284,15 @@ public void testStatsNestFields() { } public void testStatsMissingFields() { - String node1 = internalCluster().startDataOnlyNode(); - String node2 = internalCluster().startDataOnlyNode(); + final String node1, node2; + if (randomBoolean()) { + internalCluster().ensureAtLeastNumDataNodes(2); + node1 = randomDataNode().getName(); + node2 = randomValueOtherThan(node1, () -> randomDataNode().getName()); + } else { + node1 = randomDataNode().getName(); + node2 = randomDataNode().getName(); + } assertAcked( client().admin() .indices() @@ -1292,7 +1307,6 @@ public void testStatsMissingFields() { .setSettings(Settings.builder().put("index.routing.allocation.require._name", node2)) .setMapping("bar_int", "type=integer", "bar_long", "type=long", "bar_float", "type=float", "bar_double", "type=double") ); - var fields = List.of("foo_int", "foo_long", "foo_float", "foo_double"); var functions = List.of("sum", "count", "avg", "count_distinct"); for (String field : fields) { @@ -1510,4 +1524,8 @@ private void clearPersistentSettings(Setting... settings) { var clearSettingsRequest = new ClusterUpdateSettingsRequest().persistentSettings(clearedSettings.build()); admin().cluster().updateSettings(clearSettingsRequest).actionGet(); } + + private DiscoveryNode randomDataNode() { + return randomFrom(clusterService().state().nodes().getDataNodes().values()); + } } From 7cc711ad25898d1659662a464df84175606bb289 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 20 Nov 2023 17:22:19 +0000 Subject: [PATCH 5/8] Add test logging to S3BlobStoreRepositoryTests#testMetrics (#102387) Relates #101608 --- .../repositories/s3/S3BlobStoreRepositoryTests.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index aee61361ebd10..67b0202ac1eae 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -57,7 +57,7 @@ import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.junit.annotations.TestIssueLogging; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.XContentFactory; @@ -179,7 +179,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { } @Override - @TestLogging(reason = "Enable request logging to debug #88841", value = "com.amazonaws.request:DEBUG") + @TestIssueLogging(issueUrl = "https://github.com/elastic/elasticsearch/issues/88841", value = "com.amazonaws.request:DEBUG") public void testRequestStats() throws Exception { super.testRequestStats(); } @@ -225,6 +225,7 @@ public void testAbortRequestStats() throws Exception { assertEquals(assertionErrorMsg, mockCalls, sdkRequestCounts); } + @TestIssueLogging(issueUrl = "https://github.com/elastic/elasticsearch/issues/101608", value = "com.amazonaws.request:DEBUG") public void testMetrics() throws Exception { // Create the repository and perform some activities final String repository = createRepository(randomRepositoryName()); From c0782c1289059cbbebc8171de3e7688e604d9b14 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 20 Nov 2023 09:45:16 -0800 Subject: [PATCH 6/8] Move ESQL yaml tests to core (#102340) This PR moves ESQL YAML tests to the core so that we can run them with different modules (e.g., the mixed cluster tests) and even with serverless in the future. YAML tests for other plugins are also in the core. --- x-pack/plugin/esql/qa/server/single-node/build.gradle | 7 +++---- .../resources/rest-api-spec/test/esql}/100_bug_fix.yml | 9 +++++++-- .../resources/rest-api-spec/test/esql}/10_basic.yml | 2 ++ .../resources/rest-api-spec/test/esql}/20_aggs.yml | 2 ++ .../resources/rest-api-spec/test/esql}/30_types.yml | 2 ++ .../resources/rest-api-spec/test/esql}/40_tsdb.yml | 2 ++ .../rest-api-spec/test/esql}/40_unsupported_types.yml | 2 ++ .../rest-api-spec/test/esql}/45_non_tsdb_counter.yml | 2 ++ .../rest-api-spec/test/esql}/50_index_patterns.yml | 2 ++ .../resources/rest-api-spec/test/esql}/60_enrich.yml | 7 +++++++ .../resources/rest-api-spec/test/esql}/60_usage.yml | 4 ++++ .../resources/rest-api-spec/test/esql}/61_enrich_ip.yml | 7 +++++++ .../resources/rest-api-spec/test/esql}/70_locale.yml | 2 ++ .../resources/rest-api-spec/test/esql}/80_text.yml | 2 ++ .../rest-api-spec/test/esql}/90_non_indexed.yml | 2 ++ 15 files changed, 48 insertions(+), 6 deletions(-) rename x-pack/plugin/{esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test => src/yamlRestTest/resources/rest-api-spec/test/esql}/100_bug_fix.yml (94%) rename x-pack/plugin/{esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test => src/yamlRestTest/resources/rest-api-spec/test/esql}/10_basic.yml (99%) rename x-pack/plugin/{esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test => src/yamlRestTest/resources/rest-api-spec/test/esql}/20_aggs.yml (99%) rename x-pack/plugin/{esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test => src/yamlRestTest/resources/rest-api-spec/test/esql}/30_types.yml (99%) rename x-pack/plugin/{esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test => src/yamlRestTest/resources/rest-api-spec/test/esql}/40_tsdb.yml (99%) rename x-pack/plugin/{esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test => src/yamlRestTest/resources/rest-api-spec/test/esql}/40_unsupported_types.yml (99%) rename x-pack/plugin/{esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test => src/yamlRestTest/resources/rest-api-spec/test/esql}/45_non_tsdb_counter.yml (98%) rename x-pack/plugin/{esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test => src/yamlRestTest/resources/rest-api-spec/test/esql}/50_index_patterns.yml (99%) rename x-pack/plugin/{esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test => src/yamlRestTest/resources/rest-api-spec/test/esql}/60_enrich.yml (95%) rename x-pack/plugin/{esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test => src/yamlRestTest/resources/rest-api-spec/test/esql}/60_usage.yml (96%) rename x-pack/plugin/{esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test => src/yamlRestTest/resources/rest-api-spec/test/esql}/61_enrich_ip.yml (94%) rename x-pack/plugin/{esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test => src/yamlRestTest/resources/rest-api-spec/test/esql}/70_locale.yml (96%) rename x-pack/plugin/{esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test => src/yamlRestTest/resources/rest-api-spec/test/esql}/80_text.yml (99%) rename x-pack/plugin/{esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test => src/yamlRestTest/resources/rest-api-spec/test/esql}/90_non_indexed.yml (97%) diff --git a/x-pack/plugin/esql/qa/server/single-node/build.gradle b/x-pack/plugin/esql/qa/server/single-node/build.gradle index 3131b4176ee25..2d430965efb21 100644 --- a/x-pack/plugin/esql/qa/server/single-node/build.gradle +++ b/x-pack/plugin/esql/qa/server/single-node/build.gradle @@ -9,10 +9,9 @@ restResources { restApi { include '_common', 'bulk', 'get', 'indices', 'esql', 'xpack', 'enrich', 'cluster' } -} - -artifacts { - restXpackTests(new File(projectDir, "src/yamlRestTest/resources/rest-api-spec/test")) + restTests { + includeXpack 'esql' + } } testClusters.configureEach { diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/100_bug_fix.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/100_bug_fix.yml similarity index 94% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/100_bug_fix.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/100_bug_fix.yml index d5f5bee46f50a..1876d1a6d3881 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/100_bug_fix.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/100_bug_fix.yml @@ -1,6 +1,8 @@ --- -"Bug fix https://github.com/elastic/elasticsearch/issues/99472": +"Coalesce and to_ip functions": - skip: + version: " - 8.11.99" + reason: "fixes in 8.12 or later" features: warnings - do: bulk: @@ -54,7 +56,10 @@ - match: { values.1: [ 20, null, "255.255.255.255", "255.255.255.255"] } --- -"Bug fix https://github.com/elastic/elasticsearch/issues/101489": +"unsupported and invalid mapped fields": + - skip: + version: " - 8.11.99" + reason: "fixes in 8.12 or later" - do: indices.create: index: index1 diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/10_basic.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml similarity index 99% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/10_basic.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml index a3b2de27bcb5b..e15372bc3088e 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/10_basic.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml @@ -1,6 +1,8 @@ --- setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/20_aggs.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/20_aggs.yml similarity index 99% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/20_aggs.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/20_aggs.yml index e94cb6ccd8e3c..4019b3a303345 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/20_aggs.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/20_aggs.yml @@ -1,6 +1,8 @@ --- setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: warnings - do: indices.create: diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/30_types.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/30_types.yml similarity index 99% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/30_types.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/30_types.yml index bf159455d00ca..406ae169872a2 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/30_types.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/30_types.yml @@ -1,6 +1,8 @@ --- setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: warnings --- diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/40_tsdb.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml similarity index 99% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/40_tsdb.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml index 6a90fc5a7b8f8..1f9dc67dbfbbd 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/40_tsdb.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml @@ -1,5 +1,7 @@ setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/40_unsupported_types.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_unsupported_types.yml similarity index 99% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/40_unsupported_types.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_unsupported_types.yml index c06456f7f127d..be5b43433983e 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/40_unsupported_types.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_unsupported_types.yml @@ -1,5 +1,7 @@ setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/45_non_tsdb_counter.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/45_non_tsdb_counter.yml similarity index 98% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/45_non_tsdb_counter.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/45_non_tsdb_counter.yml index beb7200f01230..13a88d0c2f79f 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/45_non_tsdb_counter.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/45_non_tsdb_counter.yml @@ -1,5 +1,7 @@ setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/50_index_patterns.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/50_index_patterns.yml similarity index 99% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/50_index_patterns.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/50_index_patterns.yml index 2098b9ee60d1e..38023b7791709 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/50_index_patterns.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/50_index_patterns.yml @@ -1,5 +1,7 @@ setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex --- diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/60_enrich.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_enrich.yml similarity index 95% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/60_enrich.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_enrich.yml index 84d8682508733..1673453824584 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/60_enrich.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_enrich.yml @@ -1,6 +1,8 @@ --- setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: @@ -127,3 +129,8 @@ setup: - match: { values.1: [ "Bob", "nyc", "USA" ] } - match: { values.2: [ "Denise", "sgn", null ] } - match: { values.3: [ "Mario", "rom", "Italy" ] } + + - do: + enrich.delete_policy: + name: cities_policy + - is_true: acknowledged diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/60_usage.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml similarity index 96% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/60_usage.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml index d7998651540d8..ad46a3c2d9c3e 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/60_usage.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml @@ -1,5 +1,9 @@ --- setup: + - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" + - do: indices.create: index: test diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/61_enrich_ip.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/61_enrich_ip.yml similarity index 94% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/61_enrich_ip.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/61_enrich_ip.yml index bd89af2fd3f79..0d49f169fc4b2 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/61_enrich_ip.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/61_enrich_ip.yml @@ -1,6 +1,8 @@ --- setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: @@ -95,3 +97,8 @@ setup: - match: { values.1: [ [ "10.100.0.21", "10.101.0.107" ], [ "Production", "QA" ], [ "OPS","Engineering" ], "sending messages" ] } - match: { values.2: [ "10.101.0.107" , "QA", "Engineering", "network disconnected" ] } - match: { values.3: [ "13.101.0.114" , null, null, "authentication failed" ] } + + - do: + enrich.delete_policy: + name: networks-policy + - is_true: acknowledged diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/70_locale.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/70_locale.yml similarity index 96% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/70_locale.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/70_locale.yml index a77e0569668de..bcae5e7cf24a2 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/70_locale.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/70_locale.yml @@ -1,6 +1,8 @@ --- setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/80_text.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/80_text.yml similarity index 99% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/80_text.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/80_text.yml index d6d20fa0a0aee..cef7f88506de8 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/80_text.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/80_text.yml @@ -1,6 +1,8 @@ --- setup: - skip: + version: " - 8.10.99" + reason: "ESQL is available in 8.11+" features: allowed_warnings_regex - do: indices.create: diff --git a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/90_non_indexed.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/90_non_indexed.yml similarity index 97% rename from x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/90_non_indexed.yml rename to x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/90_non_indexed.yml index 9138a9454c571..c6124e7f75e96 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/90_non_indexed.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/90_non_indexed.yml @@ -1,5 +1,7 @@ setup: - skip: + version: " - 8.11.99" + reason: "extracting non-indexed fields available in 8.12+" features: allowed_warnings - do: indices.create: From 86e05996c171b8068104d5dcb130149cf7d56b29 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 20 Nov 2023 18:11:04 +0000 Subject: [PATCH 7/8] Add descriptions to assertions in testMetrics (#102386) In #101608 we saw one of these assertions fail, but it's impossible to know which one without some more details. This commit adds descriptions to the assertions in the loop. --- .../repositories/s3/S3BlobStoreRepositoryTests.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 67b0202ac1eae..7f46440647a54 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -282,8 +282,12 @@ public void testMetrics() throws Exception { operation, OperationPurpose.parse((String) metric.attributes().get("purpose")) ); - assertThat(statsCollectors, hasKey(statsKey)); - assertThat(metric.getLong(), equalTo(statsCollectors.get(statsKey).counter.sum())); + assertThat(nodeName + "/" + statsKey + " exists", statsCollectors, hasKey(statsKey)); + assertThat( + nodeName + "/" + statsKey + " has correct sum", + metric.getLong(), + equalTo(statsCollectors.get(statsKey).counter.sum()) + ); aggregatedMetrics.compute(operation.getKey(), (k, v) -> v == null ? metric.getLong() : v + metric.getLong()); }); From 36a2f9bc43900a77e6d88c5c22b7664be5891104 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 20 Nov 2023 18:44:21 +0000 Subject: [PATCH 8/8] Remove exception mangling around LoggingTaskListener (#102369) Replaces `ListenableActionFuture` with `SubscribableListener` at both call sites. --- .../reindex/AbstractBaseReindexRestHandler.java | 8 ++++---- .../rest/action/admin/indices/RestForceMergeAction.java | 8 ++++---- .../java/org/elasticsearch/tasks/LoggingTaskListener.java | 1 - 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractBaseReindexRestHandler.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractBaseReindexRestHandler.java index 952dd0585e7ba..8e7fab68ac697 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractBaseReindexRestHandler.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractBaseReindexRestHandler.java @@ -11,7 +11,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActiveShardCount; -import org.elasticsearch.action.support.ListenableActionFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; @@ -64,9 +64,9 @@ protected RestChannelConsumer doPrepareRequest(RestRequest request, NodeClient c if (validationException != null) { throw validationException; } - final var responseFuture = new ListenableActionFuture(); - final var task = client.executeLocally(action, internal, responseFuture); - responseFuture.addListener(new LoggingTaskListener<>(task)); + final var responseListener = new SubscribableListener(); + final var task = client.executeLocally(action, internal, responseListener); + responseListener.addListener(new LoggingTaskListener<>(task)); return sendTask(client.getLocalNodeId(), task); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestForceMergeAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestForceMergeAction.java index a04e23f289379..4c9ac8fcb9a3c 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestForceMergeAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestForceMergeAction.java @@ -13,7 +13,7 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.ListenableActionFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.Strings; import org.elasticsearch.rest.BaseRestHandler; @@ -65,9 +65,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC if (validationException != null) { throw validationException; } - final var responseFuture = new ListenableActionFuture(); - final var task = client.executeLocally(ForceMergeAction.INSTANCE, mergeRequest, responseFuture); - responseFuture.addListener(new LoggingTaskListener<>(task)); + final var responseListener = new SubscribableListener(); + final var task = client.executeLocally(ForceMergeAction.INSTANCE, mergeRequest, responseListener); + responseListener.addListener(new LoggingTaskListener<>(task)); return sendTask(client.getLocalNodeId(), task); } } diff --git a/server/src/main/java/org/elasticsearch/tasks/LoggingTaskListener.java b/server/src/main/java/org/elasticsearch/tasks/LoggingTaskListener.java index c99194d933131..63e17bd62f8ee 100644 --- a/server/src/main/java/org/elasticsearch/tasks/LoggingTaskListener.java +++ b/server/src/main/java/org/elasticsearch/tasks/LoggingTaskListener.java @@ -31,7 +31,6 @@ public LoggingTaskListener(Task task) { @Override public void onResponse(Response response) { logger.info("{} finished with response {}", task.getId(), response); - } @Override