diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java index b2179f5d69d4..acb9c72f1124 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -24,6 +24,7 @@ import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.framework.recipes.leader.Participant; +import org.apache.curator.framework.state.ConnectionState; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.guice.annotations.Self; @@ -39,9 +40,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; -/** - * - */ public class CuratorDruidLeaderSelector implements DruidLeaderSelector { private static final EmittingLogger log = new EmittingLogger(CuratorDruidLeaderSelector.class); @@ -65,11 +63,10 @@ public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self this.curator = curator; this.self = self; this.latchPath = latchPath; - - // Creating a LeaderLatch here allows us to query for the current leader. We will not be considered for leadership - // election until LeaderLatch.start() is called in registerListener(). This allows clients to observe the current - // leader without being involved in the election. this.leaderLatch.set(createNewLeaderLatch()); + + // Adding ConnectionStateListener to handle session changes using a method reference + curator.getConnectionStateListenable().addListener(this::handleConnectionStateChanged); } private LeaderLatch createNewLeaderLatch() @@ -80,55 +77,62 @@ private LeaderLatch createNewLeaderLatch() private LeaderLatch createNewLeaderLatchWithListener() { final LeaderLatch newLeaderLatch = createNewLeaderLatch(); + newLeaderLatch.addListener(new LeaderLatchListener() + { + @Override + public void isLeader() + { + try { + if (leader) { + log.warn("I'm being asked to become leader. But I am already the leader. Ignored event."); + return; + } - newLeaderLatch.addListener( - new LeaderLatchListener() - { - @Override - public void isLeader() - { - try { - if (newLeaderLatch.getState().equals(LeaderLatch.State.CLOSED)) { - log.warn("I'm being asked to become leader, but the latch is CLOSED. Ignored event."); - return; - } - - if (leader) { - log.warn("I'm being asked to become leader. But I am already the leader. Ignored event."); - return; - } - - leader = true; - term++; - listener.becomeLeader(); - } - catch (Exception ex) { - log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); - - recreateLeaderLatch(); - } + leader = true; + term++; + listener.becomeLeader(); + } + catch (Exception ex) { + log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); + + // give others a chance to become leader. + CloseableUtils.closeAndSuppressExceptions( + createNewLeaderLatchWithListener(), + e -> log.warn("Could not close old leader latch; continuing with new one anyway.") + ); + + leader = false; + try { + // Small delay before starting the latch so that others waiting are chosen to become leader. + Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); + leaderLatch.get().start(); } + catch (Exception e) { + // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for + // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but + // Curator likes to have "throws Exception" on methods so it might happen... + log.makeAlert(e, "I am a zombie").emit(); + } + } + } - @Override - public void notLeader() - { - try { - if (!leader) { - log.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event."); - return; - } - - leader = false; - listener.stopBeingLeader(); - recreateLeaderLatch(); - } - catch (Exception ex) { - log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit(); - } + @Override + public void notLeader() + { + try { + if (!leader) { + log.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event."); + return; } - }, - listenerExecutor - ); + + leader = false; + listener.stopBeingLeader(); + } + catch (Exception ex) { + log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit(); + } + } + }, listenerExecutor); return leaderLatch.getAndSet(newLeaderLatch); } @@ -175,10 +179,10 @@ public void registerListener(DruidLeaderSelector.Listener listener) try { this.listener = listener; this.listenerExecutor = Execs.singleThreaded( - StringUtils.format( - "LeaderSelector[%s]", - StringUtils.encodeForFormat(latchPath) - ) + StringUtils.format( + "LeaderSelector[%s]", + StringUtils.encodeForFormat(latchPath) + ) ); createNewLeaderLatchWithListener(); @@ -205,25 +209,36 @@ public void unregisterListener() listenerExecutor.shutdownNow(); } + // Method to handle connection state changes + private void handleConnectionStateChanged(CuratorFramework client, ConnectionState newState) + { + switch (newState) { + case SUSPENDED: + case LOST: + recreateLeaderLatch(); + break; + case RECONNECTED: + // Connection reestablished, no action needed here + break; + default: + // Do nothing for other states + break; + } + } + private void recreateLeaderLatch() { - // give others a chance to become leader. - CloseableUtils.closeAndSuppressExceptions( - createNewLeaderLatchWithListener(), - e -> log.warn("Could not close old leader latch; continuing with new one anyway.") - ); + // Close existing leader latch + CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch.")); - leader = false; + // Create and start a new leader latch + LeaderLatch newLeaderLatch = createNewLeaderLatchWithListener(); try { - //Small delay before starting the latch so that others waiting are chosen to become leader. - Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); - leaderLatch.get().start(); + newLeaderLatch.start(); } - catch (Exception e) { - // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for - // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but - // Curator likes to have "throws Exception" on methods so it might happen... - log.makeAlert(e, "I am a zombie").emit(); + catch (Exception ex) { + throw new RuntimeException("Failed to start new LeaderLatch after session change", ex); } + leaderLatch.set(newLeaderLatch); } }