Skip to content

Commit

Permalink
Makes maximumPoolSize configurable for FileStatusManager (#12)
Browse files Browse the repository at this point in the history
* Makes maximumPoolSize configurable for FileStatusManager

* Fixes passed arguments
  • Loading branch information
StrongestNumber9 authored Jul 26, 2023
1 parent 0a2e2b0 commit b6fe522
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 17 deletions.
41 changes: 25 additions & 16 deletions src/main/java/com/teragrep/rlo_12/DirectoryEventWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,7 @@ public DirectoryEventWatcher(Path directory,
Pattern filePattern,
Supplier<Consumer<MonitoredFile>> readConsumerSupplier
) throws IOException {
this.pollingInterval = Long.MAX_VALUE;
this.pollingIntervalTimeUnit = TimeUnit.DAYS;
this.initialDirectory = directory.toAbsolutePath();
this.filePatternMatcher = filePattern.matcher("");
this.recursive = recursive;

this.directoryWatcher = initialDirectory.getFileSystem().newWatchService();

FileStatusManager fileStatusManager = new FileStatusManager(transferQueue, readConsumerSupplier);

Thread fileStatusManagerThread = new Thread(fileStatusManager);
fileStatusManagerThread.start();

initialScan(initialDirectory);

this(directory, recursive, filePattern, readConsumerSupplier, Long.MAX_VALUE, TimeUnit.DAYS);
}

/**
Expand All @@ -110,6 +96,29 @@ public DirectoryEventWatcher(Path directory,
Supplier<Consumer<MonitoredFile>> readConsumerSupplier,
long interval,
TimeUnit intervalTimeUnit
) throws IOException {
this(directory, recursive, filePattern, readConsumerSupplier, interval, intervalTimeUnit, 8);
}

/**
* FileEventWatcher
*
* @param directory Directory to monitor for changes
* @param recursive Recurse to sub-directories
* @param filePattern Filename pattern to match for from (sub-)directories
* @param readConsumerSupplier MonitoredFile Consumer Supplier which processes the events
* @param interval Polling interval
* @param intervalTimeUnit Polling interval TimeUnit
* @param maximumPoolSize Maximum thread count
* @throws IOException Path.register throws IOException on initial directory
*/
public DirectoryEventWatcher(Path directory,
boolean recursive,
Pattern filePattern,
Supplier<Consumer<MonitoredFile>> readConsumerSupplier,
long interval,
TimeUnit intervalTimeUnit,
int maximumPoolSize
) throws IOException {
this.pollingInterval = interval;
this.pollingIntervalTimeUnit = intervalTimeUnit;
Expand All @@ -119,7 +128,7 @@ public DirectoryEventWatcher(Path directory,

this.directoryWatcher = initialDirectory.getFileSystem().newWatchService();

FileStatusManager fileStatusManager = new FileStatusManager(transferQueue, readConsumerSupplier);
FileStatusManager fileStatusManager = new FileStatusManager(transferQueue, readConsumerSupplier, maximumPoolSize);

Thread fileStatusManagerThread = new Thread(fileStatusManager);
fileStatusManagerThread.start();
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/com/teragrep/rlo_12/FileStatusManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ class FileStatusManager implements Runnable {
private final Supplier<Consumer<MonitoredFile>> consumerSupplier;

FileStatusManager(TransferQueue<MonitoredFile> transferQueue, Supplier<Consumer<MonitoredFile>> consumerSupplier) {
this(transferQueue, consumerSupplier, 8);
}
FileStatusManager(TransferQueue<MonitoredFile> transferQueue, Supplier<Consumer<MonitoredFile>> consumerSupplier, int maximumPoolSize) {
this.transferQueue = transferQueue;
this.consumerSupplier = consumerSupplier;
this.threadPoolExecutor = new ThreadPoolExecutor(1, 8, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
this.threadPoolExecutor = new ThreadPoolExecutor(1, maximumPoolSize, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
}

@Override
Expand Down

0 comments on commit b6fe522

Please sign in to comment.