Skip to content

Commit

Permalink
feat(FileWatcher): observe file changes on file system
Browse files Browse the repository at this point in the history
Features:
1. multi-thread file watcher, thread pool based
2. file
Watcher singleton

BREAKING CHANGE: observe file changes on file system
  • Loading branch information
johnnymillergh committed Oct 20, 2020
1 parent 7c00b2b commit 26ca2eb
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 12 deletions.
5 changes: 5 additions & 0 deletions media-streaming-spring-boot-autoconfigure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,10 @@
<artifactId>metadata-extractor</artifactId>
<version>2.15.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.jmframework.boot.mediastreamingspringbootautoconfigure.configuration;

import com.jmframework.boot.mediastreamingspringbootautoconfigure.filewatch.FileWatcher;
import com.jmframework.boot.mediastreamingspringbootautoconfigure.filewatch.FileWatcherHandler;
import com.jmframework.boot.mediastreamingspringbootautoconfigure.model.Video;
import com.jmframework.boot.mediastreamingspringbootautoconfigure.repository.VideoRepository;
import com.jmframework.boot.mediastreamingspringbootautoconfigure.services.FileService;
Expand All @@ -19,10 +21,29 @@
public class Bootstrap implements CommandLineRunner {
private final VideoRepository videoRepository;
private final FileService fileService;
private final MediaStreamingProperties mediaStreamingProperties;

@PostConstruct
public void afterInitialization() {
log.debug("Bootstrap initialization is done. Start to process videos.");
log.debug("Starting FileWatcher...");
FileWatcher fileWatcher = new FileWatcher(mediaStreamingProperties.getVideoDirectoryOnFileSystem());
fileWatcher.setFileWatcherHandler(new FileWatcherHandler() {
@Override
public void onCreated(String file) {
log.info("onCreated: {}", file);
}

@Override
public void onDeleted(String file) {
log.info("onDeleted: {}", file);
}

@Override
public void onModified(String file) {
log.info("onModified: {}", file);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.jmframework.boot.mediastreamingspringbootautoconfigure.handler.MediaStreamingExceptionHandler;
import com.jmframework.boot.mediastreamingspringbootautoconfigure.handler.VideoRouteHandler;
import com.jmframework.boot.mediastreamingspringbootautoconfigure.repository.VideoRepository;
import com.jmframework.boot.mediastreamingspringbootautoconfigure.repository.impl.InMemoryVideoRepository;
import com.jmframework.boot.mediastreamingspringbootautoconfigure.repository.impl.InMemoryVideoOnFileSystemRepository;
import com.jmframework.boot.mediastreamingspringbootautoconfigure.services.FileService;
import com.jmframework.boot.mediastreamingspringbootautoconfigure.services.VideoService;
import com.jmframework.boot.mediastreamingspringbootautoconfigure.services.impl.FileServiceImpl;
Expand Down Expand Up @@ -44,7 +44,7 @@ public void afterInitialization() {
@Bean
@ConditionalOnMissingBean
public Bootstrap bootstrap(VideoRepository videoRepository, FileService fileService) {
return new Bootstrap(videoRepository, fileService);
return new Bootstrap(videoRepository, fileService, mediaStreamingProperties);
}

@Bean
Expand All @@ -65,7 +65,16 @@ public VideoRouteHandler videoRouteHandler(VideoService videoService, FileServic
return new VideoRouteHandler(videoService, fileService);
}

/**
* Video end point router function.
* <p>
* TODO: solve Nullable Problems
*
* @param videoRouteHandler the video route handler
* @return the router function
*/
@Bean
@SuppressWarnings("NullableProblems")
public RouterFunction<ServerResponse> videoEndPoint(VideoRouteHandler videoRouteHandler) {
log.info("videoEndPoint");
return route()
Expand All @@ -91,7 +100,7 @@ public FileService fileService() {
@Bean
@ConditionalOnMissingBean
public VideoRepository videoRepository() {
return new InMemoryVideoRepository();
return new InMemoryVideoOnFileSystemRepository();
}

@SuppressWarnings("SameParameterValue")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.jmframework.boot.mediastreamingspringbootautoconfigure.filewatch;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.PreDestroy;
import java.nio.file.*;
import java.util.Optional;
import java.util.concurrent.*;

import static java.nio.file.StandardWatchEventKinds.*;

/**
* Description: FileWatcher, change description here.
*
* @author Johnny Miller (锺俊), email: [email protected], date: 10/20/2020 3:19 PM
* @see <a href='https://github.com/WhileLoop/file-watcher/'>Inspired by file-watcher</a>
**/
@Slf4j
@Setter
public class FileWatcher {
private static final ThreadFactory NAMED_THREAD_FACTORY =
new ThreadFactoryBuilder().setNameFormat("file-watcher-thread-%d").build();
private static final ExecutorService THREAD_POOL =
new ThreadPoolExecutor(1, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy());

private final Path monitoredPath;
private FileWatcherHandler fileWatcherHandler;

public FileWatcher(String directory) {
this(Paths.get(directory));
}

@SneakyThrows
private FileWatcher(Path path) {
this.monitoredPath = path;
this.monitoredPath.register(WatchServiceSingleton.getInstance(),
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
THREAD_POOL.execute(this::monitor);
}

private void monitor() {
log.debug("Started watching: {}", this.monitoredPath);
while (true) {
// wait for key to be signaled
Optional<WatchKey> optionalWatchKey = Optional.ofNullable(WatchServiceSingleton.getInstance().poll());
if (optionalWatchKey.isPresent()) {
var watchKey = optionalWatchKey.get();
for (var watchEvent : watchKey.pollEvents()) {
WatchEvent.Kind<?> kind = watchEvent.kind();

// This key is registered only for ENTRY_CREATE events,
// but an OVERFLOW event can occur regardless if events are lost or discarded.
if (kind == OVERFLOW) {
continue;
}

// The filename is the context of the event.
@SuppressWarnings("unchecked")
WatchEvent<Path> event = (WatchEvent<Path>) watchEvent;
Path filename = event.context();

// Resolve the filename against the directory.
// If the filename is "test" and the directory is "foo", the resolved name is "test/foo".
Path child = monitoredPath.resolve(filename);
String file = child.toString();
if (kind == ENTRY_CREATE) {
fileWatcherHandler.onCreated(file);
} else if (kind == ENTRY_DELETE) {
fileWatcherHandler.onDeleted(file);
} else if (kind == ENTRY_MODIFY) {
fileWatcherHandler.onModified(file);
}
}

// Reset the key -- this step is critical if you want to receive further watch events.
// If the key is no longer valid, the directory is inaccessible so exit the loop.
boolean valid = watchKey.reset();
if (!valid) {
log.debug("The watch key wasn't valid. {}", watchKey);
return;
}
}
}
}

@PreDestroy
@SneakyThrows
private void onPreDestroy() {
THREAD_POOL.awaitTermination(5, TimeUnit.SECONDS);
log.debug("THREAD_POOL for FileWatcher was terminated.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.jmframework.boot.mediastreamingspringbootautoconfigure.filewatch;

/**
* Description: FileWatcherHandler, change description here.
*
* @author Johnny Miller (锺俊), email: [email protected], date: 10/20/2020 3:22 PM
*/
public interface FileWatcherHandler {
/**
* On created.
*
* @param file the file
*/
void onCreated(String file);

/**
* On deleted.
*
* @param file the file
*/
void onDeleted(String file);

/**
* On modified.
*
* @param file the file
*/
void onModified(String file);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.jmframework.boot.mediastreamingspringbootautoconfigure.filewatch;

import lombok.SneakyThrows;

import java.nio.file.FileSystems;
import java.nio.file.WatchService;

/**
* Description: SingletonWatchService, change description here.
*
* @author Johnny Miller (锺俊), email: [email protected], date: 10/20/2020 4:52 PM
**/
public class WatchServiceSingleton {
private static volatile WatchService singletonInstance;

private WatchServiceSingleton() {
}

@SneakyThrows
public static WatchService getInstance() {
if (singletonInstance == null) {
synchronized (WatchServiceSingleton.class) {
if (singletonInstance == null) {
singletonInstance = FileSystems.getDefault().newWatchService();
}
}
}
return singletonInstance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class VideoRouteHandler {
* @param request the request
* @return the mono
*/

public Mono<ServerResponse> listVideos(ServerRequest request) {

Flux<Video> videos = videoService.getAllVideos();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,12 @@ public interface VideoRepository {
* @return the mono
*/
Mono<Video> addVideo(Video video);

/**
* Delete video by name mono.
*
* @param name the name
* @return the mono
*/
Mono<Video> deleteVideoByName(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@
import java.util.concurrent.ConcurrentHashMap;

/**
* Description: InMemoryVideoRepository, change description here.
* Description: InMemoryVideoOnFileSystemRepository, change description here.
*
* @author Johnny Miller (锺俊), email: [email protected]
* date 10/19/2020 5:16 PM
**/
public class InMemoryVideoRepository implements VideoRepository {

public class InMemoryVideoOnFileSystemRepository implements VideoRepository {
private final Map<String, Video> videoCache = new ConcurrentHashMap<>();

@Override
Expand All @@ -33,15 +32,16 @@ public Mono<Video> getVideoByName(String name) {

@Override
public Flux<Video> getAllVideos() {
synchronized (videoCache) {
return Flux.fromIterable(videoCache.values());
}
return Flux.fromIterable(videoCache.values());
}

@Override
public Mono<Video> addVideo(Video video) {
synchronized (videoCache) {
return Mono.fromCallable(() -> videoCache.put(video.getName(), video));
}
return Mono.fromCallable(() -> videoCache.put(video.getName(), video));
}

@Override
public Mono<Video> deleteVideoByName(String name) {
return Mono.fromCallable(() -> videoCache.remove(name));
}
}

0 comments on commit 26ca2eb

Please sign in to comment.