From 9cd96df179d581567c7e45ea4d8bccc4fb89e083 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 21 Nov 2023 15:35:49 +0100 Subject: [PATCH] Add support for index_filter to open pit (#102388) The open point in time API accepts a list of indices and opens a point in time view against those indices. Like we do already for field caps, this commit allows users to provide an index_filter parameter as part of the request body, that will be used to execute the can match phase and exclude the indices that can't possibly match such filter. Closes #99740 --- docs/changelog/102388.yaml | 6 ++ .../search/point-in-time-api.asciidoc | 9 ++- .../rest/yaml/CcsCommonYamlTestSuiteIT.java | 5 +- .../rest-api-spec/api/open_point_in_time.json | 6 +- .../test/search/350_point_in_time.yml | 37 +++++++++-- .../action/search/PointInTimeIT.java | 50 ++++++++++++++ .../org/elasticsearch/TransportVersions.java | 1 + .../action/search/OpenPointInTimeRequest.java | 17 +++++ .../search/RestOpenPointInTimeAction.java | 19 +++++- .../TransportOpenPointInTimeAction.java | 66 ++++++++++++++++++- x-pack/qa/runtime-fields/build.gradle | 1 + 11 files changed, 205 insertions(+), 12 deletions(-) create mode 100644 docs/changelog/102388.yaml diff --git a/docs/changelog/102388.yaml b/docs/changelog/102388.yaml new file mode 100644 index 0000000000000..3e65e46949bda --- /dev/null +++ b/docs/changelog/102388.yaml @@ -0,0 +1,6 @@ +pr: 102388 +summary: Add support for `index_filter` to open pit +area: Search +type: enhancement +issues: + - 99740 diff --git a/docs/reference/search/point-in-time-api.asciidoc b/docs/reference/search/point-in-time-api.asciidoc index 0403f9b04b2d1..2e32324cb44d9 100644 --- a/docs/reference/search/point-in-time-api.asciidoc +++ b/docs/reference/search/point-in-time-api.asciidoc @@ -22,6 +22,13 @@ or alias. To search a <> for an alias, you must have the `read` index privilege for the alias's data streams or indices. +[[point-in-time-api-request-body]] +==== {api-request-body-title} + +`index_filter`:: +(Optional, <> Allows to filter indices if the provided +query rewrites to `match_none` on every shard. + [[point-in-time-api-example]] ==== {api-examples-title} @@ -60,7 +67,7 @@ POST /_search <1> or <> as these parameters are copied from the point in time. <2> Just like regular searches, you can <>, up to the first 10,000 hits. If you +`size` to page through search results>>, up to the first 10,000 hits. If you want to retrieve more hits, use PIT with <>. <3> The `id` parameter tells Elasticsearch to execute the request using contexts from this point in time. diff --git a/qa/ccs-common-rest/src/yamlRestTest/java/org/elasticsearch/test/rest/yaml/CcsCommonYamlTestSuiteIT.java b/qa/ccs-common-rest/src/yamlRestTest/java/org/elasticsearch/test/rest/yaml/CcsCommonYamlTestSuiteIT.java index 5ad525b472b12..7c1514d2d1a6a 100644 --- a/qa/ccs-common-rest/src/yamlRestTest/java/org/elasticsearch/test/rest/yaml/CcsCommonYamlTestSuiteIT.java +++ b/qa/ccs-common-rest/src/yamlRestTest/java/org/elasticsearch/test/rest/yaml/CcsCommonYamlTestSuiteIT.java @@ -389,10 +389,7 @@ private boolean shouldReplaceIndexWithRemote(String apiName) { if (apiName.equals("search") || apiName.equals("msearch") || apiName.equals("async_search.submit")) { final String testCandidateTestPath = testCandidate.getTestPath(); - if (testCandidateTestPath.equals("search/350_point_in_time/basic") - || testCandidateTestPath.equals("search/350_point_in_time/point-in-time with slicing") - || testCandidateTestPath.equals("search/350_point_in_time/msearch") - || testCandidateTestPath.equals("search/350_point_in_time/wildcard") + if (testCandidateTestPath.startsWith("search/350_point_in_time") || testCandidateTestPath.equals("async_search/20-with-poin-in-time/Async search with point in time")) { return false; } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/open_point_in_time.json b/rest-api-spec/src/main/resources/rest-api-spec/api/open_point_in_time.json index a25c3fee32571..bce8dfd794dca 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/open_point_in_time.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/open_point_in_time.json @@ -7,7 +7,8 @@ "stability":"stable", "visibility":"public", "headers":{ - "accept": [ "application/json"] + "accept": [ "application/json"], + "content_type": ["application/json"] }, "url":{ "paths":[ @@ -55,6 +56,9 @@ "description": "Specific the time to live for the point in time", "required": true } + }, + "body":{ + "description":"An index_filter specified with the Query DSL" } } } diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search/350_point_in_time.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search/350_point_in_time.yml index bc3479b705180..7e78450931df5 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search/350_point_in_time.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search/350_point_in_time.yml @@ -6,19 +6,19 @@ setup: index: index: test id: "1" - body: { id: 1, foo: bar, age: 18 } + body: { id: 1, foo: bar, age: 18, birth: "2022-01-01" } - do: index: index: test id: "42" - body: { id: 42, foo: bar, age: 18 } + body: { id: 42, foo: bar, age: 18, birth: "2022-02-01" } - do: index: index: test id: "172" - body: { id: 172, foo: bar, age: 24 } + body: { id: 172, foo: bar, age: 24, birth: "2022-03-01" } - do: indices.create: @@ -28,7 +28,7 @@ setup: index: index: test2 id: "45" - body: { id: 45, foo: bar, age: 19 } + body: { id: 45, foo: bar, age: 19, birth: "2023-01-01" } - do: indices.refresh: @@ -235,3 +235,32 @@ setup: close_point_in_time: body: id: "$point_in_time_id" + +--- +"point-in-time with index filter": + - skip: + version: " - 8.11.99" + reason: "support for index filter was added in 8.12" + - do: + open_point_in_time: + index: test* + keep_alive: 5m + body: { index_filter: { range: { birth: { gte: "2023-01-01" }}}} + - set: {id: point_in_time_id} + + - do: + search: + body: + size: 1 + pit: + id: "$point_in_time_id" + + - match: {hits.total.value: 1 } + - length: {hits.hits: 1 } + - match: {hits.hits.0._index: test2 } + - match: {hits.hits.0._id: "45" } + + - do: + close_point_in_time: + body: + id: "$point_in_time_id" diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java index bb7658f5011e3..27100e925a0a2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java @@ -50,6 +50,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -152,6 +153,55 @@ public void testMultipleIndices() { } } + public void testIndexFilter() { + int numDocs = randomIntBetween(1, 9); + for (int i = 1; i <= 3; i++) { + String index = "index-" + i; + createIndex(index); + for (int j = 1; j <= numDocs; j++) { + String id = Integer.toString(j); + client().prepareIndex(index).setId(id).setSource("@timestamp", "2023-0" + i + "-0" + j).get(); + } + } + refresh(); + + { + + OpenPointInTimeRequest request = new OpenPointInTimeRequest("*").keepAlive(TimeValue.timeValueMinutes(2)); + final OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet(); + try { + SearchContextId searchContextId = SearchContextId.decode(writableRegistry(), response.getPointInTimeId()); + String[] actualIndices = searchContextId.getActualIndices(); + assertEquals(3, actualIndices.length); + } finally { + closePointInTime(response.getPointInTimeId()); + } + } + { + OpenPointInTimeRequest request = new OpenPointInTimeRequest("*").keepAlive(TimeValue.timeValueMinutes(2)); + request.indexFilter(new RangeQueryBuilder("@timestamp").gte("2023-03-01")); + final OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet(); + String pitId = response.getPointInTimeId(); + try { + SearchContextId searchContextId = SearchContextId.decode(writableRegistry(), pitId); + String[] actualIndices = searchContextId.getActualIndices(); + assertEquals(1, actualIndices.length); + assertEquals("index-3", actualIndices[0]); + assertResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pitId)).setSize(50), resp -> { + assertNoFailures(resp); + assertHitCount(resp, numDocs); + assertNotNull(resp.pointInTimeId()); + assertThat(resp.pointInTimeId(), equalTo(pitId)); + for (SearchHit hit : resp.getHits()) { + assertEquals("index-3", hit.getIndex()); + } + }); + } finally { + closePointInTime(pitId); + } + } + } + public void testRelocation() throws Exception { internalCluster().ensureAtLeastNumDataNodes(4); createIndex("test", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 1)).build()); diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index bde73ec5b801f..535192eeefd47 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -174,6 +174,7 @@ static TransportVersion def(int id) { public static final TransportVersion SHUTDOWN_MIGRATION_STATUS_INCLUDE_COUNTS = def(8_543_00_0); public static final TransportVersion TRANSFORM_GET_CHECKPOINT_QUERY_AND_CLUSTER_ADDED = def(8_544_00_0); public static final TransportVersion GRANT_API_KEY_CLIENT_AUTHENTICATION_ADDED = def(8_545_00_0); + public static final TransportVersion PIT_WITH_INDEX_FILTER = def(8_546_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java b/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java index 633e56b97a833..39813a883c428 100644 --- a/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -38,6 +39,8 @@ public final class OpenPointInTimeRequest extends ActionRequest implements Indic @Nullable private String preference; + private QueryBuilder indexFilter; + public static final IndicesOptions DEFAULT_INDICES_OPTIONS = SearchRequest.DEFAULT_INDICES_OPTIONS; public OpenPointInTimeRequest(String... indices) { @@ -54,6 +57,9 @@ public OpenPointInTimeRequest(StreamInput in) throws IOException { if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_500_020)) { this.maxConcurrentShardRequests = in.readVInt(); } + if (in.getTransportVersion().onOrAfter(TransportVersions.PIT_WITH_INDEX_FILTER)) { + this.indexFilter = in.readOptionalNamedWriteable(QueryBuilder.class); + } } @Override @@ -67,6 +73,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_500_020)) { out.writeVInt(maxConcurrentShardRequests); } + if (out.getTransportVersion().onOrAfter(TransportVersions.PIT_WITH_INDEX_FILTER)) { + out.writeOptionalWriteable(indexFilter); + } } @Override @@ -153,6 +162,14 @@ public void maxConcurrentShardRequests(int maxConcurrentShardRequests) { this.maxConcurrentShardRequests = maxConcurrentShardRequests; } + public void indexFilter(QueryBuilder indexFilter) { + this.indexFilter = indexFilter; + } + + public QueryBuilder indexFilter() { + return indexFilter; + } + @Override public boolean allowsRemoteIndices() { return true; diff --git a/server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java index 815deac07dfcd..627fdd88cc308 100644 --- a/server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java @@ -17,9 +17,13 @@ import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import java.io.IOException; import java.util.List; +import static org.elasticsearch.index.query.AbstractQueryBuilder.parseTopLevelQuery; import static org.elasticsearch.rest.RestRequest.Method.POST; @ServerlessScope(Scope.PUBLIC) @@ -36,7 +40,7 @@ public List routes() { } @Override - public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) { + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { final String[] indices = Strings.splitStringByCommaToArray(request.param("index")); final OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(indices); openRequest.indicesOptions(IndicesOptions.fromRequest(request, OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS)); @@ -50,6 +54,19 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC ); openRequest.maxConcurrentShardRequests(maxConcurrentShardRequests); } + request.withContentOrSourceParamParserOrNull(parser -> { + if (parser != null) { + PARSER.parse(parser, openRequest, null); + } + }); + return channel -> client.execute(OpenPointInTimeAction.INSTANCE, openRequest, new RestToXContentListener<>(channel)); } + + private static final ObjectParser PARSER = new ObjectParser<>("open_point_in_time_request"); + private static final ParseField INDEX_FILTER_FIELD = new ParseField("index_filter"); + + static { + PARSER.declareObject(OpenPointInTimeRequest::indexFilter, (p, c) -> parseTopLevelQuery(p), INDEX_FILTER_FIELD); + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index ae3c735e079e9..b247a915ad923 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -8,6 +8,8 @@ package org.elasticsearch.action.search; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.IndicesRequest; @@ -28,6 +30,7 @@ import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchContextId; @@ -47,6 +50,9 @@ import java.util.function.BiFunction; public class TransportOpenPointInTimeAction extends HandledTransportAction { + + private static final Logger logger = LogManager.getLogger(TransportOpenPointInTimeAction.class); + public static final String OPEN_SHARD_READER_CONTEXT_NAME = "indices:data/read/open_reader_context"; private final TransportSearchAction transportSearchAction; @@ -93,7 +99,8 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen .indicesOptions(request.indicesOptions()) .preference(request.preference()) .routing(request.routing()) - .allowPartialSearchResults(false); + .allowPartialSearchResults(false) + .source(new SearchSourceBuilder().query(request.indexFilter())); searchRequest.setMaxConcurrentShardRequests(request.maxConcurrentShardRequests()); searchRequest.setCcsMinimizeRoundtrips(false); transportSearchAction.executeRequest((SearchTask) task, searchRequest, listener.map(r -> { @@ -125,6 +132,63 @@ public SearchPhase newSearchPhase( boolean preFilter, ThreadPool threadPool, SearchResponse.Clusters clusters + ) { + if (SearchService.canRewriteToMatchNone(searchRequest.source())) { + return new CanMatchPreFilterSearchPhase( + logger, + searchTransportService, + connectionLookup, + aliasFilter, + concreteIndexBoosts, + threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION), + searchRequest, + shardIterators, + timeProvider, + task, + false, + searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis), + listener.delegateFailureAndWrap( + (searchResponseActionListener, searchShardIterators) -> openPointInTimePhase( + task, + searchRequest, + executor, + searchShardIterators, + timeProvider, + connectionLookup, + clusterState, + aliasFilter, + concreteIndexBoosts, + clusters + ).start() + ) + ); + } else { + return openPointInTimePhase( + task, + searchRequest, + executor, + shardIterators, + timeProvider, + connectionLookup, + clusterState, + aliasFilter, + concreteIndexBoosts, + clusters + ); + } + } + + SearchPhase openPointInTimePhase( + SearchTask task, + SearchRequest searchRequest, + Executor executor, + GroupShardsIterator shardIterators, + TransportSearchAction.SearchTimeProvider timeProvider, + BiFunction connectionLookup, + ClusterState clusterState, + Map aliasFilter, + Map concreteIndexBoosts, + SearchResponse.Clusters clusters ) { assert searchRequest.getMaxConcurrentShardRequests() == pitRequest.maxConcurrentShardRequests() : searchRequest.getMaxConcurrentShardRequests() + " != " + pitRequest.maxConcurrentShardRequests(); diff --git a/x-pack/qa/runtime-fields/build.gradle b/x-pack/qa/runtime-fields/build.gradle index 1a9e913932eb7..dd7d0abc24b19 100644 --- a/x-pack/qa/runtime-fields/build.gradle +++ b/x-pack/qa/runtime-fields/build.gradle @@ -74,6 +74,7 @@ subprojects { 'search/115_multiple_field_collapsing/two levels fields collapsing', // Field collapsing on a runtime field does not work 'search/111_field_collapsing_with_max_score/*', // Field collapsing on a runtime field does not work 'field_caps/30_index_filter/Field caps with index filter', // We don't support filtering field caps on runtime fields. What should we do? + 'search/350_point_in_time/point-in-time with index filter', // We don't support filtering pit on runtime fields. 'aggregations/filters_bucket/cache busting', // runtime keyword does not support split_queries_on_whitespace 'search/140_pre_filter_search_shards/pre_filter_shard_size with shards that have no hit', //completion suggester does not return options when the context field is a geo_point runtime field