Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if indices exist in the presence of empty search results #495

Merged
merged 3 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 49 additions & 13 deletions src/main/java/org/opensearch/ad/feature/CompositeRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.io.IOException;
import java.time.Clock;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
Expand All @@ -25,13 +26,17 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.Feature;
import org.opensearch.ad.util.ParseUtils;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.search.aggregations.Aggregation;
Expand Down Expand Up @@ -66,6 +71,8 @@ public class CompositeRetriever extends AbstractRetriever {
private final int pageSize;
private long expirationEpochMs;
private Clock clock;
private IndexNameExpressionResolver indexNameExpressionResolver;
private ClusterService clusterService;

public CompositeRetriever(
long dataStartEpoch,
Expand All @@ -77,7 +84,9 @@ public CompositeRetriever(
Clock clock,
Settings settings,
int maxEntitiesPerInterval,
int pageSize
int pageSize,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService
) {
this.dataStartEpoch = dataStartEpoch;
this.dataEndEpoch = dataEndEpoch;
Expand All @@ -89,6 +98,8 @@ public CompositeRetriever(
this.pageSize = pageSize;
this.expirationEpochMs = expirationEpochMs;
this.clock = clock;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.clusterService = clusterService;
}

// a constructor that provide default value of clock
Expand All @@ -101,7 +112,9 @@ public CompositeRetriever(
long expirationEpochMs,
Settings settings,
int maxEntitiesPerInterval,
int pageSize
int pageSize,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService
) {
this(
dataStartEpoch,
Expand All @@ -113,7 +126,9 @@ public CompositeRetriever(
Clock.systemUTC(),
settings,
maxEntitiesPerInterval,
pageSize
pageSize,
indexNameExpressionResolver,
clusterService
);
}

Expand Down Expand Up @@ -156,18 +171,22 @@ public PageIterator iterator() throws IOException {
public class PageIterator {
private SearchSourceBuilder source;
// a map from categorical field name to values (type: java.lang.Comparable)
Map<String, Object> afterKey;
private Map<String, Object> afterKey;
// number of iterations so far
private int iterations;

public PageIterator(SearchSourceBuilder source) {
this.source = source;
this.afterKey = null;
this.iterations = 0;
}

/**
* Results are returned using listener
* @param listener Listener to return results
*/
public void next(ActionListener<Page> listener) {
iterations++;
SearchRequest searchRequest = new SearchRequest(anomalyDetector.getIndices().toArray(new String[0]), source);
client.search(searchRequest, new ActionListener<SearchResponse>() {
@Override
Expand All @@ -183,13 +202,13 @@ public void onFailure(Exception e) {
}

private void processResponse(SearchResponse response, Runnable retry, ActionListener<Page> listener) {
if (shouldRetryDueToEmptyPage(response)) {
updateCompositeAfterKey(response, source);
retry.run();
return;
}

try {
if (shouldRetryDueToEmptyPage(response)) {
updateCompositeAfterKey(response, source);
retry.run();
return;
}

Page page = analyzePage(response);
// we can process at most maxEntities entities
if (totalResults <= maxEntities && afterKey != null) {
Expand Down Expand Up @@ -284,11 +303,28 @@ private boolean shouldRetryDueToEmptyPage(SearchResponse response) {
}

Optional<CompositeAggregation> getComposite(SearchResponse response) {
// When the source index is a regex like blah*, we will get empty response like
// the following even if no index starting with blah exists.
// {"took":0,"timed_out":false,"_shards":{"total":0,"successful":0,"skipped":0,"failed":0},"hits":{"max_score":0.0,"hits":[]}}
// Without regex, we will get IndexNotFoundException instead.
// {"error":{"root_cause":[{"type":"index_not_found_exception","reason":"no such
// index
// [blah]","index":"blah","resource.id":"blah","resource.type":"index_or_alias","index_uuid":"_na_"}],"type":"index_not_found_exception","reason":"no
// such index
// [blah]","index":"blah","resource.id":"blah","resource.type":"index_or_alias","index_uuid":"_na_"},"status":404}%
if (response == null || response.getAggregations() == null) {
return Optional.empty();
List<String> sourceIndices = anomalyDetector.getIndices();
String[] concreteIndices = indexNameExpressionResolver
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
.concreteIndexNames(clusterService.state(), IndicesOptions.lenientExpandOpen(), sourceIndices.toArray(new String[0]));
if (concreteIndices.length == 0) {
throw new IndexNotFoundException(String.join(",", sourceIndices));
} else {
return Optional.empty();
}
}
Aggregation agg = response.getAggregations().get(AGG_NAME_COMP);
if (agg == null) {
// when current interval has no data
return Optional.empty();
}

Expand All @@ -301,12 +337,12 @@ Optional<CompositeAggregation> getComposite(SearchResponse response) {

/**
* Whether next page exists. Conditions are:
* 1) we haven't fetched any page yet (totalResults == 0) or afterKey is not null
* 1) this is the first time we query (iterations == 0) or afterKey is not null
* 2) next detection interval has not started
* @return true if the iteration has more pages.
*/
public boolean hasNext() {
return (totalResults == 0 || (totalResults > 0 && afterKey != null)) && expirationEpochMs > clock.millis();
return (iterations == 0 || (totalResults > 0 && afterKey != null)) && expirationEpochMs > clock.millis();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible iterations == 0 and afterKey == null? Not sure if this is enough afterKey != null && expirationEpochMs > clock.millis()

Copy link
Collaborator Author

@kaituo kaituo Apr 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible iterations == 0 and afterKey == null

It is possible.

afterKey != null && expirationEpochMs > clock.millis()

This is not enough as at the beginning afterKey = null.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,9 @@ private void executeAnomalyDetection(
nextDetectionStartTime,
settings,
maxEntitiesPerInterval,
pageSize
pageSize,
indexNameExpressionResolver,
clusterService
);

PageIterator pageIterator = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,9 @@ public void testPageToString() {
clock,
settings,
10000,
1000
1000,
indexNameResolver,
clusterService
);
Map<Entity, double[]> results = new HashMap<>();
Entity entity1 = Entity.createEntityByReordering(attrs1);
Expand All @@ -1122,7 +1124,9 @@ public void testEmptyPageToString() {
clock,
settings,
10000,
1000
1000,
indexNameResolver,
clusterService
);

CompositeRetriever.Page page = retriever.new Page(null);
Expand Down Expand Up @@ -1283,4 +1287,44 @@ public void testSelectHigherExceptionInModelNode() throws InterruptedException,
EndRunException endRunException = (EndRunException) (exceptionCaptor.getValue());
assertTrue(!endRunException.isEndNow());
}

/**
* A missing index will cause the search result to contain null aggregation
* like {"took":0,"timed_out":false,"_shards":{"total":0,"successful":0,"skipped":0,"failed":0},"hits":{"max_score":0.0,"hits":[]}}
*
* The test verifies we can handle such situation and won't throw exceptions
* @throws InterruptedException while waiting for execution gets interruptted
*/
public void testMissingIndex() throws InterruptedException {
final CountDownLatch inProgressLatch = new CountDownLatch(1);

doAnswer(invocation -> {
ActionListener<SearchResponse> listener = invocation.getArgument(1);
listener
.onResponse(
new SearchResponse(
new SearchResponseSections(SearchHits.empty(), null, null, false, null, null, 1),
null,
1,
1,
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
Clusters.EMPTY
)
);
inProgressLatch.countDown();
return null;
}).when(client).search(any(), any());

PlainActionFuture<AnomalyResultResponse> listener = new PlainActionFuture<>();

action.doExecute(null, request, listener);

AnomalyResultResponse response = listener.actionGet(10000L);
assertEquals(Double.NaN, response.getAnomalyGrade(), 0.01);

assertTrue(inProgressLatch.await(10000L, TimeUnit.MILLISECONDS));
verify(stateManager, times(1)).setException(eq(detectorId), any(EndRunException.class));
}
}