Skip to content

Commit

Permalink
HBASE-26320 Implement a separate thread pool for the LogCleaner (#3712)…
Browse files Browse the repository at this point in the history
… (#4460)

This avoids starvation when the archive directory is large and takes a long time
to iterate through.

Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Anoop Sam John <[email protected]>
Signed-off-by: Pankaj <[email protected]>
  • Loading branch information
sunhelly committed May 24, 2022
1 parent 2294834 commit 4be25a3
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,10 @@ public class HMaster extends HRegionServer implements MasterServices {

private HbckChore hbckChore;
CatalogJanitor catalogJanitorChore;
private DirScanPool cleanerPool;
// Threadpool for scanning the archive directory, used by the HFileCleaner
private DirScanPool hfileCleanerPool;
// Threadpool for scanning the Old logs directory, used by the LogCleaner
private DirScanPool logCleanerPool;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private ReplicationBarrierCleaner replicationBarrierCleaner;
Expand Down Expand Up @@ -1151,7 +1154,8 @@ private void finishActiveMasterInitialization(MonitoredTask status)
(EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f));
this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime();
configurationManager.registerObserver(this.balancer);
configurationManager.registerObserver(this.cleanerPool);
configurationManager.registerObserver(this.hfileCleanerPool);
configurationManager.registerObserver(this.logCleanerPool);
configurationManager.registerObserver(this.hfileCleaner);
configurationManager.registerObserver(this.logCleaner);
configurationManager.registerObserver(this.regionsRecoveryConfigManager);
Expand Down Expand Up @@ -1515,22 +1519,24 @@ executorService.new ExecutorConfig().setExecutorType(ExecutorType.MASTER_MERGE_O
.setExecutorType(ExecutorType.MASTER_TABLE_OPERATIONS).setCorePoolSize(1));
startProcedureExecutor();

// Create cleaner thread pool
cleanerPool = new DirScanPool(conf);
// Create log cleaner thread pool
logCleanerPool = DirScanPool.getLogCleanerScanPool(conf);
Map<String, Object> params = new HashMap<>();
params.put(MASTER, this);
// Start log cleaner thread
int cleanerInterval =
conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
this.logCleaner =
new LogCleaner(cleanerInterval, this, conf, getMasterWalManager().getFileSystem(),
getMasterWalManager().getOldLogDir(), cleanerPool, params);
getMasterWalManager().getOldLogDir(), logCleanerPool, params);
getChoreService().scheduleChore(logCleaner);

// start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
// Create archive cleaner thread pool
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
getMasterFileSystem().getFileSystem(), archiveDir, hfileCleanerPool, params);
getChoreService().scheduleChore(hfileCleaner);

