Skip to content

Commit

Permalink
fixing dls/fls logic around numeric aggregations
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Feb 2, 2023
1 parent 27104c6 commit 6a6771c
Show file tree
Hide file tree
Showing 42 changed files with 1,231 additions and 749 deletions.
31 changes: 7 additions & 24 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.ad.transport.ProfileRequest;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.SecurityUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
Expand All @@ -76,7 +77,6 @@
import org.opensearch.threadpool.ThreadPool;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;

/**
* JobScheduler will call AD job runner to get anomaly result periodically
Expand Down Expand Up @@ -218,28 +218,11 @@ protected void runAdJob(
}
anomalyDetectionIndices.update();

/*
* We need to handle 3 cases:
* 1. Detectors created by older versions and never updated. These detectors wont have User details in the
* detector object. `detector.user` will be null. Insert `all_access, AmazonES_all_access` role.
* 2. Detectors are created when security plugin is disabled, these will have empty User object.
* (`detector.user.name`, `detector.user.roles` are empty )
* 3. Detectors are created when security plugin is enabled, these will have an User object.
* This will inject user role and check if the user role has permissions to call the execute
* Anomaly Result API.
*/
String user;
List<String> roles;
if (jobParameter.getUser() == null) {
// It's possible that user create domain with security disabled, then enable security
// after upgrading. This is for BWC, for old detectors which created when security
// disabled, the user will be null.
user = "";
roles = settings.getAsList("", ImmutableList.of("all_access", "AmazonES_all_access"));
} else {
user = jobParameter.getUser().getName();
roles = jobParameter.getUser().getRoles();
}
User userInfo = SecurityUtil.getUserFromJob(jobParameter, settings);

String user = userInfo.getName();
List<String> roles = userInfo.getRoles();

String resultIndex = jobParameter.getResultIndex();
if (resultIndex == null) {
runAnomalyDetectionJob(jobParameter, lockService, lock, detectionStartTime, executionStartTime, detectorId, user, roles);
Expand All @@ -265,7 +248,7 @@ private void runAnomalyDetectionJob(
String user,
List<String> roles
) {

// using one thread in the write threadpool
try (InjectSecurity injectSecurity = new InjectSecurity(detectorId, settings, client.threadPool().getThreadContext())) {
// Injecting user role to verify if the user has permissions for our API.
injectSecurity.inject(user, roles);
Expand Down
25 changes: 14 additions & 11 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.IndexUtils;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.ad.util.Throttler;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -230,6 +231,7 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private ThreadPool threadPool;
private ADStats adStats;
private ClientUtil clientUtil;
private SecurityClientUtil securityClientUtil;
private DiscoveryNodeFilterer nodeFilter;
private IndexUtils indexUtils;
private ADTaskCacheManager adTaskCacheManager;
Expand Down Expand Up @@ -352,11 +354,21 @@ public Collection<Object> createComponents(
SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
new IntegerSensitiveSingleFeatureLinearUniformInterpolator();
Interpolator interpolator = new LinearUniformInterpolator(singleFeatureLinearUniformInterpolator);
NodeStateManager stateManager = new NodeStateManager(
client,
xContentRegistry,
settings,
clientUtil,
getClock(),
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
clusterService
);
securityClientUtil = new SecurityClientUtil(stateManager, settings);
SearchFeatureDao searchFeatureDao = new SearchFeatureDao(
client,
xContentRegistry,
interpolator,
clientUtil,
securityClientUtil,
settings,
clusterService,
AnomalyDetectorSettings.NUM_SAMPLES_PER_TREE
Expand All @@ -381,16 +393,6 @@ public Collection<Object> createComponents(
adCircuitBreakerService
);

NodeStateManager stateManager = new NodeStateManager(
client,
xContentRegistry,
settings,
clientUtil,
getClock(),
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
clusterService
);

FeatureManager featureManager = new FeatureManager(
searchFeatureDao,
interpolator,
Expand Down Expand Up @@ -699,6 +701,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
threadPool,
clusterService,
client,
securityClientUtil,
adCircuitBreakerService,
featureManager,
adTaskManager,
Expand Down
32 changes: 28 additions & 4 deletions src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -81,6 +82,7 @@
public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
private final Logger logger = LogManager.getLogger(AnomalyDetectorProfileRunner.class);
private Client client;
private SecurityClientUtil clientUtil;
private NamedXContentRegistry xContentRegistry;
private DiscoveryNodeFilterer nodeFilter;
private final TransportService transportService;
Expand All @@ -89,6 +91,7 @@ public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {

public AnomalyDetectorProfileRunner(
Client client,
SecurityClientUtil clientUtil,
NamedXContentRegistry xContentRegistry,
DiscoveryNodeFilterer nodeFilter,
long requiredSamples,
Expand All @@ -97,6 +100,7 @@ public AnomalyDetectorProfileRunner(
) {
super(requiredSamples);
this.client = client;
this.clientUtil = clientUtil;
this.xContentRegistry = xContentRegistry;
this.nodeFilter = nodeFilter;
if (requiredSamples <= 0) {
Expand Down Expand Up @@ -296,7 +300,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
searchSourceBuilder.aggregation(aggBuilder);

SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder);
client.search(request, ActionListener.wrap(searchResponse -> {
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
InternalCardinality totalEntities = (InternalCardinality) aggMap.get(CommonName.TOTAL_ENTITIES);
long value = totalEntities.getValue();
Expand All @@ -306,7 +310,17 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
}, searchException -> {
logger.warn(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId());
listener.onFailure(searchException);
}));
});
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
request,
client::search,
detector.getDetectorId(),
client,
searchResponseListener
);
} else {
// Run a composite query and count the number of buckets to decide cardinality of multiple category fields
AggregationBuilder bucketAggs = AggregationBuilders
Expand All @@ -319,7 +333,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
SearchRequest searchRequest = new SearchRequest()
.indices(detector.getIndices().toArray(new String[0]))
.source(searchSourceBuilder);
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
Aggregations aggs = searchResponse.getAggregations();
if (aggs == null) {
Expand All @@ -345,7 +359,17 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
}, searchException -> {
logger.warn(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId());
listener.onFailure(searchException);
}));
});
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
detector.getDetectorId(),
client,
searchResponseListener
);
}

}
Expand Down
21 changes: 17 additions & 4 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.AnomalyDetector;
Expand All @@ -43,6 +44,7 @@
import org.opensearch.ad.transport.EntityProfileResponse;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.ParseUtils;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -64,11 +66,13 @@ public class EntityProfileRunner extends AbstractProfileRunner {
static final String EMPTY_ENTITY_ATTRIBUTES = "Empty entity attributes";
static final String NO_ENTITY = "Cannot find entity";
private Client client;
private SecurityClientUtil clientUtil;
private NamedXContentRegistry xContentRegistry;

public EntityProfileRunner(Client client, NamedXContentRegistry xContentRegistry, long requiredSamples) {
public EntityProfileRunner(Client client, SecurityClientUtil clientUtil, NamedXContentRegistry xContentRegistry, long requiredSamples) {
super(requiredSamples);
this.client = client;
this.clientUtil = clientUtil;
this.xContentRegistry = xContentRegistry;
}

Expand Down Expand Up @@ -165,8 +169,7 @@ private void validateEntity(

SearchRequest searchRequest = new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder)
.preference(Preference.LOCAL.toString());

client.search(searchRequest, ActionListener.wrap(searchResponse -> {
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
try {
if (searchResponse.getHits().getHits().length == 0) {
listener.onFailure(new IllegalArgumentException(NO_ENTITY));
Expand All @@ -177,7 +180,17 @@ private void validateEntity(
listener.onFailure(new IllegalArgumentException(NO_ENTITY));
return;
}
}, e -> listener.onFailure(new IllegalArgumentException(NO_ENTITY))));
}, e -> listener.onFailure(new IllegalArgumentException(NO_ENTITY)));
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
detector.getDetectorId(),
client,
searchResponseListener
);

}

