Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Fix actions not muting #506

Merged
merged 1 commit into from
Nov 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.NetServer;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.exceptions.MalformedConfig;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.ConnectedComponent;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.MetricsDBProvider;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Queryable;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Stats;
Expand Down Expand Up @@ -410,7 +409,15 @@ private boolean updateMutedComponents() {

LOG.info("Updating the muted graph nodes to : {}", graphNodesForMute);
Stats.getInstance().updateMutedGraphNodes(graphNodesForMute);
// We need to update muted actions in two places. One is in the controller which could
// potentially serve as a snapshot when creating a new rca scheduler, and the other place
// is in the rca scheduler itself. The scheduler maintains a snapshot of the appcontext(see
// the comment above near scheduler instantiation) that it uses to determine staleness. It
// also happens to be the appcontext that will be passed to the graph nodes.
appContext.updateMutedActions(actionsForMute);
if (rcaScheduler != null) {
rcaScheduler.updateAppContextWithMutedActions(actionsForMute);
}
Comment on lines +418 to +420
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes this issue but we can run into such inconsistencies where changes may not reflect until a scheduler restart. Can the scheduler use a reference of the AppContext of RcaController instead of a copy ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my first thought as well. But I saw this comment on L215 and decided to go this way.

// RcaScheduler should be started with a snapshot of the AppContext as RcaController
// monitors it for stale state and always restarts the scheduler if it finds its state
// stale.

I think you have the best context regarding AppContext usage, if this comment is stale, then I can go ahead and use the same AppContext reference in both the controller and the scheduler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will have to handle the case of changed role for the RCAScheduler. we can do the cleanup for a follow up PR. This is a bug fix, so I won't block this on the proper fix.

} catch (Exception e) {
LOG.error("Couldn't read/update the muted RCAs", e);
StatsCollector.instance().logMetric(MUTE_ERROR_METRIC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,17 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Queryable;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.ThresholdMain;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.WireHopper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.persistence.Persistable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -138,7 +137,8 @@ public void start() {
appContext);

schedulerState = RcaSchedulerState.STATE_STARTED;
LOG.info("RCA scheduler thread started successfully on node: {}", appContext.getMyInstanceDetails().getInstanceId());
LOG.info("RCA scheduler thread started successfully on node: {}",
appContext.getMyInstanceDetails().getInstanceId());
if (schedulerTrackingLatch != null) {
schedulerTrackingLatch.countDown();
}
Expand Down Expand Up @@ -206,6 +206,17 @@ private void createExecutorPools() {
rcaSchedulerPeriodicExecutor = Executors.newFixedThreadPool(2, taskThreadFactory);
}

/**
* Updates the list of muted actions in the current instance of {@link AppContext}.
*
* @param mutedActions The set of actions names that need to be muted.
*/
public void updateAppContextWithMutedActions(final Set<String> mutedActions) {
if (this.appContext != null) {
this.appContext.updateMutedActions(mutedActions);
}
}

public NodeRole getRole() {
return role;
}
Expand All @@ -218,4 +229,9 @@ public void setSchedulerTrackingLatch(final CountDownLatch schedulerTrackingLatc
public void setQueryable(Queryable queryable) {
this.db = queryable;
}

@VisibleForTesting
public AppContext getAppContext() {
return this.appContext;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaTestHelper.updateConfFileForMutedRcas;
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaTestHelper.updateConfFileForMutedComponents;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.ClientServers;
Expand Down Expand Up @@ -34,12 +34,12 @@
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.swing.JSeparator;
import org.jooq.tools.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -195,9 +195,11 @@ public void readAndUpdateMutedRcasBeforeGraphCreation() throws Exception {

String rcaConfPath = Paths.get(RcaConsts.TEST_CONFIG_PATH, "rca_muted.conf").toString();
Field rcaConfField = rcaController.getClass().getDeclaredField("rcaConf");
String mutedRcasComponent = "muted-rcas";
rcaConfField.setAccessible(true);
rcaConfField.set(rcaController, new RcaConf(rcaConfPath));
updateConfFileForMutedRcas(rcaConfPath, Arrays.asList("CPU_Utilization", "Heap_AllocRate"));
updateConfFileForMutedComponents(rcaConfPath, Arrays.asList("CPU_Utilization",
"Heap_AllocRate"), mutedRcasComponent);

Field mutedGraphNodesField = Stats.class.getDeclaredField("mutedGraphNodes");
mutedGraphNodesField.setAccessible(true);
Expand All @@ -214,6 +216,7 @@ public void readAndUpdateMutedRcasBeforeGraphCreation() throws Exception {

@Test
public void readAndUpdateMutedRcasWithRCAEnableAndDisabled() throws Exception {
String mutedRcaComponent = "muted-rcas";
String mutedRcaConfPath = Paths.get(RcaConsts.TEST_CONFIG_PATH, "rca_muted.conf").toString();
List<String> mutedRcas1 = Arrays.asList("CPU_Utilization", "Heap_AllocRate");
List<String> mutedRcas2 = Arrays.asList("Paging_MajfltRate");
Expand All @@ -227,15 +230,15 @@ public void readAndUpdateMutedRcasWithRCAEnableAndDisabled() throws Exception {
WaitFor.waitFor(() -> rcaController.getCurrentRole() == AllMetrics.NodeRole.MASTER, 10, TimeUnit.SECONDS);
WaitFor.waitFor(() -> RcaControllerHelper.pickRcaConfForRole(AllMetrics.NodeRole.MASTER).getConfigFileLoc() == mutedRcaConfPath,
10, TimeUnit.SECONDS);
updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas1);
updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas1, mutedRcaComponent);
Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas1));

// Disable RCA
changeRcaRunState(RcaState.STOP);
Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas1));

// Update rca.conf
updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas2);
updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas2, mutedRcaComponent);

