Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Reader changes for dynamic enable/disable of RCA graph components #325

Merged
merged 18 commits into from
Aug 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pa_config/rca.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
"NodeTemperatureRca",
"ClusterTemperatureRca"
],
"muted-deciders": [],
"muted-actions": [],

"decider-config-settings": {
// cache decider - Needs to be updated as per the performance test results
Expand Down
2 changes: 2 additions & 0 deletions pa_config/rca_idle_master.conf
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
"NodeTemperatureRca",
"ClusterTemperatureRca"
],
"muted-deciders": [],
"muted-actions": [],

"decider-config-settings": {
// cache decider - Needs to be updated as per the performance test results
Expand Down
2 changes: 2 additions & 0 deletions pa_config/rca_master.conf
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
"NodeTemperatureRca",
"ClusterTemperatureRca"
],
"muted-deciders": [],
"muted-actions": [],

"decider-config-settings": {
// cache decider - Needs to be updated as per the performance test results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -39,10 +40,12 @@ public class AppContext {
// initiate a node config cache within each AppContext space
// to store node config settings from ES
private final NodeConfigCache nodeConfigCache;
private volatile Set<String> mutedActions;

public AppContext() {
this.clusterDetailsEventProcessor = null;
this.nodeConfigCache = new NodeConfigCache();
this.mutedActions = ImmutableSet.of();
}

public AppContext(AppContext other) {
Expand Down Expand Up @@ -123,4 +126,12 @@ public InstanceDetails getInstanceById(InstanceDetails.Id instanceIdKey) {
.findFirst()
.orElse(new InstanceDetails(AllMetrics.NodeRole.UNKNOWN));
}

public boolean isActionMuted(final String action) {
return this.mutedActions.contains(action);
}

public void updateMutedActions(final Set<String> mutedActions) {
this.mutedActions = ImmutableSet.copyOf(mutedActions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* Class the manages applying the various overrides for RCAs, deciders, and action nodes as
* written by the NodeDetails collector and read by the {@link ClusterDetailsEventProcessor}
*/
public class ConfigOverridesApplier {

private static final Logger LOG = LogManager.getLogger(ConfigOverridesApplier.class);
private long lastAppliedTimestamp;

public void applyOverride(final String overridesJson, final String lastUpdatedTimestampString) {
if (!valid(overridesJson, lastUpdatedTimestampString)) {
LOG.warn("Received invalid overrides or timestamp. Overrides json: {}, last updated "
+ "timestamp: {}", overridesJson, lastAppliedTimestamp);
return;
}

try {
long lastUpdatedTimestamp = Long.parseLong(lastUpdatedTimestampString);
yojs marked this conversation as resolved.
Show resolved Hide resolved
LOG.debug("Last updated(writer): {}, Last applied(reader): {}", lastUpdatedTimestamp,
lastAppliedTimestamp);
if (lastUpdatedTimestamp > lastAppliedTimestamp) {
apply(ConfigOverridesHelper.deserialize(overridesJson));
} else {
LOG.debug("Not applying override. Last updated timestamp {} is behind last applied "
+ "timestamp {}", lastUpdatedTimestamp, lastAppliedTimestamp);
}
} catch (IOException ioe) {
LOG.error("Unable to deserialize overrides JSON:" + overridesJson, ioe);
} catch (NumberFormatException nfe) {
LOG.error("Unable to parse the lastUpdatedTimestamp {} string as a number.",
lastUpdatedTimestampString, nfe);
}
}

private void apply(final ConfigOverrides overrides) {
if (PerformanceAnalyzerApp.getRcaController() != null && PerformanceAnalyzerApp.getRcaController().isRcaEnabled()) {
LOG.info("Applying overrides: {}", overrides.getEnable().getRcas());
RcaConf rcaConf = PerformanceAnalyzerApp.getRcaController().getRcaConf();
yojs marked this conversation as resolved.
Show resolved Hide resolved
if (rcaConf != null) {
Set<String> currentMutedRcaSet = new HashSet<>(rcaConf.getMutedRcaList());
Set<String> currentMutedDeciderSet = new HashSet<>(rcaConf.getMutedDeciderList());
Set<String> currentMutedActionSet = new HashSet<>(rcaConf.getMutedActionList());
// check and remove any nodes that are in the disabled list that were enabled just now.
if (overrides.getEnable() != null) {
if (overrides.getEnable().getRcas() != null) {
currentMutedRcaSet.removeAll(overrides.getEnable().getRcas());
}
if (overrides.getEnable().getDeciders() != null) {
currentMutedDeciderSet.removeAll(overrides.getEnable().getDeciders());
}
if (overrides.getEnable().getActions() != null) {
currentMutedActionSet.removeAll(overrides.getEnable().getActions());
}
}

khushbr marked this conversation as resolved.
Show resolved Hide resolved
// union the remaining already disabled nodes with the new set of disabled nodes.
if (overrides.getDisable() != null) {
if (overrides.getDisable().getRcas() != null) {
currentMutedRcaSet.addAll(overrides.getDisable().getRcas());
}
if (overrides.getDisable().getDeciders() != null) {
currentMutedDeciderSet.addAll(overrides.getDisable().getDeciders());
}
if (overrides.getDisable().getActions() != null) {
currentMutedActionSet.addAll(overrides.getDisable().getActions());
}
}

LOG.info("New set of muted rcas: {}", currentMutedRcaSet);
LOG.info("New set of muted deciders: {}", currentMutedDeciderSet);
LOG.info("New set of muted actions: {}", currentMutedActionSet);
boolean updateSuccess = rcaConf.updateAllRcaConfFiles(currentMutedRcaSet,
currentMutedDeciderSet, currentMutedActionSet);
if (updateSuccess) {
this.lastAppliedTimestamp = System.currentTimeMillis();
}
}
}
}

private boolean valid(final String overridesJson, final String timestamp) {
if (overridesJson == null || timestamp == null) {
return false;
}

if (overridesJson.isEmpty() || timestamp.isEmpty()) {
return false;
}

return NumberUtils.isCreatable(timestamp);
}

@VisibleForTesting
public long getLastAppliedTimestamp() {
yojs marked this conversation as resolved.
Show resolved Hide resolved
return lastAppliedTimestamp;
}

@VisibleForTesting
public void setLastAppliedTimestamp(final long lastAppliedTimestamp) {
this.lastAppliedTimestamp = lastAppliedTimestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,31 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;

/**
* Class responsible for holding the latest config overrides across the cluster.
*/
public class ConfigOverridesWrapper {

private volatile ConfigOverrides currentClusterConfigOverrides;
private volatile long lastUpdatedTimestamp;
private final ObjectMapper mapper;

public ConfigOverridesWrapper() {
this(new ObjectMapper());
}

/**
* Ctor used only for unit test purposes.
* @param mapper The object mapper instance.
*/
@VisibleForTesting
public ConfigOverridesWrapper(final ObjectMapper mapper) {
khushbr marked this conversation as resolved.
Show resolved Hide resolved
this.currentClusterConfigOverrides = new ConfigOverrides();
this.mapper = mapper;
}

public ConfigOverrides getCurrentClusterConfigOverrides() {
return currentClusterConfigOverrides;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Stats;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;

import java.util.List;
Expand Down Expand Up @@ -44,4 +45,7 @@ public interface Action {

/** Returns a summary for the configured action */
String summary();

/** Returns if this action is explicitly muted through configuration */
boolean isMuted();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.HEAP;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.collector.NodeConfigCache;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
Expand All @@ -36,7 +37,7 @@

// TODO: Split the cache action into separate actions for different caches.

public class ModifyCacheMaxSizeAction implements Action {
public class ModifyCacheMaxSizeAction extends SuppressibleAction {
private static final Logger LOG = LogManager.getLogger(ModifyCacheMaxSizeAction.class);
public static final String NAME = "modifyCacheCapacity";
public static final long COOL_OFF_PERIOD_IN_MILLIS = 300 * 1_000;
Expand All @@ -58,10 +59,12 @@ public ModifyCacheMaxSizeAction(
final ResourceEnum cacheType,
final NodeConfigCache nodeConfigCache,
final double cacheSizeUpperBound,
final boolean increase) {
final boolean increase,
final AppContext appContext) {
// TODO: Add lower bound for caches
// TODO: Address cache scaling down when JVM decider is available

super(appContext);
this.esNode = esNode;
this.cacheType = cacheType;
this.nodeConfigCache = nodeConfigCache;
Expand All @@ -82,7 +85,7 @@ public String name() {
}

@Override
public boolean isActionable() {
public boolean canUpdate() {
if (currentCacheMaxSizeInBytes == null) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.HEAP;
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.NETWORK;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;

Expand All @@ -27,7 +28,7 @@
import java.util.List;
import java.util.Map;

public class ModifyQueueCapacityAction implements Action {
public class ModifyQueueCapacityAction extends SuppressibleAction {

public static final String NAME = "modify_queue_capacity";
public static final long COOL_OFF_PERIOD_IN_MILLIS = 300 * 1_000;
Expand All @@ -40,7 +41,9 @@ public class ModifyQueueCapacityAction implements Action {
private Map<ResourceEnum, Integer> lowerBound = new HashMap<>();
private Map<ResourceEnum, Integer> upperBound = new HashMap<>();

public ModifyQueueCapacityAction(NodeKey esNode, ResourceEnum threadPool, int currentCapacity, boolean increase) {
public ModifyQueueCapacityAction(NodeKey esNode, ResourceEnum threadPool, int currentCapacity,
boolean increase, AppContext appContext) {
super(appContext);
setBounds();
int STEP_SIZE = 50;
this.esNode = esNode;
Expand All @@ -56,7 +59,7 @@ public String name() {
}

@Override
public boolean isActionable() {
public boolean canUpdate() {
return desiredCapacity != currentCapacity;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;

/**
* Actions that can be suppressed through a config.
*/
public abstract class SuppressibleAction implements Action {

private final AppContext appContext;

public SuppressibleAction(final AppContext appContext) {
this.appContext = appContext;
}

/**
* Returns if this action is explicitly muted through configuration
*/
@Override
public boolean isMuted() {
return appContext.isActionMuted(name());
}

/**
* Returns true if the configured action is actionable, false otherwise.
* The method is also declared final to enforce checking for muted-ness of an action before
* declaring it actionable.
*
* <p>Actions that want to define the actionable criterion differently should not inherit from
* {@link SuppressibleAction}, instead provide their own implementation. This way, we get to
* close {@link SuppressibleAction} while still keeping the {@link Action} interface open for
* modification.</p>
*
* <p>Examples of non-actionable actions are invalid actions or actions that are explicitly
* disabled in the conf file.
*/
@Override
public final boolean isActionable() {
return !isMuted() && canUpdate();
}

/**
* Returns true if the configured action is valid, false otherwise.
*
* <p>Examples of invalid actions are resource configurations where limits have been
* reached.</p>
*
* @return true if valid, false otherwise.
*/
public abstract boolean canUpdate();
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ private ModifyCacheMaxSizeAction configureCacheMaxSize(
final double cacheUpperBound = getCacheUpperBound(cacheType);
final ModifyCacheMaxSizeAction action =
new ModifyCacheMaxSizeAction(
esNode, cacheType, getAppContext().getNodeConfigCache(), cacheUpperBound, increase);
esNode, cacheType, getAppContext().getNodeConfigCache(), cacheUpperBound, increase,
getAppContext());
if (action.isActionable()) {
return action;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector;

import java.util.Arrays;
import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.NonLeafNode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
Expand Down
Loading