diff --git a/docs/changelog/102190.yaml b/docs/changelog/102190.yaml new file mode 100644 index 0000000000000..cd04e041fca5e --- /dev/null +++ b/docs/changelog/102190.yaml @@ -0,0 +1,5 @@ +pr: 102190 +summary: Track pages in ESQL enrich request/response +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java index 5134e05b4cc3d..768353a1c8d35 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java @@ -12,11 +12,13 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -29,6 +31,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; @TestLogging(value = "org.elasticsearch.xpack.esql.session:DEBUG", reason = "to better understand planning") @@ -76,6 +79,24 @@ protected Collection> nodePlugins() { return CollectionUtils.appendToCopy(super.nodePlugins(), EsqlPlugin.class); } + protected void setRequestCircuitBreakerLimit(ByteSizeValue limit) { + if (limit != null) { + assertAcked( + clusterAdmin().prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), limit).build() + ) + ); + } else { + assertAcked( + clusterAdmin().prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().putNull(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey()).build() + ) + ); + } + } + protected EsqlQueryResponse run(String esqlCommands) { return run(esqlCommands, randomPragmas()); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java index 83b7c0543c43b..46aaa6fab16a5 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java @@ -9,10 +9,16 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.license.LicenseService; import org.elasticsearch.license.XPackLicenseState; @@ -43,9 +49,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.transport.AbstractSimpleTransportTestCase.IGNORE_DESERIALIZATION_ERRORS_SETTING; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -61,17 +69,64 @@ protected Collection> nodePlugins() { plugins.add(LocalStateEnrich.class); plugins.add(IngestCommonPlugin.class); plugins.add(ReindexPlugin.class); + plugins.add(InternalTransportSettingPlugin.class); return plugins; } + public static class InternalTransportSettingPlugin extends Plugin { + @Override + public List> getSettings() { + return List.of(IGNORE_DESERIALIZATION_ERRORS_SETTING); + } + } + @Override protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal, otherSettings)) .put(XPackSettings.SECURITY_ENABLED.getKey(), false) + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "128mb") + /* + * Force standard settings for the request breaker or we may not break at all. + * Without this we can randomly decide to use the `noop` breaker for request + * and it won't break..... + */ + .put( + HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey(), + HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.getDefault(Settings.EMPTY) + ) + .put( + HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), + HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getDefault(Settings.EMPTY) + ) + .put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMillis(between(500, 2000))) + // allow reading pages from network can trip the circuit breaker + .put(IGNORE_DESERIALIZATION_ERRORS_SETTING.getKey(), true) .build(); } + @Override + protected EsqlQueryResponse run(EsqlQueryRequest request) { + final Client client; + if (randomBoolean()) { + client = client(randomFrom(clusterService().state().nodes().getCoordinatingOnlyNodes().values()).getName()); + } else { + client = client(); + } + if (randomBoolean()) { + setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(256, 4096))); + try { + return client.execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES); + } catch (Exception e) { + logger.info("request failed", e); + ensureBlocksReleased(); + } finally { + setRequestCircuitBreakerLimit(null); + } + } + return client.execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); + } + @Before public void setupEnrichPolicies() { client().admin() @@ -129,6 +184,13 @@ public void setupMainIndex() { client().admin().indices().prepareRefresh("listens").get(); } + @Before + public void ensureAtLeastOneCoordinatingNodeOnly() { + if (clusterService().state().nodes().getCoordinatingOnlyNodes().isEmpty()) { + internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + } + } + record Listen(long timestamp, String songId, double duration) { } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java index 342df5209ec95..55a21cd7e4403 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.transport.AbstractSimpleTransportTestCase.IGNORE_DESERIALIZATION_ERRORS_SETTING; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -74,24 +73,6 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { .build(); } - private void setRequestCircuitBreakerLimit(ByteSizeValue limit) { - if (limit != null) { - assertAcked( - clusterAdmin().prepareUpdateSettings() - .setPersistentSettings( - Settings.builder().put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), limit).build() - ) - ); - } else { - assertAcked( - clusterAdmin().prepareUpdateSettings() - .setPersistentSettings( - Settings.builder().putNull(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey()).build() - ) - ); - } - } - @Override protected EsqlQueryResponse run(EsqlQueryRequest request) { setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(256, 2048))); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java index 98c1397d97860..7dd9f01a9d6c9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java @@ -38,6 +38,9 @@ import org.elasticsearch.compute.operator.OutputOperator; import org.elasticsearch.compute.operator.ProjectOperator; import org.elasticsearch.compute.operator.SourceOperator; +import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.RefCounted; +import org.elasticsearch.core.Releasables; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.shard.ShardId; @@ -125,7 +128,12 @@ public EnrichLookupService( this.executor = transportService.getThreadPool().executor(EsqlPlugin.ESQL_THREAD_POOL_NAME); this.bigArrays = bigArrays; this.blockFactory = blockFactory; - transportService.registerRequestHandler(LOOKUP_ACTION_NAME, this.executor, LookupRequest::new, new TransportHandler()); + transportService.registerRequestHandler( + LOOKUP_ACTION_NAME, + this.executor, + in -> new LookupRequest(in, blockFactory), + new TransportHandler() + ); } public void lookupAsync( @@ -164,7 +172,11 @@ public void lookupAsync( lookupRequest, parentTask, TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener.map(r -> r.page), LookupResponse::new, executor) + new ActionListenerResponseHandler<>( + listener.map(LookupResponse::takePage), + in -> new LookupResponse(in, blockFactory), + executor + ) ); } }, listener::onFailure)); @@ -226,11 +238,11 @@ private void doLookup( ActionListener listener ) { Block inputBlock = inputPage.getBlock(0); - if (inputBlock.areAllValuesNull()) { - listener.onResponse(createNullResponse(inputPage.getPositionCount(), extractFields)); - return; - } try { + if (inputBlock.areAllValuesNull()) { + listener.onResponse(createNullResponse(inputPage.getPositionCount(), extractFields)); + return; + } ShardSearchRequest shardSearchRequest = new ShardSearchRequest(shardId, 0, AliasFilter.EMPTY); SearchContext searchContext = searchService.createSearchContext(shardSearchRequest, SearchService.NO_TIMEOUT); listener = ActionListener.runBefore(listener, searchContext::close); @@ -255,9 +267,7 @@ private void doLookup( extractField instanceof Alias a ? ((NamedExpression) a.child()).name() : extractField.name(), EsqlDataTypes.isUnsupported(extractField.dataType()) ); - intermediateOperators.add( - new ValuesSourceReaderOperator(BlockFactory.getNonBreakingInstance(), sources, 0, extractField.name()) - ); + intermediateOperators.add(new ValuesSourceReaderOperator(blockFactory, sources, 0, extractField.name())); } // drop docs block intermediateOperators.add(droppingBlockOperator(extractFields.size() + 2, 0)); @@ -297,12 +307,18 @@ private void doLookup( } } - private static Page createNullResponse(int positionCount, List extractFields) { + private Page createNullResponse(int positionCount, List extractFields) { final Block[] blocks = new Block[extractFields.size()]; - for (int i = 0; i < extractFields.size(); i++) { - blocks[i] = Block.constantNullBlock(positionCount); + try { + for (int i = 0; i < extractFields.size(); i++) { + blocks[i] = blockFactory.newConstantNullBlock(positionCount); + } + return new Page(blocks); + } finally { + if (blocks[blocks.length - 1] == null) { + Releasables.close(blocks); + } } - return new Page(blocks); } private static Operator droppingBlockOperator(int totalBlocks, int droppingPosition) { @@ -340,6 +356,9 @@ private static class LookupRequest extends TransportRequest implements IndicesRe private final String matchField; private final Page inputPage; private final List extractFields; + // TODO: Remove this workaround once we have Block RefCount + private final Page toRelease; + private final RefCounted refs = AbstractRefCounted.of(this::releasePage); LookupRequest( String sessionId, @@ -354,17 +373,18 @@ private static class LookupRequest extends TransportRequest implements IndicesRe this.matchType = matchType; this.matchField = matchField; this.inputPage = inputPage; + this.toRelease = null; this.extractFields = extractFields; } - LookupRequest(StreamInput in) throws IOException { + LookupRequest(StreamInput in, BlockFactory blockFactory) throws IOException { super(in); this.sessionId = in.readString(); this.shardId = new ShardId(in); this.matchType = in.readString(); this.matchField = in.readString(); - // TODO real BlockFactory - this.inputPage = new Page(new BlockStreamInput(in, BlockFactory.getNonBreakingInstance())); + this.inputPage = new Page(new BlockStreamInput(in, blockFactory)); + this.toRelease = inputPage; PlanStreamInput planIn = new PlanStreamInput(in, PlanNameRegistry.INSTANCE, in.namedWriteableRegistry(), null); this.extractFields = planIn.readCollectionAsList(readerFromPlanReader(PlanStreamInput::readNamedExpression)); } @@ -400,6 +420,32 @@ public String getDescription() { } }; } + + private void releasePage() { + if (toRelease != null) { + Releasables.closeExpectNoException(toRelease::releaseBlocks); + } + } + + @Override + public void incRef() { + refs.incRef(); + } + + @Override + public boolean tryIncRef() { + return refs.tryIncRef(); + } + + @Override + public boolean decRef() { + return refs.decRef(); + } + + @Override + public boolean hasReferences() { + return refs.hasReferences(); + } } private static String lookupDescription( @@ -427,20 +473,52 @@ private static String lookupDescription( } private static class LookupResponse extends TransportResponse { - private final Page page; + private Page page; + private final RefCounted refs = AbstractRefCounted.of(this::releasePage); LookupResponse(Page page) { this.page = page; } - LookupResponse(StreamInput in) throws IOException { - // TODO real BlockFactory - this.page = new Page(new BlockStreamInput(in, BlockFactory.getNonBreakingInstance())); + LookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException { + this.page = new Page(new BlockStreamInput(in, blockFactory)); } @Override public void writeTo(StreamOutput out) throws IOException { page.writeTo(out); } + + Page takePage() { + var p = page; + page = null; + return p; + } + + private void releasePage() { + if (page != null) { + Releasables.closeExpectNoException(page::releaseBlocks); + } + } + + @Override + public void incRef() { + refs.incRef(); + } + + @Override + public boolean tryIncRef() { + return refs.tryIncRef(); + } + + @Override + public boolean decRef() { + return refs.decRef(); + } + + @Override + public boolean hasReferences() { + return refs.hasReferences(); + } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MergePositionsOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MergePositionsOperator.java index e9a9512fe23c9..d1eb9e8d28d78 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MergePositionsOperator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MergePositionsOperator.java @@ -81,23 +81,27 @@ public boolean needsInput() { @Override public void addInput(Page page) { - final IntBlock positions = page.getBlock(positionChannel); - final int currentPosition = positions.getInt(0); - if (singleMode) { - fillNullUpToPosition(currentPosition); - for (int i = 0; i < mergingChannels.length; i++) { - int channel = mergingChannels[i]; - outputBuilders[i].appendAllValuesToCurrentPosition(page.getBlock(channel)); - } - filledPositions++; - } else { - if (positionBuilder != null && positionBuilder.position != currentPosition) { - flushPositionBuilder(); - } - if (positionBuilder == null) { - positionBuilder = new PositionBuilder(currentPosition, mergingTypes); + try { + final IntBlock positions = page.getBlock(positionChannel); + final int currentPosition = positions.getInt(0); + if (singleMode) { + fillNullUpToPosition(currentPosition); + for (int i = 0; i < mergingChannels.length; i++) { + int channel = mergingChannels[i]; + outputBuilders[i].appendAllValuesToCurrentPosition(page.getBlock(channel)); + } + filledPositions++; + } else { + if (positionBuilder != null && positionBuilder.position != currentPosition) { + flushPositionBuilder(); + } + if (positionBuilder == null) { + positionBuilder = new PositionBuilder(currentPosition, mergingTypes); + } + positionBuilder.combine(page, mergingChannels); } - positionBuilder.combine(page, mergingChannels); + } finally { + Releasables.closeExpectNoException(page::releaseBlocks); } }