Skip to content

Commit

Permalink
ESQL: Emit null for missing _source fields
Browse files Browse the repository at this point in the history
When ESQL loads from `_source` it was accidentally emitting empty arrays
instead of null values for missing fields. These come out as `[]` for
missing fields but should be `null`.
  • Loading branch information
nik9000 committed Nov 21, 2023
1 parent 7345e64 commit 196388c
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public abstract class BlockSourceReader implements BlockLoader.RowStrideReader {
public final void read(int docId, BlockLoader.StoredFields storedFields, BlockLoader.Builder builder) throws IOException {
List<Object> values = fetcher.fetchValues(storedFields.source(), docId, ignoredValues);
ignoredValues.clear(); // TODO do something with these?
if (values == null) {
if (values == null || values.isEmpty()) {
builder.appendNull();
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.mapper;

import org.apache.lucene.document.StoredField;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
import org.elasticsearch.search.fetch.StoredFieldsSpec;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.nullValue;

public class BlockSourceReaderTests extends ESTestCase {
public void testSingle() throws IOException {
withIndex(
source -> source.field("field", "foo"),
ctx -> loadBlock(ctx, block -> assertThat(block.get(0), equalTo(new BytesRef("foo"))))
);
}

public void testMissing() throws IOException {
withIndex(source -> {}, ctx -> loadBlock(ctx, block -> assertThat(block.get(0), nullValue())));
}

public void testArray() throws IOException {
withIndex(
source -> source.startArray("field").value("foo").value("bar").endArray(),
ctx -> loadBlock(ctx, block -> assertThat(block.get(0), equalTo(List.of(new BytesRef("foo"), new BytesRef("bar")))))
);
}

public void testEmptyArray() throws IOException {
withIndex(source -> source.startArray("field").endArray(), ctx -> loadBlock(ctx, block -> assertThat(block.get(0), nullValue())));
}

private void loadBlock(LeafReaderContext ctx, Consumer<TestBlock> test) throws IOException {
BlockLoader loader = new BlockSourceReader.BytesRefsBlockLoader(SourceValueFetcher.toString(Set.of("field")));
assertThat(loader.columnAtATimeReader(ctx), nullValue());
BlockLoader.RowStrideReader reader = loader.rowStrideReader(ctx);
assertThat(loader.rowStrideStoredFieldSpec(), equalTo(StoredFieldsSpec.NEEDS_SOURCE));
BlockLoaderStoredFieldsFromLeafLoader storedFields = new BlockLoaderStoredFieldsFromLeafLoader(
StoredFieldLoader.fromSpec(loader.rowStrideStoredFieldSpec()).getLoader(ctx, null),
true
);
BlockLoader.Builder builder = loader.builder(TestBlock.FACTORY, 1);
storedFields.advanceTo(0);
reader.read(0, storedFields, builder);
TestBlock block = (TestBlock) builder.build();
assertThat(block.size(), equalTo(1));
test.accept(block);
}

private void withIndex(CheckedConsumer<XContentBuilder, IOException> buildSource, CheckedConsumer<LeafReaderContext, IOException> test)
throws IOException {
try (
Directory directory = newDirectory();
RandomIndexWriter writer = new RandomIndexWriter(
random(),
directory,
newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE)
)
) {
XContentBuilder source = JsonXContent.contentBuilder();
source.startObject();
buildSource.accept(source);
source.endObject();
writer.addDocument(List.of(new StoredField(SourceFieldMapper.NAME, BytesReference.bytes(source).toBytesRef())));
try (IndexReader reader = writer.getReader()) {
assertThat(reader.leaves(), hasSize(1));
test.accept(reader.leaves().get(0));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.indices.CrankyCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
Expand Down Expand Up @@ -122,6 +123,37 @@ protected DriverContext breakingDriverContext() { // TODO move this to driverCon
return new DriverContext(bigArrays, factory);
}

public void testAllNull() throws IOException {
BlockFactory factory = breakingDriverContext().blockFactory();
int count = 1000;
try (Directory directory = newDirectory(); RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
for (int i = 0; i < count; i++) {
for (BytesRef v : new BytesRef[] { new BytesRef("a"), new BytesRef("b"), new BytesRef("c"), new BytesRef("d") }) {
indexWriter.addDocument(List.of(new SortedDocValuesField("f", v)));
}
}
try (IndexReader reader = indexWriter.getReader()) {
for (LeafReaderContext ctx : reader.leaves()) {
SortedDocValues docValues = ctx.reader().getSortedDocValues("f");
try (SingletonOrdinalsBuilder builder = new SingletonOrdinalsBuilder(factory, docValues, ctx.reader().numDocs())) {
for (int i = 0; i < ctx.reader().maxDoc(); i++) {
if (ctx.reader().getLiveDocs() == null || ctx.reader().getLiveDocs().get(i)) {
assertThat(docValues.advanceExact(i), equalTo(true));
builder.appendNull();
}
}
try (BytesRefBlock built = builder.build()) {
for (int p = 0; p < built.getPositionCount(); p++) {
assertThat(built.isNull(p), equalTo(true));
}
assertThat(built.areAllValuesNull(), equalTo(true));
}
}
}
}
}
}

@After
public void allBreakersEmpty() throws Exception {
// first check that all big arrays are released, which can affect breakers
Expand Down

0 comments on commit 196388c

Please sign in to comment.