Skip to content

Commit

Permalink
Add unit test for detecting a dead lock
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Apr 20, 2022
1 parent 925dbd4 commit ae574d2
Showing 1 changed file with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
package org.apache.pulsar.broker.admin;

import com.google.common.collect.Sets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Phaser;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand All @@ -32,8 +37,6 @@
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

@Test(groups = "broker-admin")
@Slf4j
Expand Down Expand Up @@ -93,6 +96,49 @@ public void testHealthCheckup() throws Exception {
);
}

@Test(expectedExceptions= PulsarAdminException.class, expectedExceptionsMessageRegExp = ".*Deadlocked threads detected.*")
public void testHealthCheckupDetectsDeadlock() throws Exception {
// simulate a deadlock in the Test JVM
// the broker used in unit tests runs in the test JVM and the
// healthcheck implementation should detect this deadlock
Lock lock1 = new ReentrantReadWriteLock().writeLock();
Lock lock2 = new ReentrantReadWriteLock().writeLock();
final Phaser phaser = new Phaser(3);
Thread thread1=new Thread(() -> {
phaser.arriveAndAwaitAdvance();
deadlock(lock1, lock2, 1000L);
}, "deadlockthread-1");
Thread thread2=new Thread(() -> {
phaser.arriveAndAwaitAdvance();
deadlock(lock2, lock1, 2000L);
}, "deadlockthread-2");
thread1.start();
thread2.start();
phaser.arriveAndAwaitAdvance();
Thread.sleep(5000L);

try {
admin.brokers().healthcheck(TopicVersion.V2);
} finally {
// unlock the deadlock
thread1.interrupt();
thread2.interrupt();
}
}

private void deadlock(Lock lock1, Lock lock2, long millis) {
lock1.lock();
try {
Thread.sleep(millis);
lock2.lock();
lock2.unlock();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock1.unlock();
}
}

@Test
public void testHealthCheckupV1() throws Exception {
final int times = 30;
Expand Down

0 comments on commit ae574d2

Please sign in to comment.