Skip to content

Commit

Permalink
ESQL: Copy blocks when expanding
Browse files Browse the repository at this point in the history
Until we get lovely reference tracking, let's copy the blocks. This
fixes the memory tracking around mv_expand, but it requires a copy. For
now, that's simple and it works. I expect we won't copy forever. We can
likely share the array once we get reference tracking. But for now,
let's copy!
  • Loading branch information
nik9000 committed Oct 9, 2023
1 parent bd1c09f commit 62df953
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.Arrays;
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of boolean.
Expand Down Expand Up @@ -83,12 +82,21 @@ public BooleanBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new BooleanArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
try (var builder = blockFactory.newBooleanBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendBoolean(getBoolean(i));
}
}
return builder.mvOrdering(mvOrdering()).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new BooleanArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(boolean[] values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.core.Releasables;

import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of BytesRef.
Expand Down Expand Up @@ -86,12 +85,22 @@ public BytesRefBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new BytesRefArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
final BytesRef scratch = new BytesRef();
try (var builder = blockFactory.newBytesRefBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendBytesRef(getBytesRef(i, scratch));
}
}
return builder.mvOrdering(mvOrdering()).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new BytesRefArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(BytesRefArray values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.Arrays;
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of double.
Expand Down Expand Up @@ -83,12 +82,21 @@ public DoubleBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new DoubleArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
try (var builder = blockFactory.newDoubleBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendDouble(getDouble(i));
}
}
return builder.mvOrdering(mvOrdering()).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new DoubleArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(double[] values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.Arrays;
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of int.
Expand Down Expand Up @@ -83,12 +82,21 @@ public IntBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new IntArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
try (var builder = blockFactory.newIntBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendInt(getInt(i));
}
}
return builder.mvOrdering(mvOrdering()).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new IntArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(int[] values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.Arrays;
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of long.
Expand Down Expand Up @@ -83,12 +82,21 @@ public LongBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new LongArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
try (var builder = blockFactory.newLongBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendLong(getLong(i));
}
}
return builder.mvOrdering(mvOrdering()).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new LongArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(long[] values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import org.apache.lucene.util.RamUsageEstimator;
import java.util.Arrays;
$endif$
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of $type$.
Expand Down Expand Up @@ -69,9 +68,9 @@ $endif$

@Override
public $Type$Block filter(int... positions) {
$if(BytesRef)$
$if(BytesRef)$
final BytesRef scratch = new BytesRef();
$endif$
$endif$
try (var builder = blockFactory.new$Type$BlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
Expand Down Expand Up @@ -104,12 +103,28 @@ $endif$
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new $Type$ArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
$if(BytesRef)$
final BytesRef scratch = new BytesRef();
$endif$
try (var builder = blockFactory.new$Type$BlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
$if(BytesRef)$
builder.append$Type$(get$Type$(i, scratch));
$else$
builder.append$Type$(get$Type$(i));
$endif$
}
}
return builder.mvOrdering(mvOrdering()).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new $Type$ArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated($if(BytesRef)$BytesRefArray$else$$type$[]$endif$ values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
Expand Down Expand Up @@ -60,18 +61,30 @@ protected Page process(Page page) {
noops++;
return page;
}
if (page.getBlockCount() == 1) {
assert channel == 0;
return new Page(expandedBlock);
}

int[] duplicateFilter = buildDuplicateExpandingFilter(expandingBlock, expandedBlock.getPositionCount());
try {
if (page.getBlockCount() == 1) {
assert channel == 0;
return new Page(expandedBlock);
}

Block[] result = new Block[page.getBlockCount()];
for (int b = 0; b < result.length; b++) {
result[b] = b == channel ? expandedBlock : page.getBlock(b).filter(duplicateFilter);
int[] duplicateFilter = buildDuplicateExpandingFilter(expandingBlock, expandedBlock.getPositionCount());

Block[] result = new Block[page.getBlockCount()];
boolean success = false;
try {
for (int b = 0; b < result.length; b++) {
result[b] = b == channel ? expandedBlock : page.getBlock(b).filter(duplicateFilter);
}
success = true;
return new Page(result);
} finally {
if (success == false) {
Releasables.close(result);
}
}
} finally {
page.releaseBlocks();
}
return new Page(result);
}

private int[] buildDuplicateExpandingFilter(Block expandingBlock, int newPositions) {
Expand Down
Loading

0 comments on commit 62df953

Please sign in to comment.