Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Forward port 2.x] Fixing dls/fls logic around numeric aggregations #800

Merged
merged 1 commit into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 53 additions & 61 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.ad.transport.AnomalyResultRequest;
import org.opensearch.ad.transport.AnomalyResultResponse;
import org.opensearch.ad.transport.AnomalyResultTransportAction;
import org.opensearch.ad.util.SecurityUtil;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -53,6 +54,7 @@
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.InjectSecurity;
import org.opensearch.commons.authuser.User;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
Expand All @@ -62,7 +64,6 @@
import org.opensearch.threadpool.ThreadPool;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;

/**
* JobScheduler will call AD job runner to get anomaly result periodically
Expand Down Expand Up @@ -145,49 +146,57 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont
final LockService lockService = context.getLockService();

Runnable runnable = () -> {
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
if (!detectorOptional.isPresent()) {
log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId));
return;
}
AnomalyDetector detector = detectorOptional.get();

if (jobParameter.getLockDurationSeconds() != null) {
lockService
.acquireLock(
jobParameter,
context,
ActionListener
.wrap(
lock -> runAdJob(
jobParameter,
lockService,
lock,
detectionStartTime,
executionStartTime,
recorder,
detector
),
exception -> {
indexAnomalyResultException(
try {
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
if (!detectorOptional.isPresent()) {
log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId));
return;
}
AnomalyDetector detector = detectorOptional.get();

if (jobParameter.getLockDurationSeconds() != null) {
lockService
.acquireLock(
jobParameter,
context,
ActionListener
.wrap(
lock -> runAdJob(
jobParameter,
lockService,
null,
lock,
detectionStartTime,
executionStartTime,
exception,
false,
recorder,
detector
);
throw new IllegalStateException("Failed to acquire lock for AD job: " + detectorId);
}
)
);
} else {
log.warn("Can't get lock for AD job: " + detectorId);
}
}, e -> log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId), e)));
),
exception -> {
indexAnomalyResultException(
jobParameter,
lockService,
null,
detectionStartTime,
executionStartTime,
exception,
false,
recorder,
detector
);
throw new IllegalStateException("Failed to acquire lock for AD job: " + detectorId);
}
)
);
} else {
log.warn("Can't get lock for AD job: " + detectorId);
}

}, e -> log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId), e)));
} catch (Exception e) {
// os log won't show anything if there is an exception happens (maybe due to running on a ExecutorService)
// we at least log the error.
log.error("Can't start AD job: " + detectorId, e);
throw e;
}
};

ExecutorService executor = threadPool.executor(AD_THREAD_POOL_NAME);
Expand Down Expand Up @@ -231,28 +240,11 @@ protected void runAdJob(
}
anomalyDetectionIndices.update();

/*
* We need to handle 3 cases:
* 1. Detectors created by older versions and never updated. These detectors wont have User details in the
* detector object. `detector.user` will be null. Insert `all_access, AmazonES_all_access` role.
* 2. Detectors are created when security plugin is disabled, these will have empty User object.
* (`detector.user.name`, `detector.user.roles` are empty )
* 3. Detectors are created when security plugin is enabled, these will have an User object.
* This will inject user role and check if the user role has permissions to call the execute
* Anomaly Result API.
*/
String user;
List<String> roles;
if (jobParameter.getUser() == null) {
// It's possible that user create domain with security disabled, then enable security
// after upgrading. This is for BWC, for old detectors which created when security
// disabled, the user will be null.
user = "";
roles = settings.getAsList("", ImmutableList.of("all_access", "AmazonES_all_access"));
} else {
user = jobParameter.getUser().getName();
roles = jobParameter.getUser().getRoles();
}
User userInfo = SecurityUtil.getUserFromJob(jobParameter, settings);

String user = userInfo.getName();
List<String> roles = userInfo.getRoles();

