Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing dls/fls logic around numeric aggregations #789

Merged
merged 1 commit into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 7 additions & 24 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.ad.transport.ProfileRequest;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.SecurityUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
Expand All @@ -76,7 +77,6 @@
import org.opensearch.threadpool.ThreadPool;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;

/**
* JobScheduler will call AD job runner to get anomaly result periodically
Expand Down Expand Up @@ -218,28 +218,11 @@ protected void runAdJob(
}
anomalyDetectionIndices.update();

/*
* We need to handle 3 cases:
* 1. Detectors created by older versions and never updated. These detectors wont have User details in the
* detector object. `detector.user` will be null. Insert `all_access, AmazonES_all_access` role.
* 2. Detectors are created when security plugin is disabled, these will have empty User object.
* (`detector.user.name`, `detector.user.roles` are empty )
* 3. Detectors are created when security plugin is enabled, these will have an User object.
* This will inject user role and check if the user role has permissions to call the execute
* Anomaly Result API.
*/
String user;
List<String> roles;
if (jobParameter.getUser() == null) {
// It's possible that user create domain with security disabled, then enable security
// after upgrading. This is for BWC, for old detectors which created when security
// disabled, the user will be null.
user = "";
roles = settings.getAsList("", ImmutableList.of("all_access", "AmazonES_all_access"));
} else {
user = jobParameter.getUser().getName();
roles = jobParameter.getUser().getRoles();
}
User userInfo = SecurityUtil.getUserFromJob(jobParameter, settings);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ready user information stored in AD job? I think we have no way to get user information from security plugin. Just asking to learn more

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is from AD job.


String user = userInfo.getName();
List<String> roles = userInfo.getRoles();

String resultIndex = jobParameter.getResultIndex();
if (resultIndex == null) {
runAnomalyDetectionJob(jobParameter, lockService, lock, detectionStartTime, executionStartTime, detectorId, user, roles);
Expand All @@ -265,7 +248,7 @@ private void runAnomalyDetectionJob(
String user,
List<String> roles
) {

// using one thread in the write threadpool
try (InjectSecurity injectSecurity = new InjectSecurity(detectorId, settings, client.threadPool().getThreadContext())) {
// Injecting user role to verify if the user has permissions for our API.
injectSecurity.inject(user, roles);
Expand Down
29 changes: 14 additions & 15 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,7 @@
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.transport.handler.AnomalyResultBulkIndexHandler;
import org.opensearch.ad.transport.handler.MultiEntityResultHandler;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.IndexUtils;
import org.opensearch.ad.util.Throttler;
import org.opensearch.ad.util.*;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: should we avoid using import *?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will fix in other PRs since 1.3.8 is due today.

import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -230,6 +227,7 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private ThreadPool threadPool;
private ADStats adStats;
private ClientUtil clientUtil;
private SecurityClientUtil securityClientUtil;
private DiscoveryNodeFilterer nodeFilter;
private IndexUtils indexUtils;
private ADTaskCacheManager adTaskCacheManager;
Expand Down Expand Up @@ -352,11 +350,21 @@ public Collection<Object> createComponents(
SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
new IntegerSensitiveSingleFeatureLinearUniformInterpolator();
Interpolator interpolator = new LinearUniformInterpolator(singleFeatureLinearUniformInterpolator);
NodeStateManager stateManager = new NodeStateManager(
client,
xContentRegistry,
settings,
clientUtil,
getClock(),
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
clusterService
);
securityClientUtil = new SecurityClientUtil(stateManager, settings);
SearchFeatureDao searchFeatureDao = new SearchFeatureDao(
client,
xContentRegistry,
interpolator,
clientUtil,
securityClientUtil,
settings,
clusterService,
AnomalyDetectorSettings.NUM_SAMPLES_PER_TREE
Expand All @@ -381,16 +389,6 @@ public Collection<Object> createComponents(
adCircuitBreakerService
);

NodeStateManager stateManager = new NodeStateManager(
client,
xContentRegistry,
settings,
clientUtil,
getClock(),
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
clusterService
);

FeatureManager featureManager = new FeatureManager(
searchFeatureDao,
interpolator,
Expand Down Expand Up @@ -699,6 +697,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
threadPool,
clusterService,
client,
securityClientUtil,
adCircuitBreakerService,
featureManager,
adTaskManager,
Expand Down
35 changes: 28 additions & 7 deletions src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.RCFPollingResponse;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.*;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -81,6 +79,7 @@
public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
private final Logger logger = LogManager.getLogger(AnomalyDetectorProfileRunner.class);
private Client client;
private SecurityClientUtil clientUtil;
private NamedXContentRegistry xContentRegistry;
private DiscoveryNodeFilterer nodeFilter;
private final TransportService transportService;
Expand All @@ -89,6 +88,7 @@ public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {

public AnomalyDetectorProfileRunner(
Client client,
SecurityClientUtil clientUtil,
NamedXContentRegistry xContentRegistry,
DiscoveryNodeFilterer nodeFilter,
long requiredSamples,
Expand All @@ -97,6 +97,7 @@ public AnomalyDetectorProfileRunner(
) {
super(requiredSamples);
this.client = client;
this.clientUtil = clientUtil;
this.xContentRegistry = xContentRegistry;
this.nodeFilter = nodeFilter;
if (requiredSamples <= 0) {
Expand Down Expand Up @@ -296,7 +297,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
searchSourceBuilder.aggregation(aggBuilder);

SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder);
client.search(request, ActionListener.wrap(searchResponse -> {
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
InternalCardinality totalEntities = (InternalCardinality) aggMap.get(CommonName.TOTAL_ENTITIES);
long value = totalEntities.getValue();
Expand All @@ -306,7 +307,17 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
}, searchException -> {
logger.warn(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId());
listener.onFailure(searchException);
}));
});
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
request,
client::search,
detector.getDetectorId(),
client,
searchResponseListener
);
} else {
// Run a composite query and count the number of buckets to decide cardinality of multiple category fields
AggregationBuilder bucketAggs = AggregationBuilders
Expand All @@ -319,7 +330,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
SearchRequest searchRequest = new SearchRequest()
.indices(detector.getIndices().toArray(new String[0]))
.source(searchSourceBuilder);
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
Aggregations aggs = searchResponse.getAggregations();
if (aggs == null) {
Expand All @@ -345,7 +356,17 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
}, searchException -> {
logger.warn(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId());
listener.onFailure(searchException);
}));
});
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
detector.getDetectorId(),
client,
searchResponseListener
);
}

}
Expand Down
21 changes: 17 additions & 4 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.AnomalyDetector;
Expand All @@ -43,6 +44,7 @@
import org.opensearch.ad.transport.EntityProfileResponse;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.ParseUtils;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -64,11 +66,13 @@ public class EntityProfileRunner extends AbstractProfileRunner {
static final String EMPTY_ENTITY_ATTRIBUTES = "Empty entity attributes";
static final String NO_ENTITY = "Cannot find entity";
private Client client;
private SecurityClientUtil clientUtil;
private NamedXContentRegistry xContentRegistry;

public EntityProfileRunner(Client client, NamedXContentRegistry xContentRegistry, long requiredSamples) {
public EntityProfileRunner(Client client, SecurityClientUtil clientUtil, NamedXContentRegistry xContentRegistry, long requiredSamples) {
super(requiredSamples);
this.client = client;
this.clientUtil = clientUtil;
this.xContentRegistry = xContentRegistry;
}

Expand Down Expand Up @@ -165,8 +169,7 @@ private void validateEntity(

SearchRequest searchRequest = new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder)
.preference(Preference.LOCAL.toString());

client.search(searchRequest, ActionListener.wrap(searchResponse -> {
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
try {
if (searchResponse.getHits().getHits().length == 0) {
listener.onFailure(new IllegalArgumentException(NO_ENTITY));
Expand All @@ -177,7 +180,17 @@ private void validateEntity(
listener.onFailure(new IllegalArgumentException(NO_ENTITY));
return;
}
}, e -> listener.onFailure(new IllegalArgumentException(NO_ENTITY))));
}, e -> listener.onFailure(new IllegalArgumentException(NO_ENTITY)));
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
detector.getDetectorId(),
client,
searchResponseListener
);

}

