diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java index 0a0592b5a01f2..9ad5d26d66eec 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java @@ -91,7 +91,7 @@ public static void assertFitsIn(ByteSizeValue max, Function ACQUIRED_ARRAYS = new ConcurrentHashMap<>(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java index 9a66bf00fa71f..eb9364c57e755 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java @@ -11,7 +11,6 @@ import java.util.Arrays; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of boolean. @@ -83,12 +82,21 @@ public BooleanBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new BooleanArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + try (var builder = blockFactory.newBooleanBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendBoolean(getBoolean(i)); + } + } + return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new BooleanArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(boolean[] values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java index 9e6631b6807c6..b2729ed370b32 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java @@ -13,7 +13,6 @@ import org.elasticsearch.core.Releasables; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of BytesRef. @@ -86,12 +85,22 @@ public BytesRefBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new BytesRefArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + final BytesRef scratch = new BytesRef(); + try (var builder = blockFactory.newBytesRefBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendBytesRef(getBytesRef(i, scratch)); + } + } + return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new BytesRefArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(BytesRefArray values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java index f9e1fe0c6e199..b6b1fae0ded03 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java @@ -11,7 +11,6 @@ import java.util.Arrays; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of double. @@ -83,12 +82,21 @@ public DoubleBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new DoubleArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + try (var builder = blockFactory.newDoubleBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendDouble(getDouble(i)); + } + } + return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new DoubleArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(double[] values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java index 95344bd8367c0..31f71d292f95d 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java @@ -11,7 +11,6 @@ import java.util.Arrays; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of int. @@ -83,12 +82,21 @@ public IntBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new IntArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + try (var builder = blockFactory.newIntBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendInt(getInt(i)); + } + } + return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new IntArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(int[] values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java index a45abb1ed9248..8a71703441ebb 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java @@ -11,7 +11,6 @@ import java.util.Arrays; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of long. @@ -83,12 +82,21 @@ public LongBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new LongArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + try (var builder = blockFactory.newLongBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendLong(getLong(i)); + } + } + return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new LongArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(long[] values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st index 6a8185b43ecab..49a4c43709cde 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st @@ -19,7 +19,6 @@ import org.apache.lucene.util.RamUsageEstimator; import java.util.Arrays; $endif$ import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of $type$. @@ -69,9 +68,9 @@ $endif$ @Override public $Type$Block filter(int... positions) { - $if(BytesRef)$ +$if(BytesRef)$ final BytesRef scratch = new BytesRef(); - $endif$ +$endif$ try (var builder = blockFactory.new$Type$BlockBuilder(positions.length)) { for (int pos : positions) { if (isNull(pos)) { @@ -104,12 +103,28 @@ $endif$ if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new $Type$ArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values +$if(BytesRef)$ + final BytesRef scratch = new BytesRef(); +$endif$ + try (var builder = blockFactory.new$Type$BlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { +$if(BytesRef)$ + builder.append$Type$(get$Type$(i, scratch)); +$else$ + builder.append$Type$(get$Type$(i)); +$endif$ + } + } + return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new $Type$ArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated($if(BytesRef)$BytesRefArray$else$$type$[]$endif$ values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java index c322520d8853b..503913dbb67e8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java @@ -14,6 +14,8 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.Releasables; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; @@ -33,6 +35,7 @@ * */ public class MvExpandOperator implements Operator { + private static final Logger logger = LogManager.getLogger(MvExpandOperator.class); public record Factory(int channel, int blockSize) implements OperatorFactory { @Override @@ -53,7 +56,7 @@ public String describe() { private int noops; private Page prev; - private boolean prevCompleted = false; + private boolean prevCompleted; private boolean finished = false; private Block expandingBlock; @@ -81,49 +84,87 @@ public final Page getOutput() { return null; } pagesOut++; - if (prev.getPositionCount() == 0 || expandingBlock.mayHaveMultivaluedFields() == false) { - noops++; - Page result = prev; - prev = null; - return result; - } - try { - return process(); - } finally { - if (prevCompleted && prev != null) { + if (expandedBlock == null) { + /* + * If we're emitting the first block from this page + * then we have to expand it. + */ + logger.trace("starting {}", prev); + expandingBlock = prev.getBlock(channel); + if (expandingBlock.mayHaveMultivaluedFields() == false) { + logger.trace("can't have multivalued fields"); + noops++; + Page result = prev; + prev = null; + expandingBlock = null; + return result; + } + expandedBlock = expandingBlock.expand(); + if (expandedBlock == expandingBlock) { + // The expand was a noop - just return the previous page and clear state. + logger.trace("expanded to same"); + noops++; + Page result = prev; + prev = null; + expandingBlock = null; + expandedBlock = null; + return result; + } + if (prev.getBlockCount() == 1) { + /* + * The expand wasn't a noop, but there's only a single block in the result + * so the expansion didn't really make it take more memory. It should be safe + * to return it directly. + */ + logger.trace("single block output"); + assert channel == 0; prev.releaseBlocks(); prev = null; + expandingBlock = null; + Page result = new Page(expandedBlock); + expandedBlock = null; + return result; } } + logger.trace("slicing"); + return sliceExpandedIntoPages(); } - protected Page process() { - if (expandedBlock == expandingBlock) { - noops++; - prevCompleted = true; - return prev; - } - if (prev.getBlockCount() == 1) { - assert channel == 0; - prevCompleted = true; - return new Page(expandedBlock); - } - + private Page sliceExpandedIntoPages() { + prevCompleted = false; int[] duplicateFilter = nextDuplicateExpandingFilter(); Block[] result = new Block[prev.getBlockCount()]; - int[] expandedMask = new int[duplicateFilter.length]; - for (int i = 0; i < expandedMask.length; i++) { - expandedMask[i] = i + nextItemOnExpanded; - } - nextItemOnExpanded += expandedMask.length; - for (int b = 0; b < result.length; b++) { - result[b] = b == channel ? expandedBlock.filter(expandedMask) : prev.getBlock(b).filter(duplicateFilter); + boolean success = false; + try { + int[] expandedMask = new int[duplicateFilter.length]; + for (int i = 0; i < expandedMask.length; i++) { + expandedMask[i] = i + nextItemOnExpanded; + } + nextItemOnExpanded += expandedMask.length; + for (int b = 0; b < result.length; b++) { + result[b] = b == channel ? expandedBlock.filter(expandedMask) : prev.getBlock(b).filter(duplicateFilter); + } + success = true; + } finally { + if (success == false) { + Releasables.closeExpectNoException(result); + } } if (nextItemOnExpanded == expandedBlock.getPositionCount()) { nextItemOnExpanded = 0; } + if (prevCompleted) { + Releasables.closeExpectNoException(() -> { + if (prev != null) { + prev.releaseBlocks(); + prev = null; + } + }, expandedBlock); + expandingBlock = null; + expandedBlock = null; + } return new Page(result); } @@ -175,9 +216,7 @@ public final void addInput(Page page) { assert prev == null : "has pending input page"; prev = page; this.expandingBlock = prev.getBlock(channel); - this.expandedBlock = expandingBlock.expand(); pagesIn++; - prevCompleted = false; } @Override @@ -197,9 +236,11 @@ public final Status status() { @Override public void close() { - if (prev != null) { - Releasables.closeExpectNoException(() -> prev.releaseBlocks()); - } + Releasables.closeExpectNoException(() -> { + if (prev != null) { + prev.releaseBlocks(); + } + }, expandedBlock); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java index 9a362ad4e3ca3..1b0e61cea8135 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java @@ -11,7 +11,14 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.test.ESTestCase; +import org.junit.After; import java.util.ArrayList; import java.util.Arrays; @@ -45,20 +52,23 @@ public BlockMultiValuedTests(@Name("elementType") ElementType elementType, @Name public void testMultiValued() { int positionCount = randomIntBetween(1, 16 * 1024); - var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10, 0, 0); + var b = BasicBlockTests.randomBlock(blockFactory(), elementType, positionCount, nullAllowed, 0, 10, 0, 0); + try { + assertThat(b.block().getPositionCount(), equalTo(positionCount)); + assertThat(b.block().getTotalValueCount(), equalTo(b.valueCount())); + for (int p = 0; p < positionCount; p++) { + BlockTestUtils.assertPositionValues(b.block(), p, equalTo(b.values().get(p))); + } - assertThat(b.block().getPositionCount(), equalTo(positionCount)); - assertThat(b.block().getTotalValueCount(), equalTo(b.valueCount())); - for (int p = 0; p < positionCount; p++) { - BlockTestUtils.assertPositionValues(b.block(), p, equalTo(b.values().get(p))); + assertThat(b.block().mayHaveMultivaluedFields(), equalTo(b.values().stream().anyMatch(l -> l != null && l.size() > 1))); + } finally { + b.block().close(); } - - assertThat(b.block().mayHaveMultivaluedFields(), equalTo(b.values().stream().anyMatch(l -> l != null && l.size() > 1))); } public void testExpand() { int positionCount = randomIntBetween(1, 16 * 1024); - var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 100, 0, 0); + var b = BasicBlockTests.randomBlock(blockFactory(), elementType, positionCount, nullAllowed, 0, 100, 0, 0); assertExpanded(b.block()); } @@ -96,31 +106,37 @@ public void testFilteredJumbledSubsetThenExpanded() { private void assertFiltered(boolean all, boolean shuffled) { int positionCount = randomIntBetween(1, 16 * 1024); - var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10, 0, 0); - int[] positions = randomFilterPositions(b.block(), all, shuffled); - Block filtered = b.block().filter(positions); - - assertThat(filtered.getPositionCount(), equalTo(positions.length)); - - int expectedValueCount = 0; - for (int p : positions) { - List values = b.values().get(p); - if (values != null) { - expectedValueCount += values.size(); + var b = BasicBlockTests.randomBlock(blockFactory(), elementType, positionCount, nullAllowed, 0, 10, 0, 0); + try { + int[] positions = randomFilterPositions(b.block(), all, shuffled); + Block filtered = b.block().filter(positions); + try { + assertThat(filtered.getPositionCount(), equalTo(positions.length)); + + int expectedValueCount = 0; + for (int p : positions) { + List values = b.values().get(p); + if (values != null) { + expectedValueCount += values.size(); + } + } + assertThat(filtered.getTotalValueCount(), equalTo(expectedValueCount)); + for (int r = 0; r < positions.length; r++) { + if (b.values().get(positions[r]) == null) { + assertThat(filtered.getValueCount(r), equalTo(0)); + assertThat(filtered.isNull(r), equalTo(true)); + } else { + assertThat(filtered.getValueCount(r), equalTo(b.values().get(positions[r]).size())); + assertThat(BasicBlockTests.valuesAtPositions(filtered, r, r + 1).get(0), equalTo(b.values().get(positions[r]))); + } + } + } finally { + filtered.close(); } + assertThat(b.block().mayHaveMultivaluedFields(), equalTo(b.values().stream().anyMatch(l -> l != null && l.size() > 1))); + } finally { + b.block().close(); } - assertThat(filtered.getTotalValueCount(), equalTo(expectedValueCount)); - for (int r = 0; r < positions.length; r++) { - if (b.values().get(positions[r]) == null) { - assertThat(filtered.getValueCount(r), equalTo(0)); - assertThat(filtered.isNull(r), equalTo(true)); - } else { - assertThat(filtered.getValueCount(r), equalTo(b.values().get(positions[r]).size())); - assertThat(BasicBlockTests.valuesAtPositions(filtered, r, r + 1).get(0), equalTo(b.values().get(positions[r]))); - } - } - - assertThat(b.block().mayHaveMultivaluedFields(), equalTo(b.values().stream().anyMatch(l -> l != null && l.size() > 1))); } private int[] randomFilterPositions(Block orig, boolean all, boolean shuffled) { @@ -135,30 +151,65 @@ private int[] randomFilterPositions(Block orig, boolean all, boolean shuffled) { } private void assertExpanded(Block orig) { - Block expanded = orig.expand(); - assertThat(expanded.getPositionCount(), equalTo(orig.getTotalValueCount() + orig.nullValuesCount())); - assertThat(expanded.getTotalValueCount(), equalTo(orig.getTotalValueCount())); - - int np = 0; - for (int op = 0; op < orig.getPositionCount(); op++) { - if (orig.isNull(op)) { - assertThat(expanded.isNull(np), equalTo(true)); - assertThat(expanded.getValueCount(np++), equalTo(0)); - continue; - } - List oValues = BasicBlockTests.valuesAtPositions(orig, op, op + 1).get(0); - for (Object ov : oValues) { - assertThat(expanded.isNull(np), equalTo(false)); - assertThat(expanded.getValueCount(np), equalTo(1)); - assertThat(BasicBlockTests.valuesAtPositions(expanded, np, ++np).get(0), equalTo(List.of(ov))); + try (orig; Block expanded = orig.expand()) { + assertThat(expanded.getPositionCount(), equalTo(orig.getTotalValueCount() + orig.nullValuesCount())); + assertThat(expanded.getTotalValueCount(), equalTo(orig.getTotalValueCount())); + + int np = 0; + for (int op = 0; op < orig.getPositionCount(); op++) { + if (orig.isNull(op)) { + assertThat(expanded.isNull(np), equalTo(true)); + assertThat(expanded.getValueCount(np++), equalTo(0)); + continue; + } + List oValues = BasicBlockTests.valuesAtPositions(orig, op, op + 1).get(0); + for (Object ov : oValues) { + assertThat(expanded.isNull(np), equalTo(false)); + assertThat(expanded.getValueCount(np), equalTo(1)); + assertThat(BasicBlockTests.valuesAtPositions(expanded, np, ++np).get(0), equalTo(List.of(ov))); + } } } } private void assertFilteredThenExpanded(boolean all, boolean shuffled) { int positionCount = randomIntBetween(1, 16 * 1024); - var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10, 0, 0); - int[] positions = randomFilterPositions(b.block(), all, shuffled); - assertExpanded(b.block().filter(positions)); + var b = BasicBlockTests.randomBlock(blockFactory(), elementType, positionCount, nullAllowed, 0, 10, 0, 0); + try { + int[] positions = randomFilterPositions(b.block(), all, shuffled); + assertExpanded(b.block().filter(positions)); + } finally { + b.block().close(); + } + } + + private final List breakers = new ArrayList<>(); + private final List blockFactories = new ArrayList<>(); + + /** + * A {@link DriverContext} with a breaking {@link BigArrays} and {@link BlockFactory}. + */ + protected BlockFactory blockFactory() { // TODO move this to driverContext once everyone supports breaking + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); + CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); + breakers.add(breaker); + BlockFactory factory = new MockBlockFactory(breaker, bigArrays); + blockFactories.add(factory); + return factory; + } + + @After + public void allBreakersEmpty() throws Exception { + // first check that all big arrays are released, which can affect breakers + MockBigArrays.ensureAllArraysAreReleased(); + + for (CircuitBreaker breaker : breakers) { + for (var factory : blockFactories) { + if (factory instanceof MockBlockFactory mockBlockFactory) { + mockBlockFactory.ensureAllBlocksAreReleased(); + } + } + assertThat("Unexpected used in breaker: " + breaker, breaker.getUsed(), equalTo(0L)); + } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java index f99685609ff78..a105818a2bf03 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java @@ -40,8 +40,8 @@ protected int remaining() { protected Page createPage(int positionOffset, int length) { idx += length; return new Page( - randomBlock(ElementType.INT, length, true, 1, 10, 0, 0).block(), - randomBlock(ElementType.INT, length, false, 1, 10, 0, 0).block() + randomBlock(blockFactory, ElementType.INT, length, true, 1, 10, 0, 0).block(), + randomBlock(blockFactory, ElementType.INT, length, false, 1, 10, 0, 0).block() ); } }; @@ -257,4 +257,9 @@ protected Page createPage(int positionOffset, int length) { List results = drive(new MvExpandOperator(0, randomIntBetween(1, 1000)), input.iterator(), context); assertSimpleOutput(origInput, results); } + + @Override + protected DriverContext driverContext() { + return breakingDriverContext(); + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java.orig b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java.orig new file mode 100644 index 0000000000000..f99685609ff78 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java.orig @@ -0,0 +1,260 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.Page; + +import java.util.Iterator; +import java.util.List; + +import static org.elasticsearch.compute.data.BasicBlockTests.randomBlock; +import static org.elasticsearch.compute.data.BasicBlockTests.valuesAtPositions; +import static org.elasticsearch.compute.data.BlockTestUtils.deepCopyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public class MvExpandOperatorTests extends OperatorTestCase { + @Override + protected SourceOperator simpleInput(BlockFactory blockFactory, int end) { + return new AbstractBlockSourceOperator(blockFactory, 8 * 1024) { + private int idx; + + @Override + protected int remaining() { + return end - idx; + } + + @Override + protected Page createPage(int positionOffset, int length) { + idx += length; + return new Page( + randomBlock(ElementType.INT, length, true, 1, 10, 0, 0).block(), + randomBlock(ElementType.INT, length, false, 1, 10, 0, 0).block() + ); + } + }; + } + + @Override + protected Operator.OperatorFactory simple(BigArrays bigArrays) { + return new MvExpandOperator.Factory(0, randomIntBetween(1, 1000)); + } + + @Override + protected String expectedDescriptionOfSimple() { + return "MvExpandOperator[channel=0]"; + } + + @Override + protected String expectedToStringOfSimple() { + return expectedDescriptionOfSimple(); + } + + class BlockListIterator implements Iterator { + private final Iterator pagesIterator; + private final int channel; + private Block currentBlock; + private int nextPosition; + + BlockListIterator(List pages, int channel) { + this.pagesIterator = pages.iterator(); + this.channel = channel; + this.currentBlock = pagesIterator.next().getBlock(channel); + this.nextPosition = 0; + } + + @Override + public boolean hasNext() { + if (currentBlock == null) { + return false; + } + + return currentBlock.getValueCount(nextPosition) == 0 + || nextPosition < currentBlock.getPositionCount() + || pagesIterator.hasNext(); + } + + @Override + public Object next() { + if (currentBlock != null && currentBlock.getValueCount(nextPosition) == 0) { + nextPosition++; + if (currentBlock.getPositionCount() == nextPosition) { + loadNextBlock(); + } + return null; + } + List items = valuesAtPositions(currentBlock, nextPosition, nextPosition + 1).get(0); + nextPosition++; + if (currentBlock.getPositionCount() == nextPosition) { + loadNextBlock(); + } + return items.size() == 1 ? items.get(0) : items; + } + + private void loadNextBlock() { + if (pagesIterator.hasNext() == false) { + currentBlock = null; + return; + } + this.currentBlock = pagesIterator.next().getBlock(channel); + nextPosition = 0; + } + } + + class BlockListIteratorExpander implements Iterator { + private final Iterator pagesIterator; + private final int channel; + private Block currentBlock; + private int nextPosition; + private int nextInPosition; + + BlockListIteratorExpander(List pages, int channel) { + this.pagesIterator = pages.iterator(); + this.channel = channel; + this.currentBlock = pagesIterator.next().getBlock(channel); + this.nextPosition = 0; + this.nextInPosition = 0; + } + + @Override + public boolean hasNext() { + if (currentBlock == null) { + return false; + } + + return currentBlock.getValueCount(nextPosition) == 0 + || nextInPosition < currentBlock.getValueCount(nextPosition) + || nextPosition < currentBlock.getPositionCount() + || pagesIterator.hasNext(); + } + + @Override + public Object next() { + if (currentBlock != null && currentBlock.getValueCount(nextPosition) == 0) { + nextPosition++; + if (currentBlock.getPositionCount() == nextPosition) { + loadNextBlock(); + } + return null; + } + List items = valuesAtPositions(currentBlock, nextPosition, nextPosition + 1).get(0); + Object result = items == null ? null : items.get(nextInPosition++); + if (nextInPosition == currentBlock.getValueCount(nextPosition)) { + nextPosition++; + nextInPosition = 0; + } + if (currentBlock.getPositionCount() == nextPosition) { + loadNextBlock(); + } + return result; + } + + private void loadNextBlock() { + if (pagesIterator.hasNext() == false) { + currentBlock = null; + return; + } + this.currentBlock = pagesIterator.next().getBlock(channel); + nextPosition = 0; + nextInPosition = 0; + } + } + + @Override + protected void assertSimpleOutput(List input, List results) { + assertThat(results, hasSize(results.size())); + + var inputIter = new BlockListIteratorExpander(input, 0); + var resultIter = new BlockListIteratorExpander(results, 0); + + while (inputIter.hasNext()) { + assertThat(resultIter.hasNext(), equalTo(true)); + assertThat(resultIter.next(), equalTo(inputIter.next())); + } + assertThat(resultIter.hasNext(), equalTo(false)); + + var originalMvIter = new BlockListIterator(input, 0); + var inputIter2 = new BlockListIterator(input, 1); + var resultIter2 = new BlockListIterator(results, 1); + + while (originalMvIter.hasNext()) { + Object originalMv = originalMvIter.next(); + int originalMvSize = originalMv instanceof List l ? l.size() : 1; + assertThat(resultIter2.hasNext(), equalTo(true)); + Object inputValue = inputIter2.next(); + for (int j = 0; j < originalMvSize; j++) { + assertThat(resultIter2.next(), equalTo(inputValue)); + } + } + assertThat(resultIter2.hasNext(), equalTo(false)); + } + + @Override + protected ByteSizeValue smallEnoughToCircuitBreak() { + assumeTrue("doesn't use big arrays so can't break", false); + return null; + } + + public void testNoopStatus() { + MvExpandOperator op = new MvExpandOperator(0, randomIntBetween(1, 1000)); + List result = drive( + op, + List.of(new Page(IntVector.newVectorBuilder(2).appendInt(1).appendInt(2).build().asBlock())).iterator(), + driverContext() + ); + assertThat(result, hasSize(1)); + assertThat(valuesAtPositions(result.get(0).getBlock(0), 0, 2), equalTo(List.of(List.of(1), List.of(2)))); + MvExpandOperator.Status status = op.status(); + assertThat(status.pagesIn(), equalTo(1)); + assertThat(status.pagesOut(), equalTo(1)); + assertThat(status.noops(), equalTo(1)); + } + + public void testExpandStatus() { + MvExpandOperator op = new MvExpandOperator(0, randomIntBetween(1, 1)); + var builder = IntBlock.newBlockBuilder(2).beginPositionEntry().appendInt(1).appendInt(2).endPositionEntry(); + List result = drive(op, List.of(new Page(builder.build())).iterator(), driverContext()); + assertThat(result, hasSize(1)); + assertThat(valuesAtPositions(result.get(0).getBlock(0), 0, 2), equalTo(List.of(List.of(1), List.of(2)))); + MvExpandOperator.Status status = op.status(); + assertThat(status.pagesIn(), equalTo(1)); + assertThat(status.pagesOut(), equalTo(1)); + assertThat(status.noops(), equalTo(0)); + } + + public void testExpandWithBytesRefs() { + DriverContext context = driverContext(); + List input = CannedSourceOperator.collectPages(new AbstractBlockSourceOperator(context.blockFactory(), 8 * 1024) { + private int idx; + + @Override + protected int remaining() { + return 10000 - idx; + } + + @Override + protected Page createPage(int positionOffset, int length) { + idx += length; + return new Page( + randomBlock(context.blockFactory(), ElementType.BYTES_REF, length, true, 1, 10, 0, 0).block(), + randomBlock(context.blockFactory(), ElementType.INT, length, false, 1, 10, 0, 0).block() + ); + } + }); + List origInput = deepCopyOf(input, BlockFactory.getNonBreakingInstance()); + List results = drive(new MvExpandOperator(0, randomIntBetween(1, 1000)), input.iterator(), context); + assertSimpleOutput(origInput, results); + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java index 5d881f03bd07f..b8d9783e2aed5 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java @@ -129,17 +129,17 @@ public final void testSimpleWithCranky() { BlockFactory blockFactory = BlockFactory.getInstance(cranky.getBreaker(CircuitBreaker.REQUEST), bigArrays); DriverContext driverContext = new DriverContext(bigArrays, blockFactory); - boolean[] driverStarted = new boolean[1]; + boolean driverStarted = false; try { Operator operator = simple(bigArrays).get(driverContext); - driverStarted[0] = true; - List result = drive(operator, input.iterator(), driverContext); + driverStarted = true; + drive(operator, input.iterator(), driverContext); // Either we get lucky and cranky doesn't throw and the test completes or we don't and it throws } catch (CircuitBreakingException e) { logger.info("broken", e); assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE)); } - if (driverStarted[0] == false) { + if (driverStarted == false) { // if drive hasn't even started then we need to release the input pages Releasables.closeExpectNoException(Releasables.wrap(() -> Iterators.map(input.iterator(), p -> p::releaseBlocks))); }