From c480750a01406ee7c7695666afe9985f0afef23d Mon Sep 17 00:00:00 2001 From: Zach York Date: Thu, 16 Sep 2021 16:54:50 -0700 Subject: [PATCH] HBASE-26320 Implement a separate thread pool for the LogCleaner This avoids starvation when the archive directory is large and takes a long time to iterate through. --- .../apache/hadoop/hbase/master/HMaster.java | 29 ++++++---- .../hbase/master/cleaner/CleanerChore.java | 7 +++ .../hbase/master/cleaner/DirScanPool.java | 50 +++++++++++++---- .../hbase/backup/TestHFileArchiving.java | 2 +- .../TestZooKeeperTableArchiveClient.java | 2 +- .../master/cleaner/TestCleanerChore.java | 53 ++++++++++++++++++- .../master/cleaner/TestHFileCleaner.java | 2 +- .../master/cleaner/TestHFileLinkCleaner.java | 2 +- .../hbase/master/cleaner/TestLogsCleaner.java | 2 +- .../master/region/MasterRegionTestBase.java | 10 ++-- .../region/TestMasterRegionCompaction.java | 2 +- .../region/TestMasterRegionWALCleaner.java | 2 +- 12 files changed, 133 insertions(+), 30 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index b822921cbaa1..989739677503 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -367,7 +367,10 @@ public class HMaster extends HBaseServerBase implements Maste private HbckChore hbckChore; CatalogJanitor catalogJanitorChore; - private DirScanPool cleanerPool; + // Threadpool for scanning the archive directory, used by the HFileCleaner + private DirScanPool hfileCleanerPool; + // Threadpool for scanning the Old logs directory, used by the LogCleaner + private DirScanPool logCleanerPool; private LogCleaner logCleaner; private HFileCleaner hfileCleaner; private ReplicationBarrierCleaner replicationBarrierCleaner; @@ -1131,7 +1134,8 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc (EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f)); this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime(); configurationManager.registerObserver(this.balancer); - configurationManager.registerObserver(this.cleanerPool); + configurationManager.registerObserver(this.hfileCleanerPool); + configurationManager.registerObserver(this.logCleanerPool); configurationManager.registerObserver(this.hfileCleaner); configurationManager.registerObserver(this.logCleaner); configurationManager.registerObserver(this.regionsRecoveryConfigManager); @@ -1493,21 +1497,24 @@ private void startServiceThreads() throws IOException { ExecutorType.MASTER_TABLE_OPERATIONS).setCorePoolSize(1)); startProcedureExecutor(); - // Create cleaner thread pool - cleanerPool = new DirScanPool(conf); + // Create log cleaner thread pool + logCleanerPool = DirScanPool.getLogCleanerScanPool(conf); Map params = new HashMap<>(); params.put(MASTER, this); // Start log cleaner thread int cleanerInterval = conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL); this.logCleaner = new LogCleaner(cleanerInterval, this, conf, - getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool, params); + getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), + logCleanerPool, params); getChoreService().scheduleChore(logCleaner); // start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); + // Create archive cleaner thread pool + hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf); this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, - getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params); + getMasterFileSystem().getFileSystem(), archiveDir, hfileCleanerPool, params); getChoreService().scheduleChore(hfileCleaner); // Regions Reopen based on very high storeFileRefCount is considered enabled @@ -1557,9 +1564,13 @@ protected void stopServiceThreads() { } stopChoreService(); stopExecutorService(); - if (cleanerPool != null) { - cleanerPool.shutdownNow(); - cleanerPool = null; + if (hfileCleanerPool != null) { + hfileCleanerPool.shutdownNow(); + hfileCleanerPool = null; + } + if (logCleanerPool != null) { + logCleanerPool.shutdownNow(); + logCleanerPool = null; } if (maintenanceRegionServer != null) { maintenanceRegionServer.getRegionServer().stop(HBASE_MASTER_CLEANER_INTERVAL); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index 4ae428d51c62..8454eae3ea57 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -57,6 +57,7 @@ public abstract class CleanerChore extends Schedu private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors(); /** + * Configures the threadpool used for scanning the archive directory for the HFileCleaner * If it is an integer and >= 1, it would be the size; * if 0.0 < size <= 1.0, size would be available processors * size. * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores, @@ -64,6 +65,12 @@ public abstract class CleanerChore extends Schedu */ public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; + /** + * Configures the threadpool used for scanning the Old logs directory for the LogCleaner + * Follows the same configuration mechanism as CHORE_POOL_SIZE, but has a default of 1 thread. + */ + public static final String LOG_CLEANER_CHORE_SIZE = "hbase.log.cleaner.scan.dir.concurrent.size"; + static final String DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE = "1"; private final DirScanPool pool; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java index 164752b97314..6e1426985cc4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java @@ -39,21 +39,41 @@ public class DirScanPool implements ConfigurationObserver { private final ThreadPoolExecutor pool; private int cleanerLatch; private boolean reconfigNotification; + private Type dirScanPoolType; + private final String name; - public DirScanPool(Configuration conf) { - String poolSize = conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE); + private enum Type { + LOG_CLEANER(CleanerChore.LOG_CLEANER_CHORE_SIZE, + CleanerChore.DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE), + HFILE_CLEANER(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE); + + private final String cleanerPoolSizeConfigName; + private final String cleanerPoolSizeConfigDefault; + + private Type(String cleanerPoolSizeConfigName, String cleanerPoolSizeConfigDefault) { + this.cleanerPoolSizeConfigName = cleanerPoolSizeConfigName; + this.cleanerPoolSizeConfigDefault = cleanerPoolSizeConfigDefault; + } + } + + private DirScanPool(Configuration conf, Type dirScanPoolType) { + this.dirScanPoolType = dirScanPoolType; + this.name = dirScanPoolType.name().toLowerCase(); + String poolSize = conf.get(dirScanPoolType.cleanerPoolSizeConfigName, + dirScanPoolType.cleanerPoolSizeConfigDefault); size = CleanerChore.calculatePoolSize(poolSize); // poolSize may be 0 or 0.0 from a careless configuration, // double check to make sure. - size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size; - pool = initializePool(size); - LOG.info("Cleaner pool size is {}", size); + size = size == 0 ? + CleanerChore.calculatePoolSize(dirScanPoolType.cleanerPoolSizeConfigDefault) : size; + pool = initializePool(size, name); + LOG.info("{} Cleaner pool size is {}", name, size); cleanerLatch = 0; } - private static ThreadPoolExecutor initializePool(int size) { + private static ThreadPoolExecutor initializePool(int size, String name) { return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES, - new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d").setDaemon(true) + new ThreadFactoryBuilder().setNameFormat(name + "-dir-scan-pool-%d").setDaemon(true) .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); } @@ -64,9 +84,11 @@ private static ThreadPoolExecutor initializePool(int size) { @Override public synchronized void onConfigurationChange(Configuration conf) { int newSize = CleanerChore.calculatePoolSize( - conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE)); + conf.get(dirScanPoolType.cleanerPoolSizeConfigName, + dirScanPoolType.cleanerPoolSizeConfigDefault)); if (newSize == size) { - LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize); + LOG.trace("{} Cleaner Size from configuration is same as previous={}, no need to update.", + name, newSize); return; } size = newSize; @@ -109,11 +131,19 @@ synchronized void tryUpdatePoolSize(long timeout) { break; } } - LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size); + LOG.info("Update {} chore's pool size from {} to {}", name, pool.getPoolSize(), size); pool.setCorePoolSize(size); } public int getSize() { return size; } + + public static DirScanPool getHFileCleanerScanPool(Configuration conf) { + return new DirScanPool(conf, Type.HFILE_CLEANER); + } + + public static DirScanPool getLogCleanerScanPool(Configuration conf) { + return new DirScanPool(conf, Type.LOG_CLEANER); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java index 70d39eb3154c..997fac56fe94 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java @@ -111,7 +111,7 @@ public static void setupCluster() throws Exception { // We don't want the cleaner to remove files. The tests do that. UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().cancel(true); - POOL = new DirScanPool(UTIL.getConfiguration()); + POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); } private static void setupConf(Configuration conf) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index 6355abd0f367..92b6b78e14cb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -119,7 +119,7 @@ public static void setupCluster() throws Exception { String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher); ZKUtil.createWithParents(watcher, archivingZNode); rss = mock(RegionServerServices.class); - POOL = new DirScanPool(UTIL.getConfiguration()); + POOL= DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); } private static void setupConf(Configuration conf) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java index 0f75030cf836..c3e7741aa2ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java @@ -61,7 +61,7 @@ public class TestCleanerChore { @BeforeClass public static void setup() { - POOL = new DirScanPool(UTIL.getConfiguration()); + POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); } @AfterClass @@ -469,6 +469,57 @@ public void testOnConfigurationChange() throws Exception { t.join(); } + @Test + public void testOnConfigurationChangeLogCleaner() throws Exception { + int availableProcessorNum = Runtime.getRuntime().availableProcessors(); + if (availableProcessorNum == 1) { // no need to run this test + return; + } + + DirScanPool pool = DirScanPool.getLogCleanerScanPool(UTIL.getConfiguration()); + + // have at least 2 available processors/cores + int initPoolSize = availableProcessorNum / 2; + int changedPoolSize = availableProcessorNum; + + Stoppable stop = new StoppableImplementation(); + Configuration conf = UTIL.getConfiguration(); + Path testDir = UTIL.getDataTestDir(); + FileSystem fs = UTIL.getTestFileSystem(); + String confKey = "hbase.test.cleaner.delegates"; + conf.set(confKey, AlwaysDelete.class.getName()); + conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(initPoolSize)); + final AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, pool); + chore.setEnabled(true); + // Create subdirs under testDir + int dirNums = 6; + Path[] subdirs = new Path[dirNums]; + for (int i = 0; i < dirNums; i++) { + subdirs[i] = new Path(testDir, "subdir-" + i); + fs.mkdirs(subdirs[i]); + } + // Under each subdirs create 6 files + for (Path subdir : subdirs) { + createFiles(fs, subdir, 6); + } + // Start chore + Thread t = new Thread(new Runnable() { + @Override + public void run() { + chore.chore(); + } + }); + t.setDaemon(true); + t.start(); + // Change size of chore's pool + conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(changedPoolSize)); + pool.onConfigurationChange(conf); + assertEquals(changedPoolSize, chore.getChorePoolSize()); + // Stop chore + t.join(); + } + @Test public void testMinimumNumberOfThreads() throws Exception { Configuration conf = UTIL.getConfiguration(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index 6d08a5045bd4..0408ad1b1685 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -71,7 +71,7 @@ public class TestHFileCleaner { public static void setupCluster() throws Exception { // have to use a minidfs cluster because the localfs doesn't modify file times correctly UTIL.startMiniDFSCluster(1); - POOL = new DirScanPool(UTIL.getConfiguration()); + POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java index 5f92e34462ce..32ffaeca2372 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java @@ -67,7 +67,7 @@ public class TestHFileLinkCleaner { @BeforeClass public static void setUp() { - POOL = new DirScanPool(TEST_UTIL.getConfiguration()); + POOL = DirScanPool.getHFileCleanerScanPool(TEST_UTIL.getConfiguration()); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index d00aec849d8d..e924e6449d3f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -94,7 +94,7 @@ public class TestLogsCleaner { public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniDFSCluster(1); - POOL = new DirScanPool(TEST_UTIL.getConfiguration()); + POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration()); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/MasterRegionTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/MasterRegionTestBase.java index ea6d9d093b19..721e4d1b189c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/MasterRegionTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/MasterRegionTestBase.java @@ -48,7 +48,9 @@ public class MasterRegionTestBase { protected ChoreService choreService; - protected DirScanPool cleanerPool; + protected DirScanPool hfileCleanerPool; + + protected DirScanPool logCleanerPool; protected static byte[] CF1 = Bytes.toBytes("f1"); @@ -80,7 +82,8 @@ public void setUp() throws IOException { htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); configure(htu.getConfiguration()); choreService = new ChoreService(getClass().getSimpleName()); - cleanerPool = new DirScanPool(htu.getConfiguration()); + hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(htu.getConfiguration()); + logCleanerPool = DirScanPool.getLogCleanerScanPool(htu.getConfiguration()); Server server = mock(Server.class); when(server.getConfiguration()).thenReturn(htu.getConfiguration()); when(server.getServerName()) @@ -103,7 +106,8 @@ public void setUp() throws IOException { @After public void tearDown() throws IOException { region.close(true); - cleanerPool.shutdownNow(); + hfileCleanerPool.shutdownNow(); + logCleanerPool.shutdownNow(); choreService.shutdown(); htu.cleanupTestDir(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionCompaction.java index 713fc3096f77..6759903608e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionCompaction.java @@ -77,7 +77,7 @@ public void stop(String why) { public boolean isStopped() { return stopped; } - }, conf, fs, globalArchivePath, cleanerPool); + }, conf, fs, globalArchivePath, hfileCleanerPool); choreService.scheduleChore(hfileCleaner); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java index 39497b07e52f..f936e9e4f592 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java @@ -72,7 +72,7 @@ public void stop(String why) { public boolean isStopped() { return stopped; } - }, conf, fs, globalWALArchiveDir, cleanerPool, null); + }, conf, fs, globalWALArchiveDir, logCleanerPool, null); choreService.scheduleChore(logCleaner); }