Skip to content

Commit

Permalink
perf(FileWatcher): recursively monitor directory
Browse files Browse the repository at this point in the history
fixed file not deleted under sub directory problem

BREAKING CHANGE: recursively monitor directory; fixed file not deleted
under sub directory problem
  • Loading branch information
johnnymillergh committed Oct 27, 2020
1 parent 65b2dbf commit 2195010
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public VideoRouteHandler videoRouteHandler(VideoService videoService, FileServic
@Deprecated
@SuppressWarnings("NullableProblems")
public RouterFunction<ServerResponse> videoEndPoint(VideoRouteHandler videoRouteHandler) {
log.info("videoEndPoint");
log.debug("videoEndPoint");
return route()
.nest(path("/videos"), builder -> builder.GET("", videoRouteHandler::listVideos)
.nest(path("/{name}"), videoBuilder -> videoBuilder.GET("", param("partial"),
Expand All @@ -89,8 +89,8 @@ public RouterFunction<ServerResponse> videoEndPoint(VideoRouteHandler videoRoute
}

@Bean
public VideoController videoController(VideoRepository videoRepository) {
return new VideoController(videoRepository);
public VideoController videoController(VideoService videoService) {
return new VideoController(videoService);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ private void postConstruct() {
fileWatcher = new FileWatcher(mediaStreamingProperties.getVideoDirectoryOnFileSystem());
} catch (Exception e) {
log.error("Cannot build FileWatcher, file observation failed! " +
"Check `media-streaming.videoDirectoryOnFileSystem` configuration.", e);
"Check `media-streaming.videoDirectoryOnFileSystem` configuration.", e);
return;
}
fileWatcher.setFileWatcherHandler(new FileWatcherHandler() {
@Override
public void onCreated(Path file) {
log.debug("Created file observed: {}", file);
Video video = new Video();
video.setName(file.getFileName().toString());
video.setLocation(file);
Expand All @@ -51,13 +50,11 @@ public void onCreated(Path file) {

@Override
public void onDeleted(Path file) {
log.debug("Deleted file observed: {}", file);
Mono.just(file).then(videoRepository.deleteVideoByPath(file)).subscribe();
}

@Override
public void onModified(Path file) {
log.info("Modified file observed: {}", file);
Video video = new Video();
video.setName(file.getFileName().toString());
video.setLocation(file);
Expand All @@ -69,7 +66,7 @@ public void onModified(Path file) {
@Override
public void run(String... args) {
fileService.getAllFiles()
.doOnNext(path -> log.debug("found file in path: " + path.toUri() + " FileName: " + path.getFileName()))
.doOnNext(path -> log.debug("Found file in path: {}, file name: {}", path.toUri(), path.getFileName()))
.flatMap(path -> {
Video video = new Video();
video.setName(path.getFileName().toString());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.github.johnnymillergh.boot.mediastreamingspringbootautoconfigure.controller;

import com.github.johnnymillergh.boot.mediastreamingspringbootautoconfigure.model.Video;
import com.github.johnnymillergh.boot.mediastreamingspringbootautoconfigure.repository.VideoRepository;
import com.github.johnnymillergh.boot.mediastreamingspringbootautoconfigure.services.VideoService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
Expand All @@ -19,10 +19,10 @@
@RestController
@RequiredArgsConstructor
public class VideoController {
private final VideoRepository videoRepository;
private final VideoService videoService;

@GetMapping(value = "/video-annotation", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<List<Video>> getVideo() {
return Flux.interval(Duration.ofSeconds(2)).map(aLong -> videoRepository.getAllVideoList());
return Flux.interval(Duration.ofSeconds(2)).map(aLong -> videoService.getAllVideoList());
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.github.johnnymillergh.boot.mediastreamingspringbootautoconfigure.filewatch;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sun.nio.file.SensitivityWatchEventModifier;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.function.Consumer;

import static java.nio.file.StandardWatchEventKinds.*;

Expand All @@ -23,9 +28,38 @@ public class FileWatcher {
private static final ThreadFactory NAMED_THREAD_FACTORY =
new ThreadFactoryBuilder().setNameFormat("file-watcher-thread-%d").build();
private static final ExecutorService THREAD_POOL =
new ThreadPoolExecutor(1, 2, 0L, TimeUnit.MILLISECONDS,
new ThreadPoolExecutor(1, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), NAMED_THREAD_FACTORY,
new ThreadPoolExecutor.AbortPolicy());
private static final ConcurrentHashMap<WatchKey, Path> WATCH_KEY_MAP = new ConcurrentHashMap<>(256);
/**
* Register path. Store WatchKey for each directory in WATCH_KEY_MAP.
*/
private static final Consumer<Path> REGISTER = (path) -> {
if (!path.toFile().exists() || !path.toFile().isDirectory()) {
throw new RuntimeException(String.format("Folder %s does not exist or is not a directory!", path));
}
if (WATCH_KEY_MAP.containsValue(path)) {
log.debug("Found duplicated path in WATCH_KEY_MAP, will not register again. Path: {}", path);
return;
}
try {
Files.walkFileTree(path, new SimpleFileVisitor<>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
WatchKey watchKey = dir.register(WatchServiceSingleton.getInstance(),
new WatchEvent.Kind[]{ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY},
SensitivityWatchEventModifier.HIGH);
WATCH_KEY_MAP.put(watchKey, dir);
log.debug("Registering {} in watcher service. WATCH_KEY_MAP size: {}", dir,
WATCH_KEY_MAP.values().size());
return FileVisitResult.CONTINUE;
}
});
} catch (IOException e) {
throw new RuntimeException(String.format("Error registering path: %s", path));
}
};

private final Path monitoredPath;
private FileWatcherHandler fileWatcherHandler;
Expand All @@ -37,67 +71,93 @@ public FileWatcher(String directory) {
@SneakyThrows
private FileWatcher(Path path) {
this.monitoredPath = path;
this.monitoredPath.register(WatchServiceSingleton.getInstance(),
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
THREAD_POOL.execute(this::monitor);
log.debug("Starting Recursive Watcher");
REGISTER.accept(monitoredPath);
THREAD_POOL.execute(this::monitorRecursively);
}

private void monitor() {
log.debug("Started watching: {}", this.monitoredPath);
/**
* Monitor directory recursively.
*
* @author Johnny Miller (锺俊), email: [email protected], date: 10/27/2020 9:46 AM
*/
private void monitorRecursively() {
while (true) {
// wait for key to be signaled
Optional<WatchKey> optionalWatchKey;
// Wait for key to be signaled
final Optional<WatchKey> optionalWatchKey;
try {
optionalWatchKey = Optional.ofNullable(WatchServiceSingleton.getInstance().poll());
} catch (ClosedWatchServiceException e) {
log.error("Detected closed WatchService.", e);
log.error("Detected closed WatchService. Terminating followup FileWatcher operations.", e);
return;
}
if (optionalWatchKey.isPresent()) {
var watchKey = optionalWatchKey.get();
for (var watchEvent : watchKey.pollEvents()) {
WatchEvent.Kind<?> kind = watchEvent.kind();

// This key is registered only for ENTRY_CREATE events,
// but an OVERFLOW event can occur regardless if events are lost or discarded.
if (kind == OVERFLOW) {
continue;
}

// The filename is the context of the event.
@SuppressWarnings("unchecked")
WatchEvent<Path> event = (WatchEvent<Path>) watchEvent;
Path filename = event.context();

// Resolve the filename against the directory.
// If the filename is "test" and the directory is "foo", the resolved name is "foo/test".
Path child = monitoredPath.resolve(filename);
if (kind == ENTRY_CREATE) {
fileWatcherHandler.onCreated(child);
} else if (kind == ENTRY_DELETE) {
fileWatcherHandler.onDeleted(child);
} else if (kind == ENTRY_MODIFY) {
fileWatcherHandler.onModified(child);
}
if (optionalWatchKey.isPresent()) {
val watchKey = optionalWatchKey.get();
val optionalDirectory = Optional.ofNullable(WATCH_KEY_MAP.get(watchKey));
if (optionalDirectory.isEmpty()) {
log.warn("WatchKey {} not recognized!", watchKey);
continue;
}

watchKey.pollEvents()
.stream()
// This watcherKey is registered only for ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY events,
// but an OVERFLOW event can occur regardless if events are lost or discarded.
.filter(watchEvent -> (watchEvent.kind() != OVERFLOW))
// Iterate WatchEvent
.forEach(watchEvent -> {
// The filename is the context of the event if possible.
@SuppressWarnings("unchecked") val absolutePath =
optionalDirectory.get().resolve(((WatchEvent<Path>) watchEvent).context());
val file = absolutePath.toFile();
val kind = watchEvent.kind();
if (file.isDirectory()) {
log.debug("Absolute path found, directory: {}", absolutePath);
REGISTER.accept(absolutePath);
} else {
log.debug("Detected file changed. File: {}, WatchEvent kind: {}", file, kind);
val optionalFileWatcherHandler = Optional.ofNullable(this.fileWatcherHandler);
if (optionalFileWatcherHandler.isEmpty()) {
log.warn("FileWatcherHandler is null! FileWatcher will not work properly.");
}
optionalFileWatcherHandler.ifPresent(handler -> {
if (kind == ENTRY_CREATE) {
this.fileWatcherHandler.onCreated(absolutePath);
} else if (kind == ENTRY_DELETE) {
this.fileWatcherHandler.onDeleted(absolutePath);
} else if (kind == ENTRY_MODIFY) {
this.fileWatcherHandler.onModified(absolutePath);
}
});
}
});

// IMPORTANT: The key must be reset after processed
// Reset the key -- this step is critical if you want to receive further watch events.
// If the key is no longer valid, the directory is inaccessible so exit the loop.
boolean valid = watchKey.reset();
val valid = watchKey.reset();
if (!valid) {
log.debug("The watch key wasn't valid. {}", watchKey);
return;
break;
}
}
}
}

/**
* Destroy FileWatcher.
* <p>
* 1. Close WatchService
* <p>
* 2. Terminate THREAD_POOL
*
* @author Johnny Miller (锺俊), email: [email protected], date: 10/27/2020 10:01 AM
*/
@SneakyThrows
public void destroy() {
WatchServiceSingleton.close();
THREAD_POOL.awaitTermination(5, TimeUnit.SECONDS);
log.debug("THREAD_POOL for FileWatcher was terminated.");
val terminated = THREAD_POOL.awaitTermination(5, TimeUnit.SECONDS);
log.debug("Terminated THREAD_POOL for FileWatcher. Termination Result: {}", terminated);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;

/**
* Description: VideoService, change description here.
*
Expand Down Expand Up @@ -44,4 +46,11 @@ public interface VideoService {
* @return the long
*/
long lengthOf(UrlResource urlResource);

/**
* Gets all video list.
*
* @return the all video list
*/
List<Video> getAllVideoList();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.net.MalformedURLException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.Long.min;
Expand Down Expand Up @@ -110,6 +111,11 @@ public long lengthOf(UrlResource urlResource) {
return fileLength;
}

@Override
public List<Video> getAllVideoList() {
return videoRepository.getAllVideoList();
}

public long getChunkSize(int size) {
long responseSize;
switch (size) {
Expand Down

0 comments on commit 2195010

Please sign in to comment.