Skip to content

Commit

Permalink
fixing ent-search
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Jan 2, 2024
1 parent 11fbb12 commit 097312f
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ public void emitEvent(
try {
AnalyticsEvent event = eventFactory.fromRequest(request);
IndexRequest eventIndexRequest = createIndexRequest(event);

bulkProcessor.add(eventIndexRequest);
try {
bulkProcessor.add(eventIndexRequest);
} finally {
eventIndexRequest.decRef();
}

if (dropEvent.compareAndSet(true, false)) {
logger.warn("Bulk processor has been flushed. Accepting new events again.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.DelegatingActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
Expand All @@ -30,6 +31,7 @@
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.application.connector.action.PostConnectorAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorConfigurationAction;
Expand Down Expand Up @@ -74,13 +76,15 @@ public ConnectorIndexService(Client client) {
* @param listener The action listener to invoke on response/failure.
*/
public void putConnector(String docId, Connector connector, ActionListener<DocWriteResponse> listener) {
final IndexRequest indexRequest = new IndexRequest(CONNECTOR_INDEX_NAME);
try {
final IndexRequest indexRequest = new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
indexRequest.opType(DocWriteRequest.OpType.INDEX)
.id(docId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(connector.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS));
clientWithOrigin.index(indexRequest, listener);
clientWithOrigin.index(indexRequest, ActionListener.runAfter(listener, indexRequest::decRef));
} catch (Exception e) {
indexRequest.decRef();
listener.onFailure(e);
}
}
Expand All @@ -93,16 +97,23 @@ public void putConnector(String docId, Connector connector, ActionListener<DocWr
* @param listener The action listener to invoke on response/failure.
*/
public void postConnector(Connector connector, ActionListener<PostConnectorAction.Response> listener) {
final IndexRequest indexRequest = new IndexRequest(CONNECTOR_INDEX_NAME);
try {
final IndexRequest indexRequest = new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
indexRequest.opType(DocWriteRequest.OpType.INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(connector.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS));

clientWithOrigin.index(
indexRequest,
listener.delegateFailureAndWrap((l, indexResponse) -> l.onResponse(new PostConnectorAction.Response(indexResponse.getId())))
ActionListener.runAfter(
listener.delegateFailureAndWrap(
(l, indexResponse) -> l.onResponse(new PostConnectorAction.Response(indexResponse.getId()))
),
indexRequest::decRef
)
);
} catch (Exception e) {
indexRequest.decRef();
listener.onFailure(e);
}
}
Expand Down Expand Up @@ -214,8 +225,9 @@ public void onFailure(Exception e) {
public void updateConnectorConfiguration(UpdateConnectorConfigurationAction.Request request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
IndexRequest indexRequest = new IndexRequest(CONNECTOR_INDEX_NAME);
try {
indexRequest.opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(
Expand All @@ -225,18 +237,21 @@ public void updateConnectorConfiguration(UpdateConnectorConfigurationAction.Requ
Connector.STATUS_FIELD.getPreferredName(),
ConnectorStatus.CONFIGURED.toString()
)
)
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorId));
return;
}
l.onResponse(updateResponse);
})
);
);
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(indexRequest);
clientWithOrigin.update(
updateRequest,
ActionListener.runAfter(new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorId));
return;
}
l.onResponse(updateResponse);
}), updateRequest::decRef)
);
} finally {
indexRequest.decRef();
}
} catch (Exception e) {
listener.onFailure(e);
}
Expand All @@ -249,27 +264,7 @@ public void updateConnectorConfiguration(UpdateConnectorConfigurationAction.Requ
* @param listener The listener for handling responses, including successful updates or errors.
*/
public void updateConnectorError(UpdateConnectorErrorAction.Request request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorId));
return;
}
l.onResponse(updateResponse);
})
);
} catch (Exception e) {
listener.onFailure(e);
}
updateConfiguration(request.getConnectorId(), request, listener);
}

/**
Expand All @@ -279,27 +274,7 @@ public void updateConnectorError(UpdateConnectorErrorAction.Request request, Act
* @param listener The listener for handling responses, including successful updates or errors.
*/
public void updateConnectorNameOrDescription(UpdateConnectorNameAction.Request request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorId));
return;
}
l.onResponse(updateResponse);
})
);
} catch (Exception e) {
listener.onFailure(e);
}
updateConfiguration(request.getConnectorId(), request, listener);
}

/**
Expand All @@ -309,27 +284,7 @@ public void updateConnectorNameOrDescription(UpdateConnectorNameAction.Request r
* @param listener Listener to respond to a successful response or an error.
*/
public void updateConnectorFiltering(UpdateConnectorFilteringAction.Request request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorId));
return;
}
l.onResponse(updateResponse);
})
);
} catch (Exception e) {
listener.onFailure(e);
}
updateConfiguration(request.getConnectorId(), request, listener);
}

/**
Expand All @@ -339,27 +294,7 @@ public void updateConnectorFiltering(UpdateConnectorFilteringAction.Request requ
* @param listener The listener for handling responses, including successful updates or errors.
*/
public void updateConnectorLastSeen(UpdateConnectorLastSeenAction.Request request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorId));
return;
}
l.onResponse(updateResponse);
})
);
} catch (Exception e) {
listener.onFailure(e);
}
updateConfiguration(request.getConnectorId(), request, listener);
}

/**
Expand All @@ -369,27 +304,7 @@ public void updateConnectorLastSeen(UpdateConnectorLastSeenAction.Request reques
* @param listener Listener to respond to a successful response or an error.
*/
public void updateConnectorLastSyncStats(UpdateConnectorLastSyncStatsAction.Request request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorId));
return;
}
l.onResponse(updateResponse);
})
);
} catch (Exception e) {
listener.onFailure(e);
}
updateConfiguration(request.getConnectorId(), request, listener);
}

/**
Expand All @@ -399,27 +314,7 @@ public void updateConnectorLastSyncStats(UpdateConnectorLastSyncStatsAction.Requ
* @param listener Listener to respond to a successful response or an error.
*/
public void updateConnectorPipeline(UpdateConnectorPipelineAction.Request request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorId));
return;
}
l.onResponse(updateResponse);
})
);
} catch (Exception e) {
listener.onFailure(e);
}
updateConfiguration(request.getConnectorId(), request, listener);
}

/**
Expand All @@ -429,24 +324,38 @@ public void updateConnectorPipeline(UpdateConnectorPipelineAction.Request reques
* @param listener The listener for handling responses, including successful updates or errors.
*/
public void updateConnectorScheduling(UpdateConnectorSchedulingAction.Request request, ActionListener<UpdateResponse> listener) {
updateConfiguration(request.getConnectorId(), request, listener);
}

/**
* Updates the {@link ConnectorIngestPipeline} property of a {@link Connector}.
*
* @param connectorId Used as the document id
* @param request Request for updating connector ingest pipeline property.
* @param listener Listener to respond to a successful response or an error.
*/
private void updateConfiguration(String connectorId, ActionRequest request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
IndexRequest indexRequest = new IndexRequest(CONNECTOR_INDEX_NAME);
try {
indexRequest.opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorId));
return;
}
l.onResponse(updateResponse);
})
);
.source(((ToXContentObject) request).toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS));
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(indexRequest);
clientWithOrigin.update(
updateRequest,
ActionListener.runAfter(new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorId));
return;
}
l.onResponse(updateResponse);
}), updateRequest::decRef)
);
} finally {
indexRequest.decRef();
}
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Loading

0 comments on commit 097312f

Please sign in to comment.