Skip to content

Commit

Permalink
fixing dls/fls logic around numeric aggregations
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Jan 26, 2023
1 parent 1a804d1 commit 9354d97
Show file tree
Hide file tree
Showing 43 changed files with 1,253 additions and 602 deletions.
114 changes: 53 additions & 61 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.ad.transport.AnomalyResultRequest;
import org.opensearch.ad.transport.AnomalyResultResponse;
import org.opensearch.ad.transport.AnomalyResultTransportAction;
import org.opensearch.ad.util.SecurityUtil;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -53,6 +54,7 @@
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.InjectSecurity;
import org.opensearch.commons.authuser.User;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
Expand All @@ -62,7 +64,6 @@
import org.opensearch.threadpool.ThreadPool;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;

/**
* JobScheduler will call AD job runner to get anomaly result periodically
Expand Down Expand Up @@ -145,49 +146,57 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont
final LockService lockService = context.getLockService();

Runnable runnable = () -> {
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
if (!detectorOptional.isPresent()) {
log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId));
return;
}
AnomalyDetector detector = detectorOptional.get();

if (jobParameter.getLockDurationSeconds() != null) {
lockService
.acquireLock(
jobParameter,
context,
ActionListener
.wrap(
lock -> runAdJob(
jobParameter,
lockService,
lock,
detectionStartTime,
executionStartTime,
recorder,
detector
),
exception -> {
indexAnomalyResultException(
try {
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
if (!detectorOptional.isPresent()) {
log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId));
return;
}
AnomalyDetector detector = detectorOptional.get();

if (jobParameter.getLockDurationSeconds() != null) {
lockService
.acquireLock(
jobParameter,
context,
ActionListener
.wrap(
lock -> runAdJob(
jobParameter,
lockService,
null,
lock,
detectionStartTime,
executionStartTime,
exception,
false,
recorder,
detector
);
throw new IllegalStateException("Failed to acquire lock for AD job: " + detectorId);
}
)
);
} else {
log.warn("Can't get lock for AD job: " + detectorId);
}
}, e -> log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId), e)));
),
exception -> {
indexAnomalyResultException(
jobParameter,
lockService,
null,
detectionStartTime,
executionStartTime,
exception,
false,
recorder,
detector
);
throw new IllegalStateException("Failed to acquire lock for AD job: " + detectorId);
}
)
);
} else {
log.warn("Can't get lock for AD job: " + detectorId);
}

}, e -> log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId), e)));
} catch (Exception e) {
// os log won't show anything if there is an exception happens (maybe due to running on a ExecutorService)
// we at least log the error.
log.error("Can't start AD job: " + detectorId, e);
throw e;
}
};

ExecutorService executor = threadPool.executor(AD_THREAD_POOL_NAME);
Expand Down Expand Up @@ -231,28 +240,11 @@ protected void runAdJob(
}
anomalyDetectionIndices.update();

/*
* We need to handle 3 cases:
* 1. Detectors created by older versions and never updated. These detectors wont have User details in the
* detector object. `detector.user` will be null. Insert `all_access, AmazonES_all_access` role.
* 2. Detectors are created when security plugin is disabled, these will have empty User object.
* (`detector.user.name`, `detector.user.roles` are empty )
* 3. Detectors are created when security plugin is enabled, these will have an User object.
* This will inject user role and check if the user role has permissions to call the execute
* Anomaly Result API.
*/
String user;
List<String> roles;
if (jobParameter.getUser() == null) {
// It's possible that user create domain with security disabled, then enable security
// after upgrading. This is for BWC, for old detectors which created when security
// disabled, the user will be null.
user = "";
roles = settings.getAsList("", ImmutableList.of("all_access", "AmazonES_all_access"));
} else {
user = jobParameter.getUser().getName();
roles = jobParameter.getUser().getRoles();
}
User userInfo = SecurityUtil.getUserFromJob(jobParameter, settings);

String user = userInfo.getName();
List<String> roles = userInfo.getRoles();

