Skip to content

Commit

Permalink
using delete by query instead of bulk delete
Browse files Browse the repository at this point in the history
Signed-off-by: Hailong Cui <[email protected]>
  • Loading branch information
Hailong-am committed Dec 4, 2023
1 parent bc35a62 commit fc3227e
Showing 1 changed file with 33 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@

import org.opensearch.OpenSearchSecurityException;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.update.UpdateRequest;
Expand All @@ -28,8 +25,12 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.ExistsQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.ml.common.conversation.ActionConstants;
import org.opensearch.ml.common.conversation.ConversationalIndexConstants;
import org.opensearch.ml.common.conversation.Interaction;
Expand Down Expand Up @@ -221,22 +222,10 @@ public void getTraces(String parentInteractionId, ActionListener<List<Interactio
@VisibleForTesting
void innerGetTraces(String parentInteractionId, ActionListener<List<Interaction>> listener) {
SearchRequest searchRequest = Requests.searchRequest(indexName);

// Build the query
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

// Add the ExistsQueryBuilder for checking null values
ExistsQueryBuilder existsQueryBuilder = QueryBuilders.existsQuery(ConversationalIndexConstants.INTERACTIONS_TRACE_NUMBER_FIELD);
boolQueryBuilder.must(existsQueryBuilder);

// Add the TermQueryBuilder for another field
TermQueryBuilder termQueryBuilder = QueryBuilders
.termQuery(ConversationalIndexConstants.PARENT_INTERACTIONS_ID_FIELD, parentInteractionId);
boolQueryBuilder.must(termQueryBuilder);

QueryBuilder traceQueryBuilder = buildTraceQueryBuilder(parentInteractionId);
// Set the query to the search source
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.query(traceQueryBuilder);
searchRequest.source(searchSourceBuilder);

searchRequest.source().sort(ConversationalIndexConstants.INTERACTIONS_TRACE_NUMBER_FIELD, SortOrder.ASC);
Expand All @@ -260,6 +249,21 @@ void innerGetTraces(String parentInteractionId, ActionListener<List<Interaction>
}
}

private QueryBuilder buildTraceQueryBuilder(String parentInteractionId) {
// Build the query
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

// Add the ExistsQueryBuilder for checking null values
ExistsQueryBuilder existsQueryBuilder = QueryBuilders.existsQuery(ConversationalIndexConstants.INTERACTIONS_TRACE_NUMBER_FIELD);
boolQueryBuilder.must(existsQueryBuilder);

// Add the TermQueryBuilder for another field
TermQueryBuilder termQueryBuilder = QueryBuilders
.termQuery(ConversationalIndexConstants.PARENT_INTERACTIONS_ID_FIELD, parentInteractionId);
boolQueryBuilder.must(termQueryBuilder);
return boolQueryBuilder;
}

/**
* Get the interactions associate with this conversation, sorted by recency
* @param interactionId the parent interaction id whose traces to get
Expand Down Expand Up @@ -296,24 +300,23 @@ public void updateInteraction(String interactionId, Map<String, Object> updateCo
* @param listener callback for delete result
*/
public void deleteInteraction(String interactionId, ActionListener<Boolean> listener) {
BulkRequest bulkRequest = new BulkRequest(indexName);
bulkRequest.add(new DeleteRequest(indexName, interactionId));
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// interaction itself
boolQueryBuilder.should(QueryBuilders.idsQuery().addIds(interactionId));
// interaction trace
boolQueryBuilder.should(buildTraceQueryBuilder(interactionId));

innerGetTraces(interactionId, ActionListener.wrap(traces -> {
traces.forEach(trace -> bulkRequest.add(new DeleteRequest(indexName, trace.getId())));
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);
deleteByQueryRequest.setQuery(boolQueryBuilder);

innerDeleteInteraction(bulkRequest, interactionId, listener);
}, e -> {
// delete interaction only if we can't get trace
innerDeleteInteraction(bulkRequest, interactionId, listener);
}));
innerDeleteInteraction(deleteByQueryRequest, interactionId, listener);
}

@VisibleForTesting
void innerDeleteInteraction(BulkRequest bulkRequest, String interactionId, ActionListener<Boolean> listener) {
void innerDeleteInteraction(DeleteByQueryRequest deleteByQueryRequest, String interactionId, ActionListener<Boolean> listener) {
try (ThreadContext.StoredContext ignored = client.threadPool().getThreadContext().stashContext()) {
ActionListener<BulkResponse> al = ActionListener.wrap(bulkResponse -> {
if (bulkResponse != null && bulkResponse.hasFailures()) {
ActionListener<BulkByScrollResponse> al = ActionListener.wrap(bulkResponse -> {
if (bulkResponse != null && (!bulkResponse.getBulkFailures().isEmpty() || !bulkResponse.getSearchFailures().isEmpty())) {
log.info("Failed to delete the interaction with ID: {}", interactionId);
listener.onResponse(false);
return;
Expand All @@ -325,7 +328,7 @@ void innerDeleteInteraction(BulkRequest bulkRequest, String interactionId, Actio
listener.onFailure(exception);
});
// bulk delete interaction and its trace
client.bulk(bulkRequest, al);
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, al);
} catch (Exception e) {
log.error("Failed to delete interaction with ID {}. Details {}:", interactionId, e);
listener.onFailure(e);
Expand Down

0 comments on commit fc3227e

Please sign in to comment.