From 5ef9fc5c7702741bb07179cd47b22c4d97dae893 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Tue, 11 Jul 2023 14:32:22 -0700 Subject: [PATCH] [backport to 2.x] Enforce DOCUMENT Replication for AD Indices In this PR, we temporarily enforce DOCUMENT replication for AD indices. This change is necessary due to the current limitation of SegRep, which doesn't support Get/MultiGet by ID. This measure will be in place until SegRep adds support for these operations. This adjustment aligns with the modification made in the referenced PR: opensearch-project/job-scheduler#417 Signed-off-by: Kaituo Li --- .../ad/indices/AnomalyDetectionIndices.java | 33 +++++++++++++++---- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java b/src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java index 8c035678d..f90724d16 100644 --- a/src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java +++ b/src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java @@ -22,6 +22,8 @@ import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_RESULTS_INDEX_MAPPING_FILE; import static org.opensearch.ad.settings.AnomalyDetectorSettings.CHECKPOINT_INDEX_MAPPING_FILE; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_PRIMARY_SHARDS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.indices.replication.common.ReplicationType.DOCUMENT; import java.io.IOException; import java.net.URL; @@ -529,7 +531,10 @@ public void initAnomalyDetectorIndexIfAbsent(ActionListener * @throws IOException IOException from {@link AnomalyDetectionIndices#getAnomalyDetectorMappings} */ public void initAnomalyDetectorIndex(ActionListener actionListener) throws IOException { - CreateIndexRequest request = new CreateIndexRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX) + // AD indices need RAW (e.g., we want users to be able to consume AD results as soon as possible and send out an alert if + // anomalies found). + Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build(); + CreateIndexRequest request = new CreateIndexRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX, replicationSettings) .mapping(getAnomalyDetectorMappings(), XContentType.JSON) .settings(settings); adminClient.indices().create(request, markMappingUpToDate(ADIndex.CONFIG, actionListener)); @@ -594,7 +599,10 @@ public void initAnomalyResultIndexDirectly( ActionListener actionListener ) throws IOException { String mapping = getAnomalyResultMappings(); - CreateIndexRequest request = new CreateIndexRequest(resultIndex).mapping(mapping, XContentType.JSON); + // AD indices need RAW (e.g., we want users to be able to consume AD results as soon as possible and send out an alert if + // anomalies found). + Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build(); + CreateIndexRequest request = new CreateIndexRequest(resultIndex, replicationSettings).mapping(mapping, XContentType.JSON); if (alias != null) { request.alias(new Alias(CommonName.ANOMALY_RESULT_INDEX_ALIAS)); } @@ -613,7 +621,10 @@ public void initAnomalyResultIndexDirectly( */ public void initAnomalyDetectorJobIndex(ActionListener actionListener) { try { - CreateIndexRequest request = new CreateIndexRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX) + // AD indices need RAW (e.g., we want users to be able to consume AD results as soon as possible and send out an alert if + // anomalies found). + Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build(); + CreateIndexRequest request = new CreateIndexRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, replicationSettings) .mapping(getAnomalyDetectorJobMappings(), XContentType.JSON); request .settings( @@ -645,7 +656,10 @@ public void initAnomalyDetectorJobIndex(ActionListener acti */ public void initDetectionStateIndex(ActionListener actionListener) { try { - CreateIndexRequest request = new CreateIndexRequest(CommonName.DETECTION_STATE_INDEX) + // AD indices need RAW (e.g., we want users to be able to consume AD results as soon as possible and send out an alert if + // anomalies found). + Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build(); + CreateIndexRequest request = new CreateIndexRequest(CommonName.DETECTION_STATE_INDEX, replicationSettings) .mapping(getDetectionStateMappings(), XContentType.JSON) .settings(settings); adminClient.indices().create(request, markMappingUpToDate(ADIndex.STATE, actionListener)); @@ -668,7 +682,11 @@ public void initCheckpointIndex(ActionListener actionListen } catch (IOException e) { throw new EndRunException("", "Cannot find checkpoint mapping file", true); } - CreateIndexRequest request = new CreateIndexRequest(CommonName.CHECKPOINT_INDEX_NAME).mapping(mapping, XContentType.JSON); + // AD indices need RAW (e.g., we want users to be able to consume AD results as soon as possible and send out an alert if + // anomalies found). + Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build(); + CreateIndexRequest request = new CreateIndexRequest(CommonName.CHECKPOINT_INDEX_NAME, replicationSettings) + .mapping(mapping, XContentType.JSON); choosePrimaryShards(request); adminClient.indices().create(request, markMappingUpToDate(ADIndex.CHECKPOINT, actionListener)); } @@ -725,7 +743,10 @@ void rolloverAndDeleteHistoryIndex() { } CreateIndexRequest createRequest = rollOverRequest.getCreateIndexRequest(); - createRequest.index(AD_RESULT_HISTORY_INDEX_PATTERN).mapping(adResultMapping, XContentType.JSON); + // time series indices need RAW (e.g., we want users to be able to consume AD results as soon as possible + // and send out an alert if anomalies found). + Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build(); + createRequest.index(AD_RESULT_HISTORY_INDEX_PATTERN).settings(replicationSettings).mapping(adResultMapping, XContentType.JSON); choosePrimaryShards(createRequest);