Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Fix actions not muting (#506)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktkrg authored Nov 6, 2020
1 parent d3c6ccc commit c5fd658
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.NetServer;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.exceptions.MalformedConfig;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.ConnectedComponent;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.MetricsDBProvider;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Queryable;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Stats;
Expand Down Expand Up @@ -410,7 +409,15 @@ private boolean updateMutedComponents() {

LOG.info("Updating the muted graph nodes to : {}", graphNodesForMute);
Stats.getInstance().updateMutedGraphNodes(graphNodesForMute);
// We need to update muted actions in two places. One is in the controller which could
// potentially serve as a snapshot when creating a new rca scheduler, and the other place
// is in the rca scheduler itself. The scheduler maintains a snapshot of the appcontext(see
// the comment above near scheduler instantiation) that it uses to determine staleness. It
// also happens to be the appcontext that will be passed to the graph nodes.
appContext.updateMutedActions(actionsForMute);
if (rcaScheduler != null) {
rcaScheduler.updateAppContextWithMutedActions(actionsForMute);
}
} catch (Exception e) {
LOG.error("Couldn't read/update the muted RCAs", e);
StatsCollector.instance().logMetric(MUTE_ERROR_METRIC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,17 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Queryable;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.ThresholdMain;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.WireHopper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.persistence.Persistable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -138,7 +137,8 @@ public void start() {
appContext);

schedulerState = RcaSchedulerState.STATE_STARTED;
LOG.info("RCA scheduler thread started successfully on node: {}", appContext.getMyInstanceDetails().getInstanceId());
LOG.info("RCA scheduler thread started successfully on node: {}",
appContext.getMyInstanceDetails().getInstanceId());
if (schedulerTrackingLatch != null) {
schedulerTrackingLatch.countDown();
}
Expand Down Expand Up @@ -206,6 +206,17 @@ private void createExecutorPools() {
rcaSchedulerPeriodicExecutor = Executors.newFixedThreadPool(2, taskThreadFactory);
}

/**
* Updates the list of muted actions in the current instance of {@link AppContext}.
*
* @param mutedActions The set of actions names that need to be muted.
*/
public void updateAppContextWithMutedActions(final Set<String> mutedActions) {
if (this.appContext != null) {
this.appContext.updateMutedActions(mutedActions);
}
}

public NodeRole getRole() {
return role;
}
Expand All @@ -218,4 +229,9 @@ public void setSchedulerTrackingLatch(final CountDownLatch schedulerTrackingLatc
public void setQueryable(Queryable queryable) {
this.db = queryable;
}

@VisibleForTesting
public AppContext getAppContext() {
return this.appContext;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaTestHelper.updateConfFileForMutedRcas;
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaTestHelper.updateConfFileForMutedComponents;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.ClientServers;
Expand Down Expand Up @@ -34,12 +34,12 @@
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.swing.JSeparator;
import org.jooq.tools.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -195,9 +195,11 @@ public void readAndUpdateMutedRcasBeforeGraphCreation() throws Exception {

String rcaConfPath = Paths.get(RcaConsts.TEST_CONFIG_PATH, "rca_muted.conf").toString();
Field rcaConfField = rcaController.getClass().getDeclaredField("rcaConf");
String mutedRcasComponent = "muted-rcas";
rcaConfField.setAccessible(true);
rcaConfField.set(rcaController, new RcaConf(rcaConfPath));
updateConfFileForMutedRcas(rcaConfPath, Arrays.asList("CPU_Utilization", "Heap_AllocRate"));
updateConfFileForMutedComponents(rcaConfPath, Arrays.asList("CPU_Utilization",
"Heap_AllocRate"), mutedRcasComponent);

Field mutedGraphNodesField = Stats.class.getDeclaredField("mutedGraphNodes");
mutedGraphNodesField.setAccessible(true);
Expand All @@ -214,6 +216,7 @@ public void readAndUpdateMutedRcasBeforeGraphCreation() throws Exception {

@Test
public void readAndUpdateMutedRcasWithRCAEnableAndDisabled() throws Exception {
String mutedRcaComponent = "muted-rcas";
String mutedRcaConfPath = Paths.get(RcaConsts.TEST_CONFIG_PATH, "rca_muted.conf").toString();
List<String> mutedRcas1 = Arrays.asList("CPU_Utilization", "Heap_AllocRate");
List<String> mutedRcas2 = Arrays.asList("Paging_MajfltRate");
Expand All @@ -227,15 +230,15 @@ public void readAndUpdateMutedRcasWithRCAEnableAndDisabled() throws Exception {
WaitFor.waitFor(() -> rcaController.getCurrentRole() == AllMetrics.NodeRole.MASTER, 10, TimeUnit.SECONDS);
WaitFor.waitFor(() -> RcaControllerHelper.pickRcaConfForRole(AllMetrics.NodeRole.MASTER).getConfigFileLoc() == mutedRcaConfPath,
10, TimeUnit.SECONDS);
updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas1);
updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas1, mutedRcaComponent);
Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas1));

// Disable RCA
changeRcaRunState(RcaState.STOP);
Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas1));

// Update rca.conf
updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas2);
updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas2, mutedRcaComponent);

// Enable RCA, assert mutedRcas2 is muted nodes
changeRcaRunState(RcaState.RUN);
Expand All @@ -244,6 +247,8 @@ public void readAndUpdateMutedRcasWithRCAEnableAndDisabled() throws Exception {

@Test
public void readAndUpdateMutedRcas() throws Exception {
String mutedRcaComponent = "muted-rcas";
String mutedActionComponent = "muted-actions";
String mutedRcaConfPath = Paths.get(RcaConsts.TEST_CONFIG_PATH, "rca_muted.conf").toString();
List<String> mutedRcas1 = Arrays.asList("CPU_Utilization", "Heap_AllocRate");
List<String> mutedRcas2 = Arrays.asList("Paging_MajfltRate");
Expand All @@ -261,36 +266,48 @@ public void readAndUpdateMutedRcas() throws Exception {

// 1. Muted Graph : "CPU_Utilization, Heap_AllocRate", updating RCA Config with "CPU_Utilization, Heap_AllocRate"
// Muted Graph should have "CPU_Utilization, Heap_AllocRate"
updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas1);
updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas1, mutedRcaComponent);
Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas1));

