Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Fix bug in publisher to support cool off period on a per node basis (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
rguo-aws authored Aug 7, 2020
1 parent 87f0721 commit ee58207
Show file tree
Hide file tree
Showing 4 changed files with 330 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.builder.HashCodeBuilder;

/**
* Detect whether an action has passed its cool off period or not. Due to the heterogeneous nature of
* Decision Maker framework, this CoolOffDetector needs to keep track of the latest execution timestamp
* of the most recent actions on each node. An action is identified as "Cooled Off" only if the cool off
* period of the action has elapsed on all impacted nodes of this action
*/
public class CoolOffDetector {
private Map<NodeActionKey, Long> actionToExecutionTime;
private long initTime;
private Clock clock;

public CoolOffDetector() {
this.actionToExecutionTime = new HashMap<>();
this.clock = Clock.systemUTC();
this.initTime = clock.millis();
}

/**
* Returns true if a given {@link Action}'s last execution time was >= {@link Action#coolOffPeriodInMillis()} ago
*
* <p>If this Publisher has never executed the action, the last execution time is defined as the time when the
* CoolOffDetector object was constructed.
* @param action The {@link Action} to test
* @return true if action is cooled off on all impacted nodes of this action
*/
public boolean isCooledOff(Action action) {
for (NodeKey esNode : action.impactedNodes()) {
boolean cooledOff = checkCooledOff(action.name(), esNode, action.coolOffPeriodInMillis());
if (!cooledOff) {
return false;
}
}
return true;
}

/**
* record the timestamp when this action is published
* @param action action to be recorded
*/
public void recordAction(Action action) {
long currentTimestamp = clock.millis();
for (NodeKey esNode : action.impactedNodes()) {
NodeActionKey nodeActionKey = new NodeActionKey(action.name(), esNode);
actionToExecutionTime.put(nodeActionKey, currentTimestamp);
}
}

private boolean checkCooledOff(String actionName, NodeKey esNode, long coolOffPeriod) {
long currentTimestamp = clock.millis();
NodeActionKey nodeActionKey = new NodeActionKey(actionName, esNode);
long lastExecution = actionToExecutionTime.getOrDefault(nodeActionKey, initTime);
long elapsed = currentTimestamp - lastExecution;
return elapsed >= coolOffPeriod;
}

@VisibleForTesting
public void setClock(Clock clock) {
this.clock = clock;
}

@VisibleForTesting
public void setInitTime(long initTime) {
this.initTime = initTime;
}

/**
* define a hash key class for cool off map.
*/
private static class NodeActionKey {
private final NodeKey nodeKey;
private final String actionName;

public NodeActionKey(final String actionName, final NodeKey nodeKey) {
this.nodeKey = nodeKey;
this.actionName = actionName;
}

public NodeKey getNodeKey() {
return this.nodeKey;
}

public String getActionName() {
return this.actionName;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof CoolOffDetector.NodeActionKey) {
CoolOffDetector.NodeActionKey key = (CoolOffDetector.NodeActionKey)obj;
return nodeKey.equals(key.getNodeKey()) && actionName.equals(key.getActionName());
}
return false;
}

@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(nodeKey.hashCode())
.append(actionName.hashCode())
.toHashCode();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,75 +18,48 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ActionListener;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.CoolOffDetector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.FlipFlopDetector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.TimedFlipFlopDetector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.NonLeafNode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import com.google.common.annotations.VisibleForTesting;

import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Publisher extends NonLeafNode<EmptyFlowUnit> {

private static final Logger LOG = LogManager.getLogger(Publisher.class);
private final long initTime;

private Collator collator;
private FlipFlopDetector flipFlopDetector;
private boolean isMuted = false;
private Map<String, Long> actionToExecutionTime;
private CoolOffDetector coolOffDetector;
private List<ActionListener> actionListeners;

public Publisher(int evalIntervalSeconds, Collator collator) {
super(0, evalIntervalSeconds);
this.collator = collator;
this.actionListeners = new ArrayList<>();
this.actionToExecutionTime = new HashMap<>();
this.coolOffDetector = new CoolOffDetector();
// TODO please bring in guice so we can configure this with DI
this.flipFlopDetector = new TimedFlipFlopDetector(1, TimeUnit.HOURS);
initTime = Instant.now().toEpochMilli();
}

/**
* Returns true if a given {@link Action}'s last execution time was >= {@link Action#coolOffPeriodInMillis()} ago
*
* <p>If this Publisher has never executed the action, the last execution time is defined as the time that the
* publisher object was constructed.
*
* @param action The {@link Action} to test
* @return true if a given {@link Action}'s last execution time was >= {@link Action#coolOffPeriodInMillis()} ago
*/
public boolean isCooledOff(Action action) {
long lastExecution = actionToExecutionTime.getOrDefault(action.name(), initTime);
long elapsed = Instant.now().toEpochMilli() - lastExecution;
if (elapsed >= action.coolOffPeriodInMillis()) {
return true;
} else {
LOG.debug("Publisher: Action {} still has {} ms left in its cool off period",
action.name(),
action.coolOffPeriodInMillis() - elapsed);
return false;
}
}

@Override
public EmptyFlowUnit operate() {
// TODO: Need to add dampening, avoidance, state persistence etc.
Decision decision = collator.getFlowUnits().get(0);
for (Action action : decision.getActions()) {
if (isCooledOff(action) && !flipFlopDetector.isFlipFlop(action)) {
if (coolOffDetector.isCooledOff(action) && !flipFlopDetector.isFlipFlop(action)) {
flipFlopDetector.recordAction(action);
actionToExecutionTime.put(action.name(), Instant.now().toEpochMilli());
coolOffDetector.recordAction(action);
for (ActionListener listener : actionListeners) {
listener.actionPublished(action);
}
Expand Down Expand Up @@ -140,12 +113,13 @@ public void handleNodeMuted() {
assert true;
}

public long getInitTime() {
return this.initTime;
}

@VisibleForTesting
protected FlipFlopDetector getFlipFlopDetector() {
return this.flipFlopDetector;
}

@VisibleForTesting
protected CoolOffDetector getCoolOffDetector() {
return this.coolOffDetector;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions;

import static java.time.Instant.ofEpochMilli;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails.Id;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails.Ip;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.time.Clock;
import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;

public class CoolOffDetectorTest {
private static int EVAL_START_TS_IN_SECOND = 310;
private NodeKey node1 = new NodeKey(new Id("node1"), new Ip("127.0.0.1"));
private NodeKey node2 = new NodeKey(new Id("node2"), new Ip("127.0.0.2"));
private NodeKey node3 = new NodeKey(new Id("node3"), new Ip("127.0.0.3"));

@Test
public void testFirstPublish() {
DummyAction action = new DummyAction("action1", node1);
Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
CoolOffDetector detector = new CoolOffDetector();
detector.setClock(constantClock);
detector.setInitTime(0);

//ts = 0
Assert.assertFalse(detector.isCooledOff(action));
//ts = 150
detector.setClock(Clock.offset(constantClock, Duration.ofSeconds(150)));
Assert.assertFalse(detector.isCooledOff(action));
//ts = 310
detector.setClock(Clock.offset(constantClock, Duration.ofSeconds(310)));
Assert.assertTrue(detector.isCooledOff(action));
}

@Test
public void testCoolOffWithSingleNodeAction() {
DummyAction action1Node1 = new DummyAction("action1", node1);
DummyAction action2Node1 = new DummyAction("action2", node1);
DummyAction action1Node2 = new DummyAction("action1", node2);

CoolOffDetector detector = new CoolOffDetector();
Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
detector.setClock(constantClock);
detector.setInitTime(0);

//ts = 0
detector.setClock(Clock.offset(constantClock, Duration.ofSeconds(EVAL_START_TS_IN_SECOND)));
Assert.assertTrue(detector.isCooledOff(action1Node1));
detector.recordAction(action1Node1);
//ts = 150s
detector.setClock(Clock.offset(constantClock, Duration.ofSeconds(EVAL_START_TS_IN_SECOND + 150)));
Assert.assertFalse(detector.isCooledOff(action1Node1));
Assert.assertTrue(detector.isCooledOff(action2Node1));
detector.recordAction(action2Node1);
//ts = 310s
detector.setClock(Clock.offset(constantClock, Duration.ofSeconds(EVAL_START_TS_IN_SECOND + 310)));
Assert.assertTrue(detector.isCooledOff(action1Node1));
Assert.assertFalse(detector.isCooledOff(action2Node1));
detector.recordAction(action1Node1);
//ts = 460s
detector.setClock(Clock.offset(constantClock, Duration.ofSeconds(EVAL_START_TS_IN_SECOND + 460)));
Assert.assertTrue(detector.isCooledOff(action1Node2));
Assert.assertFalse(detector.isCooledOff(action1Node1));
Assert.assertTrue(detector.isCooledOff(action2Node1));
}

@Test
public void testCoolOffWithMultiNodesAction() {
CoolOffDetector detector = new CoolOffDetector();
Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
detector.setClock(constantClock);
detector.setInitTime(0);

DummyAction action1 = new DummyAction("action1", node1, node2);
//ts = 0
detector.setClock(Clock.offset(constantClock, Duration.ofSeconds(EVAL_START_TS_IN_SECOND)));
Assert.assertTrue(detector.isCooledOff(action1));
detector.recordAction(action1);

//ts = 150s
detector.setClock(Clock.offset(constantClock, Duration.ofSeconds(EVAL_START_TS_IN_SECOND + 150)));
DummyAction action2 = new DummyAction("action1", node2, node3);
Assert.assertFalse(detector.isCooledOff(action2));
DummyAction action3 = new DummyAction("action1", node3);
Assert.assertTrue(detector.isCooledOff(action3));
detector.recordAction(action3);

//ts = 310s
detector.setClock(Clock.offset(constantClock, Duration.ofSeconds(EVAL_START_TS_IN_SECOND + 310)));
DummyAction action4 = new DummyAction("action1", node1, node2, node3);
Assert.assertFalse(detector.isCooledOff(action4));
DummyAction action5 = new DummyAction("action2", node1, node2, node3);
Assert.assertTrue(detector.isCooledOff(action5));
DummyAction action6 = new DummyAction("action1", node1, node2);
Assert.assertTrue(detector.isCooledOff(action6));
}

private static class DummyAction implements Action {
private static final long COOL_OFF_PERIOD_IN_MILLIS = 300 * 1_000;
private String name;
private List<NodeKey> nodes;

public DummyAction(String actionName, NodeKey esNode) {
this.name = actionName;
this.nodes = new ArrayList<>();
nodes.add(esNode);
}

public DummyAction(String actionName, NodeKey... esNode) {
this.name = actionName;
this.nodes = new ArrayList<>(Arrays.asList(esNode));
}

@Override
public boolean isActionable() {
return true;
}

@Override
public long coolOffPeriodInMillis() {
return COOL_OFF_PERIOD_IN_MILLIS;
}

@Override
public List<NodeKey> impactedNodes() {
return nodes;
}

@Override
public Map<NodeKey, ImpactVector> impact() {
return null;
}

@Override
public String name() {
return name;
}

@Override
public String summary() {
return "Test";
}
}
}
Loading

0 comments on commit ee58207

Please sign in to comment.