From 62df9531b07304267b86fceeea20d8383e1eee62 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 9 Oct 2023 16:08:32 -0400 Subject: [PATCH] ESQL: Copy blocks when expanding Until we get lovely reference tracking, let's copy the blocks. This fixes the memory tracking around mv_expand, but it requires a copy. For now, that's simple and it works. I expect we won't copy forever. We can likely share the array once we get reference tracking. But for now, let's copy! --- .../compute/data/BooleanArrayBlock.java | 20 ++- .../compute/data/BytesRefArrayBlock.java | 21 ++- .../compute/data/DoubleArrayBlock.java | 20 ++- .../compute/data/IntArrayBlock.java | 20 ++- .../compute/data/LongArrayBlock.java | 20 ++- .../compute/data/X-ArrayBlock.java.st | 31 +++- .../compute/operator/MvExpandOperator.java | 33 ++-- .../compute/data/BlockMultiValuedTests.java | 151 ++++++++++++------ .../operator/MvExpandOperatorTests.java | 44 +++-- .../compute/operator/OperatorTestCase.java | 8 +- 10 files changed, 253 insertions(+), 115 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java index adf1282c21fb0..ef4ab424de9e4 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java @@ -11,7 +11,6 @@ import java.util.Arrays; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of boolean. @@ -83,12 +82,21 @@ public BooleanBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new BooleanArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + try (var builder = blockFactory.newBooleanBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendBoolean(getBoolean(i)); + } + } + return builder.mvOrdering(mvOrdering()).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new BooleanArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(boolean[] values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java index f46615307f767..b3bacb1936706 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java @@ -13,7 +13,6 @@ import org.elasticsearch.core.Releasables; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of BytesRef. @@ -86,12 +85,22 @@ public BytesRefBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new BytesRefArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + final BytesRef scratch = new BytesRef(); + try (var builder = blockFactory.newBytesRefBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendBytesRef(getBytesRef(i, scratch)); + } + } + return builder.mvOrdering(mvOrdering()).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new BytesRefArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(BytesRefArray values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java index b0d77dd71271e..7573513b8f71d 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java @@ -11,7 +11,6 @@ import java.util.Arrays; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of double. @@ -83,12 +82,21 @@ public DoubleBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new DoubleArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + try (var builder = blockFactory.newDoubleBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendDouble(getDouble(i)); + } + } + return builder.mvOrdering(mvOrdering()).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new DoubleArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(double[] values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java index 97791a03c6044..7bd62df6441f0 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java @@ -11,7 +11,6 @@ import java.util.Arrays; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of int. @@ -83,12 +82,21 @@ public IntBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new IntArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + try (var builder = blockFactory.newIntBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendInt(getInt(i)); + } + } + return builder.mvOrdering(mvOrdering()).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new IntArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(int[] values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java index dddc5296e471e..45bf29fe00933 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java @@ -11,7 +11,6 @@ import java.util.Arrays; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of long. @@ -83,12 +82,21 @@ public LongBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new LongArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + try (var builder = blockFactory.newLongBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendLong(getLong(i)); + } + } + return builder.mvOrdering(mvOrdering()).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new LongArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(long[] values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st index 1f9fb93bc65c6..b951847b91fc5 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st @@ -19,7 +19,6 @@ import org.apache.lucene.util.RamUsageEstimator; import java.util.Arrays; $endif$ import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of $type$. @@ -69,9 +68,9 @@ $endif$ @Override public $Type$Block filter(int... positions) { - $if(BytesRef)$ +$if(BytesRef)$ final BytesRef scratch = new BytesRef(); - $endif$ +$endif$ try (var builder = blockFactory.new$Type$BlockBuilder(positions.length)) { for (int pos : positions) { if (isNull(pos)) { @@ -104,12 +103,28 @@ $endif$ if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new $Type$ArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values +$if(BytesRef)$ + final BytesRef scratch = new BytesRef(); +$endif$ + try (var builder = blockFactory.new$Type$BlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { +$if(BytesRef)$ + builder.append$Type$(get$Type$(i, scratch)); +$else$ + builder.append$Type$(get$Type$(i)); +$endif$ + } + } + return builder.mvOrdering(mvOrdering()).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new $Type$ArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated($if(BytesRef)$BytesRefArray$else$$type$[]$endif$ values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java index f6156507dffa2..a0a2b4e119075 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Releasables; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; @@ -60,18 +61,30 @@ protected Page process(Page page) { noops++; return page; } - if (page.getBlockCount() == 1) { - assert channel == 0; - return new Page(expandedBlock); - } - - int[] duplicateFilter = buildDuplicateExpandingFilter(expandingBlock, expandedBlock.getPositionCount()); + try { + if (page.getBlockCount() == 1) { + assert channel == 0; + return new Page(expandedBlock); + } - Block[] result = new Block[page.getBlockCount()]; - for (int b = 0; b < result.length; b++) { - result[b] = b == channel ? expandedBlock : page.getBlock(b).filter(duplicateFilter); + int[] duplicateFilter = buildDuplicateExpandingFilter(expandingBlock, expandedBlock.getPositionCount()); + + Block[] result = new Block[page.getBlockCount()]; + boolean success = false; + try { + for (int b = 0; b < result.length; b++) { + result[b] = b == channel ? expandedBlock : page.getBlock(b).filter(duplicateFilter); + } + success = true; + return new Page(result); + } finally { + if (success == false) { + Releasables.close(result); + } + } + } finally { + page.releaseBlocks(); } - return new Page(result); } private int[] buildDuplicateExpandingFilter(Block expandingBlock, int newPositions) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java index 9a362ad4e3ca3..1b0e61cea8135 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java @@ -11,7 +11,14 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.test.ESTestCase; +import org.junit.After; import java.util.ArrayList; import java.util.Arrays; @@ -45,20 +52,23 @@ public BlockMultiValuedTests(@Name("elementType") ElementType elementType, @Name public void testMultiValued() { int positionCount = randomIntBetween(1, 16 * 1024); - var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10, 0, 0); + var b = BasicBlockTests.randomBlock(blockFactory(), elementType, positionCount, nullAllowed, 0, 10, 0, 0); + try { + assertThat(b.block().getPositionCount(), equalTo(positionCount)); + assertThat(b.block().getTotalValueCount(), equalTo(b.valueCount())); + for (int p = 0; p < positionCount; p++) { + BlockTestUtils.assertPositionValues(b.block(), p, equalTo(b.values().get(p))); + } - assertThat(b.block().getPositionCount(), equalTo(positionCount)); - assertThat(b.block().getTotalValueCount(), equalTo(b.valueCount())); - for (int p = 0; p < positionCount; p++) { - BlockTestUtils.assertPositionValues(b.block(), p, equalTo(b.values().get(p))); + assertThat(b.block().mayHaveMultivaluedFields(), equalTo(b.values().stream().anyMatch(l -> l != null && l.size() > 1))); + } finally { + b.block().close(); } - - assertThat(b.block().mayHaveMultivaluedFields(), equalTo(b.values().stream().anyMatch(l -> l != null && l.size() > 1))); } public void testExpand() { int positionCount = randomIntBetween(1, 16 * 1024); - var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 100, 0, 0); + var b = BasicBlockTests.randomBlock(blockFactory(), elementType, positionCount, nullAllowed, 0, 100, 0, 0); assertExpanded(b.block()); } @@ -96,31 +106,37 @@ public void testFilteredJumbledSubsetThenExpanded() { private void assertFiltered(boolean all, boolean shuffled) { int positionCount = randomIntBetween(1, 16 * 1024); - var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10, 0, 0); - int[] positions = randomFilterPositions(b.block(), all, shuffled); - Block filtered = b.block().filter(positions); - - assertThat(filtered.getPositionCount(), equalTo(positions.length)); - - int expectedValueCount = 0; - for (int p : positions) { - List values = b.values().get(p); - if (values != null) { - expectedValueCount += values.size(); + var b = BasicBlockTests.randomBlock(blockFactory(), elementType, positionCount, nullAllowed, 0, 10, 0, 0); + try { + int[] positions = randomFilterPositions(b.block(), all, shuffled); + Block filtered = b.block().filter(positions); + try { + assertThat(filtered.getPositionCount(), equalTo(positions.length)); + + int expectedValueCount = 0; + for (int p : positions) { + List values = b.values().get(p); + if (values != null) { + expectedValueCount += values.size(); + } + } + assertThat(filtered.getTotalValueCount(), equalTo(expectedValueCount)); + for (int r = 0; r < positions.length; r++) { + if (b.values().get(positions[r]) == null) { + assertThat(filtered.getValueCount(r), equalTo(0)); + assertThat(filtered.isNull(r), equalTo(true)); + } else { + assertThat(filtered.getValueCount(r), equalTo(b.values().get(positions[r]).size())); + assertThat(BasicBlockTests.valuesAtPositions(filtered, r, r + 1).get(0), equalTo(b.values().get(positions[r]))); + } + } + } finally { + filtered.close(); } + assertThat(b.block().mayHaveMultivaluedFields(), equalTo(b.values().stream().anyMatch(l -> l != null && l.size() > 1))); + } finally { + b.block().close(); } - assertThat(filtered.getTotalValueCount(), equalTo(expectedValueCount)); - for (int r = 0; r < positions.length; r++) { - if (b.values().get(positions[r]) == null) { - assertThat(filtered.getValueCount(r), equalTo(0)); - assertThat(filtered.isNull(r), equalTo(true)); - } else { - assertThat(filtered.getValueCount(r), equalTo(b.values().get(positions[r]).size())); - assertThat(BasicBlockTests.valuesAtPositions(filtered, r, r + 1).get(0), equalTo(b.values().get(positions[r]))); - } - } - - assertThat(b.block().mayHaveMultivaluedFields(), equalTo(b.values().stream().anyMatch(l -> l != null && l.size() > 1))); } private int[] randomFilterPositions(Block orig, boolean all, boolean shuffled) { @@ -135,30 +151,65 @@ private int[] randomFilterPositions(Block orig, boolean all, boolean shuffled) { } private void assertExpanded(Block orig) { - Block expanded = orig.expand(); - assertThat(expanded.getPositionCount(), equalTo(orig.getTotalValueCount() + orig.nullValuesCount())); - assertThat(expanded.getTotalValueCount(), equalTo(orig.getTotalValueCount())); - - int np = 0; - for (int op = 0; op < orig.getPositionCount(); op++) { - if (orig.isNull(op)) { - assertThat(expanded.isNull(np), equalTo(true)); - assertThat(expanded.getValueCount(np++), equalTo(0)); - continue; - } - List oValues = BasicBlockTests.valuesAtPositions(orig, op, op + 1).get(0); - for (Object ov : oValues) { - assertThat(expanded.isNull(np), equalTo(false)); - assertThat(expanded.getValueCount(np), equalTo(1)); - assertThat(BasicBlockTests.valuesAtPositions(expanded, np, ++np).get(0), equalTo(List.of(ov))); + try (orig; Block expanded = orig.expand()) { + assertThat(expanded.getPositionCount(), equalTo(orig.getTotalValueCount() + orig.nullValuesCount())); + assertThat(expanded.getTotalValueCount(), equalTo(orig.getTotalValueCount())); + + int np = 0; + for (int op = 0; op < orig.getPositionCount(); op++) { + if (orig.isNull(op)) { + assertThat(expanded.isNull(np), equalTo(true)); + assertThat(expanded.getValueCount(np++), equalTo(0)); + continue; + } + List oValues = BasicBlockTests.valuesAtPositions(orig, op, op + 1).get(0); + for (Object ov : oValues) { + assertThat(expanded.isNull(np), equalTo(false)); + assertThat(expanded.getValueCount(np), equalTo(1)); + assertThat(BasicBlockTests.valuesAtPositions(expanded, np, ++np).get(0), equalTo(List.of(ov))); + } } } } private void assertFilteredThenExpanded(boolean all, boolean shuffled) { int positionCount = randomIntBetween(1, 16 * 1024); - var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10, 0, 0); - int[] positions = randomFilterPositions(b.block(), all, shuffled); - assertExpanded(b.block().filter(positions)); + var b = BasicBlockTests.randomBlock(blockFactory(), elementType, positionCount, nullAllowed, 0, 10, 0, 0); + try { + int[] positions = randomFilterPositions(b.block(), all, shuffled); + assertExpanded(b.block().filter(positions)); + } finally { + b.block().close(); + } + } + + private final List breakers = new ArrayList<>(); + private final List blockFactories = new ArrayList<>(); + + /** + * A {@link DriverContext} with a breaking {@link BigArrays} and {@link BlockFactory}. + */ + protected BlockFactory blockFactory() { // TODO move this to driverContext once everyone supports breaking + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); + CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); + breakers.add(breaker); + BlockFactory factory = new MockBlockFactory(breaker, bigArrays); + blockFactories.add(factory); + return factory; + } + + @After + public void allBreakersEmpty() throws Exception { + // first check that all big arrays are released, which can affect breakers + MockBigArrays.ensureAllArraysAreReleased(); + + for (CircuitBreaker breaker : breakers) { + for (var factory : blockFactories) { + if (factory instanceof MockBlockFactory mockBlockFactory) { + mockBlockFactory.ensureAllBlocksAreReleased(); + } + } + assertThat("Unexpected used in breaker: " + breaker, breaker.getUsed(), equalTo(0L)); + } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java index 69c965fc91323..e7388cff6840d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java @@ -10,7 +10,9 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.data.BasicBlockTests; +import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockTestUtils; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; @@ -38,8 +40,8 @@ protected int remaining() { protected Page createPage(int positionOffset, int length) { idx += length; return new Page( - randomBlock(ElementType.INT, length, true, 1, 10, 0, 0).block(), - randomBlock(ElementType.INT, length, false, 1, 10, 0, 0).block() + randomBlock(blockFactory, ElementType.INT, length, true, 1, 10, 0, 0).block(), + randomBlock(blockFactory, ElementType.INT, length, false, 1, 10, 0, 0).block() ); } }; @@ -64,8 +66,8 @@ protected String expectedToStringOfSimple() { protected void assertSimpleOutput(List input, List results) { assertThat(results, hasSize(results.size())); for (int i = 0; i < results.size(); i++) { - IntBlock origExpanded = input.get(i).getBlock(0); - IntBlock resultExpanded = results.get(i).getBlock(0); + Block origExpanded = input.get(i).getBlock(0); + Block resultExpanded = results.get(i).getBlock(0); int np = 0; for (int op = 0; op < origExpanded.getPositionCount(); op++) { if (origExpanded.isNull(op)) { @@ -81,8 +83,8 @@ protected void assertSimpleOutput(List input, List results) { } } - IntBlock origDuplicated = input.get(i).getBlock(1); - IntBlock resultDuplicated = results.get(i).getBlock(1); + Block origDuplicated = input.get(i).getBlock(1); + Block resultDuplicated = results.get(i).getBlock(1); np = 0; for (int op = 0; op < origDuplicated.getPositionCount(); op++) { int copies = origExpanded.isNull(op) ? 1 : origExpanded.getValueCount(op); @@ -134,10 +136,32 @@ public void testExpandStatus() { assertThat(status.noops(), equalTo(0)); } - // TODO: remove this once possible - // https://github.com/elastic/elasticsearch/issues/99826 + public void testExpandWithBytesRefs() { + DriverContext context = driverContext(); + List input = CannedSourceOperator.collectPages(new AbstractBlockSourceOperator(context.blockFactory(), 8 * 1024) { + private int idx; + + @Override + protected int remaining() { + return 10000 - idx; + } + + @Override + protected Page createPage(int positionOffset, int length) { + idx += length; + return new Page( + randomBlock(context.blockFactory(), ElementType.BYTES_REF, length, true, 1, 10, 0, 0).block(), + randomBlock(context.blockFactory(), ElementType.INT, length, false, 1, 10, 0, 0).block() + ); + } + }); + List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); + List results = drive(new MvExpandOperator(0), input.iterator(), context); + assertSimpleOutput(origInput, results); + } + @Override - protected boolean canLeak() { - return true; + protected DriverContext driverContext() { + return breakingDriverContext(); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java index 63f601669636c..5d881f03bd07f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java @@ -212,7 +212,7 @@ protected final void assertSimple(DriverContext context, int size) { unreleasedInputs++; } } - if ((canLeak() == false) && unreleasedInputs > 0) { + if (unreleasedInputs > 0) { throw new AssertionError("[" + unreleasedInputs + "] unreleased input blocks"); } } @@ -308,12 +308,6 @@ protected void start(Driver driver, ActionListener driverListener) { } } - // TODO: Remove this once all operators do not leak anymore - // https://github.com/elastic/elasticsearch/issues/99826 - protected boolean canLeak() { - return false; - } - public static void assertDriverContext(DriverContext driverContext) { assertTrue(driverContext.isFinished()); assertThat(driverContext.getSnapshot().releasables(), empty());