-
Notifications
You must be signed in to change notification settings - Fork 73
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refinement of Forecasting and AD Precision/Recall Improvements (#1210)
This PR addresses several improvements related to forecasting and anomaly detection (AD) precision/recall. It introduces changes to accommodate forecasting functionality, which is currently disabled as it's not yet released. Additionally, it reverts name changes introduced in a previous PR #1173 due to the unreleased status of forecasting. Changes Made: * Integration of forecasting-related improvements. * Reversion of name changes for compatibility reasons. * Introduce rule based AD. Testing Done: * Verified frontend workflow remains functional: creation, previewing, historical, and real-time detection. * All existing unit and integration tests pass successfully. * Added a new integration test (RuleModelPerfIT) to validate rule-based AD improvements in precision/recall. Next Steps: * Add new AD tests before the 2.15 release to meet coverage requirements. * Conduct backward compatibility tests to ensure compatibility with existing functionality. The merge was attempted earlier to prevent blocking other teammates from submitting their changes. Signed-off-by: Kaituo Li <[email protected]>
- Loading branch information
Showing
630 changed files
with
38,812 additions
and
19,862 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
# indirect dependency of opensearch_py. Lower than 2.30 can cause CVE-2024-35195. | ||
requests>=2.32.0 | ||
numpy==1.23.0 | ||
opensearch_py==2.0.0 | ||
retry==0.9.2 | ||
scipy==1.10.0 | ||
urllib3==1.26.18 | ||
urllib3==1.26.18 |
46 changes: 46 additions & 0 deletions
46
src/main/java/org/opensearch/ad/ADEntityProfileRunner.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
* Modifications Copyright OpenSearch Contributors. See | ||
* GitHub history for details. | ||
*/ | ||
|
||
package org.opensearch.ad; | ||
|
||
import org.opensearch.ad.constant.ADCommonName; | ||
import org.opensearch.ad.model.AnomalyDetector; | ||
import org.opensearch.ad.model.AnomalyResult; | ||
import org.opensearch.ad.settings.ADNumericSetting; | ||
import org.opensearch.ad.transport.ADEntityProfileAction; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.timeseries.AnalysisType; | ||
import org.opensearch.timeseries.EntityProfileRunner; | ||
import org.opensearch.timeseries.util.SecurityClientUtil; | ||
|
||
public class ADEntityProfileRunner extends EntityProfileRunner<ADEntityProfileAction> { | ||
|
||
public ADEntityProfileRunner( | ||
Client client, | ||
SecurityClientUtil clientUtil, | ||
NamedXContentRegistry xContentRegistry, | ||
long requiredSamples | ||
) { | ||
super( | ||
client, | ||
clientUtil, | ||
xContentRegistry, | ||
requiredSamples, | ||
AnomalyDetector::parse, | ||
ADNumericSetting.maxCategoricalFields(), | ||
AnalysisType.AD, | ||
ADEntityProfileAction.INSTANCE, | ||
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS, | ||
AnomalyResult.DETECTOR_ID_FIELD | ||
); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.ad; | ||
|
||
import java.time.Instant; | ||
import java.util.List; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.ad.indices.ADIndex; | ||
import org.opensearch.ad.indices.ADIndexManagement; | ||
import org.opensearch.ad.model.ADTask; | ||
import org.opensearch.ad.model.ADTaskType; | ||
import org.opensearch.ad.model.AnomalyResult; | ||
import org.opensearch.ad.rest.handler.ADIndexJobActionHandler; | ||
import org.opensearch.ad.settings.AnomalyDetectorSettings; | ||
import org.opensearch.ad.task.ADTaskCacheManager; | ||
import org.opensearch.ad.task.ADTaskManager; | ||
import org.opensearch.ad.transport.ADProfileAction; | ||
import org.opensearch.ad.transport.AnomalyResultAction; | ||
import org.opensearch.ad.transport.AnomalyResultRequest; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.jobscheduler.spi.LockModel; | ||
import org.opensearch.jobscheduler.spi.utils.LockService; | ||
import org.opensearch.timeseries.AnalysisType; | ||
import org.opensearch.timeseries.JobProcessor; | ||
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin; | ||
import org.opensearch.timeseries.common.exception.EndRunException; | ||
import org.opensearch.timeseries.model.Config; | ||
import org.opensearch.timeseries.model.Job; | ||
import org.opensearch.timeseries.transport.ResultRequest; | ||
|
||
public class ADJobProcessor extends | ||
JobProcessor<ADIndex, ADIndexManagement, ADTaskCacheManager, ADTaskType, ADTask, ADTaskManager, AnomalyResult, ADProfileAction, ExecuteADResultResponseRecorder, ADIndexJobActionHandler> { | ||
|
||
private static final Logger log = LogManager.getLogger(ADJobProcessor.class); | ||
|
||
private static ADJobProcessor INSTANCE; | ||
|
||
public static ADJobProcessor getInstance() { | ||
if (INSTANCE != null) { | ||
return INSTANCE; | ||
} | ||
synchronized (ADJobProcessor.class) { | ||
if (INSTANCE != null) { | ||
return INSTANCE; | ||
} | ||
INSTANCE = new ADJobProcessor(); | ||
return INSTANCE; | ||
} | ||
} | ||
|
||
private ADJobProcessor() { | ||
// Singleton class, use getJobRunnerInstance method instead of constructor | ||
super(AnalysisType.AD, TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME, AnomalyResultAction.INSTANCE); | ||
} | ||
|
||
public void registerSettings(Settings settings) { | ||
super.registerSettings(settings, AnomalyDetectorSettings.AD_MAX_RETRY_FOR_END_RUN_EXCEPTION); | ||
} | ||
|
||
@Override | ||
protected ResultRequest createResultRequest(String configId, long start, long end) { | ||
return new AnomalyResultRequest(configId, start, end); | ||
} | ||
|
||
@Override | ||
protected void validateResultIndexAndRunJob( | ||
Job jobParameter, | ||
LockService lockService, | ||
LockModel lock, | ||
Instant executionStartTime, | ||
Instant executionEndTime, | ||
String configId, | ||
String user, | ||
List<String> roles, | ||
ExecuteADResultResponseRecorder recorder, | ||
Config detector | ||
) { | ||
String resultIndex = jobParameter.getCustomResultIndex(); | ||
if (resultIndex == null) { | ||
runJob(jobParameter, lockService, lock, executionStartTime, executionEndTime, configId, user, roles, recorder, detector); | ||
return; | ||
} | ||
ActionListener<Boolean> listener = ActionListener.wrap(r -> { log.debug("Custom index is valid"); }, e -> { | ||
Exception exception = new EndRunException(configId, e.getMessage(), false); | ||
handleException(jobParameter, lockService, lock, executionStartTime, executionEndTime, exception, recorder, detector); | ||
}); | ||
indexManagement.validateCustomIndexForBackendJob(resultIndex, configId, user, roles, () -> { | ||
listener.onResponse(true); | ||
runJob(jobParameter, lockService, lock, executionStartTime, executionEndTime, configId, user, roles, recorder, detector); | ||
}, listener); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.ad; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.ad.model.ADTask; | ||
import org.opensearch.ad.model.ADTaskProfile; | ||
import org.opensearch.ad.transport.ADTaskProfileAction; | ||
import org.opensearch.ad.transport.ADTaskProfileNodeResponse; | ||
import org.opensearch.ad.transport.ADTaskProfileRequest; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.timeseries.TaskProfileRunner; | ||
import org.opensearch.timeseries.cluster.HashRing; | ||
import org.opensearch.timeseries.model.EntityTaskProfile; | ||
|
||
public class ADTaskProfileRunner implements TaskProfileRunner<ADTask, ADTaskProfile> { | ||
public final Logger logger = LogManager.getLogger(ADTaskProfileRunner.class); | ||
|
||
private final HashRing hashRing; | ||
private final Client client; | ||
|
||
public ADTaskProfileRunner(HashRing hashRing, Client client) { | ||
this.hashRing = hashRing; | ||
this.client = client; | ||
} | ||
|
||
@Override | ||
public void getTaskProfile(ADTask configLevelTask, ActionListener<ADTaskProfile> listener) { | ||
String detectorId = configLevelTask.getConfigId(); | ||
|
||
hashRing.getAllEligibleDataNodesWithKnownVersion(dataNodes -> { | ||
ADTaskProfileRequest adTaskProfileRequest = new ADTaskProfileRequest(detectorId, dataNodes); | ||
client.execute(ADTaskProfileAction.INSTANCE, adTaskProfileRequest, ActionListener.wrap(response -> { | ||
if (response.hasFailures()) { | ||
listener.onFailure(response.failures().get(0)); | ||
return; | ||
} | ||
|
||
List<EntityTaskProfile> adEntityTaskProfiles = new ArrayList<>(); | ||
ADTaskProfile detectorTaskProfile = new ADTaskProfile(configLevelTask); | ||
for (ADTaskProfileNodeResponse node : response.getNodes()) { | ||
ADTaskProfile taskProfile = node.getAdTaskProfile(); | ||
if (taskProfile != null) { | ||
if (taskProfile.getNodeId() != null) { | ||
// HC detector: task profile from coordinating node | ||
// Single entity detector: task profile from worker node | ||
detectorTaskProfile.setTaskId(taskProfile.getTaskId()); | ||
detectorTaskProfile.setRcfTotalUpdates(taskProfile.getRcfTotalUpdates()); | ||
detectorTaskProfile.setThresholdModelTrained(taskProfile.getThresholdModelTrained()); | ||
detectorTaskProfile.setThresholdModelTrainingDataSize(taskProfile.getThresholdModelTrainingDataSize()); | ||
detectorTaskProfile.setModelSizeInBytes(taskProfile.getModelSizeInBytes()); | ||
detectorTaskProfile.setNodeId(taskProfile.getNodeId()); | ||
detectorTaskProfile.setTotalEntitiesCount(taskProfile.getTotalEntitiesCount()); | ||
detectorTaskProfile.setDetectorTaskSlots(taskProfile.getDetectorTaskSlots()); | ||
detectorTaskProfile.setPendingEntitiesCount(taskProfile.getPendingEntitiesCount()); | ||
detectorTaskProfile.setRunningEntitiesCount(taskProfile.getRunningEntitiesCount()); | ||
detectorTaskProfile.setRunningEntities(taskProfile.getRunningEntities()); | ||
detectorTaskProfile.setTaskType(taskProfile.getTaskType()); | ||
} | ||
if (taskProfile.getEntityTaskProfiles() != null) { | ||
adEntityTaskProfiles.addAll(taskProfile.getEntityTaskProfiles()); | ||
} | ||
} | ||
} | ||
if (adEntityTaskProfiles != null && adEntityTaskProfiles.size() > 0) { | ||
detectorTaskProfile.setEntityTaskProfiles(adEntityTaskProfiles); | ||
} | ||
listener.onResponse(detectorTaskProfile); | ||
}, e -> { | ||
logger.error("Failed to get task profile for task " + configLevelTask.getTaskId(), e); | ||
listener.onFailure(e); | ||
})); | ||
}, listener); | ||
|
||
} | ||
|
||
} |
Oops, something went wrong.