Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Add cache decider and modify cache action (#303)
Browse files Browse the repository at this point in the history
  • Loading branch information
sruti1312 authored Jul 27, 2020
1 parent e98aacf commit 9cacb33
Show file tree
Hide file tree
Showing 4 changed files with 580 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.HEAP;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ModifyCacheCapacityAction implements Action {
public static final String NAME = "modifyCacheCapacity";
public static final long COOL_OFF_PERIOD_IN_MILLIS = 300 * 1_000;

private long currentCapacityInBytes;
private long desiredCapacityInBytes;
private ResourceEnum cacheType;
private NodeKey esNode;

private Map<ResourceEnum, Long> stepSizeInBytes = new HashMap<>();
private Map<ResourceEnum, Long> upperBoundInBytes = new HashMap<>();

public ModifyCacheCapacityAction(
final NodeKey esNode,
final ResourceEnum cacheType,
final long currentCapacityInBytes,
final boolean increase) {
// TODO: Also consume NodeConfigurationRca
setBounds();
setStepSize();

this.esNode = esNode;
this.cacheType = cacheType;
this.currentCapacityInBytes = currentCapacityInBytes;
long desiredCapacity =
increase ? currentCapacityInBytes + getStepSize(cacheType) : currentCapacityInBytes;
setDesiredCapacity(desiredCapacity);
}

@Override
public String name() {
return NAME;
}

@Override
public boolean isActionable() {
return desiredCapacityInBytes != currentCapacityInBytes;
}

@Override
public long coolOffPeriodInMillis() {
return COOL_OFF_PERIOD_IN_MILLIS;
}

@Override
public List<NodeKey> impactedNodes() {
return Collections.singletonList(esNode);
}

@Override
public Map<NodeKey, ImpactVector> impact() {
final ImpactVector impactVector = new ImpactVector();
if (desiredCapacityInBytes > currentCapacityInBytes) {
impactVector.increasesPressure(HEAP);
} else if (desiredCapacityInBytes < currentCapacityInBytes) {
impactVector.decreasesPressure(HEAP);
}
return Collections.singletonMap(esNode, impactVector);
}

@Override
public void execute() {
// Making this a no-op for now
// TODO: Modify based on downstream CoS agent API calls
assert true;
}

@Override
public String summary() {
if (!isActionable()) {
return String.format("No action to take for: [%s]", NAME);
}
return String.format("Update [%s] capacity from [%d] to [%d] on node [%s]",
cacheType.toString(), currentCapacityInBytes, desiredCapacityInBytes, esNode.getNodeId());
}

@Override
public String toString() {
return summary();
}

private void setBounds() {
// This is intentionally not made static because different nodes can
// have different bounds based on instance types

// TODO: Read the upperBound from NodeConfigurationRca.
// Field data cache used when sorting on or computing aggregation on the field (in Bytes)
upperBoundInBytes.put(ResourceEnum.FIELD_DATA_CACHE, 12000 * 1_000_000L);

// Shard request cache (in Bytes)
upperBoundInBytes.put(ResourceEnum.SHARD_REQUEST_CACHE, 120000 * 1_000L);
}

private void setStepSize() {
// TODO: Update the step size to also include percentage of heap size along with absolute value
// Field data cache having step size of 512MB
stepSizeInBytes.put(ResourceEnum.FIELD_DATA_CACHE, 512 * 1_000_000L);

// Shard request cache step size of 512KB
stepSizeInBytes.put(ResourceEnum.SHARD_REQUEST_CACHE, 512 * 1_000L);
}

private long getStepSize(final ResourceEnum cacheType) {
return stepSizeInBytes.get(cacheType);
}

private void setDesiredCapacity(final long desiredCapacity) {
this.desiredCapacityInBytes = Math.min(desiredCapacity, upperBoundInBytes.get(cacheType));
}

public long getCurrentCapacityInBytes() {
return currentCapacityInBytes;
}

public long getDesiredCapacityInBytes() {
return desiredCapacityInBytes;
}

public ResourceEnum getCacheType() {
return cacheType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ModifyCacheCapacityAction;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.BaseClusterRca;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.FieldDataCacheClusterRca;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.ShardRequestCacheClusterRca;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;

// TODO: 1. Read current cache capacity, total cache capacity, upper bound, lower bound from NodeConfigurationRca
public class CacheHealthDecider extends Decider {

public static final String NAME = "cacheHealthDecider";

private final FieldDataCacheClusterRca fieldDataCacheClusterRca;
private final ShardRequestCacheClusterRca shardRequestCacheClusterRca;

List<String> actionsByUserPriority = new ArrayList<>();
private int counter = 0;

public CacheHealthDecider(final long evalIntervalSeconds,
final int decisionFrequency,
final FieldDataCacheClusterRca fieldDataCacheClusterRca,
final ShardRequestCacheClusterRca shardRequestCacheClusterRca) {
// TODO: Also consume NodeConfigurationRca
super(evalIntervalSeconds, decisionFrequency);

this.fieldDataCacheClusterRca = fieldDataCacheClusterRca;
this.shardRequestCacheClusterRca = shardRequestCacheClusterRca;

configureActionPriority();
}

@Override
public String name() {
return NAME;
}

@Override
public Decision operate() {
final ImmutableList<BaseClusterRca> cacheClusterRca =
ImmutableList.<BaseClusterRca>builder()
.add(shardRequestCacheClusterRca)
.add(fieldDataCacheClusterRca)
.build();

Decision decision = new Decision(System.currentTimeMillis(), NAME);
counter += 1;
if (counter < decisionFrequency) {
return decision;
}
counter = 0;

// TODO: Tune only one resource at a time based on action priorities
cacheClusterRca.forEach(rca -> getActionsFromRca(rca, decision));
return decision;
}

private <R extends BaseClusterRca> void getActionsFromRca(
final R cacheClusterRca, final Decision decision) {
if (!cacheClusterRca.getFlowUnits().isEmpty()) {
final ResourceFlowUnit<HotClusterSummary> flowUnit = cacheClusterRca.getFlowUnits().get(0);
if (!flowUnit.hasResourceSummary()) {
return;
}

final HotClusterSummary clusterSummary = flowUnit.getSummary();

clusterSummary
.getHotNodeSummaryList()
.forEach(
hotNodeSummary -> {
final NodeKey esNode =
new NodeKey(hotNodeSummary.getNodeID(), hotNodeSummary.getHostAddress());
for (final HotResourceSummary resource :
hotNodeSummary.getHotResourceSummaryList()) {
decision.addAction(
computeBestAction(esNode, resource.getResource().getResourceEnum()));
}
});
}
}

private void configureActionPriority() {
// TODO: Input from user configured yml
this.actionsByUserPriority.add(ModifyCacheCapacityAction.NAME);
}

/**
* Evaluate the most relevant action for a node
*
* <p>Action relevance decided based on user configured priorities for now, this can be modified
* to consume better signals going forward.
*/
private Action computeBestAction(final NodeKey esNode, final ResourceEnum cacheType) {
Action action = null;
for (String actionName : actionsByUserPriority) {
action =
getAction(actionName, esNode, cacheType, getNodeCacheCapacityInBytes(esNode, cacheType), true);
if (action != null) {
break;
}
}
return action;
}

private Action getAction(final String actionName,
final NodeKey esNode,
final ResourceEnum cacheType,
final long currentCapacityInBytes,
final boolean increase) {
if (ModifyCacheCapacityAction.NAME.equals(actionName)) {
return configureCacheCapacity(esNode, cacheType, currentCapacityInBytes, increase);
}
return null;
}

private ModifyCacheCapacityAction configureCacheCapacity(
final NodeKey esNode,
final ResourceEnum cacheType,
final long currentCapacityInBytes,
final boolean increase) {
final ModifyCacheCapacityAction action =
new ModifyCacheCapacityAction(esNode, cacheType, currentCapacityInBytes, increase);
if (action.isActionable()) {
return action;
}
return null;
}

private long getNodeCacheCapacityInBytes(final NodeKey esNode, final ResourceEnum cacheType) {
// TODO: use NodeConfigurationRca to return capacity, for now returning random value in Bytes
if (cacheType.equals(ResourceEnum.FIELD_DATA_CACHE)) {
return 1000L;
}
return 1000L;
}
}
Loading

0 comments on commit 9cacb33

Please sign in to comment.