From e4b4cef80e127d1dc16c99c5a87fd2dd340cb2b6 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Sat, 8 Apr 2023 10:23:55 -0400 Subject: [PATCH] HBASE-27780 FileChangeWatcher improvements (#5164) Signed-off-by: Duo Zhang --- .../hadoop/hbase/io/FileChangeWatcher.java | 24 +++++++--- .../hadoop/hbase/io/crypto/tls/X509Util.java | 23 +++++---- .../hbase/io/TestFileChangeWatcher.java | 48 +++++++++++++++++-- 3 files changed, 74 insertions(+), 21 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java index 77e0e4e750ce..70300da45d92 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java @@ -27,7 +27,6 @@ import java.nio.file.WatchService; import java.util.function.Consumer; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.server.ZooKeeperThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +71,8 @@ public enum State { * relative to dirPath. * @throws IOException if there is an error creating the WatchService. */ - public FileChangeWatcher(Path dirPath, Consumer> callback) throws IOException { + public FileChangeWatcher(Path dirPath, String threadNameSuffix, Consumer> callback) + throws IOException { FileSystem fs = dirPath.getFileSystem(); WatchService watchService = fs.newWatchService(); @@ -83,7 +83,7 @@ public FileChangeWatcher(Path dirPath, Consumer> callback) throws StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.OVERFLOW }); state = State.NEW; - this.watcherThread = new WatcherThread(watchService, callback); + this.watcherThread = new WatcherThread(threadNameSuffix, watchService, callback); this.watcherThread.setDaemon(true); } @@ -172,20 +172,30 @@ public void stop() { } } + String getWatcherThreadName() { + return watcherThread.getName(); + } + + private static void handleException(Thread thread, Throwable e) { + LOG.warn("Exception occurred from thread {}", thread.getName(), e); + } + /** * Inner class that implements the watcher thread logic. */ - private class WatcherThread extends ZooKeeperThread { + private class WatcherThread extends Thread { - private static final String THREAD_NAME = "FileChangeWatcher"; + private static final String THREAD_NAME_PREFIX = "FileChangeWatcher-"; final WatchService watchService; final Consumer> callback; - WatcherThread(WatchService watchService, Consumer> callback) { - super(THREAD_NAME); + WatcherThread(String threadNameSuffix, WatchService watchService, + Consumer> callback) { + super(THREAD_NAME_PREFIX + threadNameSuffix); this.watchService = watchService; this.callback = callback; + setUncaughtExceptionHandler(FileChangeWatcher::handleException); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java index f120b457b5a3..ac910c4d1239 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java @@ -74,11 +74,11 @@ public final class X509Util { static final String CONFIG_PREFIX = "hbase.rpc.tls."; public static final String TLS_CONFIG_PROTOCOL = CONFIG_PREFIX + "protocol"; public static final String TLS_CONFIG_KEYSTORE_LOCATION = CONFIG_PREFIX + "keystore.location"; - static final String TLS_CONFIG_KEYSTORE_TYPE = CONFIG_PREFIX + "keystore.type"; - static final String TLS_CONFIG_KEYSTORE_PASSWORD = CONFIG_PREFIX + "keystore.password"; - static final String TLS_CONFIG_TRUSTSTORE_LOCATION = CONFIG_PREFIX + "truststore.location"; - static final String TLS_CONFIG_TRUSTSTORE_TYPE = CONFIG_PREFIX + "truststore.type"; - static final String TLS_CONFIG_TRUSTSTORE_PASSWORD = CONFIG_PREFIX + "truststore.password"; + public static final String TLS_CONFIG_KEYSTORE_TYPE = CONFIG_PREFIX + "keystore.type"; + public static final String TLS_CONFIG_KEYSTORE_PASSWORD = CONFIG_PREFIX + "keystore.password"; + public static final String TLS_CONFIG_TRUSTSTORE_LOCATION = CONFIG_PREFIX + "truststore.location"; + public static final String TLS_CONFIG_TRUSTSTORE_TYPE = CONFIG_PREFIX + "truststore.type"; + public static final String TLS_CONFIG_TRUSTSTORE_PASSWORD = CONFIG_PREFIX + "truststore.password"; public static final String TLS_CONFIG_CLR = CONFIG_PREFIX + "clr"; public static final String TLS_CONFIG_OCSP = CONFIG_PREFIX + "ocsp"; public static final String TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED = @@ -417,7 +417,11 @@ public static void enableCertFileReloading(Configuration config, String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, ""); keystoreWatcher.set(newFileChangeWatcher(keyStoreLocation, resetContext)); String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, ""); - trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation, resetContext)); + // we are using the same callback for both. there's no reason to kick off two + // threads if keystore/truststore are both at the same location + if (!keyStoreLocation.equals(trustStoreLocation)) { + trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation, resetContext)); + } } private static FileChangeWatcher newFileChangeWatcher(String fileLocation, Runnable resetContext) @@ -430,9 +434,10 @@ private static FileChangeWatcher newFileChangeWatcher(String fileLocation, Runna if (parentPath == null) { throw new IOException("Key/trust store path does not have a parent: " + filePath); } - FileChangeWatcher fileChangeWatcher = new FileChangeWatcher(parentPath, watchEvent -> { - handleWatchEvent(filePath, watchEvent, resetContext); - }); + FileChangeWatcher fileChangeWatcher = + new FileChangeWatcher(parentPath, Objects.toString(filePath.getFileName()), watchEvent -> { + handleWatchEvent(filePath, watchEvent, resetContext); + }); fileChangeWatcher.start(); return fileChangeWatcher; } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java index cee368596292..ea00110d816c 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.hbase.io; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.File; @@ -29,11 +32,15 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseCommonTestingUtil; +import org.apache.hadoop.hbase.io.crypto.tls.X509Util; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.hamcrest.Matchers; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -76,12 +83,43 @@ public static void cleanupTempDir() { UTIL.cleanupTestDir(); } + @Test + public void testEnableCertFileReloading() throws IOException { + Configuration myConf = new Configuration(); + String sharedPath = "/tmp/foo.jks"; + myConf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION, sharedPath); + myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, sharedPath); + AtomicReference keystoreWatcher = new AtomicReference<>(); + AtomicReference truststoreWatcher = new AtomicReference<>(); + X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> { + }); + assertNotNull(keystoreWatcher.get()); + assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-foo.jks")); + assertNull(truststoreWatcher.get()); + + keystoreWatcher.getAndSet(null).stop(); + truststoreWatcher.set(null); + + String truststorePath = "/tmp/bar.jks"; + myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, truststorePath); + X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> { + }); + + assertNotNull(keystoreWatcher.get()); + assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-foo.jks")); + assertNotNull(truststoreWatcher.get()); + assertThat(truststoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-bar.jks")); + + keystoreWatcher.getAndSet(null).stop(); + truststoreWatcher.getAndSet(null).stop(); + } + @Test public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException { FileChangeWatcher watcher = null; try { final List> events = new ArrayList<>(); - watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> { LOG.info("Got an update: {} {}", event.kind(), event.context()); // Filter out the extra ENTRY_CREATE events that are // sometimes seen at the start. Even though we create the watcher @@ -124,7 +162,7 @@ public void testCallbackWorksOnFileTouched() throws IOException, InterruptedExce FileChangeWatcher watcher = null; try { final List> events = new ArrayList<>(); - watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> { LOG.info("Got an update: {} {}", event.kind(), event.context()); // Filter out the extra ENTRY_CREATE events that are // sometimes seen at the start. Even though we create the watcher @@ -164,7 +202,7 @@ public void testCallbackWorksOnFileAdded() throws IOException, InterruptedExcept FileChangeWatcher watcher = null; try { final List> events = new ArrayList<>(); - watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> { LOG.info("Got an update: {} {}", event.kind(), event.context()); synchronized (events) { events.add(event); @@ -198,7 +236,7 @@ public void testCallbackWorksOnFileDeleted() throws IOException, InterruptedExce FileChangeWatcher watcher = null; try { final List> events = new ArrayList<>(); - watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> { LOG.info("Got an update: {} {}", event.kind(), event.context()); // Filter out the extra ENTRY_CREATE events that are // sometimes seen at the start. Even though we create the watcher @@ -238,7 +276,7 @@ public void testCallbackErrorDoesNotCrashWatcherThread() FileChangeWatcher watcher = null; try { final AtomicInteger callCount = new AtomicInteger(0); - watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> { LOG.info("Got an update: {} {}", event.kind(), event.context()); int oldValue; synchronized (callCount) {