Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26320 Implement a separate thread pool for the LogCleaner #3712

Merged
merged 1 commit into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,10 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste

private HbckChore hbckChore;
CatalogJanitor catalogJanitorChore;
private DirScanPool cleanerPool;
// Threadpool for scanning the archive directory, used by the HFileCleaner
private DirScanPool hfileCleanerPool;
// Threadpool for scanning the Old logs directory, used by the LogCleaner
private DirScanPool logCleanerPool;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private ReplicationBarrierCleaner replicationBarrierCleaner;
Expand Down Expand Up @@ -1131,7 +1134,8 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
(EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f));
this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime();
configurationManager.registerObserver(this.balancer);
configurationManager.registerObserver(this.cleanerPool);
configurationManager.registerObserver(this.hfileCleanerPool);
configurationManager.registerObserver(this.logCleanerPool);
configurationManager.registerObserver(this.hfileCleaner);
configurationManager.registerObserver(this.logCleaner);
configurationManager.registerObserver(this.regionsRecoveryConfigManager);
Expand Down Expand Up @@ -1493,21 +1497,24 @@ private void startServiceThreads() throws IOException {
ExecutorType.MASTER_TABLE_OPERATIONS).setCorePoolSize(1));
startProcedureExecutor();

// Create cleaner thread pool
cleanerPool = new DirScanPool(conf);
// Create log cleaner thread pool
logCleanerPool = DirScanPool.getLogCleanerScanPool(conf);
Map<String, Object> params = new HashMap<>();
params.put(MASTER, this);
// Start log cleaner thread
int cleanerInterval =
conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool, params);
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(),
logCleanerPool, params);
getChoreService().scheduleChore(logCleaner);

// start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
// Create archive cleaner thread pool
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
getMasterFileSystem().getFileSystem(), archiveDir, hfileCleanerPool, params);
getChoreService().scheduleChore(hfileCleaner);

// Regions Reopen based on very high storeFileRefCount is considered enabled
Expand Down Expand Up @@ -1557,9 +1564,13 @@ protected void stopServiceThreads() {
}
stopChoreService();
stopExecutorService();
if (cleanerPool != null) {
cleanerPool.shutdownNow();
cleanerPool = null;
if (hfileCleanerPool != null) {
hfileCleanerPool.shutdownNow();
hfileCleanerPool = null;
}
if (logCleanerPool != null) {
logCleanerPool.shutdownNow();
logCleanerPool = null;
}
if (maintenanceRegionServer != null) {
maintenanceRegionServer.getRegionServer().stop(HBASE_MASTER_CLEANER_INTERVAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,20 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();

/**
* Configures the threadpool used for scanning the archive directory for the HFileCleaner
* If it is an integer and >= 1, it would be the size;
* if 0.0 < size <= 1.0, size would be available processors * size.
* Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores,
* while latter will use only 1 thread for chore to scan dir.
*/
public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
/**
* Configures the threadpool used for scanning the Old logs directory for the LogCleaner
* Follows the same configuration mechanism as CHORE_POOL_SIZE, but has a default of 1 thread.
*/
public static final String LOG_CLEANER_CHORE_SIZE = "hbase.log.cleaner.scan.dir.concurrent.size";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now we will have (by default) a pool of 1 thread for scan and cleaning of Archived HFiles.
For old WALs the thread pool's default size will be 25% of #cores
Can we add some code level comments here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, pool of 1 thread (by default) is for scanning and cleaning the old WALs. And pool with 25% of #cores (by default) is for archived store files.
+1 adding code level comments.

static final String DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE = "1";

private final DirScanPool pool;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,41 @@ public class DirScanPool implements ConfigurationObserver {
private final ThreadPoolExecutor pool;
private int cleanerLatch;
private boolean reconfigNotification;
private Type dirScanPoolType;
private final String name;

public DirScanPool(Configuration conf) {
String poolSize = conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE);
private enum Type {
LOG_CLEANER(CleanerChore.LOG_CLEANER_CHORE_SIZE,
CleanerChore.DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE),
HFILE_CLEANER(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE);

private final String cleanerPoolSizeConfigName;
private final String cleanerPoolSizeConfigDefault;

private Type(String cleanerPoolSizeConfigName, String cleanerPoolSizeConfigDefault) {
this.cleanerPoolSizeConfigName = cleanerPoolSizeConfigName;
this.cleanerPoolSizeConfigDefault = cleanerPoolSizeConfigDefault;
}
}

private DirScanPool(Configuration conf, Type dirScanPoolType) {
this.dirScanPoolType = dirScanPoolType;
this.name = dirScanPoolType.name().toLowerCase();
String poolSize = conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault);
size = CleanerChore.calculatePoolSize(poolSize);
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
pool = initializePool(size);
LOG.info("Cleaner pool size is {}", size);
size = size == 0 ?
CleanerChore.calculatePoolSize(dirScanPoolType.cleanerPoolSizeConfigDefault) : size;
pool = initializePool(size, name);
LOG.info("{} Cleaner pool size is {}", name, size);
cleanerLatch = 0;
}

private static ThreadPoolExecutor initializePool(int size) {
private static ThreadPoolExecutor initializePool(int size, String name) {
return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES,
new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d").setDaemon(true)
new ThreadFactoryBuilder().setNameFormat(name + "-dir-scan-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

Expand All @@ -64,9 +84,11 @@ private static ThreadPoolExecutor initializePool(int size) {
@Override
public synchronized void onConfigurationChange(Configuration conf) {
int newSize = CleanerChore.calculatePoolSize(
conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE));
conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault));
if (newSize == size) {
LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize);
LOG.trace("{} Cleaner Size from configuration is same as previous={}, no need to update.",
name, newSize);
return;
}
size = newSize;
Expand Down Expand Up @@ -109,11 +131,19 @@ synchronized void tryUpdatePoolSize(long timeout) {
break;
}
}
LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size);
LOG.info("Update {} chore's pool size from {} to {}", name, pool.getPoolSize(), size);
pool.setCorePoolSize(size);
}

public int getSize() {
return size;
}

public static DirScanPool getHFileCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.HFILE_CLEANER);
}

