Skip to content

Commit

Permalink
Done?
Browse files Browse the repository at this point in the history
  • Loading branch information
nik9000 committed Nov 15, 2023
1 parent f378e3d commit 0755f4d
Show file tree
Hide file tree
Showing 5 changed files with 471 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ public class ValuesSourceReaderBenchmark {
private static List<ValuesSourceReaderOperator.FieldInfo> fields(String name) {
return switch (name) {
case "3_stored_keywords" -> List.of(
new ValuesSourceReaderOperator.FieldInfo(name, List.of(blockLoader("stored_keyword_1"))),
new ValuesSourceReaderOperator.FieldInfo(name, List.of(blockLoader("stored_keyword_2"))),
new ValuesSourceReaderOperator.FieldInfo(name, List.of(blockLoader("stored_keyword_3")))
new ValuesSourceReaderOperator.FieldInfo("keyword_1", List.of(blockLoader("stored_keyword_1"))),
new ValuesSourceReaderOperator.FieldInfo("keyword_2", List.of(blockLoader("stored_keyword_2"))),
new ValuesSourceReaderOperator.FieldInfo("keyword_3", List.of(blockLoader("stored_keyword_3")))
);
default -> List.of(new ValuesSourceReaderOperator.FieldInfo(name, List.of(blockLoader(name))));
};
Expand Down Expand Up @@ -314,6 +314,22 @@ public void benchmark() {
if (expected != sum) {
throw new AssertionError("[" + layout + "][" + name + "] expected [" + expected + "] but was [" + sum + "]");
}
boolean foundStoredFieldLoader = false;
ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) op.status();
for (Map.Entry<String, Integer> e : status.readersBuilt().entrySet()) {
if (e.getKey().indexOf("stored_fields") >= 0) {
foundStoredFieldLoader = true;
}
}
if (name.indexOf("stored") >= 0) {
if (foundStoredFieldLoader == false) {
throw new AssertionError("expected to use a stored field loader but only had: " + status.readersBuilt());
}
} else {
if (foundStoredFieldLoader) {
throw new AssertionError("expected not to use a stored field loader but only had: " + status.readersBuilt());
}
}
}

@Setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public void read(int docId, StoredFields storedFields, Builder builder) throws I
public boolean canReuse(int startingDocID) {
return true;
}

@Override
public String toString() {
return "constant_nulls";
}
}

