diff --git a/orion-server/src/main/java/com/pinterest/orion/core/actions/memq/MemqBrokerDecommissionAction.java b/orion-server/src/main/java/com/pinterest/orion/core/actions/memq/MemqBrokerDecommissionAction.java index 52f25a38..eaddf33e 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/actions/memq/MemqBrokerDecommissionAction.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/actions/memq/MemqBrokerDecommissionAction.java @@ -38,6 +38,14 @@ public boolean decommission(Node node) throws Exception { getResult().appendOut("Host " + hostName + " is in pending termination status."); // Wait for the host to terminate. Check every 5 minutes for 30 minutes. // If the host is still not terminated after 30 minutes, mark the action as failed. + if (waitForTermination(clusterId, instanceId, hostName)) return false; + // Remove the node from the Orion cluster and mark the action as succeeded + super.decommission(node); + markSucceeded(); + return true; + } + + private boolean waitForTermination(String clusterId, String instanceId, String hostName) throws InterruptedException { long startTime = System.currentTimeMillis(); while (true) { Thread.sleep(getTerminationCheckTimeIntervalMs()); @@ -49,15 +57,12 @@ public boolean decommission(Node node) throws Exception { } else if (elapsedTime > getTerminationCheckTimeoutMs()) { markFailed(String.format("Timed out waiting for host %s(%s) in cluster %s to terminate.", hostName, instanceId, clusterId)); - return false; + return true; } getResult().appendOut(String.format("Host %s(%s) in cluster %s is still terminating after %d ms.", hostName, instanceId, clusterId, elapsedTime)); } - // Remove the node from the Orion cluster and mark the action as succeeded - super.decommission(node); - markSucceeded(); - return true; + return false; } @Override diff --git a/orion-server/src/main/java/com/pinterest/orion/core/actions/memq/MemqBrokerReplacementAction.java b/orion-server/src/main/java/com/pinterest/orion/core/actions/memq/MemqBrokerReplacementAction.java index 94415b19..367b53e1 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/actions/memq/MemqBrokerReplacementAction.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/actions/memq/MemqBrokerReplacementAction.java @@ -40,34 +40,15 @@ public void runAction() throws Exception { markFailed(String.format("Failed to replace host %s(%s) in cluster %s.", hostName, instanceId, clusterId)); } - // Check if the host is pending termination. - // The host should be in pending termination status after the API call. - Thread.sleep(getPostTerminationCheckWaitTimeMs()); - if (!getEC2Helper().isHostPendingTermination(hostName)) { - markFailed(String.format("Failed post termination check for host %s(%s) in cluster %s.", - hostName, instanceId, clusterId)); - } - getResult().appendOut("Host " + hostName + " is in pending termination status."); - // Wait for the host to terminate. - long startTime = System.currentTimeMillis(); - while (true) { - Thread.sleep(getTerminationCheckTimeIntervalMs()); - long elapsedTime = System.currentTimeMillis() - startTime; - if (getEC2Helper().isHostTerminated(hostName)) { - getResult().appendOut(String.format("Host %s(%s) in cluster %s has been terminated.", - hostName, instanceId, clusterId)); - break; - } else if (elapsedTime > getTerminationCheckTimeoutMs()) { - markFailed(String.format("Timed out waiting for host %s(%s) in cluster %s to terminate.", - hostName, instanceId, clusterId)); - break; - } - getResult().appendOut(String.format("Host %s(%s) in cluster %s is still terminating after %d ms.", - hostName, instanceId, clusterId, elapsedTime)); - } + long startTime = waitForHostTermination(clusterId, instanceId, hostName); // Wait for the replacement host to be added to the cluster. // If the broker count is >= the initial count, the replacement is successful. node.getCluster().getNodeMap().remove(node.getCurrentNodeInfo().getNodeId()); + waitForReplacementHost(clusterId, startBrokerCount, startTime); + markSucceeded(); + } + + private void waitForReplacementHost(String clusterId, int startBrokerCount, long startTime) throws InterruptedException { while (true) { int currentBrokerCount = getRunningBrokerCount(getClusterBrokerPrefix(clusterId)); long elapsedTime = System.currentTimeMillis() - startTime; @@ -93,7 +74,35 @@ public void runAction() throws Exception { Thread.sleep(getReplacementCheckTimeIntervalMs()); } getResult().appendOut("Successfully replace node " + nodeId + " in cluster " + clusterId); - markSucceeded(); + } + + private long waitForHostTermination(String clusterId, String instanceId, String hostName) throws InterruptedException { + // Check if the host is pending termination. + // The host should be in pending termination status after the API call. + Thread.sleep(getPostTerminationCheckWaitTimeMs()); + if (!getEC2Helper().isHostPendingTermination(hostName)) { + markFailed(String.format("Failed post termination check for host %s(%s) in cluster %s.", + hostName, instanceId, clusterId)); + } + getResult().appendOut("Host " + hostName + " is in pending termination status."); + // Wait for the host to terminate. + long startTime = System.currentTimeMillis(); + while (true) { + Thread.sleep(getTerminationCheckTimeIntervalMs()); + long elapsedTime = System.currentTimeMillis() - startTime; + if (getEC2Helper().isHostTerminated(hostName)) { + getResult().appendOut(String.format("Host %s(%s) in cluster %s has been terminated.", + hostName, instanceId, clusterId)); + break; + } else if (elapsedTime > getTerminationCheckTimeoutMs()) { + markFailed(String.format("Timed out waiting for host %s(%s) in cluster %s to terminate.", + hostName, instanceId, clusterId)); + break; + } + getResult().appendOut(String.format("Host %s(%s) in cluster %s is still terminating after %d ms.", + hostName, instanceId, clusterId, elapsedTime)); + } + return startTime; } @Override