Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CURATOR-696. Fix double leader for LeaderLatch #500

Merged
merged 4 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions curator-recipes/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
getChildren();
}
} else {
log.error("getChildren() failed. rc = " + event.getResultCode());
log.error("getChildren() failed. rc = {}", event.getResultCode());
}
}
};
Expand Down Expand Up @@ -548,43 +548,57 @@ private void checkLeadership(List<String> children) throws Exception {
log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", id, localOurPath, sortedChildren);

if (ourIndex < 0) {
log.error("Can't find our node. Resetting. Index: " + ourIndex);
log.error("Can't find our node. Resetting. Index: {}", ourIndex);
reset();
} else if (ourIndex == 0) {
lastPathIsLeader.set(localOurPath);
setLeadership(true);
} else {
setLeadership(false);
String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted) {
try {
getChildren();
} catch (Exception ex) {
ThreadUtils.checkInterrupted(ex);
log.error("An error occurred checking the leadership.", ex);
return;
}

if (ourIndex == 0) {
client.getData()
.inBackground((client, event) -> {
final long ephemeralOwner =
event.getStat() != null ? event.getStat().getEphemeralOwner() : -1;
final long thisSessionId =
client.getZookeeperClient().getZooKeeper().getSessionId();
if (ephemeralOwner != thisSessionId) {
// this node is gone - reset
reset();
} else {
lastPathIsLeader.set(localOurPath);
setLeadership(true);
}
Comment on lines +559 to 569
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major fix.

}
}
};
})
.forPath(localOurPath);
return;
}

BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
// previous node is gone - retry getChildren
setLeadership(false);
String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted) {
try {
getChildren();
} catch (Exception ex) {
ThreadUtils.checkInterrupted(ex);
log.error("An error occurred checking the leadership.", ex);
}
}
};
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData()
.usingWatcher(watcher)
.inBackground(callback)
.forPath(ZKPaths.makePath(latchPath, watchPath));
}
}
};

BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
// previous node is gone - retry getChildren
getChildren();
}
}
};
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
}

private void getChildren() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.curator.framework.recipes.leader;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -72,9 +73,12 @@
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag(CuratorTestBase.zk35TestCompatibilityGroup)
public class TestLeaderLatch extends BaseClassForTests {
private static final Logger LOG = LoggerFactory.getLogger(TestLeaderLatch.class);
private static final String PATH_NAME = "/one/two/me";
private static final int MAX_LOOPS = 5;

Expand Down Expand Up @@ -208,6 +212,58 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception {
}
}

@Test
public void testSessionInterruptionDoNotCauseBrainSplit() throws Exception {
final String latchPath = "/testSessionInterruptionDoNotCauseBrainSplit";
final Timing2 timing = new Timing2();
final BlockingQueue<TestEvent> events0 = new LinkedBlockingQueue<>();
final BlockingQueue<TestEvent> events1 = new LinkedBlockingQueue<>();

final List<Closeable> closeableResources = new ArrayList<>();
try {
final String id0 = "id0";
final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, null);
closeableResources.add(client0);
final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events0);
closeableResources.add(latch0);

assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
.isNotNull()
.isEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP));

final String id1 = "id1";
final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, null);
closeableResources.add(client1);
final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events1);
closeableResources.add(latch1);

// wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation
// this call is time-consuming but necessary because we don't have a handle to detect the end of the reset
// call
timing.forWaiting().sleepABit();

assertTrue(latch0.hasLeadership());
assertFalse(latch1.hasLeadership());

client0.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();

assertThat(events1.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
.isNotNull()
.isEqualTo(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP));

assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
.isNotNull()
.isEqualTo(new TestEvent(id0, TestEventType.LOST_LEADERSHIP));
// No leadership grained to old leader after session changed, hence no brain split.
assertThat(events0.poll(20, TimeUnit.MILLISECONDS))
.isNotEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP));
} finally {
// reverse is necessary for closing the LeaderLatch instances before closing the corresponding client
Collections.reverse(closeableResources);
closeableResources.forEach(CloseableUtils::closeQuietly);
}
}

@Test
public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Exception {
final String latchPath = "/test";
Expand Down Expand Up @@ -316,7 +372,9 @@ private static CuratorFramework createAndStartClient(

client.getConnectionStateListenable().addListener((client1, newState) -> {
if (newState == ConnectionState.CONNECTED) {
events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION));
if (events != null) {
events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION));
}
}
});

Expand Down Expand Up @@ -366,6 +424,11 @@ public boolean equals(Object o) {
TestEvent testEvent = (TestEvent) o;
return Objects.equals(id, testEvent.id) && eventType == testEvent.eventType;
}

@Override
public String toString() {
return "TestEvent{" + "eventType=" + eventType + ", id='" + id + '\'' + '}';
}
}

@Test
Expand Down
Loading