diff --git a/src/main/java/org/opensearch/ad/ml/IgnoreSimilarExtractor.java b/src/main/java/org/opensearch/ad/ml/IgnoreSimilarExtractor.java index f9141a1d8..bb1c63ebf 100644 --- a/src/main/java/org/opensearch/ad/ml/IgnoreSimilarExtractor.java +++ b/src/main/java/org/opensearch/ad/ml/IgnoreSimilarExtractor.java @@ -53,15 +53,18 @@ public static ThresholdArrays processDetectorRules(AnomalyDetector detector) { if (rules != null) { for (Rule rule : rules) { for (Condition condition : rule.getConditions()) { - processCondition( - condition, - featureNames, - baseDimension, - ignoreSimilarFromAbove, - ignoreSimilarFromBelow, - ignoreSimilarFromAboveByRatio, - ignoreSimilarFromBelowByRatio - ); + if (condition.getThresholdType() != ThresholdType.ACTUAL_IS_BELOW_EXPECTED + || condition.getThresholdType() != ThresholdType.ACTUAL_IS_OVER_EXPECTED) { + processCondition( + condition, + featureNames, + baseDimension, + ignoreSimilarFromAbove, + ignoreSimilarFromBelow, + ignoreSimilarFromAboveByRatio, + ignoreSimilarFromBelowByRatio + ); + } } } } @@ -100,7 +103,10 @@ private static void processCondition( int featureIndex = featureNames.indexOf(featureName); ThresholdType thresholdType = condition.getThresholdType(); - double value = condition.getValue(); + Double value = condition.getValue(); + if (value == null) { + value = 0d; + } switch (thresholdType) { case ACTUAL_OVER_EXPECTED_MARGIN: diff --git a/src/main/java/org/opensearch/ad/ml/ThresholdingResult.java b/src/main/java/org/opensearch/ad/ml/ThresholdingResult.java index f4b7c1fb0..ac6aecc27 100644 --- a/src/main/java/org/opensearch/ad/ml/ThresholdingResult.java +++ b/src/main/java/org/opensearch/ad/ml/ThresholdingResult.java @@ -12,6 +12,7 @@ package org.opensearch.ad.ml; import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -19,7 +20,9 @@ import java.util.Optional; import org.apache.commons.lang.builder.ToStringBuilder; +import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyResult; +import org.opensearch.ad.model.Rule; import org.opensearch.timeseries.ml.IntermediateResult; import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.model.Entity; @@ -331,6 +334,11 @@ public List toIndexableResults( String taskId, String error ) { + List rules = new ArrayList<>(); + if (detector instanceof AnomalyDetector) { + AnomalyDetector detectorConfig = (AnomalyDetector) detector; + rules = detectorConfig.getRules(); + } return Collections .singletonList( AnomalyResult @@ -358,7 +366,8 @@ public List toIndexableResults( likelihoodOfValues, threshold, currentData, - featureImputed + featureImputed, + rules ) ); } diff --git a/src/main/java/org/opensearch/ad/model/AnomalyDetector.java b/src/main/java/org/opensearch/ad/model/AnomalyDetector.java index 2572299b1..4b065a52d 100644 --- a/src/main/java/org/opensearch/ad/model/AnomalyDetector.java +++ b/src/main/java/org/opensearch/ad/model/AnomalyDetector.java @@ -835,6 +835,17 @@ private void validateRules(List features, List rules) { this.issueType = ValidationIssueType.RULE; return; } + } else if (thresholdType == ThresholdType.ACTUAL_IS_BELOW_EXPECTED + || thresholdType == ThresholdType.ACTUAL_IS_OVER_EXPECTED) { + // Check if both operator and value are null + if (condition.getOperator() != null || condition.getValue() != null) { + this.errorMessage = SUPPRESSION_RULE_ISSUE_PREFIX + + "For threshold type \"" + + thresholdType + + "\", both operator and value must be empty or null."; + this.issueType = ValidationIssueType.RULE; + return; + } } } } diff --git a/src/main/java/org/opensearch/ad/model/AnomalyResult.java b/src/main/java/org/opensearch/ad/model/AnomalyResult.java index f52fe7439..4f2abcc3e 100644 --- a/src/main/java/org/opensearch/ad/model/AnomalyResult.java +++ b/src/main/java/org/opensearch/ad/model/AnomalyResult.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -312,6 +313,7 @@ public AnomalyResult( * @param threshold Current threshold * @param currentData imputed data if any * @param featureImputed whether feature is imputed or not + * @param rules rules we apply on anomaly grade based on condition * @return the converted AnomalyResult instance */ public static AnomalyResult fromRawTRCFResult( @@ -338,15 +340,20 @@ public static AnomalyResult fromRawTRCFResult( double[] likelihoodOfValues, Double threshold, double[] currentData, - boolean[] featureImputed + boolean[] featureImputed, + List rules ) { List convertedRelevantAttribution = null; List convertedPastValuesList = null; List convertedExpectedValues = null; + List featureNamesForComparison = null; int featureSize = featureData == null ? 0 : featureData.size(); if (grade > 0) { + // Get the top feature names based on the relevant attribution criteria + featureNamesForComparison = getTopFeatureNames(featureData, relevantAttribution); + if (relevantAttribution != null) { if (relevantAttribution.length == featureSize) { convertedRelevantAttribution = new ArrayList<>(featureSize); @@ -425,6 +432,66 @@ public static AnomalyResult fromRawTRCFResult( ); } } + for (String featureName : featureNamesForComparison) { + Double valueToCompare = null; + if (convertedPastValuesList != null) { + Double pastValue = convertedPastValuesList + .stream() + .filter(data -> data.getFeatureId().equals(featureName)) + .map(DataByFeatureId::getData) + .findFirst() + .orElse(null); + valueToCompare = pastValue != null ? pastValue : 0d; + } else { + int featureIndex = featureData + .stream() + .filter(data -> data.getFeatureId().equals(featureName)) + .map(featureData::indexOf) + .findFirst() + .orElse(-1); + + valueToCompare = (featureIndex != -1 && currentData != null) ? currentData[featureIndex] : 0d; + } + + Double expectedValue = convertedExpectedValues + .stream() + .flatMap(evList -> evList.getValueList().stream()) + .filter(data -> data.getFeatureId().equals(featureName)) + .map(DataByFeatureId::getData) + .findFirst() + .orElse(null); + + int featureIndex = featureData + .stream() + .filter(data -> data.getFeatureId().equals(featureName)) + .map(featureData::indexOf) + .findFirst() + .orElse(-1); + + if (valueToCompare == null || expectedValue == null) { + continue; // Skip if either valueToCompare or expectedValue is missing + } + + for (Rule rule : rules) { + for (Condition condition : rule.getConditions()) { + if (condition.getFeatureName().equals(featureName)) { + ThresholdType thresholdType = condition.getThresholdType(); + + if (thresholdType == ThresholdType.ACTUAL_IS_BELOW_EXPECTED && valueToCompare < expectedValue) { + grade = 0d; + break; + } else if (thresholdType == ThresholdType.ACTUAL_IS_OVER_EXPECTED && valueToCompare > expectedValue) { + grade = 0d; + break; + } + } + } + if (grade == 0) + break; + } + if (grade == 0) + break; + } } List featureImputedList = new ArrayList<>(); @@ -468,6 +535,31 @@ public static AnomalyResult fromRawTRCFResult( ); } + private static List getTopFeatureNames(List featureData, double[] relevantAttribution) { + List topFeatureNames = new ArrayList<>(); + + if (relevantAttribution == null || relevantAttribution.length == 0 || (relevantAttribution.length != featureData.size())) { + featureData.forEach(feature -> topFeatureNames.add(feature.getFeatureId())); + return topFeatureNames; + } + + // Find the maximum rounded value in a single pass and add corresponding feature names + double maxRoundedAttribution = Arrays + .stream(relevantAttribution) + .map(value -> Math.round(value * 100.0) / 100.0) + .max() + .orElse(Double.NaN); + + // Collect feature names with values that match the max rounded value + for (int i = 0; i < relevantAttribution.length; i++) { + if (Math.round(relevantAttribution[i] * 100.0) / 100.0 == maxRoundedAttribution) { + topFeatureNames.add(featureData.get(i).getFeatureId()); + } + } + + return topFeatureNames; + } + public AnomalyResult(StreamInput input) throws IOException { super(input); this.modelId = input.readOptionalString(); diff --git a/src/main/java/org/opensearch/ad/model/Condition.java b/src/main/java/org/opensearch/ad/model/Condition.java index bfa4907f7..826e29038 100644 --- a/src/main/java/org/opensearch/ad/model/Condition.java +++ b/src/main/java/org/opensearch/ad/model/Condition.java @@ -29,9 +29,9 @@ public class Condition implements Writeable, ToXContentObject { private String featureName; private ThresholdType thresholdType; private Operator operator; - private double value; + private Double value; - public Condition(String featureName, ThresholdType thresholdType, Operator operator, double value) { + public Condition(String featureName, ThresholdType thresholdType, Operator operator, Double value) { this.featureName = featureName; this.thresholdType = thresholdType; this.operator = operator; @@ -42,7 +42,7 @@ public Condition(StreamInput input) throws IOException { this.featureName = input.readString(); this.thresholdType = input.readEnum(ThresholdType.class); this.operator = input.readEnum(Operator.class); - this.value = input.readDouble(); + this.value = input.readBoolean() ? input.readDouble() : null; } /** @@ -56,7 +56,7 @@ public static Condition parse(XContentParser parser) throws IOException { String featureName = null; ThresholdType thresholdType = null; Operator operator = null; - Double value = 0d; + Double value = null; ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); while (parser.nextToken() != XContentParser.Token.END_OBJECT) { @@ -70,11 +70,19 @@ public static Condition parse(XContentParser parser) throws IOException { case THRESHOLD_TYPE_FIELD: thresholdType = ThresholdType.valueOf(parser.text().toUpperCase(Locale.ROOT)); break; - case OPERATOR_FIELD: - operator = Operator.valueOf(parser.text().toUpperCase(Locale.ROOT)); + case "operator": + if (parser.currentToken() == XContentParser.Token.VALUE_NULL) { + operator = null; // Set operator to null if the field is missing + } else { + operator = Operator.valueOf(parser.text().toUpperCase(Locale.ROOT)); + } break; case VALUE_FIELD: - value = parser.doubleValue(); + if (parser.currentToken() == XContentParser.Token.VALUE_NULL) { + value = null; + } else { + value = parser.doubleValue(); + } break; default: break; @@ -89,8 +97,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .startObject() .field(FEATURE_NAME_FIELD, featureName) .field(THRESHOLD_TYPE_FIELD, thresholdType) - .field(OPERATOR_FIELD, operator) - .field(VALUE_FIELD, value); + .field(OPERATOR_FIELD, operator); + if (value != null) { + builder.field("value", value); + } return xContentBuilder.endObject(); } @@ -99,7 +109,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(featureName); out.writeEnum(thresholdType); out.writeEnum(operator); - out.writeDouble(value); + out.writeBoolean(value != null); + if (value != null) { + out.writeDouble(value); + } } public String getFeatureName() { @@ -114,7 +127,7 @@ public Operator getOperator() { return operator; } - public double getValue() { + public Double getValue() { return value; } diff --git a/src/main/java/org/opensearch/ad/model/ThresholdType.java b/src/main/java/org/opensearch/ad/model/ThresholdType.java index dd17751eb..f44af8ad0 100644 --- a/src/main/java/org/opensearch/ad/model/ThresholdType.java +++ b/src/main/java/org/opensearch/ad/model/ThresholdType.java @@ -57,7 +57,19 @@ public enum ThresholdType { * should be ignored if the ratio of the deviation from the expected to the actual * (b-a)/|a| is less than or equal to ignoreNearExpectedFromBelowByRatio. */ - EXPECTED_OVER_ACTUAL_RATIO("the ratio of the expected value over the actual value"); + EXPECTED_OVER_ACTUAL_RATIO("the ratio of the expected value over the actual value"), + + /** + * Specifies a threshold for ignoring anomalies based on whether the actual value + * is over the expected value returned from the model. + */ + ACTUAL_IS_OVER_EXPECTED("the actual value is over the expected value"), + + /** + * Specifies a threshold for ignoring anomalies based on whether the actual value + * is below the expected value returned from the model. + * */ + ACTUAL_IS_BELOW_EXPECTED("the actual value is below the expected value"); private final String description; diff --git a/src/main/java/org/opensearch/ad/transport/AnomalyResultResponse.java b/src/main/java/org/opensearch/ad/transport/AnomalyResultResponse.java index 072795ef2..87dc00406 100644 --- a/src/main/java/org/opensearch/ad/transport/AnomalyResultResponse.java +++ b/src/main/java/org/opensearch/ad/transport/AnomalyResultResponse.java @@ -21,7 +21,9 @@ import java.util.List; import java.util.Optional; +import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyResult; +import org.opensearch.ad.model.Rule; import org.opensearch.commons.authuser.User; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.InputStreamStreamInput; @@ -30,6 +32,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.timeseries.constant.CommonName; +import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.model.FeatureData; import org.opensearch.timeseries.transport.ResultResponse; @@ -304,7 +307,7 @@ public static AnomalyResultResponse fromActionResponse(final ActionResponse acti * * Convert AnomalyResultResponse to AnomalyResult * - * @param configId Detector Id + * @param config config * @param dataStartInstant data start time * @param dataEndInstant data end time * @param executionStartInstant execution start time @@ -316,7 +319,7 @@ public static AnomalyResultResponse fromActionResponse(final ActionResponse acti */ @Override public List toIndexableResults( - String configId, + Config config, Instant dataStartInstant, Instant dataEndInstant, Instant executionStartInstant, @@ -325,13 +328,19 @@ public List toIndexableResults( User user, String error ) { + List rules = new ArrayList<>(); + if (config instanceof AnomalyDetector) { + AnomalyDetector detectorConfig = (AnomalyDetector) config; + rules = detectorConfig.getRules(); + } + // Detector interval in milliseconds long detectorIntervalMilli = Duration.between(dataStartInstant, dataEndInstant).toMillis(); return Collections .singletonList( AnomalyResult .fromRawTRCFResult( - configId, + config.getId(), detectorIntervalMilli, taskId, // real time results have no task id anomalyScore, @@ -357,7 +366,8 @@ public List toIndexableResults( // as the single stream has been changed to async mode. The job no longer waits for results before returning. // Therefore, we set the following two fields to null, as we will not record any imputed fields. null, - null + null, + rules ) ); } diff --git a/src/main/java/org/opensearch/forecast/transport/ForecastResultResponse.java b/src/main/java/org/opensearch/forecast/transport/ForecastResultResponse.java index b1c7a8b47..b1dc0bf70 100644 --- a/src/main/java/org/opensearch/forecast/transport/ForecastResultResponse.java +++ b/src/main/java/org/opensearch/forecast/transport/ForecastResultResponse.java @@ -25,6 +25,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.forecast.model.ForecastResult; import org.opensearch.timeseries.constant.CommonName; +import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.model.FeatureData; import org.opensearch.timeseries.transport.ResultResponse; @@ -169,7 +170,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws * * Convert ForecastResultResponse to ForecastResult * - * @param forecastId Forecaster Id + * @param config config * @param dataStartInstant data start time * @param dataEndInstant data end time * @param executionStartInstant execution start time @@ -181,7 +182,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws */ @Override public List toIndexableResults( - String forecastId, + Config config, Instant dataStartInstant, Instant dataEndInstant, Instant executionStartInstant, @@ -194,7 +195,7 @@ public List toIndexableResults( long forecasterIntervalMilli = Duration.between(dataStartInstant, dataEndInstant).toMillis(); return ForecastResult .fromRawRCFCasterResult( - forecastId, + config.getId(), forecasterIntervalMilli, dataQuality, features, diff --git a/src/main/java/org/opensearch/timeseries/ExecuteResultResponseRecorder.java b/src/main/java/org/opensearch/timeseries/ExecuteResultResponseRecorder.java index 69a6caaf2..7a6f70003 100644 --- a/src/main/java/org/opensearch/timeseries/ExecuteResultResponseRecorder.java +++ b/src/main/java/org/opensearch/timeseries/ExecuteResultResponseRecorder.java @@ -119,7 +119,7 @@ public void indexResult( List analysisResults = response .toIndexableResults( - configId, + config, dataStartTime, dataEndTime, executionStartTime, diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/IndexJobActionHandler.java b/src/main/java/org/opensearch/timeseries/rest/handler/IndexJobActionHandler.java index 324a6cfa1..f185aa620 100644 --- a/src/main/java/org/opensearch/timeseries/rest/handler/IndexJobActionHandler.java +++ b/src/main/java/org/opensearch/timeseries/rest/handler/IndexJobActionHandler.java @@ -161,23 +161,20 @@ public void startJob(Config config, TransportService transportService, ActionLis executionStartTime.toEpochMilli(), executionEndTime.toEpochMilli() ); - client - .execute( - resultAction, - getRequest, - ActionListener - .wrap(response -> recorder.indexResult(executionStartTime, executionEndTime, response, config), exception -> { - - recorder - .indexResultException( - executionStartTime, - executionEndTime, - Throwables.getStackTraceAsString(exception), - null, - config - ); - }) - ); + client.execute(resultAction, getRequest, ActionListener.wrap(response -> { + + recorder.indexResult(executionStartTime, executionEndTime, response, config); + }, exception -> { + + recorder + .indexResultException( + executionStartTime, + executionEndTime, + Throwables.getStackTraceAsString(exception), + null, + config + ); + })); } catch (Exception ex) { listener.onFailure(ex); return; diff --git a/src/main/java/org/opensearch/timeseries/transport/ResultResponse.java b/src/main/java/org/opensearch/timeseries/transport/ResultResponse.java index 38e566f3d..5cbaff410 100644 --- a/src/main/java/org/opensearch/timeseries/transport/ResultResponse.java +++ b/src/main/java/org/opensearch/timeseries/transport/ResultResponse.java @@ -19,6 +19,7 @@ import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.model.FeatureData; import org.opensearch.timeseries.model.IndexableResult; @@ -89,7 +90,7 @@ public boolean shouldSave() { } public abstract List toIndexableResults( - String configId, + Config configId, Instant dataStartInstant, Instant dataEndInstant, Instant executionStartInstant, diff --git a/src/test/java/org/opensearch/ad/model/AnomalyResultTests.java b/src/test/java/org/opensearch/ad/model/AnomalyResultTests.java index cac5e8a4d..4626f197e 100644 --- a/src/test/java/org/opensearch/ad/model/AnomalyResultTests.java +++ b/src/test/java/org/opensearch/ad/model/AnomalyResultTests.java @@ -213,7 +213,8 @@ public void testFromRawTRCFResultWithHighConfidence() { likelihoodOfValues, threshold, currentData, - featureImputed + featureImputed, + Collections.emptyList() ); // Assert that the confidence is capped at 1.0