Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Reader changes for dynamic enable/disable of RCA graph components (#325)
Browse files Browse the repository at this point in the history
* Common changes needed to support dynamic en/disabling of config overrides

* Add ability to apply the config overrides

* Refactor muting logic for actions and graph nodes

* Add unit tests

* Remove merge conflict markers in comments

* Add licence header to new files

* Filter at the decider instead of collator

* Delete collator test as it is not used to filter actions

* Use the right log levels

* Add abstract SuppressibleAction class to handle muted actions

* Remove unwanted import

* Address PR comments

* Address PR comments

* Address PR comments
  • Loading branch information
ktkrg authored Aug 7, 2020
1 parent ee58207 commit 7ede0b9
Show file tree
Hide file tree
Showing 39 changed files with 846 additions and 141 deletions.
2 changes: 2 additions & 0 deletions pa_config/rca.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
"NodeTemperatureRca",
"ClusterTemperatureRca"
],
"muted-deciders": [],
"muted-actions": [],

"decider-config-settings": {
// cache decider - Needs to be updated as per the performance test results
Expand Down
2 changes: 2 additions & 0 deletions pa_config/rca_idle_master.conf
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
"NodeTemperatureRca",
"ClusterTemperatureRca"
],
"muted-deciders": [],
"muted-actions": [],

"decider-config-settings": {
// cache decider - Needs to be updated as per the performance test results
Expand Down
2 changes: 2 additions & 0 deletions pa_config/rca_master.conf
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
"NodeTemperatureRca",
"ClusterTemperatureRca"
],
"muted-deciders": [],
"muted-actions": [],