// Enable RCA, assert mutedRcas2 is muted nodes
changeRcaRunState(RcaState.RUN);
Expand All @@ -244,6 +247,8 @@ public void readAndUpdateMutedRcasWithRCAEnableAndDisabled() throws Exception {

@Test
public void readAndUpdateMutedRcas() throws Exception {
String mutedRcaComponent = "muted-rcas";
String mutedActionComponent = "muted-actions";
String mutedRcaConfPath = Paths.get(RcaConsts.TEST_CONFIG_PATH, "rca_muted.conf").toString();
List<String> mutedRcas1 = Arrays.asList("CPU_Utilization", "Heap_AllocRate");
List<String> mutedRcas2 = Arrays.asList("Paging_MajfltRate");
Expand All @@ -261,36 +266,48 @@ public void readAndUpdateMutedRcas() throws Exception {

// 1. Muted Graph : "CPU_Utilization, Heap_AllocRate", updating RCA Config with "CPU_Utilization, Heap_AllocRate"
// Muted Graph should have "CPU_Utilization, Heap_AllocRate"
updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas1);
updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas1, mutedRcaComponent);
Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas1));

// 2. Muted Graph : "CPU_Utilization, Heap_AllocRate", updating RCA Config with ""
// Muted Graph should have no nodes
updateConfFileForMutedRcas(mutedRcaConfPath, Collections.emptyList());
updateConfFileForMutedComponents(mutedRcaConfPath, Collections.emptyList(), mutedRcaComponent);
Assert.assertTrue(check(new MutedRCAEval(rcaController), Collections.emptyList()));


// 3. Muted Graph : "", updating RCA Config with ""
// Muted Graph should have no nodes
updateConfFileForMutedRcas(mutedRcaConfPath, Collections.emptyList());
updateConfFileForMutedComponents(mutedRcaConfPath, Collections.emptyList(), mutedRcaComponent);
Assert.assertTrue(check(new MutedRCAEval(rcaController), Collections.emptyList()));

// 4. On RCA Config, "muted-rcas" : "CPU_Utilization, Heap_AllocRate", Updating RCA Config with "Paging_MajfltRate"
// Muted Graph should retain only "Paging_MajfltRate"
updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas2);
updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas2, mutedRcaComponent);
Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas2));

// 5. On RCA Config, "muted-rcas" : "Paging_MajfltRate", Updating RCA Config with "Paging_MajfltRate_Check"
// Muted Graph should still have "Paging_MajfltRate"
updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas3);
updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas3, mutedRcaComponent);
Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas2));

// 6. On RCA Config, "muted-rcas" : "CPU_Utilization, Heap_AllocRate"
// Updating RCA Config with "Paging_MajfltRate_Check, Paging_MajfltRate"
// Muted Graph should have "Paging_MajfltRate"
updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas1);
updateConfFileForMutedRcas(mutedRcaConfPath, mutedRcas4);
updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas1, mutedRcaComponent);
updateConfFileForMutedComponents(mutedRcaConfPath, mutedRcas4, mutedRcaComponent);
Assert.assertTrue(check(new MutedRCAEval(rcaController), mutedRcas2));

// 7. On RCA Config, "muted-actions": ["HeapSizeIncreaseAction"]
// Scheduler should be updated with "HeapSizeIncreaseAction"]
List<String> mutedActions = Collections.singletonList("HeapSizeIncreaseAction");
updateConfFileForMutedComponents(mutedRcaConfPath, mutedActions, mutedActionComponent);
Assert.assertTrue(check(new MutedActionEval(rcaController), new HashSet<>(mutedActions)));

// 8. On RCA config, "muted-actions": [],
// Scheduler should be updated with []
updateConfFileForMutedComponents(mutedRcaConfPath, Collections.emptyList(),
mutedActionComponent);
Assert.assertTrue(check(new MutedActionEval(rcaController), Collections.emptySet()));
}

@Test
Expand Down Expand Up @@ -522,6 +539,25 @@ public boolean evaluateAndCheck(List<String> mutedRcas) {

}
}
}

class MutedActionEval implements IEval<Set<String>> {

private final RcaController testController;

MutedActionEval(final RcaController testController) {
this.testController = testController;
}

@Override
public boolean evaluateAndCheck(Set<String> strings) {
AppContext snapshotAppContext = testController.getRcaScheduler().getAppContext();
if (strings.isEmpty() && snapshotAppContext.getMutedActions().isEmpty()) {
return true;
}

return strings.containsAll(snapshotAppContext.getMutedActions()) && snapshotAppContext
.getMutedActions().containsAll(strings);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ public static void truncate(File file) {
}
}

public static void updateConfFileForMutedRcas(String rcaConfPath, List<String> mutedRcas) throws Exception {
public static void updateConfFileForMutedComponents(String rcaConfPath,
List<String> mutedComponents, String componentKey) throws Exception {

// create the config json Object from rca config file
Scanner scanner = new Scanner(new FileInputStream(rcaConfPath), StandardCharsets.UTF_8.name());
Expand All @@ -182,8 +183,8 @@ public static void updateConfFileForMutedRcas(String rcaConfPath, List<String> m
JsonNode configObject = mapper.readTree(jsonText);

// update the `MUTED_RCAS_CONFIG` value in config Object
ArrayNode array = mapper.valueToTree(mutedRcas);
((ObjectNode) configObject).putArray("muted-rcas").addAll(array);
ArrayNode array = mapper.valueToTree(mutedComponents);
((ObjectNode) configObject).putArray(componentKey).addAll(array);
mapper.writeValue(new FileOutputStream(rcaConfPath), configObject);
}

Expand Down