Skip to content

Commit

Permalink
ESQL: Copy blocks on expand
Browse files Browse the repository at this point in the history
Until we get lovely reference tracking, let's copy blocks when "expanding"
them. For now, that's simple and it works and helps make memory tracking cleaner.
I expect we won't copy forever. We can likely share the array once we get
reference tracking. But for now, let's copy!

Closes elastic#100548
  • Loading branch information
nik9000 committed Oct 12, 2023
1 parent b1b28ba commit a010443
Show file tree
Hide file tree
Showing 12 changed files with 543 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static void assertFitsIn(ByteSizeValue max, Function<BigArrays, Releasabl
* Tracking allocations is useful when debugging a leak but shouldn't be enabled by default as this would also be very costly
* since it creates a new Exception every time a new array is created.
*/
private static final boolean TRACK_ALLOCATIONS = false;
private static final boolean TRACK_ALLOCATIONS = true;

private static final ConcurrentMap<Object, Object> ACQUIRED_ARRAYS = new ConcurrentHashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.Arrays;
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of boolean.
Expand Down Expand Up @@ -83,12 +82,21 @@ public BooleanBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new BooleanArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
try (var builder = blockFactory.newBooleanBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendBoolean(getBoolean(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new BooleanArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(boolean[] values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.core.Releasables;

import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of BytesRef.
Expand Down Expand Up @@ -86,12 +85,22 @@ public BytesRefBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new BytesRefArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
final BytesRef scratch = new BytesRef();
try (var builder = blockFactory.newBytesRefBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendBytesRef(getBytesRef(i, scratch));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new BytesRefArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(BytesRefArray values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.Arrays;
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of double.
Expand Down Expand Up @@ -83,12 +82,21 @@ public DoubleBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new DoubleArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
try (var builder = blockFactory.newDoubleBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendDouble(getDouble(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new DoubleArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(double[] values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.Arrays;
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of int.
Expand Down Expand Up @@ -83,12 +82,21 @@ public IntBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new IntArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
try (var builder = blockFactory.newIntBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendInt(getInt(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new IntArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(int[] values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.Arrays;
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of long.
Expand Down Expand Up @@ -83,12 +82,21 @@ public LongBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new LongArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
try (var builder = blockFactory.newLongBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendLong(getLong(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new LongArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(long[] values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import org.apache.lucene.util.RamUsageEstimator;
import java.util.Arrays;
$endif$
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of $type$.
Expand Down Expand Up @@ -69,9 +68,9 @@ $endif$

@Override
public $Type$Block filter(int... positions) {
$if(BytesRef)$
$if(BytesRef)$
final BytesRef scratch = new BytesRef();
$endif$
$endif$
try (var builder = blockFactory.new$Type$BlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
Expand Down Expand Up @@ -104,12 +103,28 @@ $endif$
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new $Type$ArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
$if(BytesRef)$
final BytesRef scratch = new BytesRef();
$endif$
try (var builder = blockFactory.new$Type$BlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
$if(BytesRef)$
builder.append$Type$(get$Type$(i, scratch));
$else$
builder.append$Type$(get$Type$(i));
$endif$
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new $Type$ArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated($if(BytesRef)$BytesRefArray$else$$type$[]$endif$ values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Loading

0 comments on commit a010443

Please sign in to comment.