Skip to content

Commit

Permalink
Use checked circuit breaker and fix CategorizeBlockHashTests
Browse files Browse the repository at this point in the history
  • Loading branch information
ivancea committed Nov 27, 2024
1 parent 3771532 commit 599b02b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

public abstract class BlockHashTestCase extends ESTestCase {

final CircuitBreaker breaker = new MockBigArrays.LimitedBreaker("esql-test-breaker", ByteSizeValue.ofGb(1));
final CircuitBreaker breaker = newLimitedBreaker(ByteSizeValue.ofGb(1));
final BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, mockBreakerService(breaker));
final MockBlockFactory blockFactory = new MockBlockFactory(breaker, bigArrays);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public void testCategorizeRaw() {
hash.add(page, new GroupingAggregatorFunction.AddInput() {
@Override
public void add(int positionOffset, IntBlock groupIds) {
groupIds.incRef();
assertEquals(groupIds.getPositionCount(), positions);

assertEquals(0, groupIds.getInt(0));
Expand Down Expand Up @@ -123,15 +122,16 @@ public void testCategorizeIntermediate() {
page2 = new Page(builder.build());
}

Page intermediatePage1, intermediatePage2;

// Fill intermediatePages with the intermediate state from the raw hashes
try (
BlockHash rawHash1 = new CategorizeRawBlockHash(0, blockFactory, true);
BlockHash rawHash2 = new CategorizeRawBlockHash(0, blockFactory, true);
BlockHash intermediateHash = new CategorizedIntermediateBlockHash(0, blockFactory, true)
BlockHash rawHash2 = new CategorizeRawBlockHash(0, blockFactory, true)
) {
rawHash1.add(page1, new GroupingAggregatorFunction.AddInput() {
@Override
public void add(int positionOffset, IntBlock groupIds) {
groupIds.incRef();
assertEquals(groupIds.getPositionCount(), positions1);
assertEquals(0, groupIds.getInt(0));
assertEquals(1, groupIds.getInt(1));
Expand All @@ -152,16 +152,17 @@ public void close() {
fail("hashes should not close AddInput");
}
});
intermediatePage1 = new Page(rawHash1.getKeys()[0]);

Page intermediatePage1 = new Page(rawHash1.getKeys()[0]);
intermediateHash.add(intermediatePage1, new GroupingAggregatorFunction.AddInput() {
rawHash2.add(page2, new GroupingAggregatorFunction.AddInput() {
@Override
public void add(int positionOffset, IntBlock groupIds) {
Set<Integer> values = IntStream.range(0, groupIds.getPositionCount())
.map(groupIds::getInt)
.boxed()
.collect(Collectors.toSet());
assertEquals(values, Set.of(0, 1));
assertEquals(groupIds.getPositionCount(), positions2);
assertEquals(0, groupIds.getInt(0));
assertEquals(1, groupIds.getInt(1));
assertEquals(0, groupIds.getInt(2));
assertEquals(1, groupIds.getInt(3));
assertEquals(2, groupIds.getInt(4));
}

@Override
Expand All @@ -174,18 +175,23 @@ public void close() {
fail("hashes should not close AddInput");
}
});
intermediatePage1.releaseBlocks();
intermediatePage2 = new Page(rawHash2.getKeys()[0]);
} finally {
page1.releaseBlocks();
page2.releaseBlocks();
}

rawHash2.add(page2, new GroupingAggregatorFunction.AddInput() {
try (
BlockHash intermediateHash = new CategorizedIntermediateBlockHash(0, blockFactory, true)
) {
intermediateHash.add(intermediatePage1, new GroupingAggregatorFunction.AddInput() {
@Override
public void add(int positionOffset, IntBlock groupIds) {
groupIds.incRef();
assertEquals(groupIds.getPositionCount(), positions2);
assertEquals(0, groupIds.getInt(0));
assertEquals(1, groupIds.getInt(1));
assertEquals(0, groupIds.getInt(2));
assertEquals(1, groupIds.getInt(3));
assertEquals(2, groupIds.getInt(4));
Set<Integer> values = IntStream.range(0, groupIds.getPositionCount())
.map(groupIds::getInt)
.boxed()
.collect(Collectors.toSet());
assertEquals(values, Set.of(0, 1));
}

@Override
Expand All @@ -199,7 +205,7 @@ public void close() {
}
});

intermediateHash.add(new Page(rawHash2.getKeys()[0]), new GroupingAggregatorFunction.AddInput() {
intermediateHash.add(intermediatePage2, new GroupingAggregatorFunction.AddInput() {
@Override
public void add(int positionOffset, IntBlock groupIds) {
Set<Integer> values = IntStream.range(0, groupIds.getPositionCount())
Expand All @@ -222,8 +228,8 @@ public void close() {
}
});
} finally {
page1.releaseBlocks();
page2.releaseBlocks();
intermediatePage1.releaseBlocks();
intermediatePage2.releaseBlocks();
}
}

Expand Down

0 comments on commit 599b02b

Please sign in to comment.