/**
Expand All @@ -172,6 +177,11 @@ public Block read(BlockFactory factory, Docs docs) {
public boolean canReuse(int startingDocID) {
return true;
}

@Override
public String toString() {
return "constant[" + value + "]";
}
};
}

Expand All @@ -187,6 +197,11 @@ public void read(int docId, StoredFields storedFields, Builder builder) {
public boolean canReuse(int startingDocID) {
return true;
}

@Override
public String toString() {
return "constant[" + value + "]";
}
};
}

Expand All @@ -212,6 +227,89 @@ public String toString() {
};
}

abstract class Delegating implements BlockLoader {
protected final BlockLoader delegate;

protected Delegating(BlockLoader delegate) {
this.delegate = delegate;
}

@Override
public Builder builder(BlockFactory factory, int expectedCount) {
return delegate.builder(factory, expectedCount);
}

@Override
public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException {
ColumnAtATimeReader reader = delegate.columnAtATimeReader(context);
if (reader == null) {
return null;
}
return new ColumnAtATimeReader() {
@Override
public Block read(BlockFactory factory, Docs docs) throws IOException {
return reader.read(factory, docs);
}

@Override
public boolean canReuse(int startingDocID) {
return reader.canReuse(startingDocID);
}

@Override
public String toString() {
return "Delegating[to=" + delegatingTo() + ", impl=" + reader + "]";
}
};
}

@Override
public RowStrideReader rowStrideReader(LeafReaderContext context) throws IOException {
RowStrideReader reader = delegate.rowStrideReader(context);
if (reader == null) {
return null;
}
return new RowStrideReader() {
@Override
public void read(int docId, StoredFields storedFields, Builder builder) throws IOException {
reader.read(docId, storedFields, builder);
}

@Override
public boolean canReuse(int startingDocID) {
return reader.canReuse(startingDocID);
}

@Override
public String toString() {
return "Delegating[to=" + delegatingTo() + ", impl=" + reader + "]";
}
};
}

@Override
public StoredFieldsSpec rowStrideStoredFieldSpec() {
return delegate.rowStrideStoredFieldSpec();
}

@Override
public boolean supportsOrdinals() {
return delegate.supportsOrdinals();
}

@Override
public SortedSetDocValues ordinals(LeafReaderContext context) throws IOException {
return delegate.ordinals(context);
}

protected abstract String delegatingTo();

@Override
public final String toString() {
return "Delegating[to=" + delegatingTo() + ", impl=" + delegate + "]";
}
}

/**
* A list of documents to load. Documents are always in non-decreasing order.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FieldType;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.queries.intervals.Intervals;
import org.apache.lucene.queries.intervals.IntervalsSource;
Expand Down Expand Up @@ -70,6 +72,7 @@
import org.elasticsearch.script.field.DelegateDocValuesField;
import org.elasticsearch.script.field.TextDocValuesField;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.fetch.StoredFieldsSpec;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

Expand Down Expand Up @@ -939,7 +942,12 @@ public boolean isAggregatable() {
@Override
public BlockLoader blockLoader(BlockLoaderContext blContext) {
if (syntheticSourceDelegate != null) {
return syntheticSourceDelegate.blockLoader(blContext);
return new BlockLoader.Delegating(syntheticSourceDelegate.blockLoader(blContext)) {
@Override
protected String delegatingTo() {
return syntheticSourceDelegate.name();
}
};
}
if (isSyntheticSource) {
if (isStored()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public int get(int i) {
StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(ctx(shard, segment), null),
storedFieldsSpec.requiresSource()
);
trackStoredFields(storedFieldsSpec); // TODO when optimization is enabled add it to tracking
}
for (int p = 0; p < docs.getPositionCount(); p++) {
int doc = docs.getInt(p);
Expand Down Expand Up @@ -205,13 +206,13 @@ private void loadFromManyLeaves(Block[] blocks, DocVector docVector) throws IOEx
if (shard != lastShard || segment != lastSegment) {
lastShard = shard;
lastSegment = segment;
if (false == storedFieldsSpecForShard(shard).equals(StoredFieldsSpec.NO_REQUIREMENTS)) {
StoredFieldsSpec storedFieldsSpec = storedFieldsSpecForShard(shard);
StoredFieldsSpec storedFieldsSpec = storedFieldsSpecForShard(shard);
if (false == storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) {
storedFields = new BlockLoaderStoredFieldsFromLeafLoader(
StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(ctx(shard, segment), null),
storedFieldsSpec.requiresSource()
);
// TODO track the number of times we build the stored fields reader
trackStoredFields(storedFieldsSpec);
}
}
if (storedFields != null) {
Expand All @@ -231,6 +232,14 @@ private void loadFromManyLeaves(Block[] blocks, DocVector docVector) throws IOEx
}
}

private void trackStoredFields(StoredFieldsSpec spec) {
readersBuilt.merge(
"stored_fields[" + "requires_source:" + spec.requiresSource() + ", fields:" + spec.requiredStoredFields().size() + "]",
1,
(prev, one) -> prev + one
);
}

/**
* Returns a builder from the first non - {@link BlockLoader#CONSTANT_NULLS} loader
* in the list. If they are all the null loader then returns a null builder.
Expand Down Expand Up @@ -261,13 +270,23 @@ private class FieldWork {
BlockLoader.ColumnAtATimeReader build(BlockLoader loader, LeafReaderContext ctx) throws IOException {
return loader.columnAtATimeReader(ctx);
}

@Override
String type() {
return "column_at_a_time";
}
};

final GuardedReader<BlockLoader.RowStrideReader> rowStride = new GuardedReader<>() {
@Override
BlockLoader.RowStrideReader build(BlockLoader loader, LeafReaderContext ctx) throws IOException {
return loader.rowStrideReader(ctx);
}

@Override
String type() {
return "row_stride";
}
};

FieldWork(FieldInfo info) {
Expand All @@ -280,17 +299,24 @@ private abstract class GuardedReader<V extends BlockLoader.Reader> {
V lastReader;

V reader(int shard, int segment, int startingDocId) throws IOException {
if (lastShard == shard && lastSegment == segment && lastReader != null && lastReader.canReuse(startingDocId)) {
return lastReader;
if (lastShard == shard && lastSegment == segment) {
if (lastReader == null) {
return null;
}
if (lastReader.canReuse(startingDocId)) {
return lastReader;
}
}
lastShard = shard;
lastSegment = segment;
lastReader = build(info.blockLoaders.get(shard), ctx(shard, segment));
readersBuilt.merge(info.name + ":" + lastReader, 1, (prev, one) -> prev + one);
readersBuilt.merge(info.name + ":" + type() + ":" + lastReader, 1, (prev, one) -> prev + one);
return lastReader;
}

abstract V build(BlockLoader loader, LeafReaderContext ctx) throws IOException;

abstract String type();
}
}

Expand Down
Loading

0 comments on commit 0755f4d

Please sign in to comment.