diff --git a/daemon/src/main/java/org/apache/maven/cli/DaemonMavenCli.java b/daemon/src/main/java/org/apache/maven/cli/DaemonMavenCli.java index 9939d27d3..7ce839700 100644 --- a/daemon/src/main/java/org/apache/maven/cli/DaemonMavenCli.java +++ b/daemon/src/main/java/org/apache/maven/cli/DaemonMavenCli.java @@ -69,6 +69,7 @@ import org.apache.maven.extension.internal.CoreExtensionEntry; import org.apache.maven.lifecycle.LifecycleExecutionException; import org.apache.maven.model.building.ModelProcessor; +import org.apache.maven.plugin.PluginRealmCache; import org.apache.maven.project.MavenProject; import org.apache.maven.properties.internal.EnvironmentUtils; import org.apache.maven.properties.internal.SystemProperties; @@ -90,6 +91,7 @@ import org.codehaus.plexus.util.StringUtils; import org.eclipse.aether.transfer.TransferListener; import org.jboss.fuse.mvnd.logging.smart.AbstractLoggingSpy; +import org.jboss.fuse.mvnd.plugin.CliPluginRealmCache; import org.slf4j.ILoggerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -481,6 +483,7 @@ protected void configure() { bind( ILoggerFactory.class ).toInstance( slf4jLoggerFactory ); bind( CoreExports.class ).toInstance( exports ); + bind( PluginRealmCache.class ).toInstance(new CliPluginRealmCache()); } } ); diff --git a/daemon/src/main/java/org/jboss/fuse/mvnd/plugin/CliPluginRealmCache.java b/daemon/src/main/java/org/jboss/fuse/mvnd/plugin/CliPluginRealmCache.java index ce107cc34..751830edf 100644 --- a/daemon/src/main/java/org/jboss/fuse/mvnd/plugin/CliPluginRealmCache.java +++ b/daemon/src/main/java/org/jboss/fuse/mvnd/plugin/CliPluginRealmCache.java @@ -17,18 +17,21 @@ import java.io.File; import java.io.IOException; -import java.nio.file.Files; +import java.nio.file.FileSystems; import java.nio.file.Path; -import java.nio.file.attribute.BasicFileAttributes; -import java.nio.file.attribute.FileTime; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchEvent.Kind; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicInteger; import javax.enterprise.inject.Default; import javax.inject.Named; @@ -47,6 +50,8 @@ import org.eclipse.aether.repository.LocalRepository; import org.eclipse.aether.repository.RemoteRepository; import org.eclipse.aether.repository.WorkspaceRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Default PluginCache implementation. Assumes cached data does not change. @@ -142,64 +147,27 @@ public boolean equals( Object o ) CacheKey that = (CacheKey) o; - return parentRealm == that.parentRealm + return parentRealm == that.parentRealm && CliCacheUtils.pluginEquals( plugin, that.plugin ) && Objects.equals( workspace, that.workspace ) && Objects.equals( localRepo, that.localRepo ) - && RepositoryUtils.repositoriesEquals( this.repositories, that.repositories ) + && RepositoryUtils.repositoriesEquals( this.repositories, that.repositories ) && Objects.equals( filter, that.filter ) && Objects.equals( foreignImports, that.foreignImports ); } } - protected static class TimestampedCacheRecord extends CacheRecord { - - static class ArtifactTimestamp { - final Path path; - final FileTime lastModifiedTime; - final Object fileKey; - ArtifactTimestamp(Path path) { - this.path = path; - try { - BasicFileAttributes attrs = Files.readAttributes(path, BasicFileAttributes.class); - this.lastModifiedTime = attrs.lastModifiedTime(); - this.fileKey = attrs.fileKey(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ArtifactTimestamp that = (ArtifactTimestamp) o; - return path.equals(that.path) && - Objects.equals(lastModifiedTime, that.lastModifiedTime) && - Objects.equals(fileKey, that.fileKey); - } - @Override - public int hashCode() { - return Objects.hash(path, lastModifiedTime, fileKey); - } - } - Set timestamp; - public TimestampedCacheRecord(ClassRealm realm, List artifacts) { + protected static class WatchedCacheRecord extends CacheRecord { + + private volatile boolean valid = true; + public WatchedCacheRecord(ClassRealm realm, List artifacts) { super(realm, artifacts); - timestamp = current(); } + public boolean isValid() { - try { - return Objects.equals(current(), timestamp); - } catch (Exception e) { - return false; - } - } - private Set current() { - return getArtifacts().stream().map(Artifact::getFile) - .map(File::toPath) - .map(ArtifactTimestamp::new) - .collect(Collectors.toSet()); + return valid; } + public void dispose() { ClassRealm realm = getRealm(); try @@ -213,7 +181,147 @@ public void dispose() { } } - protected final Map cache = new ConcurrentHashMap<>(); + /** + * A {@link WatchService} with some methods to watch JARs associated with {@link WatchedCacheRecord}. + */ + static class MultiWatcher { + private final WatchService watchService; + + /** + * Records that have no been invalidated so far. From watched JAR paths to records (because one JAR can be + * present in multiple records) + */ + private final Map> validRecordsByPath = new ConcurrentHashMap<>(); + + /** + * {@link WatchService} can watch only directories but we actually want to watch files. So here we store + * for the given parent directory the count of its child files we watch. + */ + private final Map registrationsByDir = new ConcurrentHashMap<>(); + + public MultiWatcher() { + try { + this.watchService = FileSystems.getDefault().newWatchService(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Watch the JARs associated with the given {@code record} for deletions and modifications. + * + * @param record the {@link WatchedCacheRecord} to watch + */ + public void watch(WatchedCacheRecord record) { + record.getArtifacts().stream() + .map(Artifact::getFile) + .map(File::toPath) + .forEach(p -> { + validRecordsByPath.compute(p, (key, value) -> { + if (value == null) { + value = new ArrayList<>(); + } + value.add(record); + return value; + }); + final Path dir = p.getParent(); + registrationsByDir.compute(dir, (key, value) -> { + if (value == null) { + log.debug("Starting to watch path {}", key); + try { + final WatchKey watchKey = dir.register(watchService, StandardWatchEventKinds.ENTRY_DELETE, + StandardWatchEventKinds.ENTRY_MODIFY); + return new Registration(watchKey); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + int cnt = value.count.incrementAndGet(); + log.debug("Already {} watchers for path {}", cnt, key); + return value; + } + }); + }); + } + + /** + * Stopn watching the JARs associated with the given {@code record} for deletions and modifications. + * + * @param record the {@link WatchedCacheRecord} to stop watching + */ + public void unwatch(WatchedCacheRecord record) { + record.getArtifacts().stream() + .map(Artifact::getFile) + .map(File::toPath) + .forEach(p -> { + final Path dir = p.getParent(); + registrationsByDir.compute(dir, (key, value) -> { + if (value == null) { + log.debug("Already unwatchers for path {}", key); + return null; + } else { + final int cnt = value.count.decrementAndGet(); + if (cnt <= 0) { + log.debug("Unwatching path {}", key); + value.watchKey.cancel(); + return null; + } else { + log.debug("Still {} watchers for path {}", cnt, key); + return value; + } + } + }); + }); + } + + /** + * Poll all events via {@link WatchKey}s available in {@link #registrationsByDir}. + */ + void processWatcherEvents() { + for (Entry entry : registrationsByDir.entrySet()) { + final Path dir = entry.getKey(); + final WatchKey watchKey = entry.getValue().watchKey; + // TODO: figure out whether watchKey can be invalid and how to handle that situation + for (WatchEvent event : watchKey.pollEvents()) { + Kind kind = event.kind(); + log.debug("Got watcher event {}", kind.name()); + if (kind == StandardWatchEventKinds.ENTRY_DELETE || kind == StandardWatchEventKinds.ENTRY_MODIFY) { + final Path path = dir.resolve((Path) event.context()); + final List records = validRecordsByPath.get(path); + log.debug("Records for path {}: {}", path, records); + if (records != null) { + synchronized(records) { + for (WatchedCacheRecord record : records) { + log.debug("Invalidating recorder of path {}", path); + record.valid = false; + unwatch(record); + } + records.clear(); + } + } + } + // TODO: Handle StandardWatchEventKinds.OVERFLOW - probably by invalidating all related records + } + } + } + + /** + * A watcher registration for a directory storing the {@link WatchKey} and the count of watchers to be able to + * tell when the {@link #watchKey} should be cancelled. + */ + static class Registration { + final AtomicInteger count = new AtomicInteger(1); + final WatchKey watchKey; + public Registration(WatchKey watchKey) { + this.watchKey = watchKey; + } + } + + } + + private static final Logger log = LoggerFactory.getLogger(CliPluginRealmCache.class); + protected final Map cache = new ConcurrentHashMap<>(); + private final MultiWatcher watcher = new MultiWatcher(); public Key createKey(Plugin plugin, ClassLoader parentRealm, Map foreignImports, DependencyFilter dependencyFilter, List repositories, @@ -224,7 +332,8 @@ public Key createKey(Plugin plugin, ClassLoader parentRealm, Map pluginAr throw new IllegalStateException( "Duplicate plugin realm for plugin " + key ); } - TimestampedCacheRecord record = new TimestampedCacheRecord( pluginRealm, pluginArtifacts ); - + WatchedCacheRecord record = new WatchedCacheRecord( pluginRealm, pluginArtifacts ); + watcher.watch(record); cache.put( key, record ); return record; @@ -252,7 +361,7 @@ public CacheRecord put( Key key, ClassRealm pluginRealm, List pluginAr public void flush() { - for ( TimestampedCacheRecord record : cache.values() ) + for ( WatchedCacheRecord record : cache.values() ) { record.dispose(); }