Skip to content

Commit

Permalink
perf(media-streaming): release resources when shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnymillergh committed Oct 20, 2020
1 parent 58f423c commit 44a54cc
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import reactor.core.publisher.Mono;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.file.Path;

/**
Expand All @@ -24,12 +25,12 @@ public class MediaStreamingBootstrap implements CommandLineRunner {
private final VideoRepository videoRepository;
private final FileService fileService;
private final MediaStreamingProperties mediaStreamingProperties;
private FileWatcher fileWatcher;

@PostConstruct
private void postConstruct() {
log.debug("MediaStreamingBootstrap initialization is done. Start to process videos.");
log.debug("Starting FileWatcher...");
FileWatcher fileWatcher;
try {
fileWatcher = new FileWatcher(mediaStreamingProperties.getVideoDirectoryOnFileSystem());
} catch (Exception e) {
Expand Down Expand Up @@ -80,4 +81,10 @@ public void run(String... args) {
.doOnNext(video -> log.debug("Registered video: " + video.getName()))
.subscribe();
}

@PreDestroy
private void preDestroy() {
log.debug("Destroying {}, fileWatcher: {}", this.getClass().getSimpleName(), fileWatcher);
fileWatcher.destroy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ private static long lengthOf(ResourceRegion resourceRegion) {
}

@Override
@SuppressWarnings("NullableProblems")
public boolean canWrite(ResolvableType elementType, MediaType mediaType) {
return resourceRegionEncoder.canEncode(elementType, mediaType);
}

@Override
@SuppressWarnings("NullableProblems")
public List<MediaType> getWritableMediaTypes() {
return this.mediaTypes;
}
Expand All @@ -108,14 +110,15 @@ public List<MediaType> getWritableMediaTypes() {
* @return indicates completion or error
*/
@Override
@SuppressWarnings("NullableProblems")
public Mono<Void> write(Publisher<? extends ResourceRegion> inputStream,
ResolvableType elementType, MediaType mediaType,
ReactiveHttpOutputMessage message,
Map<String, Object> hints) {

return Mono.from(inputStream)
.flatMap(resource ->
writeResource(resource, elementType, mediaType, message, hints));
writeResource(resource, elementType, mediaType, message, hints));
}

private Mono<Void> writeResource(ResourceRegion resourceRegion,
Expand All @@ -125,8 +128,8 @@ private Mono<Void> writeResource(ResourceRegion resourceRegion,

HttpHeaders headers = message.getHeaders();

Mono<MediaType> mediaTypeMono = Mono.fromCallable(() -> getResourceMediaType(mediaType,
resourceRegion.getResource()));
Mono<MediaType> mediaTypeMono = Mono
.fromCallable(() -> getResourceMediaType(mediaType, resourceRegion.getResource()));

Mono<Long> headersMono = Mono.fromCallable(() -> lengthOf(resourceRegion))
.filter(length -> length > -1)
Expand All @@ -140,16 +143,14 @@ private Mono<Void> writeResource(ResourceRegion resourceRegion,
return resourceType;
})
.flatMap(resourceType ->
zeroCopy(resourceRegion.getResource(), resourceRegion, message)
.orElseGet(() -> {

Mono<ResourceRegion> input = Mono.just(resourceRegion);
DataBufferFactory bufferFactory = message.bufferFactory();
Flux<DataBuffer> body =
this.resourceRegionEncoder.encode(input, bufferFactory, elementType,
resourceType, hints);
return message.writeWith(body);
}));
zeroCopy(resourceRegion.getResource(), resourceRegion, message)
.orElseGet(() -> {
Mono<ResourceRegion> input = Mono.just(resourceRegion);
DataBufferFactory bufferFactory = message.bufferFactory();
Flux<DataBuffer> body = this.resourceRegionEncoder
.encode(input, bufferFactory, elementType, resourceType, hints);
return message.writeWith(body);
}));
}

/**
Expand All @@ -168,6 +169,7 @@ private Mono<Void> writeResource(ResourceRegion resourceRegion,
* @return a {@link Mono} that indicates completion of writing or error
*/
@Override
@SuppressWarnings("NullableProblems")
public Mono<Void> write(Publisher<? extends ResourceRegion> inputStream,
ResolvableType actualType, ResolvableType elementType,
MediaType mediaType, ServerHttpRequest request,
Expand All @@ -188,7 +190,7 @@ public Mono<Void> write(Publisher<? extends ResourceRegion> inputStream,

headers.setContentType(resourceMediaType);
headers.add(HttpHeaders.CONTENT_RANGE,
"bytes " + startPosition + '-' + endPosition + '/' + contentLength);
"bytes " + startPosition + '-' + endPosition + '/' + contentLength);
headers.setContentLength(endPosition - startPosition + 1);

response.setStatusCode(HttpStatus.PARTIAL_CONTENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.PreDestroy;
import java.nio.file.*;
import java.util.Optional;
import java.util.concurrent.*;
Expand All @@ -25,7 +24,8 @@ public class FileWatcher {
new ThreadFactoryBuilder().setNameFormat("file-watcher-thread-%d").build();
private static final ExecutorService THREAD_POOL =
new ThreadPoolExecutor(1, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy());
new LinkedBlockingQueue<>(1024), NAMED_THREAD_FACTORY,
new ThreadPoolExecutor.AbortPolicy());

private final Path monitoredPath;
private FileWatcherHandler fileWatcherHandler;
Expand All @@ -38,9 +38,9 @@ public FileWatcher(String directory) {
private FileWatcher(Path path) {
this.monitoredPath = path;
this.monitoredPath.register(WatchServiceSingleton.getInstance(),
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
THREAD_POOL.execute(this::monitor);
}

Expand Down Expand Up @@ -88,9 +88,8 @@ private void monitor() {
}
}

@PreDestroy
@SneakyThrows
private void onPreDestroy() {
public void destroy() {
THREAD_POOL.awaitTermination(5, TimeUnit.SECONDS);
log.debug("THREAD_POOL for FileWatcher was terminated.");
}
Expand Down

0 comments on commit 44a54cc

Please sign in to comment.