Skip to content

Commit

Permalink
Add support for index_filter to open pit (elastic#102388)
Browse files Browse the repository at this point in the history
The open point in time API accepts a list of indices and opens a point in time view against those indices.
Like we do already for field caps, this commit allows users to provide an index_filter parameter as part of
the request body, that will be used to execute the can match phase and exclude the indices that can't possibly
match such filter.

Closes elastic#99740
  • Loading branch information
javanna authored Nov 21, 2023
1 parent 8a22586 commit 9cd96df
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 12 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/102388.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 102388
summary: Add support for `index_filter` to open pit
area: Search
type: enhancement
issues:
- 99740
9 changes: 8 additions & 1 deletion docs/reference/search/point-in-time-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ or alias.
To search a <<point-in-time-api,point in time (PIT)>> for an alias, you
must have the `read` index privilege for the alias's data streams or indices.

[[point-in-time-api-request-body]]
==== {api-request-body-title}

`index_filter`::
(Optional, <<query-dsl,query object>> Allows to filter indices if the provided
query rewrites to `match_none` on every shard.

[[point-in-time-api-example]]
==== {api-examples-title}

Expand Down Expand Up @@ -60,7 +67,7 @@ POST /_search <1>
or <<search-preference,`preference`>>
as these parameters are copied from the point in time.
<2> Just like regular searches, you can <<paginate-search-results,use `from` and
`size` to page through search results>>, up to the first 10,000 hits. If you
`size` to page through search results>>, up to the first 10,000 hits. If you
want to retrieve more hits, use PIT with <<search-after,`search_after`>>.
<3> The `id` parameter tells Elasticsearch to execute the request using contexts
from this point in time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,7 @@ private boolean shouldReplaceIndexWithRemote(String apiName) {

if (apiName.equals("search") || apiName.equals("msearch") || apiName.equals("async_search.submit")) {
final String testCandidateTestPath = testCandidate.getTestPath();
if (testCandidateTestPath.equals("search/350_point_in_time/basic")
|| testCandidateTestPath.equals("search/350_point_in_time/point-in-time with slicing")
|| testCandidateTestPath.equals("search/350_point_in_time/msearch")
|| testCandidateTestPath.equals("search/350_point_in_time/wildcard")
if (testCandidateTestPath.startsWith("search/350_point_in_time")
|| testCandidateTestPath.equals("async_search/20-with-poin-in-time/Async search with point in time")) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"stability":"stable",
"visibility":"public",
"headers":{
"accept": [ "application/json"]
"accept": [ "application/json"],
"content_type": ["application/json"]
},
"url":{
"paths":[
Expand Down Expand Up @@ -55,6 +56,9 @@
"description": "Specific the time to live for the point in time",
"required": true
}
},
"body":{
"description":"An index_filter specified with the Query DSL"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ setup:
index:
index: test
id: "1"
body: { id: 1, foo: bar, age: 18 }
body: { id: 1, foo: bar, age: 18, birth: "2022-01-01" }

- do:
index:
index: test
id: "42"
body: { id: 42, foo: bar, age: 18 }
body: { id: 42, foo: bar, age: 18, birth: "2022-02-01" }

- do:
index:
index: test
id: "172"
body: { id: 172, foo: bar, age: 24 }
body: { id: 172, foo: bar, age: 24, birth: "2022-03-01" }

- do:
indices.create:
Expand All @@ -28,7 +28,7 @@ setup:
index:
index: test2
id: "45"
body: { id: 45, foo: bar, age: 19 }
body: { id: 45, foo: bar, age: 19, birth: "2023-01-01" }

- do:
indices.refresh:
Expand Down Expand Up @@ -235,3 +235,32 @@ setup:
close_point_in_time:
body:
id: "$point_in_time_id"

---
"point-in-time with index filter":
- skip:
version: " - 8.11.99"
reason: "support for index filter was added in 8.12"
- do:
open_point_in_time:
index: test*
keep_alive: 5m
body: { index_filter: { range: { birth: { gte: "2023-01-01" }}}}
- set: {id: point_in_time_id}

- do:
search:
body:
size: 1
pit:
id: "$point_in_time_id"

- match: {hits.total.value: 1 }
- length: {hits.hits: 1 }
- match: {hits.hits.0._index: test2 }
- match: {hits.hits.0._id: "45" }

- do:
close_point_in_time:
body:
id: "$point_in_time_id"
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -152,6 +153,55 @@ public void testMultipleIndices() {
}
}

public void testIndexFilter() {
int numDocs = randomIntBetween(1, 9);
for (int i = 1; i <= 3; i++) {
String index = "index-" + i;
createIndex(index);
for (int j = 1; j <= numDocs; j++) {
String id = Integer.toString(j);
client().prepareIndex(index).setId(id).setSource("@timestamp", "2023-0" + i + "-0" + j).get();
}
}
refresh();

{

OpenPointInTimeRequest request = new OpenPointInTimeRequest("*").keepAlive(TimeValue.timeValueMinutes(2));
final OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet();
try {
SearchContextId searchContextId = SearchContextId.decode(writableRegistry(), response.getPointInTimeId());
String[] actualIndices = searchContextId.getActualIndices();
assertEquals(3, actualIndices.length);
} finally {
closePointInTime(response.getPointInTimeId());
}
}
{
OpenPointInTimeRequest request = new OpenPointInTimeRequest("*").keepAlive(TimeValue.timeValueMinutes(2));
request.indexFilter(new RangeQueryBuilder("@timestamp").gte("2023-03-01"));
final OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet();
String pitId = response.getPointInTimeId();
try {
SearchContextId searchContextId = SearchContextId.decode(writableRegistry(), pitId);
String[] actualIndices = searchContextId.getActualIndices();
assertEquals(1, actualIndices.length);
assertEquals("index-3", actualIndices[0]);
assertResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pitId)).setSize(50), resp -> {
assertNoFailures(resp);
assertHitCount(resp, numDocs);
assertNotNull(resp.pointInTimeId());
assertThat(resp.pointInTimeId(), equalTo(pitId));
for (SearchHit hit : resp.getHits()) {
assertEquals("index-3", hit.getIndex());
}
});
} finally {
closePointInTime(pitId);
}
}
}

public void testRelocation() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(4);
createIndex("test", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 1)).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ static TransportVersion def(int id) {
public static final TransportVersion SHUTDOWN_MIGRATION_STATUS_INCLUDE_COUNTS = def(8_543_00_0);
public static final TransportVersion TRANSFORM_GET_CHECKPOINT_QUERY_AND_CLUSTER_ADDED = def(8_544_00_0);
public static final TransportVersion GRANT_API_KEY_CLIENT_AUTHENTICATION_ADDED = def(8_545_00_0);
public static final TransportVersion PIT_WITH_INDEX_FILTER = def(8_546_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

Expand All @@ -38,6 +39,8 @@ public final class OpenPointInTimeRequest extends ActionRequest implements Indic
@Nullable
private String preference;

private QueryBuilder indexFilter;

public static final IndicesOptions DEFAULT_INDICES_OPTIONS = SearchRequest.DEFAULT_INDICES_OPTIONS;

public OpenPointInTimeRequest(String... indices) {
Expand All @@ -54,6 +57,9 @@ public OpenPointInTimeRequest(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_500_020)) {
this.maxConcurrentShardRequests = in.readVInt();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.PIT_WITH_INDEX_FILTER)) {
this.indexFilter = in.readOptionalNamedWriteable(QueryBuilder.class);
}
}

@Override
Expand All @@ -67,6 +73,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_500_020)) {
out.writeVInt(maxConcurrentShardRequests);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.PIT_WITH_INDEX_FILTER)) {
out.writeOptionalWriteable(indexFilter);
}
}