public static DirScanPool getLogCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.LOG_CLEANER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public static void setupCluster() throws Exception {
// We don't want the cleaner to remove files. The tests do that.
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().cancel(true);

POOL = new DirScanPool(UTIL.getConfiguration());
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
}

private static void setupConf(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public static void setupCluster() throws Exception {
String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
ZKUtil.createWithParents(watcher, archivingZNode);
rss = mock(RegionServerServices.class);
POOL = new DirScanPool(UTIL.getConfiguration());
POOL= DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
}

private static void setupConf(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class TestCleanerChore {

@BeforeClass
public static void setup() {
POOL = new DirScanPool(UTIL.getConfiguration());
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
}

@AfterClass
Expand Down Expand Up @@ -469,6 +469,57 @@ public void testOnConfigurationChange() throws Exception {
t.join();
}

@Test
public void testOnConfigurationChangeLogCleaner() throws Exception {
int availableProcessorNum = Runtime.getRuntime().availableProcessors();
if (availableProcessorNum == 1) { // no need to run this test
return;
}

DirScanPool pool = DirScanPool.getLogCleanerScanPool(UTIL.getConfiguration());

// have at least 2 available processors/cores
int initPoolSize = availableProcessorNum / 2;
int changedPoolSize = availableProcessorNum;

Stoppable stop = new StoppableImplementation();
Configuration conf = UTIL.getConfiguration();
Path testDir = UTIL.getDataTestDir();
FileSystem fs = UTIL.getTestFileSystem();
String confKey = "hbase.test.cleaner.delegates";
conf.set(confKey, AlwaysDelete.class.getName());
conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(initPoolSize));
final AllValidPaths chore =
new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, pool);
chore.setEnabled(true);
// Create subdirs under testDir
int dirNums = 6;
Path[] subdirs = new Path[dirNums];
for (int i = 0; i < dirNums; i++) {
subdirs[i] = new Path(testDir, "subdir-" + i);
fs.mkdirs(subdirs[i]);
}
// Under each subdirs create 6 files
for (Path subdir : subdirs) {
createFiles(fs, subdir, 6);
}
// Start chore
Thread t = new Thread(new Runnable() {
@Override
public void run() {
chore.chore();
}
});
t.setDaemon(true);
t.start();
// Change size of chore's pool
conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(changedPoolSize));
pool.onConfigurationChange(conf);
assertEquals(changedPoolSize, chore.getChorePoolSize());
// Stop chore
t.join();
}

@Test
public void testMinimumNumberOfThreads() throws Exception {
Configuration conf = UTIL.getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class TestHFileCleaner {
public static void setupCluster() throws Exception {
// have to use a minidfs cluster because the localfs doesn't modify file times correctly
UTIL.startMiniDFSCluster(1);
POOL = new DirScanPool(UTIL.getConfiguration());
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class TestHFileLinkCleaner {

@BeforeClass
public static void setUp() {
POOL = new DirScanPool(TEST_UTIL.getConfiguration());
POOL = DirScanPool.getHFileCleanerScanPool(TEST_UTIL.getConfiguration());
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public class TestLogsCleaner {
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
TEST_UTIL.startMiniDFSCluster(1);
POOL = new DirScanPool(TEST_UTIL.getConfiguration());
POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration());
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public class MasterRegionTestBase {

protected ChoreService choreService;

protected DirScanPool cleanerPool;
protected DirScanPool hfileCleanerPool;

protected DirScanPool logCleanerPool;

protected static byte[] CF1 = Bytes.toBytes("f1");

Expand Down Expand Up @@ -80,7 +82,8 @@ public void setUp() throws IOException {
htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
configure(htu.getConfiguration());
choreService = new ChoreService(getClass().getSimpleName());
cleanerPool = new DirScanPool(htu.getConfiguration());
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(htu.getConfiguration());
logCleanerPool = DirScanPool.getLogCleanerScanPool(htu.getConfiguration());
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(htu.getConfiguration());
when(server.getServerName())
Expand All @@ -103,7 +106,8 @@ public void setUp() throws IOException {
@After
public void tearDown() throws IOException {
region.close(true);
cleanerPool.shutdownNow();
hfileCleanerPool.shutdownNow();
logCleanerPool.shutdownNow();
choreService.shutdown();
htu.cleanupTestDir();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void stop(String why) {
public boolean isStopped() {
return stopped;
}
}, conf, fs, globalArchivePath, cleanerPool);
}, conf, fs, globalArchivePath, hfileCleanerPool);
choreService.scheduleChore(hfileCleaner);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void stop(String why) {
public boolean isStopped() {
return stopped;
}
}, conf, fs, globalWALArchiveDir, cleanerPool, null);
}, conf, fs, globalWALArchiveDir, logCleanerPool, null);
choreService.scheduleChore(logCleaner);
}

Expand Down