// Regions Reopen based on very high storeFileRefCount is considered enabled
Expand Down Expand Up @@ -1583,9 +1589,13 @@ protected void stopServiceThreads() {
this.mobCompactThread.close();
}
super.stopServiceThreads();
if (cleanerPool != null) {
cleanerPool.shutdownNow();
cleanerPool = null;
if (hfileCleanerPool != null) {
hfileCleanerPool.shutdownNow();
hfileCleanerPool = null;
}
if (logCleanerPool != null) {
logCleanerPool.shutdownNow();
logCleanerPool = null;
}

LOG.debug("Stopping service threads");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,19 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();

/**
* If it is an integer and >= 1, it would be the size; if 0.0 < size <= 1.0, size would be
* available processors * size. Pay attention that 1.0 is different from 1, former indicates it
* will use 100% of cores, while latter will use only 1 thread for chore to scan dir.
* Configures the threadpool used for scanning the archive directory for the HFileCleaner If it is
* an integer and >= 1, it would be the size; if 0.0 < size <= 1.0, size would be available
* processors * size. Pay attention that 1.0 is different from 1, former indicates it will use
* 100% of cores, while latter will use only 1 thread for chore to scan dir.
*/
public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
/**
* Configures the threadpool used for scanning the Old logs directory for the LogCleaner Follows
* the same configuration mechanism as CHORE_POOL_SIZE, but has a default of 1 thread.
*/
public static final String LOG_CLEANER_CHORE_SIZE = "hbase.log.cleaner.scan.dir.concurrent.size";
static final String DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE = "1";

private final DirScanPool pool;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,42 @@ public class DirScanPool implements ConfigurationObserver {
private final ThreadPoolExecutor pool;
private int cleanerLatch;
private boolean reconfigNotification;
private Type dirScanPoolType;
private final String name;

public DirScanPool(Configuration conf) {
String poolSize = conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE);
private enum Type {
LOG_CLEANER(CleanerChore.LOG_CLEANER_CHORE_SIZE,
CleanerChore.DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE),
HFILE_CLEANER(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE);

private final String cleanerPoolSizeConfigName;
private final String cleanerPoolSizeConfigDefault;

private Type(String cleanerPoolSizeConfigName, String cleanerPoolSizeConfigDefault) {
this.cleanerPoolSizeConfigName = cleanerPoolSizeConfigName;
this.cleanerPoolSizeConfigDefault = cleanerPoolSizeConfigDefault;
}
}

private DirScanPool(Configuration conf, Type dirScanPoolType) {
this.dirScanPoolType = dirScanPoolType;
this.name = dirScanPoolType.name().toLowerCase();
String poolSize = conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault);
size = CleanerChore.calculatePoolSize(poolSize);
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
pool = initializePool(size);
LOG.info("Cleaner pool size is {}", size);
size = size == 0
? CleanerChore.calculatePoolSize(dirScanPoolType.cleanerPoolSizeConfigDefault)
: size;
pool = initializePool(size, name);
LOG.info("{} Cleaner pool size is {}", name, size);
cleanerLatch = 0;
}

private static ThreadPoolExecutor initializePool(int size) {
private static ThreadPoolExecutor initializePool(int size, String name) {
return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES,
new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d").setDaemon(true)
new ThreadFactoryBuilder().setNameFormat(name + "-dir-scan-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

Expand All @@ -63,10 +84,11 @@ private static ThreadPoolExecutor initializePool(int size) {
*/
@Override
public synchronized void onConfigurationChange(Configuration conf) {
int newSize = CleanerChore.calculatePoolSize(
conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE));
int newSize = CleanerChore.calculatePoolSize(conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault));
if (newSize == size) {
LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize);
LOG.trace("{} Cleaner Size from configuration is same as previous={}, no need to update.",
name, newSize);
return;
}
size = newSize;
Expand Down Expand Up @@ -109,11 +131,19 @@ synchronized void tryUpdatePoolSize(long timeout) {
break;
}
}
LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size);
LOG.info("Update {} chore's pool size from {} to {}", name, pool.getPoolSize(), size);
pool.setCorePoolSize(size);
}

public int getSize() {
return size;
}

public static DirScanPool getHFileCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.HFILE_CLEANER);
}

public static DirScanPool getLogCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.LOG_CLEANER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public static void setupCluster() throws Exception {
// We don't want the cleaner to remove files. The tests do that.
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().cancel(true);

POOL = new DirScanPool(UTIL.getConfiguration());
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
}