Expand Down
30 changes: 26 additions & 4 deletions src/main/java/org/opensearch/ad/feature/CompositeRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.Feature;
import org.opensearch.ad.util.ParseUtils;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class CompositeRetriever extends AbstractRetriever {
private final AnomalyDetector anomalyDetector;
private final NamedXContentRegistry xContent;
private final Client client;
private final SecurityClientUtil clientUtil;
private int totalResults;
private int maxEntities;
private final int pageSize;
Expand All @@ -73,6 +75,7 @@ public CompositeRetriever(
AnomalyDetector anomalyDetector,
NamedXContentRegistry xContent,
Client client,
SecurityClientUtil clientUtil,
long expirationEpochMs,
Clock clock,
Settings settings,
Expand All @@ -84,6 +87,7 @@ public CompositeRetriever(
this.anomalyDetector = anomalyDetector;
this.xContent = xContent;
this.client = client;
this.clientUtil = clientUtil;
this.totalResults = 0;
this.maxEntities = maxEntitiesPerInterval;
this.pageSize = pageSize;
Expand All @@ -98,6 +102,7 @@ public CompositeRetriever(
AnomalyDetector anomalyDetector,
NamedXContentRegistry xContent,
Client client,
SecurityClientUtil clientUtil,
long expirationEpochMs,
Settings settings,
int maxEntitiesPerInterval,
Expand All @@ -109,6 +114,7 @@ public CompositeRetriever(
anomalyDetector,
xContent,
client,
clientUtil,
expirationEpochMs,
Clock.systemUTC(),
settings,
Expand Down Expand Up @@ -157,19 +163,25 @@ public class PageIterator {
private SearchSourceBuilder source;
// a map from categorical field name to values (type: java.lang.Comparable)
Map<String, Object> afterKey;
// number of iterations so far
private int iterations;

public PageIterator(SearchSourceBuilder source) {
this.source = source;
this.afterKey = null;
this.iterations = 0;
}

/**
* Results are returned using listener
* @param listener Listener to return results
*/
public void next(ActionListener<Page> listener) {
iterations++;

// inject user role while searching.
SearchRequest searchRequest = new SearchRequest(anomalyDetector.getIndices().toArray(new String[0]), source);
client.search(searchRequest, new ActionListener<SearchResponse>() {
final ActionListener<SearchResponse> searchResponseListener = new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {
processResponse(response, () -> client.search(searchRequest, this), listener);
Expand All @@ -179,7 +191,17 @@ public void onResponse(SearchResponse response) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
};
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
anomalyDetector.getDetectorId(),
client,
searchResponseListener
);
}

private void processResponse(SearchResponse response, Runnable retry, ActionListener<Page> listener) {
Expand Down Expand Up @@ -301,12 +323,12 @@ Optional<CompositeAggregation> getComposite(SearchResponse response) {

/**
* Whether next page exists. Conditions are:
* 1) we haven't fetched any page yet (totalResults == 0) or afterKey is not null
* 1)this is the first time we query (iterations == 0) or afterKey is not null
* 2) next detection interval has not started
* @return true if the iteration has more pages.
*/
public boolean hasNext() {
return (totalResults == 0 || (totalResults > 0 && afterKey != null)) && expirationEpochMs > clock.millis();
return (iterations == 0 || (totalResults > 0 && afterKey != null)) && expirationEpochMs > clock.millis();
}

@Override
Expand Down
Loading

0 comments on commit 6a6771c

Please sign in to comment.