diff --git a/src/main/java/org/opensearch/ad/feature/CompositeRetriever.java b/src/main/java/org/opensearch/ad/feature/CompositeRetriever.java index bb9446a29..dad048ba9 100644 --- a/src/main/java/org/opensearch/ad/feature/CompositeRetriever.java +++ b/src/main/java/org/opensearch/ad/feature/CompositeRetriever.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.time.Clock; import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -25,13 +26,17 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.IndicesOptions; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.Entity; import org.opensearch.ad.model.Feature; import org.opensearch.ad.util.ParseUtils; import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.RangeQueryBuilder; import org.opensearch.search.aggregations.Aggregation; @@ -66,6 +71,8 @@ public class CompositeRetriever extends AbstractRetriever { private final int pageSize; private long expirationEpochMs; private Clock clock; + private IndexNameExpressionResolver indexNameExpressionResolver; + private ClusterService clusterService; public CompositeRetriever( long dataStartEpoch, @@ -77,7 +84,9 @@ public CompositeRetriever( Clock clock, Settings settings, int maxEntitiesPerInterval, - int pageSize + int pageSize, + IndexNameExpressionResolver indexNameExpressionResolver, + ClusterService clusterService ) { this.dataStartEpoch = dataStartEpoch; this.dataEndEpoch = dataEndEpoch; @@ -89,6 +98,8 @@ public CompositeRetriever( this.pageSize = pageSize; this.expirationEpochMs = expirationEpochMs; this.clock = clock; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.clusterService = clusterService; } // a constructor that provide default value of clock @@ -101,7 +112,9 @@ public CompositeRetriever( long expirationEpochMs, Settings settings, int maxEntitiesPerInterval, - int pageSize + int pageSize, + IndexNameExpressionResolver indexNameExpressionResolver, + ClusterService clusterService ) { this( dataStartEpoch, @@ -113,7 +126,9 @@ public CompositeRetriever( Clock.systemUTC(), settings, maxEntitiesPerInterval, - pageSize + pageSize, + indexNameExpressionResolver, + clusterService ); } @@ -156,11 +171,14 @@ public PageIterator iterator() throws IOException { public class PageIterator { private SearchSourceBuilder source; // a map from categorical field name to values (type: java.lang.Comparable) - Map afterKey; + private Map afterKey; + // number of iterations so far + private int iterations; public PageIterator(SearchSourceBuilder source) { this.source = source; this.afterKey = null; + this.iterations = 0; } /** @@ -168,6 +186,7 @@ public PageIterator(SearchSourceBuilder source) { * @param listener Listener to return results */ public void next(ActionListener listener) { + iterations++; SearchRequest searchRequest = new SearchRequest(anomalyDetector.getIndices().toArray(new String[0]), source); client.search(searchRequest, new ActionListener() { @Override @@ -183,13 +202,13 @@ public void onFailure(Exception e) { } private void processResponse(SearchResponse response, Runnable retry, ActionListener listener) { - if (shouldRetryDueToEmptyPage(response)) { - updateCompositeAfterKey(response, source); - retry.run(); - return; - } - try { + if (shouldRetryDueToEmptyPage(response)) { + updateCompositeAfterKey(response, source); + retry.run(); + return; + } + Page page = analyzePage(response); // we can process at most maxEntities entities if (totalResults <= maxEntities && afterKey != null) { @@ -284,11 +303,28 @@ private boolean shouldRetryDueToEmptyPage(SearchResponse response) { } Optional getComposite(SearchResponse response) { + // When the source index is a regex like blah*, we will get empty response like + // the following even if no index starting with blah exists. + // {"took":0,"timed_out":false,"_shards":{"total":0,"successful":0,"skipped":0,"failed":0},"hits":{"max_score":0.0,"hits":[]}} + // Without regex, we will get IndexNotFoundException instead. + // {"error":{"root_cause":[{"type":"index_not_found_exception","reason":"no such + // index + // [blah]","index":"blah","resource.id":"blah","resource.type":"index_or_alias","index_uuid":"_na_"}],"type":"index_not_found_exception","reason":"no + // such index + // [blah]","index":"blah","resource.id":"blah","resource.type":"index_or_alias","index_uuid":"_na_"},"status":404}% if (response == null || response.getAggregations() == null) { - return Optional.empty(); + List sourceIndices = anomalyDetector.getIndices(); + String[] concreteIndices = indexNameExpressionResolver + .concreteIndexNames(clusterService.state(), IndicesOptions.lenientExpandOpen(), sourceIndices.toArray(new String[0])); + if (concreteIndices.length == 0) { + throw new IndexNotFoundException(String.join(",", sourceIndices)); + } else { + return Optional.empty(); + } } Aggregation agg = response.getAggregations().get(AGG_NAME_COMP); if (agg == null) { + // when current interval has no data return Optional.empty(); } @@ -301,12 +337,12 @@ Optional getComposite(SearchResponse response) { /** * Whether next page exists. Conditions are: - * 1) we haven't fetched any page yet (totalResults == 0) or afterKey is not null + * 1) this is the first time we query (iterations == 0) or afterKey is not null * 2) next detection interval has not started * @return true if the iteration has more pages. */ public boolean hasNext() { - return (totalResults == 0 || (totalResults > 0 && afterKey != null)) && expirationEpochMs > clock.millis(); + return (iterations == 0 || (totalResults > 0 && afterKey != null)) && expirationEpochMs > clock.millis(); } @Override diff --git a/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java index 42142a286..384a8d1f2 100644 --- a/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java @@ -478,7 +478,9 @@ private void executeAnomalyDetection( nextDetectionStartTime, settings, maxEntitiesPerInterval, - pageSize + pageSize, + indexNameExpressionResolver, + clusterService ); PageIterator pageIterator = null; diff --git a/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java b/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java index 5005f41da..0268b9d7b 100644 --- a/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java +++ b/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java @@ -599,8 +599,10 @@ public void testParseBuckets() throws InstantiationException, new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), 1 ); - hllpp.collect(0, BitMixer.mix64(randomIntBetween(1, 100))); - hllpp.collect(0, BitMixer.mix64(randomIntBetween(1, 100))); + long hash1 = BitMixer.mix64(randomIntBetween(1, 100)); + long hash2 = BitMixer.mix64(randomIntBetween(1, 100)); + hllpp.collect(0, hash1); + hllpp.collect(0, hash2); Constructor ctor = null; ctor = InternalCardinality.class.getDeclaredConstructor(String.class, AbstractHyperLogLogPlusPlus.class, Map.class); @@ -626,7 +628,8 @@ public void testParseBuckets() throws InstantiationException, assertTrue(parsedResult.isPresent()); double[] parsedCardinality = parsedResult.get(); assertEquals(1, parsedCardinality.length); - assertEquals(2, parsedCardinality[0], 0.001); + double buckets = hash1 == hash2 ? 1 : 2; + assertEquals(buckets, parsedCardinality[0], 0.001); // release MockBigArrays; otherwise, test will fail Releasables.close(hllpp); diff --git a/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java b/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java index e2c232f23..e3a70c376 100644 --- a/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java +++ b/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java @@ -1098,7 +1098,9 @@ public void testPageToString() { clock, settings, 10000, - 1000 + 1000, + indexNameResolver, + clusterService ); Map results = new HashMap<>(); Entity entity1 = Entity.createEntityByReordering(attrs1); @@ -1122,7 +1124,9 @@ public void testEmptyPageToString() { clock, settings, 10000, - 1000 + 1000, + indexNameResolver, + clusterService ); CompositeRetriever.Page page = retriever.new Page(null); @@ -1283,4 +1287,44 @@ public void testSelectHigherExceptionInModelNode() throws InterruptedException, EndRunException endRunException = (EndRunException) (exceptionCaptor.getValue()); assertTrue(!endRunException.isEndNow()); } + + /** + * A missing index will cause the search result to contain null aggregation + * like {"took":0,"timed_out":false,"_shards":{"total":0,"successful":0,"skipped":0,"failed":0},"hits":{"max_score":0.0,"hits":[]}} + * + * The test verifies we can handle such situation and won't throw exceptions + * @throws InterruptedException while waiting for execution gets interruptted + */ + public void testMissingIndex() throws InterruptedException { + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(1); + listener + .onResponse( + new SearchResponse( + new SearchResponseSections(SearchHits.empty(), null, null, false, null, null, 1), + null, + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + Clusters.EMPTY + ) + ); + inProgressLatch.countDown(); + return null; + }).when(client).search(any(), any()); + + PlainActionFuture listener = new PlainActionFuture<>(); + + action.doExecute(null, request, listener); + + AnomalyResultResponse response = listener.actionGet(10000L); + assertEquals(Double.NaN, response.getAnomalyGrade(), 0.01); + + assertTrue(inProgressLatch.await(10000L, TimeUnit.MILLISECONDS)); + verify(stateManager, times(1)).setException(eq(detectorId), any(EndRunException.class)); + } }