String resultIndex = jobParameter.getResultIndex();
if (resultIndex == null) {
runAnomalyDetectionJob(
Expand Down Expand Up @@ -302,7 +294,7 @@ private void runAnomalyDetectionJob(
ExecuteADResultResponseRecorder recorder,
AnomalyDetector detector
) {

// using one thread in the write threadpool
try (InjectSecurity injectSecurity = new InjectSecurity(detectorId, settings, client.threadPool().getThreadContext())) {
// Injecting user role to verify if the user has permissions for our API.
injectSecurity.inject(user, roles);
Expand Down
25 changes: 14 additions & 11 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.IndexUtils;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.ad.util.Throttler;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -232,6 +233,7 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private ThreadPool threadPool;
private ADStats adStats;
private ClientUtil clientUtil;
private SecurityClientUtil securityClientUtil;
private DiscoveryNodeFilterer nodeFilter;
private IndexUtils indexUtils;
private ADTaskManager adTaskManager;
Expand Down Expand Up @@ -344,11 +346,21 @@ public Collection<Object> createComponents(
SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
new IntegerSensitiveSingleFeatureLinearUniformInterpolator();
Interpolator interpolator = new LinearUniformInterpolator(singleFeatureLinearUniformInterpolator);
stateManager = new NodeStateManager(
client,
xContentRegistry,
settings,
clientUtil,
getClock(),
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
clusterService
);
securityClientUtil = new SecurityClientUtil(stateManager, settings);
SearchFeatureDao searchFeatureDao = new SearchFeatureDao(
client,
xContentRegistry,
interpolator,
clientUtil,
securityClientUtil,
settings,
clusterService,
AnomalyDetectorSettings.NUM_SAMPLES_PER_TREE
Expand All @@ -373,16 +385,6 @@ public Collection<Object> createComponents(
adCircuitBreakerService
);

stateManager = new NodeStateManager(
client,
xContentRegistry,
settings,
clientUtil,
getClock(),
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
clusterService
);

FeatureManager featureManager = new FeatureManager(
searchFeatureDao,
interpolator,
Expand Down Expand Up @@ -731,6 +733,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
threadPool,
clusterService,
client,
securityClientUtil,
adCircuitBreakerService,
featureManager,
adTaskManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -78,6 +79,7 @@
public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
private final Logger logger = LogManager.getLogger(AnomalyDetectorProfileRunner.class);
private Client client;
private SecurityClientUtil clientUtil;
private NamedXContentRegistry xContentRegistry;
private DiscoveryNodeFilterer nodeFilter;
private final TransportService transportService;
Expand All @@ -86,6 +88,7 @@ public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {

public AnomalyDetectorProfileRunner(
Client client,
SecurityClientUtil clientUtil,
NamedXContentRegistry xContentRegistry,
DiscoveryNodeFilterer nodeFilter,
long requiredSamples,
Expand All @@ -94,6 +97,7 @@ public AnomalyDetectorProfileRunner(
) {
super(requiredSamples);
this.client = client;
this.clientUtil = clientUtil;
this.xContentRegistry = xContentRegistry;
this.nodeFilter = nodeFilter;
if (requiredSamples <= 0) {
Expand Down Expand Up @@ -293,7 +297,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
searchSourceBuilder.aggregation(aggBuilder);

SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder);
client.search(request, ActionListener.wrap(searchResponse -> {
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
InternalCardinality totalEntities = (InternalCardinality) aggMap.get(CommonName.TOTAL_ENTITIES);
long value = totalEntities.getValue();
Expand All @@ -303,7 +307,17 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
}, searchException -> {
logger.warn(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId());
listener.onFailure(searchException);
}));
});
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
request,
client::search,
detector.getDetectorId(),
client,
searchResponseListener
);
} else {
// Run a composite query and count the number of buckets to decide cardinality of multiple category fields
AggregationBuilder bucketAggs = AggregationBuilders
Expand All @@ -316,7 +330,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
SearchRequest searchRequest = new SearchRequest()
.indices(detector.getIndices().toArray(new String[0]))
.source(searchSourceBuilder);
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
Aggregations aggs = searchResponse.getAggregations();
if (aggs == null) {
Expand All @@ -342,7 +356,17 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
}, searchException -> {
logger.warn(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId());
listener.onFailure(searchException);
}));
});
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
detector.getDetectorId(),
client,
searchResponseListener
);
}

}
Expand Down
21 changes: 17 additions & 4 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.AnomalyDetector;
Expand All @@ -43,6 +44,7 @@
import org.opensearch.ad.transport.EntityProfileResponse;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.ParseUtils;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -64,11 +66,13 @@ public class EntityProfileRunner extends AbstractProfileRunner {
static final String EMPTY_ENTITY_ATTRIBUTES = "Empty entity attributes";
static final String NO_ENTITY = "Cannot find entity";
private Client client;
private SecurityClientUtil clientUtil;
private NamedXContentRegistry xContentRegistry;

public EntityProfileRunner(Client client, NamedXContentRegistry xContentRegistry, long requiredSamples) {
public EntityProfileRunner(Client client, SecurityClientUtil clientUtil, NamedXContentRegistry xContentRegistry, long requiredSamples) {
super(requiredSamples);
this.client = client;
this.clientUtil = clientUtil;
this.xContentRegistry = xContentRegistry;
}

Expand Down Expand Up @@ -165,8 +169,7 @@ private void validateEntity(

SearchRequest searchRequest = new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder)
.preference(Preference.LOCAL.toString());

client.search(searchRequest, ActionListener.wrap(searchResponse -> {
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
try {
if (searchResponse.getHits().getHits().length == 0) {
listener.onFailure(new IllegalArgumentException(NO_ENTITY));
Expand All @@ -177,7 +180,17 @@ private void validateEntity(
listener.onFailure(new IllegalArgumentException(NO_ENTITY));
return;
}
}, e -> listener.onFailure(new IllegalArgumentException(NO_ENTITY))));
}, e -> listener.onFailure(new IllegalArgumentException(NO_ENTITY)));
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
detector.getDetectorId(),
client,
searchResponseListener
);

}

Expand Down
Loading