String resultIndex = jobParameter.getResultIndex();
if (resultIndex == null) {
runAnomalyDetectionJob(
Expand Down Expand Up @@ -302,7 +294,7 @@ private void runAnomalyDetectionJob(
ExecuteADResultResponseRecorder recorder,
AnomalyDetector detector
) {

// using one thread in the write threadpool
try (InjectSecurity injectSecurity = new InjectSecurity(detectorId, settings, client.threadPool().getThreadContext())) {
// Injecting user role to verify if the user has permissions for our API.
injectSecurity.inject(user, roles);
Expand Down
29 changes: 14 additions & 15 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,7 @@
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.transport.handler.AnomalyResultBulkIndexHandler;
import org.opensearch.ad.transport.handler.MultiEntityResultHandler;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.IndexUtils;
import org.opensearch.ad.util.Throttler;
import org.opensearch.ad.util.*;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -232,6 +229,7 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private ThreadPool threadPool;
private ADStats adStats;
private ClientUtil clientUtil;
private SecurityClientUtil securityClientUtil;
private DiscoveryNodeFilterer nodeFilter;
private IndexUtils indexUtils;
private ADTaskManager adTaskManager;
Expand Down Expand Up @@ -344,11 +342,21 @@ public Collection<Object> createComponents(
SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
new IntegerSensitiveSingleFeatureLinearUniformInterpolator();
Interpolator interpolator = new LinearUniformInterpolator(singleFeatureLinearUniformInterpolator);
stateManager = new NodeStateManager(
client,
xContentRegistry,
settings,
clientUtil,
getClock(),
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
clusterService
);
securityClientUtil = new SecurityClientUtil(stateManager, settings);
SearchFeatureDao searchFeatureDao = new SearchFeatureDao(
client,
xContentRegistry,
interpolator,
clientUtil,
securityClientUtil,
settings,
clusterService,
AnomalyDetectorSettings.NUM_SAMPLES_PER_TREE
Expand All @@ -373,16 +381,6 @@ public Collection<Object> createComponents(
adCircuitBreakerService
);

stateManager = new NodeStateManager(
client,
xContentRegistry,
settings,
clientUtil,
getClock(),
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
clusterService
);

FeatureManager featureManager = new FeatureManager(
searchFeatureDao,
interpolator,
Expand Down Expand Up @@ -731,6 +729,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
threadPool,
clusterService,
client,
securityClientUtil,
adCircuitBreakerService,
featureManager,
adTaskManager,
Expand Down
35 changes: 28 additions & 7 deletions src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.RCFPollingResponse;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.*;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -78,6 +76,7 @@
public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
private final Logger logger = LogManager.getLogger(AnomalyDetectorProfileRunner.class);
private Client client;
private SecurityClientUtil clientUtil;
private NamedXContentRegistry xContentRegistry;
private DiscoveryNodeFilterer nodeFilter;
private final TransportService transportService;
Expand All @@ -86,6 +85,7 @@ public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {

public AnomalyDetectorProfileRunner(
Client client,
SecurityClientUtil clientUtil,
NamedXContentRegistry xContentRegistry,
DiscoveryNodeFilterer nodeFilter,
long requiredSamples,
Expand All @@ -94,6 +94,7 @@ public AnomalyDetectorProfileRunner(
) {
super(requiredSamples);
this.client = client;
this.clientUtil = clientUtil;
this.xContentRegistry = xContentRegistry;
this.nodeFilter = nodeFilter;
if (requiredSamples <= 0) {
Expand Down Expand Up @@ -293,7 +294,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
searchSourceBuilder.aggregation(aggBuilder);

SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder);
client.search(request, ActionListener.wrap(searchResponse -> {
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
InternalCardinality totalEntities = (InternalCardinality) aggMap.get(CommonName.TOTAL_ENTITIES);
long value = totalEntities.getValue();
Expand All @@ -303,7 +304,17 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
}, searchException -> {
logger.warn(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId());
listener.onFailure(searchException);
}));
});
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
request,
client::search,
detector.getDetectorId(),
client,
searchResponseListener
);
} else {
// Run a composite query and count the number of buckets to decide cardinality of multiple category fields
AggregationBuilder bucketAggs = AggregationBuilders
Expand All @@ -316,7 +327,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
SearchRequest searchRequest = new SearchRequest()
.indices(detector.getIndices().toArray(new String[0]))
.source(searchSourceBuilder);
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
Aggregations aggs = searchResponse.getAggregations();
if (aggs == null) {
Expand All @@ -342,7 +353,17 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
}, searchException -> {
logger.warn(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId());
listener.onFailure(searchException);
}));
});
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
detector.getDetectorId(),
client,
searchResponseListener
);
}

}
Expand Down
21 changes: 17 additions & 4 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.AnomalyDetector;
Expand All @@ -43,6 +44,7 @@
import org.opensearch.ad.transport.EntityProfileResponse;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.ParseUtils;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -64,11 +66,13 @@ public class EntityProfileRunner extends AbstractProfileRunner {
static final String EMPTY_ENTITY_ATTRIBUTES = "Empty entity attributes";
static final String NO_ENTITY = "Cannot find entity";
private Client client;
private SecurityClientUtil clientUtil;
private NamedXContentRegistry xContentRegistry;

public EntityProfileRunner(Client client, NamedXContentRegistry xContentRegistry, long requiredSamples) {
public EntityProfileRunner(Client client, SecurityClientUtil clientUtil, NamedXContentRegistry xContentRegistry, long requiredSamples) {
super(requiredSamples);
this.client = client;
this.clientUtil = clientUtil;
this.xContentRegistry = xContentRegistry;
}

Expand Down Expand Up @@ -165,8 +169,7 @@ private void validateEntity(

SearchRequest searchRequest = new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder)
.preference(Preference.LOCAL.toString());

client.search(searchRequest, ActionListener.wrap(searchResponse -> {
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
try {
if (searchResponse.getHits().getHits().length == 0) {
listener.onFailure(new IllegalArgumentException(NO_ENTITY));
Expand All @@ -177,7 +180,17 @@ private void validateEntity(
listener.onFailure(new IllegalArgumentException(NO_ENTITY));
return;
}
}, e -> listener.onFailure(new IllegalArgumentException(NO_ENTITY))));
}, e -> listener.onFailure(new IllegalArgumentException(NO_ENTITY)));
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
detector.getDetectorId(),
client,
searchResponseListener
);

}

Expand Down
Loading

0 comments on commit 9354d97

Please sign in to comment.