diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/AppContext.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/AppContext.java index 790f97edd..620d8bcee 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/AppContext.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/AppContext.java @@ -22,9 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java index 032ad78e4..6f8325d6d 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java @@ -30,7 +30,6 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.NetServer; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.exceptions.MalformedConfig; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.ConnectedComponent; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.MetricsDBProvider; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Queryable; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Stats; @@ -410,7 +409,15 @@ private boolean updateMutedComponents() { LOG.info("Updating the muted graph nodes to : {}", graphNodesForMute); Stats.getInstance().updateMutedGraphNodes(graphNodesForMute); + // We need to update muted actions in two places. One is in the controller which could + // potentially serve as a snapshot when creating a new rca scheduler, and the other place + // is in the rca scheduler itself. The scheduler maintains a snapshot of the appcontext(see + // the comment above near scheduler instantiation) that it uses to determine staleness. It + // also happens to be the appcontext that will be passed to the graph nodes. appContext.updateMutedActions(actionsForMute); + if (rcaScheduler != null) { + rcaScheduler.updateAppContextWithMutedActions(actionsForMute); + } } catch (Exception e) { LOG.error("Couldn't read/update the muted RCAs", e); StatsCollector.instance().logMetric(MUTE_ERROR_METRIC); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java index 77b9bf576..059252df8 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java @@ -21,18 +21,17 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Queryable; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.ThresholdMain; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.WireHopper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.persistence.Persistable; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.sql.SQLException; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; @@ -138,7 +137,8 @@ public void start() { appContext); schedulerState = RcaSchedulerState.STATE_STARTED; - LOG.info("RCA scheduler thread started successfully on node: {}", appContext.getMyInstanceDetails().getInstanceId()); + LOG.info("RCA scheduler thread started successfully on node: {}", + appContext.getMyInstanceDetails().getInstanceId()); if (schedulerTrackingLatch != null) { schedulerTrackingLatch.countDown(); } @@ -206,6 +206,17 @@ private void createExecutorPools() { rcaSchedulerPeriodicExecutor = Executors.newFixedThreadPool(2, taskThreadFactory); } + /** + * Updates the list of muted actions in the current instance of {@link AppContext}. + * + * @param mutedActions The set of actions names that need to be muted. + */ + public void updateAppContextWithMutedActions(final Set mutedActions) { + if (this.appContext != null) { + this.appContext.updateMutedActions(mutedActions); + } + } + public NodeRole getRole() { return role; } @@ -218,4 +229,9 @@ public void setSchedulerTrackingLatch(final CountDownLatch schedulerTrackingLatc public void setQueryable(Queryable queryable) { this.db = queryable; } + + @VisibleForTesting + public AppContext getAppContext() { + return this.appContext; + } } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaControllerTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaControllerTest.java index f7b85f85d..4b479c121 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaControllerTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaControllerTest.java @@ -1,6 +1,6 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca; -import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaTestHelper.updateConfFileForMutedRcas; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaTestHelper.updateConfFileForMutedComponents; import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext; import com.amazon.opendistro.elasticsearch.performanceanalyzer.ClientServers; @@ -34,12 +34,12 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import javax.swing.JSeparator; import org.jooq.tools.json.JSONObject; import org.junit.After; import org.junit.Assert; @@ -195,9 +195,11 @@ public void readAndUpdateMutedRcasBeforeGraphCreation() throws Exception { String rcaConfPath = Paths.get(RcaConsts.TEST_CONFIG_PATH, "rca_muted.conf").toString(); Field rcaConfField = rcaController.getClass().getDeclaredField("rcaConf"); + String mutedRcasComponent = "muted-rcas"; rcaConfField.setAccessible(true); rcaConfField.set(rcaController, new RcaConf(rcaConfPath)); - updateConfFileForMutedRcas(rcaConfPath, Arrays.asList("CPU_Utilization", "Heap_AllocRate")); + updateConfFileForMutedComponents(rcaConfPath, Arrays.asList("CPU_Utilization", + "Heap_AllocRate"), mutedRcasComponent); Field mutedGraphNodesField = Stats.class.getDeclaredField("mutedGraphNodes"); mutedGraphNodesField.setAccessible(true); @@ -214,6 +216,7 @@ public void readAndUpdateMutedRcasBeforeGraphCreation() throws Exception { @Test public void readAndUpdateMutedRcasWithRCAEnableAndDisabled() throws Exception { + String mutedRcaComponent = "muted-rcas"; String mutedRcaConfPath = Paths.get(RcaConsts.TEST_CONFIG_PATH, "rca_muted.conf").toString(); List mutedRcas1 = Arrays.asList("CPU_Utilization", "Heap_AllocRate"); List mutedRcas2 = Arrays.asList("Paging_MajfltRate"); @@ -227,7 +230,7 @@ public void readAndUpdateMutedRcasWithRCAEnableAndDisabled() throws Exception { WaitFor.waitFor(() -> rcaController.getCurrentRole() == AllMetrics.NodeRole.MASTER, 10, TimeUnit.SECONDS); WaitFor.waitFor(() -> RcaControllerHelper.pickRcaConfForRole(AllMetrics.NodeRole.MASTER).getConfigFileLoc() == mutedRcaConfPath, 10, TimeUnit.SECONDS); - updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas1); + updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas1, mutedRcaComponent); Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas1)); // Disable RCA @@ -235,7 +238,7 @@ public void readAndUpdateMutedRcasWithRCAEnableAndDisabled() throws Exception { Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas1)); // Update rca.conf - updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas2); + updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas2, mutedRcaComponent); // Enable RCA, assert mutedRcas2 is muted nodes changeRcaRunState(RcaState.RUN); @@ -244,6 +247,8 @@ public void readAndUpdateMutedRcasWithRCAEnableAndDisabled() throws Exception { @Test public void readAndUpdateMutedRcas() throws Exception { + String mutedRcaComponent = "muted-rcas"; + String mutedActionComponent = "muted-actions"; String mutedRcaConfPath = Paths.get(RcaConsts.TEST_CONFIG_PATH, "rca_muted.conf").toString(); List mutedRcas1 = Arrays.asList("CPU_Utilization", "Heap_AllocRate"); List mutedRcas2 = Arrays.asList("Paging_MajfltRate"); @@ -261,36 +266,48 @@ public void readAndUpdateMutedRcas() throws Exception { // 1. Muted Graph : "CPU_Utilization, Heap_AllocRate", updating RCA Config with "CPU_Utilization, Heap_AllocRate" // Muted Graph should have "CPU_Utilization, Heap_AllocRate" - updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas1); + updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas1, mutedRcaComponent); Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas1)); // 2. Muted Graph : "CPU_Utilization, Heap_AllocRate", updating RCA Config with "" // Muted Graph should have no nodes - updateConfFileForMutedRcas(mutedRcaConfPath, Collections.emptyList()); + updateConfFileForMutedComponents(mutedRcaConfPath, Collections.emptyList(), mutedRcaComponent); Assert.assertTrue(check(new MutedRCAEval(rcaController), Collections.emptyList())); // 3. Muted Graph : "", updating RCA Config with "" // Muted Graph should have no nodes - updateConfFileForMutedRcas(mutedRcaConfPath, Collections.emptyList()); + updateConfFileForMutedComponents(mutedRcaConfPath, Collections.emptyList(), mutedRcaComponent); Assert.assertTrue(check(new MutedRCAEval(rcaController), Collections.emptyList())); // 4. On RCA Config, "muted-rcas" : "CPU_Utilization, Heap_AllocRate", Updating RCA Config with "Paging_MajfltRate" // Muted Graph should retain only "Paging_MajfltRate" - updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas2); + updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas2, mutedRcaComponent); Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas2)); // 5. On RCA Config, "muted-rcas" : "Paging_MajfltRate", Updating RCA Config with "Paging_MajfltRate_Check" // Muted Graph should still have "Paging_MajfltRate" - updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas3); + updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas3, mutedRcaComponent); Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas2)); // 6. On RCA Config, "muted-rcas" : "CPU_Utilization, Heap_AllocRate" // Updating RCA Config with "Paging_MajfltRate_Check, Paging_MajfltRate" // Muted Graph should have "Paging_MajfltRate" - updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas1); - updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas4); + updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas1, mutedRcaComponent); + updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas4, mutedRcaComponent); Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas2)); + + // 7. On RCA Config, "muted-actions": ["HeapSizeIncreaseAction"] + // Scheduler should be updated with "HeapSizeIncreaseAction"] + List mutedActions = Collections.singletonList("HeapSizeIncreaseAction"); + updateConfFileForMutedComponents(mutedRcaConfPath, mutedActions, mutedActionComponent); + Assert.assertTrue(check(new MutedActionEval(rcaController), new HashSet<>(mutedActions))); + + // 8. On RCA config, "muted-actions": [], + // Scheduler should be updated with [] + updateConfFileForMutedComponents(mutedRcaConfPath, Collections.emptyList(), + mutedActionComponent); + Assert.assertTrue(check(new MutedActionEval(rcaController), Collections.emptySet())); } @Test @@ -522,6 +539,25 @@ public boolean evaluateAndCheck(List mutedRcas) { } } + } + + class MutedActionEval implements IEval> { + + private final RcaController testController; + + MutedActionEval(final RcaController testController) { + this.testController = testController; + } + @Override + public boolean evaluateAndCheck(Set strings) { + AppContext snapshotAppContext = testController.getRcaScheduler().getAppContext(); + if (strings.isEmpty() && snapshotAppContext.getMutedActions().isEmpty()) { + return true; + } + + return strings.containsAll(snapshotAppContext.getMutedActions()) && snapshotAppContext + .getMutedActions().containsAll(strings); + } } } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaTestHelper.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaTestHelper.java index a76a520ca..dc92c9050 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaTestHelper.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaTestHelper.java @@ -171,7 +171,8 @@ public static void truncate(File file) { } } - public static void updateConfFileForMutedRcas(String rcaConfPath, List mutedRcas) throws Exception { + public static void updateConfFileForMutedComponents(String rcaConfPath, + List mutedComponents, String componentKey) throws Exception { // create the config json Object from rca config file Scanner scanner = new Scanner(new FileInputStream(rcaConfPath), StandardCharsets.UTF_8.name()); @@ -182,8 +183,8 @@ public static void updateConfFileForMutedRcas(String rcaConfPath, List m JsonNode configObject = mapper.readTree(jsonText); // update the `MUTED_RCAS_CONFIG` value in config Object - ArrayNode array = mapper.valueToTree(mutedRcas); - ((ObjectNode) configObject).putArray("muted-rcas").addAll(array); + ArrayNode array = mapper.valueToTree(mutedComponents); + ((ObjectNode) configObject).putArray(componentKey).addAll(array); mapper.writeValue(new FileOutputStream(rcaConfPath), configObject); }