"decider-config-settings": {
// cache decider - Needs to be updated as per the performance test results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -39,10 +40,12 @@ public class AppContext {
// initiate a node config cache within each AppContext space
// to store node config settings from ES
private final NodeConfigCache nodeConfigCache;
private volatile Set<String> mutedActions;

public AppContext() {
this.clusterDetailsEventProcessor = null;
this.nodeConfigCache = new NodeConfigCache();
this.mutedActions = ImmutableSet.of();
}

public AppContext(AppContext other) {
Expand Down Expand Up @@ -123,4 +126,12 @@ public InstanceDetails getInstanceById(InstanceDetails.Id instanceIdKey) {
.findFirst()
.orElse(new InstanceDetails(AllMetrics.NodeRole.UNKNOWN));
}

public boolean isActionMuted(final String action) {
return this.mutedActions.contains(action);
}

public void updateMutedActions(final Set<String> mutedActions) {
this.mutedActions = ImmutableSet.copyOf(mutedActions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* Class the manages applying the various overrides for RCAs, deciders, and action nodes as
* written by the NodeDetails collector and read by the {@link ClusterDetailsEventProcessor}
*/
public class ConfigOverridesApplier {

private static final Logger LOG = LogManager.getLogger(ConfigOverridesApplier.class);
private long lastAppliedTimestamp;

public void applyOverride(final String overridesJson, final String lastUpdatedTimestampString) {
if (!valid(overridesJson, lastUpdatedTimestampString)) {
LOG.warn("Received invalid overrides or timestamp. Overrides json: {}, last updated "
+ "timestamp: {}", overridesJson, lastAppliedTimestamp);
return;
}

try {
long lastUpdatedTimestamp = Long.parseLong(lastUpdatedTimestampString);
LOG.debug("Last updated(writer): {}, Last applied(reader): {}", lastUpdatedTimestamp,
lastAppliedTimestamp);
if (lastUpdatedTimestamp > lastAppliedTimestamp) {
apply(ConfigOverridesHelper.deserialize(overridesJson));
} else {
LOG.debug("Not applying override. Last updated timestamp {} is behind last applied "
+ "timestamp {}", lastUpdatedTimestamp, lastAppliedTimestamp);
}
} catch (IOException ioe) {
LOG.error("Unable to deserialize overrides JSON:" + overridesJson, ioe);
} catch (NumberFormatException nfe) {
LOG.error("Unable to parse the lastUpdatedTimestamp {} string as a number.",
lastUpdatedTimestampString, nfe);
}
}

private void apply(final ConfigOverrides overrides) {
if (PerformanceAnalyzerApp.getRcaController() != null && PerformanceAnalyzerApp.getRcaController().isRcaEnabled()) {
LOG.info("Applying overrides: {}", overrides.getEnable().getRcas());
RcaConf rcaConf = PerformanceAnalyzerApp.getRcaController().getRcaConf();
if (rcaConf != null) {
Set<String> currentMutedRcaSet = new HashSet<>(rcaConf.getMutedRcaList());
Set<String> currentMutedDeciderSet = new HashSet<>(rcaConf.getMutedDeciderList());
Set<String> currentMutedActionSet = new HashSet<>(rcaConf.getMutedActionList());
// check and remove any nodes that are in the disabled list that were enabled just now.
if (overrides.getEnable() != null) {
if (overrides.getEnable().getRcas() != null) {
currentMutedRcaSet.removeAll(overrides.getEnable().getRcas());
}
if (overrides.getEnable().getDeciders() != null) {
currentMutedDeciderSet.removeAll(overrides.getEnable().getDeciders());
}
if (overrides.getEnable().getActions() != null) {
currentMutedActionSet.removeAll(overrides.getEnable().getActions());
}
}

// union the remaining already disabled nodes with the new set of disabled nodes.
if (overrides.getDisable() != null) {
if (overrides.getDisable().getRcas() != null) {
currentMutedRcaSet.addAll(overrides.getDisable().getRcas());
}
if (overrides.getDisable().getDeciders() != null) {
currentMutedDeciderSet.addAll(overrides.getDisable().getDeciders());
}
if (overrides.getDisable().getActions() != null) {
currentMutedActionSet.addAll(overrides.getDisable().getActions());
}
}

LOG.info("New set of muted rcas: {}", currentMutedRcaSet);
LOG.info("New set of muted deciders: {}", currentMutedDeciderSet);
LOG.info("New set of muted actions: {}", currentMutedActionSet);
boolean updateSuccess = rcaConf.updateAllRcaConfFiles(currentMutedRcaSet,
currentMutedDeciderSet, currentMutedActionSet);
if (updateSuccess) {
this.lastAppliedTimestamp = System.currentTimeMillis();
}
}
}
}

private boolean valid(final String overridesJson, final String timestamp) {
if (overridesJson == null || timestamp == null) {
return false;
}

if (overridesJson.isEmpty() || timestamp.isEmpty()) {
return false;
}

return NumberUtils.isCreatable(timestamp);
}

@VisibleForTesting
public long getLastAppliedTimestamp() {
return lastAppliedTimestamp;
}

@VisibleForTesting
public void setLastAppliedTimestamp(final long lastAppliedTimestamp) {
this.lastAppliedTimestamp = lastAppliedTimestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,31 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;

/**
* Class responsible for holding the latest config overrides across the cluster.
*/
public class ConfigOverridesWrapper {

private volatile ConfigOverrides currentClusterConfigOverrides;
private volatile long lastUpdatedTimestamp;
private final ObjectMapper mapper;

public ConfigOverridesWrapper() {
this(new ObjectMapper());
}

/**
* Ctor used only for unit test purposes.
* @param mapper The object mapper instance.
*/
@VisibleForTesting
public ConfigOverridesWrapper(final ObjectMapper mapper) {
this.currentClusterConfigOverrides = new ConfigOverrides();
this.mapper = mapper;
}

public ConfigOverrides getCurrentClusterConfigOverrides() {
return currentClusterConfigOverrides;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Stats;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;

import java.util.List;
Expand Down Expand Up @@ -44,4 +45,7 @@ public interface Action {

/** Returns a summary for the configured action */
String summary();

/** Returns if this action is explicitly muted through configuration */
boolean isMuted();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.HEAP;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.collector.NodeConfigCache;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
Expand All @@ -36,7 +37,7 @@

// TODO: Split the cache action into separate actions for different caches.

public class ModifyCacheMaxSizeAction implements Action {
public class ModifyCacheMaxSizeAction extends SuppressibleAction {
private static final Logger LOG = LogManager.getLogger(ModifyCacheMaxSizeAction.class);
public static final String NAME = "modifyCacheCapacity";
public static final long COOL_OFF_PERIOD_IN_MILLIS = 300 * 1_000;
Expand All @@ -58,10 +59,12 @@ public ModifyCacheMaxSizeAction(
final ResourceEnum cacheType,
final NodeConfigCache nodeConfigCache,
final double cacheSizeUpperBound,
final boolean increase) {
final boolean increase,
final AppContext appContext) {
// TODO: Add lower bound for caches
// TODO: Address cache scaling down when JVM decider is available

super(appContext);
this.esNode = esNode;
this.cacheType = cacheType;
this.nodeConfigCache = nodeConfigCache;
Expand All @@ -82,7 +85,7 @@ public String name() {
}

@Override
public boolean isActionable() {
public boolean canUpdate() {
if (currentCacheMaxSizeInBytes == null) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.HEAP;
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.NETWORK;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;

Expand All @@ -27,7 +28,7 @@
import java.util.List;
import java.util.Map;

public class ModifyQueueCapacityAction implements Action {
public class ModifyQueueCapacityAction extends SuppressibleAction {

public static final String NAME = "modify_queue_capacity";
public static final long COOL_OFF_PERIOD_IN_MILLIS = 300 * 1_000;
Expand All @@ -40,7 +41,9 @@ public class ModifyQueueCapacityAction implements Action {
private Map<ResourceEnum, Integer> lowerBound = new HashMap<>();
private Map<ResourceEnum, Integer> upperBound = new HashMap<>();

public ModifyQueueCapacityAction(NodeKey esNode, ResourceEnum threadPool, int currentCapacity, boolean increase) {
public ModifyQueueCapacityAction(NodeKey esNode, ResourceEnum threadPool, int currentCapacity,
boolean increase, AppContext appContext) {
super(appContext);
setBounds();
int STEP_SIZE = 50;
this.esNode = esNode;
Expand All @@ -56,7 +59,7 @@ public String name() {
}

@Override
public boolean isActionable() {
public boolean canUpdate() {
return desiredCapacity != currentCapacity;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;

/**
* Actions that can be suppressed through a config.
*/
public abstract class SuppressibleAction implements Action {

private final AppContext appContext;

public SuppressibleAction(final AppContext appContext) {
this.appContext = appContext;
}

/**
* Returns if this action is explicitly muted through configuration
*/
@Override
public boolean isMuted() {
return appContext.isActionMuted(name());
}

/**
* Returns true if the configured action is actionable, false otherwise.
* The method is also declared final to enforce checking for muted-ness of an action before
* declaring it actionable.
*
* <p>Actions that want to define the actionable criterion differently should not inherit from
* {@link SuppressibleAction}, instead provide their own implementation. This way, we get to
* close {@link SuppressibleAction} while still keeping the {@link Action} interface open for
* modification.</p>
*
* <p>Examples of non-actionable actions are invalid actions or actions that are explicitly
* disabled in the conf file.
*/
@Override
public final boolean isActionable() {
return !isMuted() && canUpdate();
}

/**
* Returns true if the configured action is valid, false otherwise.
*
* <p>Examples of invalid actions are resource configurations where limits have been
* reached.</p>
*
* @return true if valid, false otherwise.
*/
public abstract boolean canUpdate();
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ private ModifyCacheMaxSizeAction configureCacheMaxSize(
final double cacheUpperBound = getCacheUpperBound(cacheType);
final ModifyCacheMaxSizeAction action =
new ModifyCacheMaxSizeAction(
esNode, cacheType, getAppContext().getNodeConfigCache(), cacheUpperBound, increase);
esNode, cacheType, getAppContext().getNodeConfigCache(), cacheUpperBound, increase,
getAppContext());
if (action.isActionable()) {
return action;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector;

import java.util.Arrays;
import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.NonLeafNode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
Expand Down
Loading

0 comments on commit 7ede0b9

Please sign in to comment.