Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix getLedgerDirsListener #4015

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
private OpStatsLogger getEntryStats;
private OpStatsLogger pageScanStats;
private Counter retryCounter;
protected LedgerDirsManager ledgerDirsManager;

public InterleavedLedgerStorage() {
activeLedgers = new SnapshotMap<>();
Expand Down Expand Up @@ -184,6 +185,7 @@ public void initializeWithEntryLogger(ServerConfiguration conf,
checkNotNull(checkpointer, "invalid null checkpointer");
this.entryLogger = (DefaultEntryLogger) entryLogger;
this.entryLogger.addListener(this);
this.ledgerDirsManager = ledgerDirsManager;
ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager,
Expand All @@ -201,21 +203,26 @@ private LedgerDirsListener getLedgerDirsListener() {

@Override
public void diskAlmostFull(File disk) {
if (gcThread.isForceGCAllowWhenNoSpace) {
gcThread.enableForceGC();
} else {
gcThread.suspendMajorGC();
if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) {
if (gcThread.isForceGCAllowWhenNoSpace) {
gcThread.enableForceGC();
} else {
gcThread.suspendMajorGC();
}
}
}

@Override
public void diskFull(File disk) {
if (gcThread.isForceGCAllowWhenNoSpace) {
gcThread.enableForceGC();
} else {
gcThread.suspendMajorGC();
gcThread.suspendMinorGC();
if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) {
if (gcThread.isForceGCAllowWhenNoSpace) {
gcThread.enableForceGC();
} else {
gcThread.suspendMajorGC();
gcThread.suspendMinorGC();
}
}

}

@Override
Expand All @@ -230,25 +237,30 @@ public void allDisksFull(boolean highPriorityWritesAllowed) {

@Override
public void diskWritable(File disk) {
// we have enough space now
if (gcThread.isForceGCAllowWhenNoSpace) {
// disable force gc.
gcThread.disableForceGC();
} else {
// resume compaction to normal.
gcThread.resumeMajorGC();
gcThread.resumeMinorGC();
if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) {
// we have enough space now
if (gcThread.isForceGCAllowWhenNoSpace) {
// disable force gc.
gcThread.disableForceGC();
} else {
// resume compaction to normal.
gcThread.resumeMajorGC();
gcThread.resumeMinorGC();
}
}

}

@Override
public void diskJustWritable(File disk) {
if (gcThread.isForceGCAllowWhenNoSpace) {
// if a disk is just writable, we still need force gc.
gcThread.enableForceGC();
} else {
// still under warn threshold, only resume minor compaction.
gcThread.resumeMinorGC();
if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) {
if (gcThread.isForceGCAllowWhenNoSpace) {
// if a disk is just writable, we still need force gc.
gcThread.enableForceGC();
} else {
// still under warn threshold, only resume minor compaction.
gcThread.resumeMinorGC();
}
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage

private final Counter flushExecutorTime;
private final boolean singleLedgerDirs;
private final LedgerDirsManager ledgerDirsManager;

public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
Expand Down Expand Up @@ -225,9 +226,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
});

ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
if (!ledgerBaseDir.equals(indexBaseDir)) {
indexDirsManager.addLedgerDirsListener(getLedgerDirsListener());
}
this.ledgerDirsManager = ledgerDirsManager;
}

@Override
Expand Down Expand Up @@ -1147,21 +1146,26 @@ private LedgerDirsManager.LedgerDirsListener getLedgerDirsListener() {

@Override
public void diskAlmostFull(File disk) {
if (gcThread.isForceGCAllowWhenNoSpace()) {
gcThread.enableForceGC();
} else {
gcThread.suspendMajorGC();
if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) {
if (gcThread.isForceGCAllowWhenNoSpace()) {
gcThread.enableForceGC();
} else {
gcThread.suspendMajorGC();
}
}
}

@Override
public void diskFull(File disk) {
if (gcThread.isForceGCAllowWhenNoSpace()) {
gcThread.enableForceGC();
} else {
gcThread.suspendMajorGC();
gcThread.suspendMinorGC();
if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) {
if (gcThread.isForceGCAllowWhenNoSpace()) {
gcThread.enableForceGC();
} else {
gcThread.suspendMajorGC();
gcThread.suspendMinorGC();
}
}

}

@Override
Expand All @@ -1176,25 +1180,29 @@ public void allDisksFull(boolean highPriorityWritesAllowed) {

@Override
public void diskWritable(File disk) {
// we have enough space now
if (gcThread.isForceGCAllowWhenNoSpace()) {
// disable force gc.
gcThread.disableForceGC();
} else {
// resume compaction to normal.
gcThread.resumeMajorGC();
gcThread.resumeMinorGC();
if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) {
// we have enough space now
if (gcThread.isForceGCAllowWhenNoSpace()) {
// disable force gc.
gcThread.disableForceGC();
} else {
// resume compaction to normal.
gcThread.resumeMajorGC();
gcThread.resumeMinorGC();
}
}
}

@Override
public void diskJustWritable(File disk) {
if (gcThread.isForceGCAllowWhenNoSpace()) {
// if a disk is just writable, we still need force gc.
gcThread.enableForceGC();
} else {
// still under warn threshold, only resume minor compaction.
gcThread.resumeMinorGC();
if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) {
if (gcThread.isForceGCAllowWhenNoSpace()) {
// if a disk is just writable, we still need force gc.
gcThread.enableForceGC();
} else {
// still under warn threshold, only resume minor compaction.
gcThread.resumeMinorGC();
}
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
Expand Down Expand Up @@ -246,6 +247,83 @@ public void testIsReadOnlyModeOnAnyDiskFullEnabled() throws Exception {
testAnyLedgerFullTransitToReadOnly(false);
}

@Test
public void testTriggerLedgerDirListener() throws Exception {
ledgerMonitor.shutdown();

final float nospace = 0.90f;
final float lwm = 0.80f;
HashMap<File, Float> usageMap;

File tmpDir1 = createTempDir("bkTest", ".dir");
File curDir1 = BookieImpl.getCurrentDirectory(tmpDir1);
BookieImpl.checkDirectoryStructure(curDir1);

File tmpDir2 = createTempDir("bkTest", ".dir");
File curDir2 = BookieImpl.getCurrentDirectory(tmpDir2);
BookieImpl.checkDirectoryStructure(curDir2);

conf.setDiskUsageThreshold(nospace);
conf.setDiskLowWaterMarkUsageThreshold(lwm);
conf.setDiskUsageWarnThreshold(nospace);
conf.setReadOnlyModeOnAnyDiskFullEnabled(false);
conf.setLedgerDirNames(new String[] { tmpDir1.toString(), tmpDir2.toString() });

mockDiskChecker = new MockDiskChecker(nospace, warnThreshold);
dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()), statsLogger);
ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, Collections.singletonList(dirsManager));
usageMap = new HashMap<>();
usageMap.put(curDir1, 0.1f);
usageMap.put(curDir2, 0.1f);
mockDiskChecker.setUsageMap(usageMap);
ledgerMonitor.init();
final MockLedgerDirsListener mockLedgerDirsListener = new MockLedgerDirsListener();
dirsManager.addLedgerDirsListener(mockLedgerDirsListener);
ledgerMonitor.start();

final CountDownLatch diskAlmostFull = new CountDownLatch(1);
final CountDownLatch diskFull = new CountDownLatch(1);

dirsManager.addLedgerDirsListener(new LedgerDirsListener() {
@Override
public void diskAlmostFull(File disk) {
if (disk.equals(curDir1)) {
diskAlmostFull.countDown();
}

}

@Override
public void diskFull(File disk) {
if (disk.equals(curDir1)) {
diskFull.countDown();
}


}
});

Thread.sleep((diskCheckInterval * 2) + 100);
assertFalse(mockLedgerDirsListener.readOnly);

// diskAlmostFull
setUsageAndThenVerify(curDir1, nospace - 0.6f, curDir2, nospace - 0.20f, mockDiskChecker,
mockLedgerDirsListener, false);
assertEquals(1, diskAlmostFull.getCount());
setUsageAndThenVerify(curDir1, nospace - 0.2f, curDir2, nospace - 0.60f, mockDiskChecker,
mockLedgerDirsListener, false);
assertEquals(0, diskAlmostFull.getCount());

// diskFull
setUsageAndThenVerify(curDir1, nospace - 0.6f, curDir2, nospace + 0.05f, mockDiskChecker,
mockLedgerDirsListener, false);
assertEquals(1, diskFull.getCount());
setUsageAndThenVerify(curDir1, nospace + 0.05f, curDir2, nospace - 0.20f, mockDiskChecker,
mockLedgerDirsListener, true);
assertEquals(0, diskFull.getCount());
}

public void testAnyLedgerFullTransitToReadOnly(boolean isReadOnlyModeOnAnyDiskFullEnabled) throws Exception {
ledgerMonitor.shutdown();

Expand Down