@Override
Expand Down Expand Up @@ -153,6 +162,14 @@ public void maxConcurrentShardRequests(int maxConcurrentShardRequests) {
this.maxConcurrentShardRequests = maxConcurrentShardRequests;
}

public void indexFilter(QueryBuilder indexFilter) {
this.indexFilter = indexFilter;
}

public QueryBuilder indexFilter() {
return indexFilter;
}

@Override
public boolean allowsRemoteIndices() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.index.query.AbstractQueryBuilder.parseTopLevelQuery;
import static org.elasticsearch.rest.RestRequest.Method.POST;

@ServerlessScope(Scope.PUBLIC)
Expand All @@ -36,7 +40,7 @@ public List<Route> routes() {
}

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(indices);
openRequest.indicesOptions(IndicesOptions.fromRequest(request, OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS));
Expand All @@ -50,6 +54,19 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
);
openRequest.maxConcurrentShardRequests(maxConcurrentShardRequests);
}
request.withContentOrSourceParamParserOrNull(parser -> {
if (parser != null) {
PARSER.parse(parser, openRequest, null);
}
});

return channel -> client.execute(OpenPointInTimeAction.INSTANCE, openRequest, new RestToXContentListener<>(channel));
}

private static final ObjectParser<OpenPointInTimeRequest, Void> PARSER = new ObjectParser<>("open_point_in_time_request");
private static final ParseField INDEX_FILTER_FIELD = new ParseField("index_filter");

