From 599b02b444d13e133c78b650731ff77635bba0a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Wed, 27 Nov 2024 13:33:09 +0100 Subject: [PATCH] Use checked circuit breaker and fix CategorizeBlockHashTests --- .../blockhash/BlockHashTestCase.java | 2 +- .../blockhash/CategorizeBlockHashTests.java | 52 +++++++++++-------- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTestCase.java index 41d29e846c2e3..fa93c0aa1c375 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTestCase.java @@ -21,7 +21,7 @@ public abstract class BlockHashTestCase extends ESTestCase { - final CircuitBreaker breaker = new MockBigArrays.LimitedBreaker("esql-test-breaker", ByteSizeValue.ofGb(1)); + final CircuitBreaker breaker = newLimitedBreaker(ByteSizeValue.ofGb(1)); final BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, mockBreakerService(breaker)); final MockBlockFactory blockFactory = new MockBlockFactory(breaker, bigArrays); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizeBlockHashTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizeBlockHashTests.java index 5c711fa81bd6a..70c9c36527fb3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizeBlockHashTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizeBlockHashTests.java @@ -68,7 +68,6 @@ public void testCategorizeRaw() { hash.add(page, new GroupingAggregatorFunction.AddInput() { @Override public void add(int positionOffset, IntBlock groupIds) { - groupIds.incRef(); assertEquals(groupIds.getPositionCount(), positions); assertEquals(0, groupIds.getInt(0)); @@ -123,15 +122,16 @@ public void testCategorizeIntermediate() { page2 = new Page(builder.build()); } + Page intermediatePage1, intermediatePage2; + + // Fill intermediatePages with the intermediate state from the raw hashes try ( BlockHash rawHash1 = new CategorizeRawBlockHash(0, blockFactory, true); - BlockHash rawHash2 = new CategorizeRawBlockHash(0, blockFactory, true); - BlockHash intermediateHash = new CategorizedIntermediateBlockHash(0, blockFactory, true) + BlockHash rawHash2 = new CategorizeRawBlockHash(0, blockFactory, true) ) { rawHash1.add(page1, new GroupingAggregatorFunction.AddInput() { @Override public void add(int positionOffset, IntBlock groupIds) { - groupIds.incRef(); assertEquals(groupIds.getPositionCount(), positions1); assertEquals(0, groupIds.getInt(0)); assertEquals(1, groupIds.getInt(1)); @@ -152,16 +152,17 @@ public void close() { fail("hashes should not close AddInput"); } }); + intermediatePage1 = new Page(rawHash1.getKeys()[0]); - Page intermediatePage1 = new Page(rawHash1.getKeys()[0]); - intermediateHash.add(intermediatePage1, new GroupingAggregatorFunction.AddInput() { + rawHash2.add(page2, new GroupingAggregatorFunction.AddInput() { @Override public void add(int positionOffset, IntBlock groupIds) { - Set values = IntStream.range(0, groupIds.getPositionCount()) - .map(groupIds::getInt) - .boxed() - .collect(Collectors.toSet()); - assertEquals(values, Set.of(0, 1)); + assertEquals(groupIds.getPositionCount(), positions2); + assertEquals(0, groupIds.getInt(0)); + assertEquals(1, groupIds.getInt(1)); + assertEquals(0, groupIds.getInt(2)); + assertEquals(1, groupIds.getInt(3)); + assertEquals(2, groupIds.getInt(4)); } @Override @@ -174,18 +175,23 @@ public void close() { fail("hashes should not close AddInput"); } }); - intermediatePage1.releaseBlocks(); + intermediatePage2 = new Page(rawHash2.getKeys()[0]); + } finally { + page1.releaseBlocks(); + page2.releaseBlocks(); + } - rawHash2.add(page2, new GroupingAggregatorFunction.AddInput() { + try ( + BlockHash intermediateHash = new CategorizedIntermediateBlockHash(0, blockFactory, true) + ) { + intermediateHash.add(intermediatePage1, new GroupingAggregatorFunction.AddInput() { @Override public void add(int positionOffset, IntBlock groupIds) { - groupIds.incRef(); - assertEquals(groupIds.getPositionCount(), positions2); - assertEquals(0, groupIds.getInt(0)); - assertEquals(1, groupIds.getInt(1)); - assertEquals(0, groupIds.getInt(2)); - assertEquals(1, groupIds.getInt(3)); - assertEquals(2, groupIds.getInt(4)); + Set values = IntStream.range(0, groupIds.getPositionCount()) + .map(groupIds::getInt) + .boxed() + .collect(Collectors.toSet()); + assertEquals(values, Set.of(0, 1)); } @Override @@ -199,7 +205,7 @@ public void close() { } }); - intermediateHash.add(new Page(rawHash2.getKeys()[0]), new GroupingAggregatorFunction.AddInput() { + intermediateHash.add(intermediatePage2, new GroupingAggregatorFunction.AddInput() { @Override public void add(int positionOffset, IntBlock groupIds) { Set values = IntStream.range(0, groupIds.getPositionCount()) @@ -222,8 +228,8 @@ public void close() { } }); } finally { - page1.releaseBlocks(); - page2.releaseBlocks(); + intermediatePage1.releaseBlocks(); + intermediatePage2.releaseBlocks(); } }