Skip to content

Commit

Permalink
ESQL: Load values a different way (elastic#101235)
Browse files Browse the repository at this point in the history
This changes how we load values in ESQL, delegating to the
`MappedFieldType` like we do with doc values and synthetic
source. This allows a much more OO way of getting the loads
working which makes that path much easier to read. And! It
means those code paths look like doc values. So there's
symmetry. It's like it rhymes.

There are a few side effects here:
1. It's fairly simple to load from ordinals efficiently. I
   wrote some block-at-a-time code for resolving ordinals
   and it's about twice as fast. With more work it should
   be possible to make custom ordinal-shaped blocks move
   through the system to save space and speed things up.
2. Most fields can now be loaded from `_source`. Everything
   that can be loaded from `_source` in scripts will load
   from `_source` in ESQL.
3. We get a *lot* more tests for loading fields in
   different configurations by piggybacking on the synthetic
   source testing framework. 
4. Loading from `_source` no longer sorts the fields. Same
   for stored fields. Now we keep them in whatever they were
   stored in. This is a pretty marginal time save because
   loading from `_source` is so much more time consuming
   than the sort. But it's something.
  • Loading branch information
nik9000 authored Oct 25, 2023
1 parent 9796de4 commit 4ca793e
Show file tree
Hide file tree
Showing 92 changed files with 3,595 additions and 1,388 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,19 @@
import org.elasticsearch.compute.data.DocVector;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.DoubleVector;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.lucene.BlockReaderFactories;
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.ValueSourceInfo;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
import org.elasticsearch.compute.operator.topn.TopNOperator;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.fielddata.plain.SortedDoublesIndexFieldData;
import org.elasticsearch.index.fielddata.plain.SortedNumericIndexFieldData;
import org.elasticsearch.index.fielddata.plain.SortedSetOrdinalsIndexFieldData;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.script.field.KeywordDocValuesField;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.FieldContext;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -101,54 +92,18 @@ public class ValuesSourceReaderBenchmark {
}
}

