Skip to content

Commit

Permalink
Code Refactoring for CommonName (#867)
Browse files Browse the repository at this point in the history
In this pull request, I have refactored the code related to shared names in both AD and forecasting modules to CommonNames. Additionally, the previously used CommonName has been renamed to ADCommonName. For the Forecasting module, I have introduced new names in ForecastCommonNames.

Testing done:
* gradle build

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed May 9, 2023
1 parent 1f8415a commit 7edb33b
Show file tree
Hide file tree
Showing 152 changed files with 916 additions and 819 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ List<String> jacocoExclusions = [

// Class containing just constants. Don't need to test
'org.opensearch.ad.constant.*',
'org.opensearch.forecast.constant.*',
'org.opensearch.timeseries.constant.*',

//'org.opensearch.ad.common.exception.AnomalyDetectionException',
'org.opensearch.ad.util.ClientUtil',
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.constant.CommonName;

import com.google.common.base.Throwables;

Expand Down Expand Up @@ -514,7 +515,7 @@ private void stopAdJobForEndRunException(
}

private void stopAdJob(String detectorId, AnomalyDetectorFunction function) {
GetRequest getRequest = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX).id(detectorId);
GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX).id(detectorId);
ActionListener<GetResponse> listener = ActionListener.wrap(response -> {
if (response.isExists()) {
try (
Expand All @@ -537,7 +538,7 @@ private void stopAdJob(String detectorId, AnomalyDetectorFunction function) {
job.getUser(),
job.getResultIndex()
);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX)
IndexRequest indexRequest = new IndexRequest(CommonName.JOB_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(newJob.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), XCONTENT_WITH_TYPE))
.id(detectorId);
Expand Down
25 changes: 13 additions & 12 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.opensearch.ad.cluster.ADDataMigrator;
import org.opensearch.ad.cluster.ClusterManagerEventListener;
import org.opensearch.ad.cluster.HashRing;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.dataprocessor.IntegerSensitiveSingleFeatureLinearUniformInterpolator;
import org.opensearch.ad.dataprocessor.Interpolator;
import org.opensearch.ad.dataprocessor.LinearUniformInterpolator;
Expand Down Expand Up @@ -85,7 +85,6 @@
import org.opensearch.ad.settings.NumericSetting;
import org.opensearch.ad.stats.ADStat;
import org.opensearch.ad.stats.ADStats;
import org.opensearch.ad.stats.StatNames;
import org.opensearch.ad.stats.suppliers.CounterSupplier;
import org.opensearch.ad.stats.suppliers.IndexStatusSupplier;
import org.opensearch.ad.stats.suppliers.ModelsOnNodeCountSupplier;
Expand Down Expand Up @@ -195,6 +194,8 @@
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.stats.StatNames;
import org.opensearch.watcher.ResourceWatcherService;

import com.amazon.randomcutforest.parkservices.state.ThresholdedRandomCutForestMapper;
Expand Down Expand Up @@ -429,7 +430,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
CheckpointDao checkpoint = new CheckpointDao(
client,
clientUtil,
CommonName.CHECKPOINT_INDEX_NAME,
ADCommonName.CHECKPOINT_INDEX_NAME,
gson,
mapper,
converter,
Expand All @@ -454,7 +455,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
CheckPointMaintainRequestAdapter adapter = new CheckPointMaintainRequestAdapter(
cacheProvider,
checkpoint,
CommonName.CHECKPOINT_INDEX_NAME,
ADCommonName.CHECKPOINT_INDEX_NAME,
AnomalyDetectorSettings.CHECKPOINT_SAVING_FREQ,
getClock(),
clusterService,
Expand All @@ -477,7 +478,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT,
AnomalyDetectorSettings.QUEUE_MAINTENANCE,
checkpoint,
CommonName.CHECKPOINT_INDEX_NAME,
ADCommonName.CHECKPOINT_INDEX_NAME,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
stateManager,
AnomalyDetectorSettings.HOURLY_MAINTENANCE
Expand Down Expand Up @@ -625,23 +626,23 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
)
.put(
StatNames.ANOMALY_DETECTORS_INDEX_STATUS.getName(),
new ADStat<>(true, new IndexStatusSupplier(indexUtils, AnomalyDetector.ANOMALY_DETECTORS_INDEX))
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.CONFIG_INDEX))
)
.put(
StatNames.ANOMALY_RESULTS_INDEX_STATUS.getName(),
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.ANOMALY_RESULT_INDEX_ALIAS))
new ADStat<>(true, new IndexStatusSupplier(indexUtils, ADCommonName.ANOMALY_RESULT_INDEX_ALIAS))
)
.put(
StatNames.MODELS_CHECKPOINT_INDEX_STATUS.getName(),
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.CHECKPOINT_INDEX_NAME))
new ADStat<>(true, new IndexStatusSupplier(indexUtils, ADCommonName.CHECKPOINT_INDEX_NAME))
)
.put(
StatNames.ANOMALY_DETECTION_JOB_INDEX_STATUS.getName(),
new ADStat<>(true, new IndexStatusSupplier(indexUtils, AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX))
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.JOB_INDEX))
)
.put(
StatNames.ANOMALY_DETECTION_STATE_STATUS.getName(),
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.DETECTION_STATE_INDEX))
new ADStat<>(true, new IndexStatusSupplier(indexUtils, ADCommonName.DETECTION_STATE_INDEX))
)
.put(StatNames.DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
.put(StatNames.SINGLE_ENTITY_DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
Expand Down Expand Up @@ -752,7 +753,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
client,
settings,
threadPool,
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
anomalyDetectionIndices,
this.clientUtil,
this.indexUtils,
Expand Down Expand Up @@ -1010,7 +1011,7 @@ public String getJobType() {

@Override
public String getJobIndex() {
return AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
return CommonName.JOB_INDEX;
}

@Override
Expand Down
19 changes: 9 additions & 10 deletions src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG;
import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_PARSE_DETECTOR_MSG;
import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
Expand All @@ -35,8 +33,8 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.common.exception.NotSerializedADExceptionName;
import org.opensearch.ad.common.exception.ResourceNotFoundException;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
Expand Down Expand Up @@ -74,6 +72,7 @@
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.opensearch.search.aggregations.metrics.InternalCardinality;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.transport.TransportService;

public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
Expand Down Expand Up @@ -121,7 +120,7 @@ private void calculateTotalResponsesToWait(
Set<DetectorProfileName> profilesToCollect,
ActionListener<DetectorProfile> listener
) {
GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
GetRequest getDetectorRequest = new GetRequest(CommonName.CONFIG_INDEX, detectorId);
client.get(getDetectorRequest, ActionListener.wrap(getDetectorResponse -> {
if (getDetectorResponse != null && getDetectorResponse.isExists()) {
try (
Expand Down Expand Up @@ -151,7 +150,7 @@ private void prepareProfile(
Set<DetectorProfileName> profilesToCollect
) {
String detectorId = detector.getDetectorId();
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
Expand Down Expand Up @@ -292,14 +291,14 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
if (categoryField.size() == 1) {
// Run a cardinality aggregation to count the cardinality of single category fields
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
CardinalityAggregationBuilder aggBuilder = new CardinalityAggregationBuilder(CommonName.TOTAL_ENTITIES);
CardinalityAggregationBuilder aggBuilder = new CardinalityAggregationBuilder(ADCommonName.TOTAL_ENTITIES);
aggBuilder.field(categoryField.get(0));
searchSourceBuilder.aggregation(aggBuilder);

SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder);
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
InternalCardinality totalEntities = (InternalCardinality) aggMap.get(CommonName.TOTAL_ENTITIES);
InternalCardinality totalEntities = (InternalCardinality) aggMap.get(ADCommonName.TOTAL_ENTITIES);
long value = totalEntities.getValue();
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
DetectorProfile profile = profileBuilder.totalEntities(value).build();
Expand All @@ -322,7 +321,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
// Run a composite query and count the number of buckets to decide cardinality of multiple category fields
AggregationBuilder bucketAggs = AggregationBuilders
.composite(
CommonName.TOTAL_ENTITIES,
ADCommonName.TOTAL_ENTITIES,
detector.getCategoryField().stream().map(f -> new TermsValuesSourceBuilder(f).field(f)).collect(Collectors.toList())
)
.size(maxTotalEntitiesToTrack);
Expand All @@ -344,7 +343,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
return;
}

Aggregation aggrResult = aggs.get(CommonName.TOTAL_ENTITIES);
Aggregation aggrResult = aggs.get(ADCommonName.TOTAL_ENTITIES);
if (aggrResult == null) {
listener.onFailure(new IllegalArgumentException("Fail to find valid aggregation result"));
return;
Expand Down Expand Up @@ -558,7 +557,7 @@ private ActionListener<RCFPollingResponse> onPollRCFUpdates(
NotSerializedADExceptionName.RESOURCE_NOT_FOUND_EXCEPTION_NAME_UNDERSCORE.getName()
)
|| (ExceptionUtil.isIndexNotAvailable(causeException)
&& causeException.getMessage().contains(CommonName.CHECKPOINT_INDEX_NAME))) {
&& causeException.getMessage().contains(ADCommonName.CHECKPOINT_INDEX_NAME))) {
// cannot find checkpoint
// We don't want to show the estimated time remaining to initialize
// a detector before cold start finishes, where the actual
Expand Down
15 changes: 7 additions & 8 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

package org.opensearch.ad;

import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.util.List;
Expand All @@ -27,8 +25,8 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
Expand Down Expand Up @@ -58,6 +56,7 @@
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.constant.CommonName;

public class EntityProfileRunner extends AbstractProfileRunner {
private final Logger logger = LogManager.getLogger(EntityProfileRunner.class);
Expand Down Expand Up @@ -94,7 +93,7 @@ public void profile(
listener.onFailure(new IllegalArgumentException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
return;
}
GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
GetRequest getDetectorRequest = new GetRequest(CommonName.CONFIG_INDEX, detectorId);

client.get(getDetectorRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
Expand Down Expand Up @@ -220,7 +219,7 @@ private void getJob(
EntityProfileResponse entityProfileResponse,
ActionListener<EntityProfile> listener
) {
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
Expand Down Expand Up @@ -457,15 +456,15 @@ private SearchRequest createLastSampleTimeRequest(String detectorId, long enable

boolQueryBuilder.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));

boolQueryBuilder.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
boolQueryBuilder.filter(QueryBuilders.rangeQuery(CommonName.EXECUTION_END_TIME_FIELD).gte(enabledTime));

SearchSourceBuilder source = new SearchSourceBuilder()
.query(boolQueryBuilder)
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(AnomalyResult.EXECUTION_END_TIME_FIELD))
.aggregation(AggregationBuilders.max(ADCommonName.AGG_NAME_MAX_TIME).field(CommonName.EXECUTION_END_TIME_FIELD))
.trackTotalHits(false)
.size(0);

SearchRequest request = new SearchRequest(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
SearchRequest request = new SearchRequest(ADCommonName.ANOMALY_RESULT_INDEX_ALIAS);
request.source(source);
if (resultIndex != null) {
request.indices(resultIndex);
Expand Down
9 changes: 5 additions & 4 deletions src/main/java/org/opensearch/ad/NodeStateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.ad.common.exception.EndRunException;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.ml.SingleStreamModelIdMapper;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
Expand All @@ -48,6 +48,7 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.timeseries.constant.CommonName;

/**
* NodeStateManager is used to manage states shared by transport and ml components
Expand Down Expand Up @@ -130,7 +131,7 @@ public void getAnomalyDetector(String adID, ActionListener<Optional<AnomalyDetec
if (state != null && state.getDetectorDef() != null) {
listener.onResponse(Optional.of(state.getDetectorDef()));
} else {
GetRequest request = new GetRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX, adID);
GetRequest request = new GetRequest(CommonName.CONFIG_INDEX, adID);
clientUtil.<GetRequest, GetResponse>asyncRequest(request, client::get, onGetDetectorResponse(adID, listener));
}
}
Expand Down Expand Up @@ -182,7 +183,7 @@ public void getDetectorCheckpoint(String adID, ActionListener<Boolean> listener)
return;
}

GetRequest request = new GetRequest(CommonName.CHECKPOINT_INDEX_NAME, SingleStreamModelIdMapper.getRcfModelId(adID, 0));
GetRequest request = new GetRequest(ADCommonName.CHECKPOINT_INDEX_NAME, SingleStreamModelIdMapper.getRcfModelId(adID, 0));

clientUtil.<GetRequest, GetResponse>asyncRequest(request, client::get, onGetCheckpointResponse(adID, listener));
}
Expand Down Expand Up @@ -375,7 +376,7 @@ public void getAnomalyDetectorJob(String adID, ActionListener<Optional<AnomalyDe
if (state != null && state.getDetectorJob() != null) {
listener.onResponse(Optional.of(state.getDetectorJob()));
} else {
GetRequest request = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, adID);
GetRequest request = new GetRequest(CommonName.JOB_INDEX, adID);
clientUtil.<GetRequest, GetResponse>asyncRequest(request, client::get, onGetDetectorJobResponse(adID, listener));
}
}
Expand Down
Loading

0 comments on commit 7edb33b

Please sign in to comment.