From 37a9fb8f7af36a06cbdddbbfdb07e14cad18152f Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Mon, 25 Apr 2022 16:59:15 -0700 Subject: [PATCH] [Backport to 1.3] Check if indices exist in the presence of empty search results (#495) (#522) * Check if indices exist in the presence of empty search results Previously, CompositeRetriever throws an IllegalArgumentException in the presence of empty results, which gets translated to internal failure and increments AD failure count. When the source index is a regex like blah*, we will get an empty response even if the index does not exist. This PR checks indices exist in the presence of empty search results. If yes, we throw an IndexNotFoundException that ends up being converted to EndRunException; if no, we still throw an IllegalArgumentException. Testing done: 1. added unit tests 2. reproduced manually and verified the change fixed the issue. Signed-off-by: Kaituo Li --- .../ad/feature/CompositeRetriever.java | 62 +++++++++++++++---- .../AnomalyResultTransportAction.java | 4 +- .../ad/transport/MultiEntityResultTests.java | 48 +++++++++++++- 3 files changed, 98 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/opensearch/ad/feature/CompositeRetriever.java b/src/main/java/org/opensearch/ad/feature/CompositeRetriever.java index bb9446a29..dad048ba9 100644 --- a/src/main/java/org/opensearch/ad/feature/CompositeRetriever.java +++ b/src/main/java/org/opensearch/ad/feature/CompositeRetriever.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.time.Clock; import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -25,13 +26,17 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.IndicesOptions; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.Entity; import org.opensearch.ad.model.Feature; import org.opensearch.ad.util.ParseUtils; import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.RangeQueryBuilder; import org.opensearch.search.aggregations.Aggregation; @@ -66,6 +71,8 @@ public class CompositeRetriever extends AbstractRetriever { private final int pageSize; private long expirationEpochMs; private Clock clock; + private IndexNameExpressionResolver indexNameExpressionResolver; + private ClusterService clusterService; public CompositeRetriever( long dataStartEpoch, @@ -77,7 +84,9 @@ public CompositeRetriever( Clock clock, Settings settings, int maxEntitiesPerInterval, - int pageSize + int pageSize, + IndexNameExpressionResolver indexNameExpressionResolver, + ClusterService clusterService ) { this.dataStartEpoch = dataStartEpoch; this.dataEndEpoch = dataEndEpoch; @@ -89,6 +98,8 @@ public CompositeRetriever( this.pageSize = pageSize; this.expirationEpochMs = expirationEpochMs; this.clock = clock; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.clusterService = clusterService; } // a constructor that provide default value of clock @@ -101,7 +112,9 @@ public CompositeRetriever( long expirationEpochMs, Settings settings, int maxEntitiesPerInterval, - int pageSize + int pageSize, + IndexNameExpressionResolver indexNameExpressionResolver, + ClusterService clusterService ) { this( dataStartEpoch, @@ -113,7 +126,9 @@ public CompositeRetriever( Clock.systemUTC(), settings, maxEntitiesPerInterval, - pageSize + pageSize, + indexNameExpressionResolver, + clusterService ); } @@ -156,11 +171,14 @@ public PageIterator iterator() throws IOException { public class PageIterator { private SearchSourceBuilder source; // a map from categorical field name to values (type: java.lang.Comparable) - Map afterKey; + private Map afterKey; + // number of iterations so far + private int iterations; public PageIterator(SearchSourceBuilder source) { this.source = source; this.afterKey = null; + this.iterations = 0; } /** @@ -168,6 +186,7 @@ public PageIterator(SearchSourceBuilder source) { * @param listener Listener to return results */ public void next(ActionListener listener) { + iterations++; SearchRequest searchRequest = new SearchRequest(anomalyDetector.getIndices().toArray(new String[0]), source); client.search(searchRequest, new ActionListener() { @Override @@ -183,13 +202,13 @@ public void onFailure(Exception e) { } private void processResponse(SearchResponse response, Runnable retry, ActionListener listener) { - if (shouldRetryDueToEmptyPage(response)) { - updateCompositeAfterKey(response, source); - retry.run(); - return; - } - try { + if (shouldRetryDueToEmptyPage(response)) { + updateCompositeAfterKey(response, source); + retry.run(); + return; + } + Page page = analyzePage(response); // we can process at most maxEntities entities if (totalResults <= maxEntities && afterKey != null) { @@ -284,11 +303,28 @@ private boolean shouldRetryDueToEmptyPage(SearchResponse response) { } Optional getComposite(SearchResponse response) { + // When the source index is a regex like blah*, we will get empty response like + // the following even if no index starting with blah exists. + // {"took":0,"timed_out":false,"_shards":{"total":0,"successful":0,"skipped":0,"failed":0},"hits":{"max_score":0.0,"hits":[]}} + // Without regex, we will get IndexNotFoundException instead. + // {"error":{"root_cause":[{"type":"index_not_found_exception","reason":"no such + // index + // [blah]","index":"blah","resource.id":"blah","resource.type":"index_or_alias","index_uuid":"_na_"}],"type":"index_not_found_exception","reason":"no + // such index + // [blah]","index":"blah","resource.id":"blah","resource.type":"index_or_alias","index_uuid":"_na_"},"status":404}% if (response == null || response.getAggregations() == null) { - return Optional.empty(); + List sourceIndices = anomalyDetector.getIndices(); + String[] concreteIndices = indexNameExpressionResolver + .concreteIndexNames(clusterService.state(), IndicesOptions.lenientExpandOpen(), sourceIndices.toArray(new String[0])); + if (concreteIndices.length == 0) { + throw new IndexNotFoundException(String.join(",", sourceIndices)); + } else { + return Optional.empty(); + } } Aggregation agg = response.getAggregations().get(AGG_NAME_COMP); if (agg == null) { + // when current interval has no data return Optional.empty(); } @@ -301,12 +337,12 @@ Optional getComposite(SearchResponse response) { /** * Whether next page exists. Conditions are: - * 1) we haven't fetched any page yet (totalResults == 0) or afterKey is not null + * 1) this is the first time we query (iterations == 0) or afterKey is not null * 2) next detection interval has not started * @return true if the iteration has more pages. */ public boolean hasNext() { - return (totalResults == 0 || (totalResults > 0 && afterKey != null)) && expirationEpochMs > clock.millis(); + return (iterations == 0 || (totalResults > 0 && afterKey != null)) && expirationEpochMs > clock.millis(); } @Override diff --git a/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java index 42142a286..384a8d1f2 100644 --- a/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java @@ -478,7 +478,9 @@ private void executeAnomalyDetection( nextDetectionStartTime, settings, maxEntitiesPerInterval, - pageSize + pageSize, + indexNameExpressionResolver, + clusterService ); PageIterator pageIterator = null; diff --git a/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java b/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java index e2c232f23..e3a70c376 100644 --- a/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java +++ b/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java @@ -1098,7 +1098,9 @@ public void testPageToString() { clock, settings, 10000, - 1000 + 1000, + indexNameResolver, + clusterService ); Map results = new HashMap<>(); Entity entity1 = Entity.createEntityByReordering(attrs1); @@ -1122,7 +1124,9 @@ public void testEmptyPageToString() { clock, settings, 10000, - 1000 + 1000, + indexNameResolver, + clusterService ); CompositeRetriever.Page page = retriever.new Page(null); @@ -1283,4 +1287,44 @@ public void testSelectHigherExceptionInModelNode() throws InterruptedException, EndRunException endRunException = (EndRunException) (exceptionCaptor.getValue()); assertTrue(!endRunException.isEndNow()); } + + /** + * A missing index will cause the search result to contain null aggregation + * like {"took":0,"timed_out":false,"_shards":{"total":0,"successful":0,"skipped":0,"failed":0},"hits":{"max_score":0.0,"hits":[]}} + * + * The test verifies we can handle such situation and won't throw exceptions + * @throws InterruptedException while waiting for execution gets interruptted + */ + public void testMissingIndex() throws InterruptedException { + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(1); + listener + .onResponse( + new SearchResponse( + new SearchResponseSections(SearchHits.empty(), null, null, false, null, null, 1), + null, + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + Clusters.EMPTY + ) + ); + inProgressLatch.countDown(); + return null; + }).when(client).search(any(), any()); + + PlainActionFuture listener = new PlainActionFuture<>(); + + action.doExecute(null, request, listener); + + AnomalyResultResponse response = listener.actionGet(10000L); + assertEquals(Double.NaN, response.getAnomalyGrade(), 0.01); + + assertTrue(inProgressLatch.await(10000L, TimeUnit.MILLISECONDS)); + verify(stateManager, times(1)).setException(eq(detectorId), any(EndRunException.class)); + } }