diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index 2b90e3b0081..50cede05dec 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -121,6 +121,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry private OpStatsLogger getEntryStats; private OpStatsLogger pageScanStats; private Counter retryCounter; + protected LedgerDirsManager ledgerDirsManager; public InterleavedLedgerStorage() { activeLedgers = new SnapshotMap<>(); @@ -184,6 +185,7 @@ public void initializeWithEntryLogger(ServerConfiguration conf, checkNotNull(checkpointer, "invalid null checkpointer"); this.entryLogger = (DefaultEntryLogger) entryLogger; this.entryLogger.addListener(this); + this.ledgerDirsManager = ledgerDirsManager; ledgerCache = new LedgerCacheImpl(conf, activeLedgers, null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger); gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager, @@ -201,21 +203,26 @@ private LedgerDirsListener getLedgerDirsListener() { @Override public void diskAlmostFull(File disk) { - if (gcThread.isForceGCAllowWhenNoSpace) { - gcThread.enableForceGC(); - } else { - gcThread.suspendMajorGC(); + if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) { + if (gcThread.isForceGCAllowWhenNoSpace) { + gcThread.enableForceGC(); + } else { + gcThread.suspendMajorGC(); + } } } @Override public void diskFull(File disk) { - if (gcThread.isForceGCAllowWhenNoSpace) { - gcThread.enableForceGC(); - } else { - gcThread.suspendMajorGC(); - gcThread.suspendMinorGC(); + if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) { + if (gcThread.isForceGCAllowWhenNoSpace) { + gcThread.enableForceGC(); + } else { + gcThread.suspendMajorGC(); + gcThread.suspendMinorGC(); + } } + } @Override @@ -230,25 +237,30 @@ public void allDisksFull(boolean highPriorityWritesAllowed) { @Override public void diskWritable(File disk) { - // we have enough space now - if (gcThread.isForceGCAllowWhenNoSpace) { - // disable force gc. - gcThread.disableForceGC(); - } else { - // resume compaction to normal. - gcThread.resumeMajorGC(); - gcThread.resumeMinorGC(); + if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) { + // we have enough space now + if (gcThread.isForceGCAllowWhenNoSpace) { + // disable force gc. + gcThread.disableForceGC(); + } else { + // resume compaction to normal. + gcThread.resumeMajorGC(); + gcThread.resumeMinorGC(); + } } + } @Override public void diskJustWritable(File disk) { - if (gcThread.isForceGCAllowWhenNoSpace) { - // if a disk is just writable, we still need force gc. - gcThread.enableForceGC(); - } else { - // still under warn threshold, only resume minor compaction. - gcThread.resumeMinorGC(); + if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) { + if (gcThread.isForceGCAllowWhenNoSpace) { + // if a disk is just writable, we still need force gc. + gcThread.enableForceGC(); + } else { + // still under warn threshold, only resume minor compaction. + gcThread.resumeMinorGC(); + } } } }; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 667dd466b8a..3b1270c0175 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -144,6 +144,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage private final Counter flushExecutorTime; private final boolean singleLedgerDirs; + private final LedgerDirsManager ledgerDirsManager; public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, @@ -225,9 +226,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le }); ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); - if (!ledgerBaseDir.equals(indexBaseDir)) { - indexDirsManager.addLedgerDirsListener(getLedgerDirsListener()); - } + this.ledgerDirsManager = ledgerDirsManager; } @Override @@ -1147,21 +1146,26 @@ private LedgerDirsManager.LedgerDirsListener getLedgerDirsListener() { @Override public void diskAlmostFull(File disk) { - if (gcThread.isForceGCAllowWhenNoSpace()) { - gcThread.enableForceGC(); - } else { - gcThread.suspendMajorGC(); + if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) { + if (gcThread.isForceGCAllowWhenNoSpace()) { + gcThread.enableForceGC(); + } else { + gcThread.suspendMajorGC(); + } } } @Override public void diskFull(File disk) { - if (gcThread.isForceGCAllowWhenNoSpace()) { - gcThread.enableForceGC(); - } else { - gcThread.suspendMajorGC(); - gcThread.suspendMinorGC(); + if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) { + if (gcThread.isForceGCAllowWhenNoSpace()) { + gcThread.enableForceGC(); + } else { + gcThread.suspendMajorGC(); + gcThread.suspendMinorGC(); + } } + } @Override @@ -1176,25 +1180,29 @@ public void allDisksFull(boolean highPriorityWritesAllowed) { @Override public void diskWritable(File disk) { - // we have enough space now - if (gcThread.isForceGCAllowWhenNoSpace()) { - // disable force gc. - gcThread.disableForceGC(); - } else { - // resume compaction to normal. - gcThread.resumeMajorGC(); - gcThread.resumeMinorGC(); + if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) { + // we have enough space now + if (gcThread.isForceGCAllowWhenNoSpace()) { + // disable force gc. + gcThread.disableForceGC(); + } else { + // resume compaction to normal. + gcThread.resumeMajorGC(); + gcThread.resumeMinorGC(); + } } } @Override public void diskJustWritable(File disk) { - if (gcThread.isForceGCAllowWhenNoSpace()) { - // if a disk is just writable, we still need force gc. - gcThread.enableForceGC(); - } else { - // still under warn threshold, only resume minor compaction. - gcThread.resumeMinorGC(); + if (ledgerDirsManager.getAllLedgerDirs().contains(disk)) { + if (gcThread.isForceGCAllowWhenNoSpace()) { + // if a disk is just writable, we still need force gc. + gcThread.enableForceGC(); + } else { + // still under warn threshold, only resume minor compaction. + gcThread.resumeMinorGC(); + } } } }; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerDirsManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerDirsManagerTest.java index 2a171373fe8..4001d651106 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerDirsManagerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerDirsManagerTest.java @@ -36,6 +36,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; @@ -246,6 +247,83 @@ public void testIsReadOnlyModeOnAnyDiskFullEnabled() throws Exception { testAnyLedgerFullTransitToReadOnly(false); } + @Test + public void testTriggerLedgerDirListener() throws Exception { + ledgerMonitor.shutdown(); + + final float nospace = 0.90f; + final float lwm = 0.80f; + HashMap usageMap; + + File tmpDir1 = createTempDir("bkTest", ".dir"); + File curDir1 = BookieImpl.getCurrentDirectory(tmpDir1); + BookieImpl.checkDirectoryStructure(curDir1); + + File tmpDir2 = createTempDir("bkTest", ".dir"); + File curDir2 = BookieImpl.getCurrentDirectory(tmpDir2); + BookieImpl.checkDirectoryStructure(curDir2); + + conf.setDiskUsageThreshold(nospace); + conf.setDiskLowWaterMarkUsageThreshold(lwm); + conf.setDiskUsageWarnThreshold(nospace); + conf.setReadOnlyModeOnAnyDiskFullEnabled(false); + conf.setLedgerDirNames(new String[] { tmpDir1.toString(), tmpDir2.toString() }); + + mockDiskChecker = new MockDiskChecker(nospace, warnThreshold); + dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), + new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()), statsLogger); + ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, Collections.singletonList(dirsManager)); + usageMap = new HashMap<>(); + usageMap.put(curDir1, 0.1f); + usageMap.put(curDir2, 0.1f); + mockDiskChecker.setUsageMap(usageMap); + ledgerMonitor.init(); + final MockLedgerDirsListener mockLedgerDirsListener = new MockLedgerDirsListener(); + dirsManager.addLedgerDirsListener(mockLedgerDirsListener); + ledgerMonitor.start(); + + final CountDownLatch diskAlmostFull = new CountDownLatch(1); + final CountDownLatch diskFull = new CountDownLatch(1); + + dirsManager.addLedgerDirsListener(new LedgerDirsListener() { + @Override + public void diskAlmostFull(File disk) { + if (disk.equals(curDir1)) { + diskAlmostFull.countDown(); + } + + } + + @Override + public void diskFull(File disk) { + if (disk.equals(curDir1)) { + diskFull.countDown(); + } + + + } + }); + + Thread.sleep((diskCheckInterval * 2) + 100); + assertFalse(mockLedgerDirsListener.readOnly); + + // diskAlmostFull + setUsageAndThenVerify(curDir1, nospace - 0.6f, curDir2, nospace - 0.20f, mockDiskChecker, + mockLedgerDirsListener, false); + assertEquals(1, diskAlmostFull.getCount()); + setUsageAndThenVerify(curDir1, nospace - 0.2f, curDir2, nospace - 0.60f, mockDiskChecker, + mockLedgerDirsListener, false); + assertEquals(0, diskAlmostFull.getCount()); + + // diskFull + setUsageAndThenVerify(curDir1, nospace - 0.6f, curDir2, nospace + 0.05f, mockDiskChecker, + mockLedgerDirsListener, false); + assertEquals(1, diskFull.getCount()); + setUsageAndThenVerify(curDir1, nospace + 0.05f, curDir2, nospace - 0.20f, mockDiskChecker, + mockLedgerDirsListener, true); + assertEquals(0, diskFull.getCount()); + } + public void testAnyLedgerFullTransitToReadOnly(boolean isReadOnlyModeOnAnyDiskFullEnabled) throws Exception { ledgerMonitor.shutdown();