static {
PARSER.declareObject(OpenPointInTimeRequest::indexFilter, (p, c) -> parseTopLevelQuery(p), INDEX_FILTER_FIELD);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.elasticsearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.IndicesRequest;
Expand All @@ -28,6 +30,7 @@
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchContextId;
Expand All @@ -47,6 +50,9 @@
import java.util.function.BiFunction;

public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenPointInTimeRequest, OpenPointInTimeResponse> {

private static final Logger logger = LogManager.getLogger(TransportOpenPointInTimeAction.class);

public static final String OPEN_SHARD_READER_CONTEXT_NAME = "indices:data/read/open_reader_context";

private final TransportSearchAction transportSearchAction;
Expand Down Expand Up @@ -93,7 +99,8 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
.indicesOptions(request.indicesOptions())
.preference(request.preference())
.routing(request.routing())
.allowPartialSearchResults(false);
.allowPartialSearchResults(false)
.source(new SearchSourceBuilder().query(request.indexFilter()));
searchRequest.setMaxConcurrentShardRequests(request.maxConcurrentShardRequests());
searchRequest.setCcsMinimizeRoundtrips(false);
transportSearchAction.executeRequest((SearchTask) task, searchRequest, listener.map(r -> {
Expand Down Expand Up @@ -125,6 +132,63 @@ public SearchPhase newSearchPhase(
boolean preFilter,
ThreadPool threadPool,
SearchResponse.Clusters clusters
) {
if (SearchService.canRewriteToMatchNone(searchRequest.source())) {
return new CanMatchPreFilterSearchPhase(
logger,
searchTransportService,
connectionLookup,
aliasFilter,
concreteIndexBoosts,
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION),
searchRequest,
shardIterators,
timeProvider,
task,
false,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
listener.delegateFailureAndWrap(
(searchResponseActionListener, searchShardIterators) -> openPointInTimePhase(
task,
searchRequest,
executor,
searchShardIterators,
timeProvider,
connectionLookup,
clusterState,
aliasFilter,
concreteIndexBoosts,
clusters
).start()
)
);
} else {
return openPointInTimePhase(
task,
searchRequest,
executor,
shardIterators,
timeProvider,
connectionLookup,
clusterState,
aliasFilter,
concreteIndexBoosts,
clusters
);
}
}

SearchPhase openPointInTimePhase(
SearchTask task,
SearchRequest searchRequest,
Executor executor,
GroupShardsIterator<SearchShardIterator> shardIterators,
TransportSearchAction.SearchTimeProvider timeProvider,
BiFunction<String, String, Transport.Connection> connectionLookup,
ClusterState clusterState,
Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
SearchResponse.Clusters clusters
) {
assert searchRequest.getMaxConcurrentShardRequests() == pitRequest.maxConcurrentShardRequests()
: searchRequest.getMaxConcurrentShardRequests() + " != " + pitRequest.maxConcurrentShardRequests();
Expand Down
Loading

0 comments on commit 9cd96df

Please sign in to comment.