Skip to content

Commit

Permalink
support storing anomaly result to custom index (#276)
Browse files Browse the repository at this point in the history
* support storing anomaly result to custom index

Signed-off-by: Yaliang Wu <[email protected]>

* refactor search result action

Signed-off-by: Yaliang Wu <[email protected]>

* address comments; validate more about result index

* add more comment; remove custom result index debug log

* address comments from kaituo

* fix failed test

Signed-off-by: Yaliang Wu <[email protected]>
  • Loading branch information
ylwu-amzn authored Nov 2, 2021
1 parent ca905f1 commit 44baa81
Show file tree
Hide file tree
Showing 62 changed files with 1,339 additions and 407 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,9 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.transport.SearchTopAnomalyResult*',
'org.opensearch.ad.rest.RestSearchTopAnomalyResultAction',
'org.opensearch.ad.model.AnomalyResultBucket',
//TODO: custom result index caused coverage drop
'org.opensearch.ad.indices.AnomalyDetectionIndices',
'org.opensearch.ad.transport.handler.AnomalyResultBulkIndexHandler'
]

jacocoTestCoverageVerification {
Expand Down
99 changes: 69 additions & 30 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner {
private ThreadPool threadPool;
private AnomalyIndexHandler<AnomalyResult> anomalyResultHandler;
private ConcurrentHashMap<String, Integer> detectorEndRunExceptionCount;
private AnomalyDetectionIndices indexUtil;
private AnomalyDetectionIndices anomalyDetectionIndices;
private DiscoveryNodeFilterer nodeFilter;
private ADTaskManager adTaskManager;

Expand Down Expand Up @@ -133,25 +133,25 @@ public void setAdTaskManager(ADTaskManager adTaskManager) {
this.adTaskManager = adTaskManager;
}

public void setIndexUtil(AnomalyDetectionIndices indexUtil) {
this.indexUtil = indexUtil;
public void setAnomalyDetectionIndices(AnomalyDetectionIndices anomalyDetectionIndices) {
this.anomalyDetectionIndices = anomalyDetectionIndices;
}

public void setNodeFilter(DiscoveryNodeFilterer nodeFilter) {
this.nodeFilter = nodeFilter;
}

@Override
public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) {
String detectorId = jobParameter.getName();
public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionContext context) {
String detectorId = scheduledJobParameter.getName();
log.info("Start to run AD job {}", detectorId);
adTaskManager.refreshRealtimeJobRunTime(detectorId);
if (!(jobParameter instanceof AnomalyDetectorJob)) {
if (!(scheduledJobParameter instanceof AnomalyDetectorJob)) {
throw new IllegalArgumentException(
"Job parameter is not instance of AnomalyDetectorJob, type: " + jobParameter.getClass().getCanonicalName()
"Job parameter is not instance of AnomalyDetectorJob, type: " + scheduledJobParameter.getClass().getCanonicalName()
);
}

AnomalyDetectorJob jobParameter = (AnomalyDetectorJob) scheduledJobParameter;
Instant executionStartTime = Instant.now();
IntervalSchedule schedule = (IntervalSchedule) jobParameter.getSchedule();
Instant detectionStartTime = executionStartTime.minus(schedule.getInterval(), schedule.getUnit());
Expand Down Expand Up @@ -197,7 +197,7 @@ public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext conte
* @param executionStartTime detection end time
*/
protected void runAdJob(
ScheduledJobParameter jobParameter,
AnomalyDetectorJob jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand All @@ -216,7 +216,8 @@ protected void runAdJob(
);
return;
}
indexUtil.update();
anomalyDetectionIndices.update();

/*
* We need to handle 3 cases:
* 1. Detectors created by older versions and never updated. These detectors wont have User details in the
Expand All @@ -229,14 +230,41 @@ protected void runAdJob(
*/
String user;
List<String> roles;
AnomalyDetectorJob job = (AnomalyDetectorJob) jobParameter;
if (job.getUser() == null) {
if (jobParameter.getUser() == null) {
// It's possible that user create domain with security disabled, then enable security
// after upgrading. This is for BWC, for old detectors which created when security
// disabled, the user will be null.
user = "";
roles = settings.getAsList("", ImmutableList.of("all_access", "AmazonES_all_access"));
} else {
user = job.getUser().getName();
roles = job.getUser().getRoles();
user = jobParameter.getUser().getName();
roles = jobParameter.getUser().getRoles();
}
String resultIndex = jobParameter.getResultIndex();
if (resultIndex == null) {
runAnomalyDetectionJob(jobParameter, lockService, lock, detectionStartTime, executionStartTime, detectorId, user, roles);
return;
}
ActionListener<Boolean> listener = ActionListener.wrap(r -> { log.debug("Custom index is valid"); }, e -> {
Exception exception = new EndRunException(detectorId, e.getMessage(), true);
handleAdException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, exception);
});
anomalyDetectionIndices.validateCustomIndexForBackendJob(resultIndex, detectorId, user, roles, () -> {
listener.onResponse(true);
runAnomalyDetectionJob(jobParameter, lockService, lock, detectionStartTime, executionStartTime, detectorId, user, roles);
}, listener);
}

private void runAnomalyDetectionJob(
AnomalyDetectorJob jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Instant executionStartTime,
String detectorId,
String user,
List<String> roles
) {

try (InjectSecurity injectSecurity = new InjectSecurity(detectorId, settings, client.threadPool().getThreadContext())) {
// Injecting user role to verify if the user has permissions for our API.
Expand Down Expand Up @@ -305,7 +333,7 @@ protected void runAdJob(
* @param exception exception
*/
protected void handleAdException(
ScheduledJobParameter jobParameter,
AnomalyDetectorJob jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -376,7 +404,7 @@ protected void handleAdException(
}

private void stopAdJobForEndRunException(
ScheduledJobParameter jobParameter,
AnomalyDetectorJob jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -425,7 +453,8 @@ private void stopAdJob(String detectorId, AnomalyDetectorFunction function) {
Instant.now(),
Instant.now(),
job.getLockDurationSeconds(),
job.getUser()
job.getUser(),
job.getResultIndex()
);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
Expand Down Expand Up @@ -457,7 +486,7 @@ private void stopAdJob(String detectorId, AnomalyDetectorFunction function) {
}

private void indexAnomalyResult(
ScheduledJobParameter jobParameter,
AnomalyDetectorJob jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand All @@ -476,10 +505,10 @@ private void indexAnomalyResult(
updateRealtimeTask(response, detectorId);
return;
}
IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) ((AnomalyDetectorJob) jobParameter).getWindowDelay();
IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) jobParameter.getWindowDelay();
Instant dataStartTime = detectionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
User user = ((AnomalyDetectorJob) jobParameter).getUser();
User user = jobParameter.getUser();

if (response.getError() != null) {
log.info("Anomaly result action run successfully for {} with error {}", detectorId, response.getError());
Expand All @@ -496,10 +525,13 @@ private void indexAnomalyResult(
Instant.now(),
response.getError(),
user,
indexUtil.getSchemaVersion(ADIndex.RESULT)
anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT)
);
anomalyResultHandler.index(anomalyResult, detectorId);
String resultIndex = jobParameter.getResultIndex();
anomalyResultHandler.index(anomalyResult, detectorId, resultIndex);
updateRealtimeTask(response, detectorId);
} catch (EndRunException e) {
handleAdException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, e);
} catch (Exception e) {
log.error("Failed to index anomaly result for " + detectorId, e);
} finally {
Expand Down Expand Up @@ -538,7 +570,7 @@ private void updateRealtimeTask(AnomalyResultResponse response, String detectorI
}

private void indexAnomalyResultException(
ScheduledJobParameter jobParameter,
AnomalyDetectorJob jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand All @@ -557,7 +589,7 @@ private void indexAnomalyResultException(
}

private void indexAnomalyResultException(
ScheduledJobParameter jobParameter,
AnomalyDetectorJob jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand All @@ -578,7 +610,7 @@ private void indexAnomalyResultException(
}

private void indexAnomalyResultException(
ScheduledJobParameter jobParameter,
AnomalyDetectorJob jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand All @@ -589,10 +621,10 @@ private void indexAnomalyResultException(
) {
String detectorId = jobParameter.getName();
try {
IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) ((AnomalyDetectorJob) jobParameter).getWindowDelay();
IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) jobParameter.getWindowDelay();
Instant dataStartTime = detectionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
User user = ((AnomalyDetectorJob) jobParameter).getUser();
User user = jobParameter.getUser();

AnomalyResult anomalyResult = new AnomalyResult(
detectorId,
Expand All @@ -606,9 +638,16 @@ private void indexAnomalyResultException(
Instant.now(),
errorMessage,
user,
indexUtil.getSchemaVersion(ADIndex.RESULT)
anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT)
);
anomalyResultHandler.index(anomalyResult, detectorId);
String resultIndex = jobParameter.getResultIndex();
if (resultIndex != null && !anomalyDetectionIndices.doesIndexExist(resultIndex)) {
// Set result index as null, will write exception to default result index.
anomalyResultHandler.index(anomalyResult, detectorId, null);
} else {
anomalyResultHandler.index(anomalyResult, detectorId, resultIndex);
}

updateLatestRealtimeTask(detectorId, taskState, null, null, errorMessage);
} catch (Exception e) {
log.error("Failed to index anomaly result for " + detectorId, e);
Expand Down Expand Up @@ -650,7 +689,7 @@ private void updateLatestRealtimeTask(
);
}

private void releaseLock(ScheduledJobParameter jobParameter, LockService lockService, LockModel lock) {
private void releaseLock(AnomalyDetectorJob jobParameter, LockService lockService, LockModel lock) {
lockService
.release(
lock,
Expand Down
8 changes: 2 additions & 6 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.IndexUtils;
import org.opensearch.ad.util.Throttler;
import org.opensearch.ad.util.ThrowingConsumerWrapper;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -263,8 +262,7 @@ public List<RestHandler> getRestHandlers(
settings,
threadPool,
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initAnomalyResultIndexDirectly),
anomalyDetectionIndices::doesAnomalyResultIndexExist,
anomalyDetectionIndices,
this.clientUtil,
this.indexUtils,
clusterService
Expand All @@ -275,7 +273,7 @@ public List<RestHandler> getRestHandlers(
jobRunner.setThreadPool(threadPool);
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setSettings(settings);
jobRunner.setIndexUtil(anomalyDetectionIndices);
jobRunner.setAnomalyDetectionIndices(anomalyDetectionIndices);
jobRunner.setNodeFilter(nodeFilter);
jobRunner.setAdTaskManager(adTaskManager);

Expand Down Expand Up @@ -691,8 +689,6 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
client,
settings,
threadPool,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initAnomalyResultIndexDirectly),
anomalyDetectionIndices::doesAnomalyResultIndexExist,
this.clientUtil,
this.indexUtils,
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ private void confirmMultiEntityDetectorInitStatus(
long totalUpdates,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
SearchRequest searchLatestResult = createInittedEverRequest(detector.getDetectorId(), enabledTime);
SearchRequest searchLatestResult = createInittedEverRequest(detector.getDetectorId(), enabledTime, detector.getResultIndex());
client.search(searchLatestResult, onInittedEver(enabledTime, profile, profilesToCollect, detector, totalUpdates, listener));
}

Expand Down Expand Up @@ -609,7 +609,7 @@ private void processInitResponse(
* @param enabledTime the time when AD job is enabled in milliseconds
* @return the search request
*/
private SearchRequest createInittedEverRequest(String detectorId, long enabledTime) {
private SearchRequest createInittedEverRequest(String detectorId, long enabledTime, String resultIndex) {
BoolQueryBuilder filterQuery = new BoolQueryBuilder();
filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
Expand All @@ -619,6 +619,9 @@ private SearchRequest createInittedEverRequest(String detectorId, long enabledTi

SearchRequest request = new SearchRequest(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
request.source(source);
if (resultIndex != null) {
request.indices(resultIndex);
}
return request;
}
}
12 changes: 10 additions & 2 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,12 @@ private void getJob(

if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) {
long enabledTimeMs = job.getEnabledTime().toEpochMilli();
SearchRequest lastSampleTimeRequest = createLastSampleTimeRequest(detectorId, enabledTimeMs, entityValue);
SearchRequest lastSampleTimeRequest = createLastSampleTimeRequest(
detectorId,
enabledTimeMs,
entityValue,
detector.getResultIndex()
);

EntityProfile.Builder builder = new EntityProfile.Builder();

Expand Down Expand Up @@ -387,7 +392,7 @@ private void sendInitState(
delegateListener.onResponse(builder.build());
}

private SearchRequest createLastSampleTimeRequest(String detectorId, long enabledTime, Entity entity) {
private SearchRequest createLastSampleTimeRequest(String detectorId, long enabledTime, Entity entity, String resultIndex) {
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();

String path = "entity";
Expand Down Expand Up @@ -449,6 +454,9 @@ private SearchRequest createLastSampleTimeRequest(String detectorId, long enable

SearchRequest request = new SearchRequest(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
request.source(source);
if (resultIndex != null) {
request.indices(resultIndex);
}
return request;
}
}
12 changes: 12 additions & 0 deletions src/main/java/org/opensearch/ad/constant/CommonErrorMessages.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

package org.opensearch.ad.constant;

import static org.opensearch.ad.constant.CommonName.CUSTOM_RESULT_INDEX_PREFIX;
import static org.opensearch.ad.model.AnomalyDetector.MAX_RESULT_INDEX_NAME_SIZE;

import java.util.Locale;

public class CommonErrorMessages {
Expand All @@ -24,6 +27,7 @@ public class CommonErrorMessages {
public static final String MEMORY_CIRCUIT_BROKEN_ERR_MSG = "AD memory circuit is broken.";
public static final String DISABLED_ERR_MSG = "AD plugin is disabled. To enable update plugins.anomaly_detection.enabled to true";
public static final String CAN_NOT_CHANGE_CATEGORY_FIELD = "Can't change detector category field";
public static final String CAN_NOT_CHANGE_RESULT_INDEX = "Can't change detector result index";
public static final String CREATE_INDEX_NOT_ACKNOWLEDGED = "Create index %S not acknowledged";
// We need this invalid query tag to show proper error message on frontend
// refer to AD Dashboard code: https://tinyurl.com/8b5n8hat
Expand Down Expand Up @@ -83,4 +87,12 @@ public static String getTooManyCategoricalFieldErr(int limit) {
public static String FAIL_TO_DELETE_AD_RESULT = "Fail to delete anomaly result";
public static String FAIL_TO_GET_STATS = "Fail to get stats";
public static String FAIL_TO_SEARCH = "Fail to search";

public static String CAN_NOT_FIND_RESULT_INDEX = "Can't find result index ";
public static String INVALID_RESULT_INDEX_PREFIX = "Result index must start with " + CUSTOM_RESULT_INDEX_PREFIX;
public static String INVALID_RESULT_INDEX_NAME_SIZE = "Result index name size must contains less than "
+ MAX_RESULT_INDEX_NAME_SIZE
+ " characters";
public static String INVALID_CHAR_IN_RESULT_INDEX_NAME =
"Result index name has invalid character. Valid characters are a-z, 0-9, -(hyphen) and _(underscore)";
}
7 changes: 7 additions & 0 deletions src/main/java/org/opensearch/ad/constant/CommonName.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,11 @@ public class CommonName {
// ======================================
// detector validation aspect
public static final String DETECTOR = "detector";

// ======================================
// Used for custom AD result index
// ======================================
public static final String DUMMY_AD_RESULT_ID = "dummy_ad_result_id";
public static final String DUMMY_DETECTOR_ID = "dummy_detector_id";
public static final String CUSTOM_RESULT_INDEX_PREFIX = "opensearch-ad-plugin-result-";
}
Loading

0 comments on commit 44baa81

Please sign in to comment.