private static ValueSourceInfo info(IndexReader reader, String name) {
private static BlockLoader blockLoader(String name) {
return switch (name) {
case "long" -> numericInfo(reader, name, IndexNumericFieldData.NumericType.LONG, ElementType.LONG);
case "int" -> numericInfo(reader, name, IndexNumericFieldData.NumericType.INT, ElementType.INT);
case "double" -> {
SortedDoublesIndexFieldData fd = new SortedDoublesIndexFieldData(
name,
IndexNumericFieldData.NumericType.DOUBLE,
CoreValuesSourceType.NUMERIC,
null
);
FieldContext context = new FieldContext(name, fd, null);
yield new ValueSourceInfo(
CoreValuesSourceType.NUMERIC,
CoreValuesSourceType.NUMERIC.getField(context, null),
ElementType.DOUBLE,
reader
);
}
case "keyword" -> {
SortedSetOrdinalsIndexFieldData fd = new SortedSetOrdinalsIndexFieldData(
new IndexFieldDataCache.None(),
"keyword",
CoreValuesSourceType.KEYWORD,
new NoneCircuitBreakerService(),
(dv, n) -> new KeywordDocValuesField(FieldData.toString(dv), n)
);
FieldContext context = new FieldContext(name, fd, null);
yield new ValueSourceInfo(
CoreValuesSourceType.KEYWORD,
CoreValuesSourceType.KEYWORD.getField(context, null),
ElementType.BYTES_REF,
reader
);
}
case "long" -> numericBlockLoader(name, NumberFieldMapper.NumberType.LONG);
case "int" -> numericBlockLoader(name, NumberFieldMapper.NumberType.INTEGER);
case "double" -> numericBlockLoader(name, NumberFieldMapper.NumberType.DOUBLE);
case "keyword" -> new KeywordFieldMapper.KeywordFieldType(name).blockLoader(null);
default -> throw new IllegalArgumentException("can't read [" + name + "]");
};
}

private static ValueSourceInfo numericInfo(
IndexReader reader,
String name,
IndexNumericFieldData.NumericType numericType,
ElementType elementType
) {
SortedNumericIndexFieldData fd = new SortedNumericIndexFieldData(name, numericType, CoreValuesSourceType.NUMERIC, null);
FieldContext context = new FieldContext(name, fd, null);
return new ValueSourceInfo(CoreValuesSourceType.NUMERIC, CoreValuesSourceType.NUMERIC.getField(context, null), elementType, reader);
private static BlockLoader numericBlockLoader(String name, NumberFieldMapper.NumberType numberType) {
return new NumberFieldMapper.NumberFieldType(name, numberType).blockLoader(null);
}

/**
Expand Down Expand Up @@ -176,7 +131,11 @@ private static ValueSourceInfo numericInfo(
@Benchmark
@OperationsPerInvocation(INDEX_SIZE)
public void benchmark() {
ValuesSourceReaderOperator op = new ValuesSourceReaderOperator(List.of(info(reader, name)), 0, name);
ValuesSourceReaderOperator op = new ValuesSourceReaderOperator(
List.of(BlockReaderFactories.loaderToFactory(reader, blockLoader(name))),
0,
name
);
long sum = 0;
for (Page page : pages) {
op.addInput(page);
Expand All @@ -203,13 +162,24 @@ public void benchmark() {
BytesRef scratch = new BytesRef();
BytesRefVector values = op.getOutput().<BytesRefBlock>getBlock(1).asVector();
for (int p = 0; p < values.getPositionCount(); p++) {
sum += Integer.parseInt(values.getBytesRef(p, scratch).utf8ToString());
BytesRef r = values.getBytesRef(p, scratch);
r.offset++;
r.length--;
sum += Integer.parseInt(r.utf8ToString());
}
}
}
}
long expected = INDEX_SIZE;
expected = expected * (expected - 1) / 2;
long expected;
if (name.equals("keyword")) {
expected = 0;
for (int i = 0; i < INDEX_SIZE; i++) {
expected += i % 1000;
}
} else {
expected = INDEX_SIZE;
expected = expected * (expected - 1) / 2;
}
if (expected != sum) {
throw new AssertionError("[" + layout + "][" + name + "] expected [" + expected + "] but was [" + sum + "]");
}
Expand All @@ -225,16 +195,13 @@ private void setupIndex() throws IOException {
directory = new ByteBuffersDirectory();
try (IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE))) {
for (int i = 0; i < INDEX_SIZE; i++) {
String c = Character.toString('a' - ((i % 1000) % 26) + 26);
iw.addDocument(
List.of(
new NumericDocValuesField("long", i),
new NumericDocValuesField("int", i),
new NumericDocValuesField("double", NumericUtils.doubleToSortableLong(i)),
new KeywordFieldMapper.KeywordField(
"keyword",
new BytesRef(Integer.toString(i)),
KeywordFieldMapper.Defaults.FIELD_TYPE
)
new KeywordFieldMapper.KeywordField("keyword", new BytesRef(c + i % 1000), KeywordFieldMapper.Defaults.FIELD_TYPE)
)
);
if (i % COMMIT_INTERVAL == 0) {
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/101235.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 101235
summary: Load different way
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.elasticsearch.index.fielddata.StoredFieldSortedBinaryIndexFieldData;
import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader;
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.index.mapper.BlockSourceReader;
import org.elasticsearch.index.mapper.BlockStoredFieldsReader;
import org.elasticsearch.index.mapper.DocumentParserContext;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperBuilderContext;
Expand Down Expand Up @@ -318,6 +321,14 @@ public Query phrasePrefixQuery(TokenStream stream, int slop, int maxExpansions,
return toQuery(query, queryShardContext);
}

@Override
public BlockLoader blockLoader(BlockLoaderContext blContext) {
if (textFieldType.isSyntheticSource()) {
return BlockStoredFieldsReader.bytesRefsFromStrings(storedFieldNameForSyntheticSource());
}
return BlockSourceReader.bytesRefs(SourceValueFetcher.toString(blContext.sourcePaths(name())));
}

@Override
public IndexFieldData.Builder fielddataBuilder(FieldDataContext fieldDataContext) {
if (fieldDataContext.fielddataOperation() != FielddataOperation.SCRIPT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.fielddata.SourceValueFetcherSortedDoubleIndexFieldData;
import org.elasticsearch.index.fielddata.plain.SortedNumericIndexFieldData;
import org.elasticsearch.index.mapper.BlockDocValuesReader;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.index.mapper.BlockSourceReader;
import org.elasticsearch.index.mapper.DocumentParserContext;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperBuilderContext;
Expand Down Expand Up @@ -303,6 +306,19 @@ public Query rangeQuery(
return NumberFieldMapper.NumberType.LONG.rangeQuery(name(), lo, hi, true, true, hasDocValues(), context, isIndexed());
}

@Override
public BlockLoader blockLoader(BlockLoaderContext blContext) {
if (indexMode == IndexMode.TIME_SERIES && metricType == TimeSeriesParams.MetricType.COUNTER) {
// Counters are not supported by ESQL so we load them in null
return BlockDocValuesReader.nulls();
}
if (hasDocValues()) {
double scalingFactorInverse = 1d / scalingFactor;
return BlockDocValuesReader.doubles(name(), l -> l * scalingFactorInverse);
}
return BlockSourceReader.doubles(sourceValueFetcher(blContext.sourcePaths(name())));
}

@Override
public IndexFieldData.Builder fielddataBuilder(FieldDataContext fieldDataContext) {
FielddataOperation operation = fieldDataContext.fielddataOperation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.lucene.tests.analysis.CannedTokenStream;
import org.apache.lucene.tests.analysis.Token;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.DocumentMapper;
Expand All @@ -42,6 +43,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -261,4 +263,9 @@ public void testDocValuesLoadedFromSynthetic() throws IOException {
protected IngestScriptSupport ingestScriptSupport() {
throw new AssumptionViolatedException("not supported");
}

@Override
protected Function<Object, Object> loadBlockExpected() {
return v -> ((BytesRef) v).utf8ToString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,20 @@
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
import org.hamcrest.Matcher;
import org.junit.AssumptionViolatedException;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;

import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notANumber;

public class ScaledFloatFieldMapperTests extends MapperTestCase {

Expand Down Expand Up @@ -368,13 +371,15 @@ private static class ScaledFloatSyntheticSourceSupport implements SyntheticSourc
public SyntheticSourceExample example(int maxValues) {
if (randomBoolean()) {
Tuple<Double, Double> v = generateValue();
return new SyntheticSourceExample(v.v1(), v.v2(), this::mapping);
return new SyntheticSourceExample(v.v1(), v.v2(), roundDocValues(v.v2()), this::mapping);
}
List<Tuple<Double, Double>> values = randomList(1, maxValues, this::generateValue);
List<Double> in = values.stream().map(Tuple::v1).toList();
List<Double> outList = values.stream().map(Tuple::v2).sorted().toList();
Object out = outList.size() == 1 ? outList.get(0) : outList;
return new SyntheticSourceExample(in, out, this::mapping);
List<Double> outBlockList = values.stream().map(v -> roundDocValues(v.v2())).sorted().toList();
Object outBlock = outBlockList.size() == 1 ? outBlockList.get(0) : outBlockList;
return new SyntheticSourceExample(in, out, outBlock, this::mapping);
}

private Tuple<Double, Double> generateValue() {
Expand All @@ -398,6 +403,11 @@ private double round(double d) {
return decoded;
}

private double roundDocValues(double d) {
long encoded = Math.round(d * scalingFactor);
return encoded * (1 / scalingFactor);
}

private void mapping(XContentBuilder b) throws IOException {
b.field("type", "scaled_float");
b.field("scaling_factor", scalingFactor);
Expand Down Expand Up @@ -427,6 +437,16 @@ public List<SyntheticSourceInvalidExample> invalidExample() throws IOException {
}
}

@Override
protected Function<Object, Object> loadBlockExpected() {
return v -> (Number) v;
}

@Override
protected Matcher<?> blockItemMatcher(Object expected) {
return "NaN".equals(expected) ? notANumber() : equalTo(expected);
}

@Override
protected IngestScriptSupport ingestScriptSupport() {
throw new AssumptionViolatedException("not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ public synchronized void clearField(final String fieldName) {
*/
@SuppressWarnings("unchecked")
public <IFD extends IndexFieldData<?>> IFD getForField(MappedFieldType fieldType, FieldDataContext fieldDataContext) {
return getFromBuilder(fieldType, fieldType.fielddataBuilder(fieldDataContext));
}

@SuppressWarnings("unchecked")
public <IFD extends IndexFieldData<?>> IFD getFromBuilder(MappedFieldType fieldType, IndexFieldData.Builder builder) {
final String fieldName = fieldType.name();
IndexFieldData.Builder builder = fieldType.fielddataBuilder(fieldDataContext);
IndexFieldDataCache cache;
synchronized (this) {
cache = fieldDataCaches.get(fieldName);
Expand Down
Loading

0 comments on commit 4ca793e

Please sign in to comment.