Skip to content

Commit

Permalink
perf(FileWatcher): gracefully destroy FileWatcher
Browse files Browse the repository at this point in the history
@Setter(AccessLevel.NONE)
private volatile boolean terminated = false;
  • Loading branch information
johnnymillergh committed Oct 27, 2020
1 parent 2195010 commit f1b9720
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sun.nio.file.SensitivityWatchEventModifier;
import lombok.AccessLevel;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -26,11 +27,12 @@
@Setter
public class FileWatcher {
private static final ThreadFactory NAMED_THREAD_FACTORY =
new ThreadFactoryBuilder().setNameFormat("file-watcher-thread-%d").build();
new ThreadFactoryBuilder().setNameFormat("file-watcher-%d").build();
private static final ExecutorService THREAD_POOL =
new ThreadPoolExecutor(1, 4, 0L, TimeUnit.MILLISECONDS,
new ThreadPoolExecutor(2, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), NAMED_THREAD_FACTORY,
new ThreadPoolExecutor.AbortPolicy());
private static final long INTERVAL = 10L;
private static final ConcurrentHashMap<WatchKey, Path> WATCH_KEY_MAP = new ConcurrentHashMap<>(256);
/**
* Register path. Store WatchKey for each directory in WATCH_KEY_MAP.
Expand Down Expand Up @@ -60,7 +62,8 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) th
throw new RuntimeException(String.format("Error registering path: %s", path));
}
};

@Setter(AccessLevel.NONE)
private volatile boolean terminated = false;
private final Path monitoredPath;
private FileWatcherHandler fileWatcherHandler;

Expand All @@ -82,7 +85,7 @@ private FileWatcher(Path path) {
* @author Johnny Miller (锺俊), email: [email protected], date: 10/27/2020 9:46 AM
*/
private void monitorRecursively() {
while (true) {
while (!terminated) {
// Wait for key to be signaled
final Optional<WatchKey> optionalWatchKey;
try {
Expand Down Expand Up @@ -113,10 +116,10 @@ private void monitorRecursively() {
val file = absolutePath.toFile();
val kind = watchEvent.kind();
if (file.isDirectory()) {
log.debug("Absolute path found, directory: {}", absolutePath);
log.debug("Absolute path found. Path: {}", absolutePath);
REGISTER.accept(absolutePath);
} else {
log.debug("Detected file changed. File: {}, WatchEvent kind: {}", file, kind);
log.debug("Detected file change. File: {}, WatchEvent kind: {}", file, kind);
val optionalFileWatcherHandler = Optional.ofNullable(this.fileWatcherHandler);
if (optionalFileWatcherHandler.isEmpty()) {
log.warn("FileWatcherHandler is null! FileWatcher will not work properly.");
Expand Down Expand Up @@ -157,7 +160,44 @@ private void monitorRecursively() {
@SneakyThrows
public void destroy() {
WatchServiceSingleton.close();
val terminated = THREAD_POOL.awaitTermination(5, TimeUnit.SECONDS);
log.debug("Terminated THREAD_POOL for FileWatcher. Termination Result: {}", terminated);
synchronized (this) {
this.terminated = true;
}
this.shutdownAndAwaitTermination();
}

/**
* Shutdown and await termination. The following method shuts down an ExecutorService in two phases, first by
* calling shutdown to reject incoming tasks, and then calling shutdownNow, if necessary, to cancel any lingering
* tasks.
*
* @author Johnny Miller (锺俊), email: [email protected], date: 10/27/2020 1:29 PM
* @see
* <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ExecutorService.html">ExecutorService</a>
*/
private void shutdownAndAwaitTermination() {
// Disable new tasks from being submitted
THREAD_POOL.shutdown();
log.debug("Shutdown THREAD_POOL for FileWatcher done.");
try {
// Wait a while for existing tasks to terminate
if (!THREAD_POOL.awaitTermination(INTERVAL, TimeUnit.SECONDS)) {
THREAD_POOL.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled,
// true if this executor terminated and false if the timeout elapsed before termination
if (!THREAD_POOL.awaitTermination(INTERVAL, TimeUnit.SECONDS)) {
log.debug("THREAD_POOL for FileWatcher did not terminate. Current thread: {}, state: {}",
Thread.currentThread(), Thread.currentThread().getState());
}
}
} catch (InterruptedException e) {
log.debug("InterruptedException occurred when shutting down THREAD_POOL", e);
// (Re-)Cancel if current thread also interrupted
THREAD_POOL.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
log.debug("THREAD_POOL for FileWatcher terminated. Current thread: {}, state: {}", Thread.currentThread(),
Thread.currentThread().getState());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ public static WatchService getInstance() {
synchronized (WatchServiceSingleton.class) {
if (singletonInstance == null) {
singletonInstance = FileSystems.getDefault().newWatchService();
log.debug("WatchService instance initiated.");
}
}
}
log.debug("WatchService is not null, returned directly");
return singletonInstance;
}

Expand All @@ -54,6 +56,7 @@ public static synchronized void close() {
Optional.ofNullable(singletonInstance).ifPresent(watchService -> {
try {
watchService.close();
log.debug("WatchService closed.");
} catch (IOException e) {
log.error("Exception occurred when closing WatchService.", e);
}
Expand Down

0 comments on commit f1b9720

Please sign in to comment.