Expand Down
23 changes: 21 additions & 2 deletions src/main/java/org/opensearch/ad/feature/CompositeRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.Feature;
import org.opensearch.ad.util.ParseUtils;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class CompositeRetriever extends AbstractRetriever {
private final AnomalyDetector anomalyDetector;
private final NamedXContentRegistry xContent;
private final Client client;
private final SecurityClientUtil clientUtil;
private int totalResults;
private int maxEntities;
private final int pageSize;
Expand All @@ -80,6 +82,7 @@ public CompositeRetriever(
AnomalyDetector anomalyDetector,
NamedXContentRegistry xContent,
Client client,
SecurityClientUtil clientUtil,
long expirationEpochMs,
Clock clock,
Settings settings,
Expand All @@ -93,6 +96,7 @@ public CompositeRetriever(
this.anomalyDetector = anomalyDetector;
this.xContent = xContent;
this.client = client;
this.clientUtil = clientUtil;
this.totalResults = 0;
this.maxEntities = maxEntitiesPerInterval;
this.pageSize = pageSize;
Expand All @@ -109,6 +113,7 @@ public CompositeRetriever(
AnomalyDetector anomalyDetector,
NamedXContentRegistry xContent,
Client client,
SecurityClientUtil clientUtil,
long expirationEpochMs,
Settings settings,
int maxEntitiesPerInterval,
Expand All @@ -122,6 +127,7 @@ public CompositeRetriever(
anomalyDetector,
xContent,
client,
clientUtil,
expirationEpochMs,
Clock.systemUTC(),
settings,
Expand Down Expand Up @@ -187,8 +193,11 @@ public PageIterator(SearchSourceBuilder source) {
*/
public void next(ActionListener<Page> listener) {
iterations++;

// inject user role while searching.

SearchRequest searchRequest = new SearchRequest(anomalyDetector.getIndices().toArray(new String[0]), source);
client.search(searchRequest, new ActionListener<SearchResponse>() {
final ActionListener<SearchResponse> searchResponseListener = new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {
processResponse(response, () -> client.search(searchRequest, this), listener);
Expand All @@ -198,7 +207,17 @@ public void onResponse(SearchResponse response) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
};
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
anomalyDetector.getDetectorId(),
client,
searchResponseListener
);
}

private void processResponse(SearchResponse response, Runnable retry, ActionListener<Page> listener) {
Expand Down
Loading