Skip to content

Commit

Permalink
Add a not-master state for desired balance
Browse files Browse the repository at this point in the history
The new state prevents a long running desired balance computation to set
result after the node stands down as master.
  • Loading branch information
ywangd committed Nov 17, 2024
1 parent 27eb013 commit 3cb6452
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public DesiredBalance(long lastConvergedIndex, Map<ShardId, ShardAssignment> ass
this(lastConvergedIndex, assignments, Map.of(), ComputationFinishReason.CONVERGED);
}

public static final DesiredBalance NOT_MASTER = new DesiredBalance(-2, Map.of());
public static final DesiredBalance INITIAL = new DesiredBalance(-1, Map.of());

public ShardAssignment getAssignment(ShardId shardId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* A {@link ShardsAllocator} which asynchronously refreshes the desired balance held by the {@link DesiredBalanceComputer} and then takes
Expand All @@ -62,7 +63,7 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
private final AtomicLong indexGenerator = new AtomicLong(-1);
private final ConcurrentLinkedQueue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves = new ConcurrentLinkedQueue<>();
private final MasterServiceTaskQueue<ReconcileDesiredBalanceTask> masterServiceTaskQueue;
private volatile DesiredBalance currentDesiredBalance = DesiredBalance.INITIAL;
private final AtomicReference<DesiredBalance> currentDesiredBalanceRef = new AtomicReference<>(DesiredBalance.NOT_MASTER);
private volatile boolean resetCurrentDesiredBalance = false;
private final Set<String> processedNodeShutdowns = new HashSet<>();
private final DesiredBalanceMetrics desiredBalanceMetrics;
Expand Down Expand Up @@ -129,11 +130,17 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) {
long index = desiredBalanceInput.index();
logger.debug("Starting desired balance computation for [{}]", index);

final DesiredBalance initialDesiredBalance = getInitialDesiredBalance();
if (initialDesiredBalance == DesiredBalance.NOT_MASTER) {
logger.debug("Abort desired balance computation because node is no longer master");
return;
}

recordTime(
cumulativeComputationTime,
() -> setCurrentDesiredBalance(
desiredBalanceComputer.compute(
getInitialDesiredBalance(),
initialDesiredBalance,
desiredBalanceInput,
pendingDesiredBalanceMoves,
this::isFresh
Expand All @@ -142,7 +149,13 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) {
);
computationsExecuted.inc();

if (currentDesiredBalance.finishReason() == DesiredBalance.ComputationFinishReason.STOP_EARLY) {
final DesiredBalance currentDesiredBalance = currentDesiredBalanceRef.get();
// Update is either not successful or the node has concurrently stands down as master
if (currentDesiredBalance.lastConvergedIndex() != index) {
assert currentDesiredBalance == DesiredBalance.NOT_MASTER || currentDesiredBalance == DesiredBalance.INITIAL
: currentDesiredBalance;
logger.debug("Desired balance computation for [{}] is discarded as master has changed", index);
} else if (currentDesiredBalance.finishReason() == DesiredBalance.ComputationFinishReason.STOP_EARLY) {
logger.debug(
"Desired balance computation for [{}] terminated early with partial result, scheduling reconciliation",
index
Expand All @@ -160,10 +173,13 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) {
}

private DesiredBalance getInitialDesiredBalance() {
final DesiredBalance currentDesiredBalance = currentDesiredBalanceRef.get();
if (resetCurrentDesiredBalance) {
logger.info("Resetting current desired balance");
resetCurrentDesiredBalance = false;
return new DesiredBalance(currentDesiredBalance.lastConvergedIndex(), Map.of());
return currentDesiredBalance == DesiredBalance.NOT_MASTER
? DesiredBalance.NOT_MASTER
: new DesiredBalance(currentDesiredBalance.lastConvergedIndex(), Map.of());
} else {
return currentDesiredBalance;
}
Expand Down Expand Up @@ -211,12 +227,14 @@ public void allocate(RoutingAllocation allocation, ActionListener<Void> listener
var index = indexGenerator.incrementAndGet();
logger.debug("Executing allocate for [{}]", index);
queue.add(index, listener);
// This can only run on master, so unset not-master if exists
currentDesiredBalanceRef.compareAndSet(DesiredBalance.NOT_MASTER, DesiredBalance.INITIAL);
desiredBalanceComputation.onNewInput(DesiredBalanceInput.create(index, allocation));

// Starts reconciliation towards desired balance that might have not been updated with a recent calculation yet.
// This is fine as balance should have incremental rather than radical changes.
// This should speed up achieving the desired balance in cases current state is still different from it (due to THROTTLING).
reconcile(currentDesiredBalance, allocation);
reconcile(currentDesiredBalanceRef.get(), allocation);
}

private void processNodeShutdowns(ClusterState clusterState) {
Expand Down Expand Up @@ -259,16 +277,28 @@ private static List<MoveAllocationCommand> getMoveCommands(AllocationCommands co
}

private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) {
if (logger.isTraceEnabled()) {
var diff = DesiredBalance.hasChanges(currentDesiredBalance, newDesiredBalance)
? "Diff: " + DesiredBalance.humanReadableDiff(currentDesiredBalance, newDesiredBalance)
: "No changes";
logger.trace("Desired balance updated: {}. {}", newDesiredBalance, diff);
// Update current desired balance if the old value has not been changed already by master fail-over
final DesiredBalance updatedDesiredBalance = currentDesiredBalanceRef.updateAndGet(current -> {
if (current != DesiredBalance.NOT_MASTER) {
return newDesiredBalance;
} else {
return current;
}
});

if (updatedDesiredBalance == newDesiredBalance) {
if (logger.isTraceEnabled()) {
var diff = DesiredBalance.hasChanges(updatedDesiredBalance, newDesiredBalance)
? "Diff: " + DesiredBalance.humanReadableDiff(updatedDesiredBalance, newDesiredBalance)
: "No changes";
logger.trace("Desired balance updated: {}. {}", newDesiredBalance, diff);
} else {
logger.debug("Desired balance updated for [{}]", newDesiredBalance.lastConvergedIndex());
}
computedShardMovements.inc(DesiredBalance.shardMovements(updatedDesiredBalance, newDesiredBalance));
} else {
logger.debug("Desired balance updated for [{}]", newDesiredBalance.lastConvergedIndex());
logger.debug("discard desired balance for [{}]", newDesiredBalance.lastConvergedIndex());
}
computedShardMovements.inc(DesiredBalance.shardMovements(currentDesiredBalance, newDesiredBalance));
currentDesiredBalance = newDesiredBalance;
}

protected void submitReconcileTask(DesiredBalance desiredBalance) {
Expand Down Expand Up @@ -308,7 +338,7 @@ public void execute(RoutingAllocation allocation) {
}

public DesiredBalance getDesiredBalance() {
return currentDesiredBalance;
return currentDesiredBalanceRef.get();
}

public void resetDesiredBalance() {
Expand All @@ -317,7 +347,7 @@ public void resetDesiredBalance() {

public DesiredBalanceStats getStats() {
return new DesiredBalanceStats(
Math.max(currentDesiredBalance.lastConvergedIndex(), 0L),
Math.max(currentDesiredBalanceRef.get().lastConvergedIndex(), 0L),
desiredBalanceComputation.isActive(),
computationsSubmitted.count(),
computationsExecuted.count(),
Expand All @@ -334,7 +364,7 @@ public DesiredBalanceStats getStats() {

private void onNoLongerMaster() {
if (indexGenerator.getAndSet(-1) != -1) {
currentDesiredBalance = DesiredBalance.INITIAL;
currentDesiredBalanceRef.set(DesiredBalance.NOT_MASTER);
queue.completeAllAsNotMaster();
pendingDesiredBalanceMoves.clear();
desiredBalanceReconciler.clear();
Expand Down Expand Up @@ -404,7 +434,7 @@ private static void discardSupersededTasks(

// only for tests - in production, this happens after reconciliation
protected final void completeToLastConvergedIndex() {
queue.complete(currentDesiredBalance.lastConvergedIndex());
queue.complete(currentDesiredBalanceRef.get().lastConvergedIndex());
}

private void recordTime(CounterMetric metric, Runnable action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;

public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {

Expand Down Expand Up @@ -694,6 +695,7 @@ public void onFailure(Exception e) {

try {
assertTrue(listenersCalled.await(10, TimeUnit.SECONDS));
assertThat(desiredBalanceShardsAllocator.getDesiredBalance(), sameInstance(DesiredBalance.NOT_MASTER));
} finally {
clusterService.close();
terminate(threadPool);
Expand Down Expand Up @@ -812,7 +814,7 @@ public void testResetDesiredBalanceOnNoLongerMaster() {
assertThat(
"desired balance should be resetted on no longer master",
desiredBalanceShardsAllocator.getDesiredBalance(),
equalTo(DesiredBalance.INITIAL)
equalTo(DesiredBalance.NOT_MASTER)
);
} finally {
clusterService.close();
Expand Down

0 comments on commit 3cb6452

Please sign in to comment.