Skip to content

Commit

Permalink
Addressing apache#16411
Browse files Browse the repository at this point in the history
Added listener method that tracks ZK leader state
  • Loading branch information
Razin Bouzar authored and razinbouzar committed Jun 17, 2024
1 parent 3da8f9d commit e3e2b9f
Showing 1 changed file with 85 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.guice.annotations.Self;
Expand All @@ -39,9 +40,6 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;

/**
*
*/
public class CuratorDruidLeaderSelector implements DruidLeaderSelector
{
private static final EmittingLogger log = new EmittingLogger(CuratorDruidLeaderSelector.class);
Expand All @@ -65,11 +63,10 @@ public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self
this.curator = curator;
this.self = self;
this.latchPath = latchPath;

// Creating a LeaderLatch here allows us to query for the current leader. We will not be considered for leadership
// election until LeaderLatch.start() is called in registerListener(). This allows clients to observe the current
// leader without being involved in the election.
this.leaderLatch.set(createNewLeaderLatch());

// Adding ConnectionStateListener to handle session changes using a method reference
curator.getConnectionStateListenable().addListener(this::handleConnectionStateChanged);
}

private LeaderLatch createNewLeaderLatch()
Expand All @@ -80,55 +77,62 @@ private LeaderLatch createNewLeaderLatch()
private LeaderLatch createNewLeaderLatchWithListener()
{
final LeaderLatch newLeaderLatch = createNewLeaderLatch();
newLeaderLatch.addListener(new LeaderLatchListener()
{
@Override
public void isLeader()
{
try {
if (leader) {
log.warn("I'm being asked to become leader. But I am already the leader. Ignored event.");
return;
}

newLeaderLatch.addListener(
new LeaderLatchListener()
{
@Override
public void isLeader()
{
try {
if (newLeaderLatch.getState().equals(LeaderLatch.State.CLOSED)) {
log.warn("I'm being asked to become leader, but the latch is CLOSED. Ignored event.");
return;
}

if (leader) {
log.warn("I'm being asked to become leader. But I am already the leader. Ignored event.");
return;
}

leader = true;
term++;
listener.becomeLeader();
}
catch (Exception ex) {
log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit();

recreateLeaderLatch();
}
leader = true;
term++;
listener.becomeLeader();
}
catch (Exception ex) {
log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit();

// give others a chance to become leader.
CloseableUtils.closeAndSuppressExceptions(
createNewLeaderLatchWithListener(),
e -> log.warn("Could not close old leader latch; continuing with new one anyway.")
);

leader = false;
try {
// Small delay before starting the latch so that others waiting are chosen to become leader.
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
leaderLatch.get().start();
}
catch (Exception e) {
// If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for
// the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but
// Curator likes to have "throws Exception" on methods so it might happen...
log.makeAlert(e, "I am a zombie").emit();
}
}
}

@Override
public void notLeader()
{
try {
if (!leader) {
log.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event.");
return;
}

leader = false;
listener.stopBeingLeader();
recreateLeaderLatch();
}
catch (Exception ex) {
log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit();
}
@Override
public void notLeader()
{
try {
if (!leader) {
log.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event.");
return;
}
},
listenerExecutor
);

leader = false;
listener.stopBeingLeader();
}
catch (Exception ex) {
log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit();
}
}
}, listenerExecutor);

return leaderLatch.getAndSet(newLeaderLatch);
}
Expand Down Expand Up @@ -175,10 +179,10 @@ public void registerListener(DruidLeaderSelector.Listener listener)
try {
this.listener = listener;
this.listenerExecutor = Execs.singleThreaded(
StringUtils.format(
"LeaderSelector[%s]",
StringUtils.encodeForFormat(latchPath)
)
StringUtils.format(
"LeaderSelector[%s]",
StringUtils.encodeForFormat(latchPath)
)
);

createNewLeaderLatchWithListener();
Expand All @@ -205,25 +209,36 @@ public void unregisterListener()
listenerExecutor.shutdownNow();
}

// Method to handle connection state changes
private void handleConnectionStateChanged(CuratorFramework client, ConnectionState newState)
{
switch (newState) {
case SUSPENDED:
case LOST:
recreateLeaderLatch();
break;
case RECONNECTED:
// Connection reestablished, no action needed here
break;
default:
// Do nothing for other states
break;
}
}

private void recreateLeaderLatch()
{
// give others a chance to become leader.
CloseableUtils.closeAndSuppressExceptions(
createNewLeaderLatchWithListener(),
e -> log.warn("Could not close old leader latch; continuing with new one anyway.")
);
// Close existing leader latch
CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch."));

leader = false;
// Create and start a new leader latch
LeaderLatch newLeaderLatch = createNewLeaderLatchWithListener();
try {
//Small delay before starting the latch so that others waiting are chosen to become leader.
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
leaderLatch.get().start();
newLeaderLatch.start();
}
catch (Exception e) {
// If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for
// the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but
// Curator likes to have "throws Exception" on methods so it might happen...
log.makeAlert(e, "I am a zombie").emit();
catch (Exception ex) {
throw new RuntimeException("Failed to start new LeaderLatch after session change", ex);
}
leaderLatch.set(newLeaderLatch);
}
}

0 comments on commit e3e2b9f

Please sign in to comment.