From 097312f851d2c07b26bf54bc109d86bf421bbd5f Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 2 Jan 2024 16:48:23 -0600 Subject: [PATCH] fixing ent-search --- .../ingest/AnalyticsEventEmitter.java | 7 +- .../connector/ConnectorIndexService.java | 227 ++++++------------ .../syncjob/ConnectorSyncJobIndexService.java | 105 +++++--- .../rules/QueryRulesIndexService.java | 6 +- .../search/SearchApplicationIndexService.java | 18 +- .../ConnectorSyncJobIndexServiceTests.java | 10 +- 6 files changed, 160 insertions(+), 213 deletions(-) diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/ingest/AnalyticsEventEmitter.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/ingest/AnalyticsEventEmitter.java index aa22dfbe2e3cb..7460c8cc1477a 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/ingest/AnalyticsEventEmitter.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/ingest/AnalyticsEventEmitter.java @@ -78,8 +78,11 @@ public void emitEvent( try { AnalyticsEvent event = eventFactory.fromRequest(request); IndexRequest eventIndexRequest = createIndexRequest(event); - - bulkProcessor.add(eventIndexRequest); + try { + bulkProcessor.add(eventIndexRequest); + } finally { + eventIndexRequest.decRef(); + } if (dropEvent.compareAndSet(true, false)) { logger.warn("Bulk processor has been flushed. Accepting new events again."); diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorIndexService.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorIndexService.java index 8a1b336bfa1e3..b5a50cb1f8a07 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorIndexService.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorIndexService.java @@ -10,6 +10,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.DelegatingActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; @@ -30,6 +31,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.application.connector.action.PostConnectorAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorConfigurationAction; @@ -74,13 +76,15 @@ public ConnectorIndexService(Client client) { * @param listener The action listener to invoke on response/failure. */ public void putConnector(String docId, Connector connector, ActionListener listener) { + final IndexRequest indexRequest = new IndexRequest(CONNECTOR_INDEX_NAME); try { - final IndexRequest indexRequest = new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) + indexRequest.opType(DocWriteRequest.OpType.INDEX) .id(docId) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source(connector.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)); - clientWithOrigin.index(indexRequest, listener); + clientWithOrigin.index(indexRequest, ActionListener.runAfter(listener, indexRequest::decRef)); } catch (Exception e) { + indexRequest.decRef(); listener.onFailure(e); } } @@ -93,16 +97,23 @@ public void putConnector(String docId, Connector connector, ActionListener listener) { + final IndexRequest indexRequest = new IndexRequest(CONNECTOR_INDEX_NAME); try { - final IndexRequest indexRequest = new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) + indexRequest.opType(DocWriteRequest.OpType.INDEX) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source(connector.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)); clientWithOrigin.index( indexRequest, - listener.delegateFailureAndWrap((l, indexResponse) -> l.onResponse(new PostConnectorAction.Response(indexResponse.getId()))) + ActionListener.runAfter( + listener.delegateFailureAndWrap( + (l, indexResponse) -> l.onResponse(new PostConnectorAction.Response(indexResponse.getId())) + ), + indexRequest::decRef + ) ); } catch (Exception e) { + indexRequest.decRef(); listener.onFailure(e); } } @@ -214,8 +225,9 @@ public void onFailure(Exception e) { public void updateConnectorConfiguration(UpdateConnectorConfigurationAction.Request request, ActionListener listener) { try { String connectorId = request.getConnectorId(); - final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( - new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) + IndexRequest indexRequest = new IndexRequest(CONNECTOR_INDEX_NAME); + try { + indexRequest.opType(DocWriteRequest.OpType.INDEX) .id(connectorId) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source( @@ -225,18 +237,21 @@ public void updateConnectorConfiguration(UpdateConnectorConfigurationAction.Requ Connector.STATUS_FIELD.getPreferredName(), ConnectorStatus.CONFIGURED.toString() ) - ) - ); - clientWithOrigin.update( - updateRequest, - new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> { - if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { - l.onFailure(new ResourceNotFoundException(connectorId)); - return; - } - l.onResponse(updateResponse); - }) - ); + ); + final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(indexRequest); + clientWithOrigin.update( + updateRequest, + ActionListener.runAfter(new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> { + if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { + l.onFailure(new ResourceNotFoundException(connectorId)); + return; + } + l.onResponse(updateResponse); + }), updateRequest::decRef) + ); + } finally { + indexRequest.decRef(); + } } catch (Exception e) { listener.onFailure(e); } @@ -249,27 +264,7 @@ public void updateConnectorConfiguration(UpdateConnectorConfigurationAction.Requ * @param listener The listener for handling responses, including successful updates or errors. */ public void updateConnectorError(UpdateConnectorErrorAction.Request request, ActionListener listener) { - try { - String connectorId = request.getConnectorId(); - final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( - new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) - .id(connectorId) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) - ); - clientWithOrigin.update( - updateRequest, - new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> { - if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { - l.onFailure(new ResourceNotFoundException(connectorId)); - return; - } - l.onResponse(updateResponse); - }) - ); - } catch (Exception e) { - listener.onFailure(e); - } + updateConfiguration(request.getConnectorId(), request, listener); } /** @@ -279,27 +274,7 @@ public void updateConnectorError(UpdateConnectorErrorAction.Request request, Act * @param listener The listener for handling responses, including successful updates or errors. */ public void updateConnectorNameOrDescription(UpdateConnectorNameAction.Request request, ActionListener listener) { - try { - String connectorId = request.getConnectorId(); - final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( - new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) - .id(connectorId) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) - ); - clientWithOrigin.update( - updateRequest, - new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> { - if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { - l.onFailure(new ResourceNotFoundException(connectorId)); - return; - } - l.onResponse(updateResponse); - }) - ); - } catch (Exception e) { - listener.onFailure(e); - } + updateConfiguration(request.getConnectorId(), request, listener); } /** @@ -309,27 +284,7 @@ public void updateConnectorNameOrDescription(UpdateConnectorNameAction.Request r * @param listener Listener to respond to a successful response or an error. */ public void updateConnectorFiltering(UpdateConnectorFilteringAction.Request request, ActionListener listener) { - try { - String connectorId = request.getConnectorId(); - final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( - new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) - .id(connectorId) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) - ); - clientWithOrigin.update( - updateRequest, - new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> { - if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { - l.onFailure(new ResourceNotFoundException(connectorId)); - return; - } - l.onResponse(updateResponse); - }) - ); - } catch (Exception e) { - listener.onFailure(e); - } + updateConfiguration(request.getConnectorId(), request, listener); } /** @@ -339,27 +294,7 @@ public void updateConnectorFiltering(UpdateConnectorFilteringAction.Request requ * @param listener The listener for handling responses, including successful updates or errors. */ public void updateConnectorLastSeen(UpdateConnectorLastSeenAction.Request request, ActionListener listener) { - try { - String connectorId = request.getConnectorId(); - final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( - new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) - .id(connectorId) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) - ); - clientWithOrigin.update( - updateRequest, - new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> { - if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { - l.onFailure(new ResourceNotFoundException(connectorId)); - return; - } - l.onResponse(updateResponse); - }) - ); - } catch (Exception e) { - listener.onFailure(e); - } + updateConfiguration(request.getConnectorId(), request, listener); } /** @@ -369,27 +304,7 @@ public void updateConnectorLastSeen(UpdateConnectorLastSeenAction.Request reques * @param listener Listener to respond to a successful response or an error. */ public void updateConnectorLastSyncStats(UpdateConnectorLastSyncStatsAction.Request request, ActionListener listener) { - try { - String connectorId = request.getConnectorId(); - final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( - new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) - .id(connectorId) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) - ); - clientWithOrigin.update( - updateRequest, - new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> { - if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { - l.onFailure(new ResourceNotFoundException(connectorId)); - return; - } - l.onResponse(updateResponse); - }) - ); - } catch (Exception e) { - listener.onFailure(e); - } + updateConfiguration(request.getConnectorId(), request, listener); } /** @@ -399,27 +314,7 @@ public void updateConnectorLastSyncStats(UpdateConnectorLastSyncStatsAction.Requ * @param listener Listener to respond to a successful response or an error. */ public void updateConnectorPipeline(UpdateConnectorPipelineAction.Request request, ActionListener listener) { - try { - String connectorId = request.getConnectorId(); - final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( - new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) - .id(connectorId) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) - ); - clientWithOrigin.update( - updateRequest, - new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> { - if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { - l.onFailure(new ResourceNotFoundException(connectorId)); - return; - } - l.onResponse(updateResponse); - }) - ); - } catch (Exception e) { - listener.onFailure(e); - } + updateConfiguration(request.getConnectorId(), request, listener); } /** @@ -429,24 +324,38 @@ public void updateConnectorPipeline(UpdateConnectorPipelineAction.Request reques * @param listener The listener for handling responses, including successful updates or errors. */ public void updateConnectorScheduling(UpdateConnectorSchedulingAction.Request request, ActionListener listener) { + updateConfiguration(request.getConnectorId(), request, listener); + } + + /** + * Updates the {@link ConnectorIngestPipeline} property of a {@link Connector}. + * + * @param connectorId Used as the document id + * @param request Request for updating connector ingest pipeline property. + * @param listener Listener to respond to a successful response or an error. + */ + private void updateConfiguration(String connectorId, ActionRequest request, ActionListener listener) { try { - String connectorId = request.getConnectorId(); - final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( - new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) + IndexRequest indexRequest = new IndexRequest(CONNECTOR_INDEX_NAME); + try { + indexRequest.opType(DocWriteRequest.OpType.INDEX) .id(connectorId) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) - ); - clientWithOrigin.update( - updateRequest, - new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> { - if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { - l.onFailure(new ResourceNotFoundException(connectorId)); - return; - } - l.onResponse(updateResponse); - }) - ); + .source(((ToXContentObject) request).toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)); + final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(indexRequest); + clientWithOrigin.update( + updateRequest, + ActionListener.runAfter(new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> { + if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { + l.onFailure(new ResourceNotFoundException(connectorId)); + return; + } + l.onResponse(updateResponse); + }), updateRequest::decRef) + ); + } finally { + indexRequest.decRef(); + } } catch (Exception e) { listener.onFailure(e); } diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java index a7d20414d4631..aac32dc5b6022 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java @@ -99,10 +99,6 @@ public void createConnectorSyncJob( try { String syncJobId = generateId(); - final IndexRequest indexRequest = new IndexRequest(CONNECTOR_SYNC_JOB_INDEX_NAME).id(syncJobId) - .opType(DocWriteRequest.OpType.INDEX) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - ConnectorSyncJob syncJob = new ConnectorSyncJob.Builder().setId(syncJobId) .setJobType(jobType) .setTriggerMethod(triggerMethod) @@ -116,14 +112,27 @@ public void createConnectorSyncJob( .setDeletedDocumentCount(ZERO) .build(); - indexRequest.source(syncJob.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)); - - clientWithOrigin.index( - indexRequest, - l.delegateFailureAndWrap( - (ll, indexResponse) -> ll.onResponse(new PostConnectorSyncJobAction.Response(indexResponse.getId())) - ) - ); + final IndexRequest indexRequest = new IndexRequest(CONNECTOR_SYNC_JOB_INDEX_NAME); + try { + indexRequest.id(syncJobId) + .opType(DocWriteRequest.OpType.INDEX) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + indexRequest.source(syncJob.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)); + + clientWithOrigin.index( + indexRequest, + ActionListener.runAfter( + l.delegateFailureAndWrap( + (ll, indexResponse) -> ll.onResponse(new PostConnectorSyncJobAction.Response(indexResponse.getId())) + ), + indexRequest::decRef + ) + ); + } catch (Exception e) { + indexRequest.decRef(); + throw e; + } } catch (IOException e) { l.onFailure(e); } @@ -176,15 +185,19 @@ public void checkInConnectorSyncJob(String connectorSyncJobId, ActionListener(connectorSyncJobId, listener, (l, updateResponse) -> { - if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { - l.onFailure(new ResourceNotFoundException(connectorSyncJobId)); - return; - } - l.onResponse(updateResponse); - }) + ActionListener.runAfter( + new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, updateResponse) -> { + if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { + l.onFailure(new ResourceNotFoundException(connectorSyncJobId)); + return; + } + l.onResponse(updateResponse); + }), + updateRequest::decRef + ) ); } catch (Exception e) { + updateRequest.decRef(); listener.onFailure(e); } } @@ -249,15 +262,19 @@ public void cancelConnectorSyncJob(String connectorSyncJobId, ActionListener(connectorSyncJobId, listener, (l, updateResponse) -> { - if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { - l.onFailure(new ResourceNotFoundException(connectorSyncJobId)); - return; - } - l.onResponse(updateResponse); - }) + ActionListener.runAfter( + new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, updateResponse) -> { + if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { + l.onFailure(new ResourceNotFoundException(connectorSyncJobId)); + return; + } + l.onResponse(updateResponse); + }), + updateRequest::decRef + ) ); } catch (Exception e) { + updateRequest.decRef(); listener.onFailure(e); } } @@ -393,15 +410,19 @@ public void updateConnectorSyncJobIngestionStats( try { clientWithOrigin.update( updateRequest, - new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(syncJobId, listener, (l, updateResponse) -> { - if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { - l.onFailure(new ResourceNotFoundException(syncJobId)); - return; - } - l.onResponse(updateResponse); - }) + ActionListener.runAfter( + new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(syncJobId, listener, (l, updateResponse) -> { + if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { + l.onFailure(new ResourceNotFoundException(syncJobId)); + return; + } + l.onResponse(updateResponse); + }), + updateRequest::decRef + ) ); } catch (Exception e) { + updateRequest.decRef(); listener.onFailure(e); } @@ -481,15 +502,19 @@ public void updateConnectorSyncJobError(String connectorSyncJobId, String error, try { clientWithOrigin.update( updateRequest, - new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, updateResponse) -> { - if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { - l.onFailure(new ResourceNotFoundException(connectorSyncJobId)); - return; - } - l.onResponse(updateResponse); - }) + ActionListener.runAfter( + new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, updateResponse) -> { + if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { + l.onFailure(new ResourceNotFoundException(connectorSyncJobId)); + return; + } + l.onResponse(updateResponse); + }), + updateRequest::decRef + ) ); } catch (Exception e) { + updateRequest.decRef(); listener.onFailure(e); } } diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/rules/QueryRulesIndexService.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/rules/QueryRulesIndexService.java index 4e001d38bf279..62000df551a45 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/rules/QueryRulesIndexService.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/rules/QueryRulesIndexService.java @@ -232,15 +232,17 @@ private static List parseCriteria(List> r * @param listener The action listener to invoke on response/failure. */ public void putQueryRuleset(QueryRuleset queryRuleset, ActionListener listener) { + final IndexRequest indexRequest = new IndexRequest(QUERY_RULES_ALIAS_NAME); try { validateQueryRuleset(queryRuleset); - final IndexRequest indexRequest = new IndexRequest(QUERY_RULES_ALIAS_NAME).opType(DocWriteRequest.OpType.INDEX) + indexRequest.opType(DocWriteRequest.OpType.INDEX) .id(queryRuleset.id()) .opType(DocWriteRequest.OpType.INDEX) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source(queryRuleset.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)); - clientWithOrigin.index(indexRequest, listener); + clientWithOrigin.index(indexRequest, ActionListener.runAfter(listener, indexRequest::decRef)); } catch (Exception e) { + indexRequest.decRef(); listener.onFailure(e); } diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/SearchApplicationIndexService.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/SearchApplicationIndexService.java index 46e4d45f2c146..b31474dad6d95 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/SearchApplicationIndexService.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/SearchApplicationIndexService.java @@ -291,12 +291,18 @@ private void updateSearchApplication(SearchApplication app, boolean create, Acti .endObject(); } DocWriteRequest.OpType opType = (create ? DocWriteRequest.OpType.CREATE : DocWriteRequest.OpType.INDEX); - final IndexRequest indexRequest = new IndexRequest(SEARCH_APPLICATION_ALIAS_NAME).opType(DocWriteRequest.OpType.INDEX) - .id(app.name()) - .opType(opType) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(buffer.bytes(), XContentType.JSON); - clientWithOrigin.index(indexRequest, listener); + final IndexRequest indexRequest = new IndexRequest(SEARCH_APPLICATION_ALIAS_NAME); + try { + indexRequest.opType(DocWriteRequest.OpType.INDEX) + .id(app.name()) + .opType(opType) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source(buffer.bytes(), XContentType.JSON); + clientWithOrigin.index(indexRequest, ActionListener.runAfter(listener, indexRequest::decRef)); + } catch (Exception e) { + indexRequest.decRef(); + throw e; + } } catch (Exception e) { listener.onFailure(e); } diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java index f559017db188d..a8960fc591ce1 100644 --- a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java @@ -77,16 +77,18 @@ public void setup() throws Exception { } private void createConnector(Connector connector) throws IOException, InterruptedException, ExecutionException, TimeoutException { - try ( - IndexRequest indexRequest = new IndexRequest(ConnectorIndexService.CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) + IndexRequest indexRequest = new IndexRequest(ConnectorIndexService.CONNECTOR_INDEX_NAME); + try { + indexRequest.opType(DocWriteRequest.OpType.INDEX) .id(connector.getConnectorId()) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(connector.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) - ) { + .source(connector.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)); ActionFuture index = client().index(indexRequest); // wait 10 seconds for connector creation index.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } finally { + indexRequest.decRef(); } }