// 2. Muted Graph : "CPU_Utilization, Heap_AllocRate", updating RCA Config with ""
// Muted Graph should have no nodes
updateConfFileForMutedRcas(mutedRcaConfPath, Collections.emptyList());
updateConfFileForMutedComponents(mutedRcaConfPath, Collections.emptyList(), mutedRcaComponent);
Assert.assertTrue(check(new MutedRCAEval(rcaController), Collections.emptyList()));


// 3. Muted Graph : "", updating RCA Config with ""
// Muted Graph should have no nodes
updateConfFileForMutedRcas(mutedRcaConfPath, Collections.emptyList());
updateConfFileForMutedComponents(mutedRcaConfPath, Collections.emptyList(), mutedRcaComponent);
Assert.assertTrue(check(new MutedRCAEval(rcaController), Collections.emptyList()));

// 4. On RCA Config, "muted-rcas" : "CPU_Utilization, Heap_AllocRate", Updating RCA Config with "Paging_MajfltRate"
// Muted Graph should retain only "Paging_MajfltRate"
updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas2);
updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas2, mutedRcaComponent);
Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas2));

// 5. On RCA Config, "muted-rcas" : "Paging_MajfltRate", Updating RCA Config with "Paging_MajfltRate_Check"
// Muted Graph should still have "Paging_MajfltRate"
updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas3);
updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas3, mutedRcaComponent);
Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas2));

// 6. On RCA Config, "muted-rcas" : "CPU_Utilization, Heap_AllocRate"
// Updating RCA Config with "Paging_MajfltRate_Check, Paging_MajfltRate"
// Muted Graph should have "Paging_MajfltRate"
updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas1);
updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas4);
updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas1, mutedRcaComponent);
updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas4, mutedRcaComponent);
Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas2));

// 7. On RCA Config, "muted-actions": ["HeapSizeIncreaseAction"]
// Scheduler should be updated with "HeapSizeIncreaseAction"]
List<String> mutedActions = Collections.singletonList("HeapSizeIncreaseAction");
updateConfFileForMutedComponents(mutedRcaConfPath, mutedActions, mutedActionComponent);
Assert.assertTrue(check(new MutedActionEval(rcaController), new HashSet<>(mutedActions)));

// 8. On RCA config, "muted-actions": [],
// Scheduler should be updated with []
updateConfFileForMutedComponents(mutedRcaConfPath, Collections.emptyList(),
mutedActionComponent);
Assert.assertTrue(check(new MutedActionEval(rcaController), Collections.emptySet()));
}

@Test
Expand Down Expand Up @@ -522,6 +539,25 @@ public boolean evaluateAndCheck(List<String> mutedRcas) {

}
}
}

class MutedActionEval implements IEval<Set<String>> {

private final RcaController testController;

MutedActionEval(final RcaController testController) {
this.testController = testController;
}

@Override
public boolean evaluateAndCheck(Set<String> strings) {
AppContext snapshotAppContext = testController.getRcaScheduler().getAppContext();
if (strings.isEmpty() && snapshotAppContext.getMutedActions().isEmpty()) {
return true;
}

return strings.containsAll(snapshotAppContext.getMutedActions()) && snapshotAppContext
.getMutedActions().containsAll(strings);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ public static void truncate(File file) {
}
}

public static void updateConfFileForMutedRcas(String rcaConfPath, List<String> mutedRcas) throws Exception {
public static void updateConfFileForMutedComponents(String rcaConfPath,
List<String> mutedComponents, String componentKey) throws Exception {

// create the config json Object from rca config file
Scanner scanner = new Scanner(new FileInputStream(rcaConfPath), StandardCharsets.UTF_8.name());
Expand All @@ -182,8 +183,8 @@ public static void updateConfFileForMutedRcas(String rcaConfPath, List<String> m
JsonNode configObject = mapper.readTree(jsonText);

// update the `MUTED_RCAS_CONFIG` value in config Object
ArrayNode array = mapper.valueToTree(mutedRcas);
((ObjectNode) configObject).putArray("muted-rcas").addAll(array);
ArrayNode array = mapper.valueToTree(mutedComponents);
((ObjectNode) configObject).putArray(componentKey).addAll(array);
mapper.writeValue(new FileOutputStream(rcaConfPath), configObject);
}

Expand Down

0 comments on commit c5fd658

Please sign in to comment.