private static void setupConf(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static void setupCluster() throws Exception {
String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
ZKUtil.createWithParents(watcher, archivingZNode);
rss = mock(RegionServerServices.class);
POOL = new DirScanPool(UTIL.getConfiguration());
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
}

private static void setupConf(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class TestCleanerChore {

@BeforeClass
public static void setup() {
POOL = new DirScanPool(UTIL.getConfiguration());
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
}

@AfterClass
Expand Down Expand Up @@ -470,6 +470,57 @@ public void testOnConfigurationChange() throws Exception {
t.join();
}

@Test
public void testOnConfigurationChangeLogCleaner() throws Exception {
int availableProcessorNum = Runtime.getRuntime().availableProcessors();
if (availableProcessorNum == 1) { // no need to run this test
return;
}

DirScanPool pool = DirScanPool.getLogCleanerScanPool(UTIL.getConfiguration());

// have at least 2 available processors/cores
int initPoolSize = availableProcessorNum / 2;
int changedPoolSize = availableProcessorNum;

Stoppable stop = new StoppableImplementation();
Configuration conf = UTIL.getConfiguration();
Path testDir = UTIL.getDataTestDir();
FileSystem fs = UTIL.getTestFileSystem();
String confKey = "hbase.test.cleaner.delegates";
conf.set(confKey, AlwaysDelete.class.getName());
conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(initPoolSize));
final AllValidPaths chore =
new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, pool);
chore.setEnabled(true);
// Create subdirs under testDir
int dirNums = 6;
Path[] subdirs = new Path[dirNums];
for (int i = 0; i < dirNums; i++) {
subdirs[i] = new Path(testDir, "subdir-" + i);
fs.mkdirs(subdirs[i]);
}
// Under each subdirs create 6 files
for (Path subdir : subdirs) {
createFiles(fs, subdir, 6);
}
// Start chore
Thread t = new Thread(new Runnable() {
@Override
public void run() {
chore.chore();
}
});
t.setDaemon(true);
t.start();
// Change size of chore's pool
conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(changedPoolSize));
pool.onConfigurationChange(conf);
assertEquals(changedPoolSize, chore.getChorePoolSize());
// Stop chore
t.join();
}

@Test
public void testMinimumNumberOfThreads() throws Exception {
Configuration conf = UTIL.getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class TestHFileCleaner {
public static void setupCluster() throws Exception {
// have to use a minidfs cluster because the localfs doesn't modify file times correctly
UTIL.startMiniDFSCluster(1);
POOL = new DirScanPool(UTIL.getConfiguration());
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class TestHFileLinkCleaner {

@BeforeClass
public static void setUp() {
POOL = new DirScanPool(TEST_UTIL.getConfiguration());
POOL = DirScanPool.getHFileCleanerScanPool(TEST_UTIL.getConfiguration());
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class TestLogsCleaner {
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
TEST_UTIL.startMiniDFSCluster(1);
POOL = new DirScanPool(TEST_UTIL.getConfiguration());
POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration());
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public class MasterRegionTestBase {

protected ChoreService choreService;

protected DirScanPool cleanerPool;
protected DirScanPool hfileCleanerPool;

protected DirScanPool logCleanerPool;

protected static byte[] CF1 = Bytes.toBytes("f1");

Expand Down Expand Up @@ -90,7 +92,8 @@ protected final void createMasterRegion() throws IOException {
Configuration conf = htu.getConfiguration();
configure(conf);
choreService = new ChoreService(getClass().getSimpleName());
cleanerPool = new DirScanPool(htu.getConfiguration());
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(htu.getConfiguration());
logCleanerPool = DirScanPool.getLogCleanerScanPool(htu.getConfiguration());
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(conf);
when(server.getServerName())
Expand All @@ -117,7 +120,8 @@ protected final void createMasterRegion() throws IOException {
@After
public void tearDown() throws IOException {
region.close(true);
cleanerPool.shutdownNow();
hfileCleanerPool.shutdownNow();
logCleanerPool.shutdownNow();
choreService.shutdown();
htu.cleanupTestDir();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void stop(String why) {
public boolean isStopped() {
return stopped;
}
}, conf, fs, globalArchivePath, cleanerPool);
}, conf, fs, globalArchivePath, hfileCleanerPool);
choreService.scheduleChore(hfileCleaner);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void stop(String why) {
public boolean isStopped() {
return stopped;
}
}, conf, fs, globalWALArchiveDir, cleanerPool, null);
}, conf, fs, globalWALArchiveDir, logCleanerPool, null);
choreService.scheduleChore(logCleaner);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void testDisableSnapshotAndNotDeleteBackReference() throws Exception {

// Initialize cleaner
HFileCleaner cleaner = new HFileCleaner(10000, Mockito.mock(Stoppable.class), conf, fs,
archiveDir, new DirScanPool(UTIL.getConfiguration()));
archiveDir, DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()));
// Link backref and HFile cannot be removed
cleaner.choreForTesting();
assertTrue(fs.exists(linkBackRef));
Expand Down

0 comments on commit 4be25a3

Please sign in to comment.