Skip to content

Commit

Permalink
Use separate search execution context for each pipeline (elastic#100563)
Browse files Browse the repository at this point in the history
Synthetic source doesn't seem to work correctly with either 
inter-segment or intra-segment parallelism. Neither of these
parallelisms were available when the synthetic source was developed. The 
new test fails with the doc or segment data_partitioning. While we are 
working on a proper fix, this PR introduces a workaround by creating a
separate search execution context for each execution pipeline, restoring
the sequential execution invariants. I believe that the overhead added
by this workaround is minimal.
  • Loading branch information
dnhatn authored Oct 10, 2023
1 parent cabb296 commit a8d79b3
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 15 deletions.
1 change: 1 addition & 0 deletions x-pack/plugin/esql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
testImplementation('org.webjars.npm:fontsource__roboto-mono:4.5.7')

internalClusterTestImplementation project(":client:rest-high-level")
internalClusterTestImplementation project(":modules:mapper-extras")
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.search.aggregations.support.FieldContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -49,7 +50,20 @@ public static List<ValueSourceInfo> sources(
List<ValueSourceInfo> sources = new ArrayList<>(searchContexts.size());

for (SearchContext searchContext : searchContexts) {
SearchExecutionContext ctx = searchContext.getSearchExecutionContext();
// TODO: remove this workaround
// Create a separate SearchExecutionContext for each ValuesReader, as it seems that
// the synthetic source doesn't work properly with inter-segment or intra-segment parallelism.
ShardSearchRequest shardRequest = searchContext.request();
SearchExecutionContext ctx = searchContext.readerContext()
.indexService()
.newSearchExecutionContext(
shardRequest.shardId().id(),
shardRequest.shardRequestIndex(),
searchContext.searcher(),
shardRequest::nowInMillis,
shardRequest.getClusterAlias(),
shardRequest.getRuntimeMappings()
);
var fieldType = ctx.getFieldType(fieldName);
if (fieldType == null) {
sources.add(new ValueSourceInfo(new NullValueSourceType(), new NullValueSource(), elementType, ctx.getIndexReader()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Supplier;

/**
* Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator}
Expand All @@ -42,12 +43,12 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
* @param docChannel the channel containing the shard, leaf/segment and doc id
* @param field the lucene field being loaded
*/
public record ValuesSourceReaderOperatorFactory(List<ValueSourceInfo> sources, int docChannel, String field)
public record ValuesSourceReaderOperatorFactory(Supplier<List<ValueSourceInfo>> sources, int docChannel, String field)
implements
OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
return new ValuesSourceReaderOperator(sources, docChannel, field);
return new ValuesSourceReaderOperator(sources.get(), docChannel, field);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
Expand All @@ -51,7 +52,7 @@
*/
public class OrdinalsGroupingOperator implements Operator {
public record OrdinalsGroupingOperatorFactory(
List<ValueSourceInfo> sources,
Supplier<List<ValueSourceInfo>> sources,
int docChannel,
String groupingField,
List<Factory> aggregators,
Expand All @@ -61,7 +62,15 @@ public record OrdinalsGroupingOperatorFactory(

@Override
public Operator get(DriverContext driverContext) {
return new OrdinalsGroupingOperator(sources, docChannel, groupingField, aggregators, maxPageSize, bigArrays, driverContext);
return new OrdinalsGroupingOperator(
sources.get(),
docChannel,
groupingField,
aggregators,
maxPageSize,
bigArrays,
driverContext
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ static Operator.OperatorFactory factory(IndexReader reader, ValuesSourceType vsT
FieldContext fc = new FieldContext(ft.name(), fd, ft);
ValuesSource vs = vsType.getField(fc, null);
return new ValuesSourceReaderOperator.ValuesSourceReaderOperatorFactory(
List.of(new ValueSourceInfo(vsType, vs, elementType, reader)),
() -> List.of(new ValueSourceInfo(vsType, vs, elementType, reader)),
0,
ft.name()
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class SyntheticSourceIT extends AbstractEsqlIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
var plugins = new ArrayList<>(super.nodePlugins());
plugins.add(MapperExtrasPlugin.class);
return plugins;
}

public void testMatchOnlyText() throws Exception {
XContentBuilder mapping = JsonXContent.contentBuilder();
mapping.startObject();
if (true || randomBoolean()) {
mapping.startObject("_source");
mapping.field("mode", "synthetic");
mapping.endObject();
}
{
mapping.startObject("properties");
mapping.startObject("uid");
mapping.field("type", "keyword");
mapping.endObject();
mapping.startObject("name");
mapping.field("type", "match_only_text");
mapping.endObject();
mapping.endObject();
}
mapping.endObject();

assertAcked(client().admin().indices().prepareCreate("test").setMapping(mapping));

int numDocs = between(10, 1000);
for (int i = 0; i < numDocs; i++) {
IndexRequestBuilder indexRequest = client().prepareIndex("test").setSource("uid", "u" + i);
if (randomInt(100) < 5) {
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
indexRequest.get();
}
client().admin().indices().prepareRefresh("test").get();
try (EsqlQueryResponse resp = run("from test | keep uid, name | sort uid asc | limit 1")) {
Iterator<Object> row = resp.values().next();
assertThat(row.next(), equalTo("u0"));
assertNull(row.next());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.compute.lucene.LuceneOperator;
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator;
import org.elasticsearch.compute.lucene.ValueSourceInfo;
import org.elasticsearch.compute.lucene.ValueSources;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
import org.elasticsearch.compute.operator.Operator;
Expand All @@ -39,10 +40,12 @@
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
import org.elasticsearch.xpack.ql.type.DataType;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.elasticsearch.common.lucene.search.Queries.newNonNestedFilter;
import static org.elasticsearch.compute.lucene.LuceneSourceOperator.NO_LIMIT;
Expand Down Expand Up @@ -74,19 +77,18 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi
layout.append(attr);
Layout previousLayout = op.layout;

var sources = ValueSources.sources(
DataType dataType = attr.dataType();
String fieldName = attr.name();
Supplier<List<ValueSourceInfo>> sources = () -> ValueSources.sources(
searchContexts,
attr.name(),
EsqlDataTypes.isUnsupported(attr.dataType()),
LocalExecutionPlanner.toElementType(attr.dataType())
fieldName,
EsqlDataTypes.isUnsupported(dataType),
LocalExecutionPlanner.toElementType(dataType)
);

int docChannel = previousLayout.get(sourceAttr.id()).channel();

op = op.with(
new ValuesSourceReaderOperator.ValuesSourceReaderOperatorFactory(sources, docChannel, attr.name()),
layout.build()
);
op = op.with(new ValuesSourceReaderOperator.ValuesSourceReaderOperatorFactory(sources, docChannel, fieldName), layout.build());
}
return op;
}
Expand Down Expand Up @@ -173,7 +175,7 @@ public final Operator.OperatorFactory ordinalGroupingOperatorFactory(
// The grouping-by values are ready, let's group on them directly.
// Costin: why are they ready and not already exposed in the layout?
return new OrdinalsGroupingOperator.OrdinalsGroupingOperatorFactory(
ValueSources.sources(
() -> ValueSources.sources(
searchContexts,
attrSource.name(),
EsqlDataTypes.isUnsupported(attrSource.dataType()),
Expand Down

0 comments on commit a8d79b3

Please sign in to comment.