Skip to content

Commit

Permalink
Fix apache#33 Maven mojo change ignored
Browse files Browse the repository at this point in the history
  • Loading branch information
ppalaga committed Jun 29, 2020
1 parent ccf4f93 commit d1892e1
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 56 deletions.
3 changes: 3 additions & 0 deletions daemon/src/main/java/org/apache/maven/cli/DaemonMavenCli.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.maven.extension.internal.CoreExtensionEntry;
import org.apache.maven.lifecycle.LifecycleExecutionException;
import org.apache.maven.model.building.ModelProcessor;
import org.apache.maven.plugin.PluginRealmCache;
import org.apache.maven.project.MavenProject;
import org.apache.maven.properties.internal.EnvironmentUtils;
import org.apache.maven.properties.internal.SystemProperties;
Expand All @@ -90,6 +91,7 @@
import org.codehaus.plexus.util.StringUtils;
import org.eclipse.aether.transfer.TransferListener;
import org.jboss.fuse.mvnd.logging.smart.AbstractLoggingSpy;
import org.jboss.fuse.mvnd.plugin.CliPluginRealmCache;
import org.slf4j.ILoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -481,6 +483,7 @@ protected void configure()
{
bind( ILoggerFactory.class ).toInstance( slf4jLoggerFactory );
bind( CoreExports.class ).toInstance( exports );
bind( PluginRealmCache.class ).toInstance(new CliPluginRealmCache());
}
} );

Expand Down
221 changes: 165 additions & 56 deletions daemon/src/main/java/org/jboss/fuse/mvnd/plugin/CliPluginRealmCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.concurrent.atomic.AtomicInteger;

import javax.enterprise.inject.Default;
import javax.inject.Named;
Expand All @@ -47,6 +50,8 @@
import org.eclipse.aether.repository.LocalRepository;
import org.eclipse.aether.repository.RemoteRepository;
import org.eclipse.aether.repository.WorkspaceRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Default PluginCache implementation. Assumes cached data does not change.
Expand Down Expand Up @@ -142,64 +147,27 @@ public boolean equals( Object o )

CacheKey that = (CacheKey) o;

return parentRealm == that.parentRealm
return parentRealm == that.parentRealm
&& CliCacheUtils.pluginEquals( plugin, that.plugin )
&& Objects.equals( workspace, that.workspace )
&& Objects.equals( localRepo, that.localRepo )
&& RepositoryUtils.repositoriesEquals( this.repositories, that.repositories )
&& RepositoryUtils.repositoriesEquals( this.repositories, that.repositories )
&& Objects.equals( filter, that.filter )
&& Objects.equals( foreignImports, that.foreignImports );
}
}

protected static class TimestampedCacheRecord extends CacheRecord {

static class ArtifactTimestamp {
final Path path;
final FileTime lastModifiedTime;
final Object fileKey;
ArtifactTimestamp(Path path) {
this.path = path;
try {
BasicFileAttributes attrs = Files.readAttributes(path, BasicFileAttributes.class);
this.lastModifiedTime = attrs.lastModifiedTime();
this.fileKey = attrs.fileKey();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ArtifactTimestamp that = (ArtifactTimestamp) o;
return path.equals(that.path) &&
Objects.equals(lastModifiedTime, that.lastModifiedTime) &&
Objects.equals(fileKey, that.fileKey);
}
@Override
public int hashCode() {
return Objects.hash(path, lastModifiedTime, fileKey);
}
}
Set<ArtifactTimestamp> timestamp;
public TimestampedCacheRecord(ClassRealm realm, List<Artifact> artifacts) {
protected static class WatchedCacheRecord extends CacheRecord {

private volatile boolean valid = true;
public WatchedCacheRecord(ClassRealm realm, List<Artifact> artifacts) {
super(realm, artifacts);
timestamp = current();
}

public boolean isValid() {
try {
return Objects.equals(current(), timestamp);
} catch (Exception e) {
return false;
}
}
private Set<ArtifactTimestamp> current() {
return getArtifacts().stream().map(Artifact::getFile)
.map(File::toPath)
.map(ArtifactTimestamp::new)
.collect(Collectors.toSet());
return valid;
}

public void dispose() {
ClassRealm realm = getRealm();
try
Expand All @@ -213,7 +181,147 @@ public void dispose() {
}
}

protected final Map<Key, TimestampedCacheRecord> cache = new ConcurrentHashMap<>();
/**
* A {@link WatchService} with some methods to watch JARs associated with {@link WatchedCacheRecord}.
*/
static class MultiWatcher {
private final WatchService watchService;

/**
* Records that have no been invalidated so far. From watched JAR paths to records (because one JAR can be
* present in multiple records)
*/
private final Map<Path, List<WatchedCacheRecord>> validRecordsByPath = new ConcurrentHashMap<>();

/**
* {@link WatchService} can watch only directories but we actually want to watch files. So here we store
* for the given parent directory the count of its child files we watch.
*/
private final Map<Path, Registration> registrationsByDir = new ConcurrentHashMap<>();

public MultiWatcher() {
try {
this.watchService = FileSystems.getDefault().newWatchService();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* Watch the JARs associated with the given {@code record} for deletions and modifications.
*
* @param record the {@link WatchedCacheRecord} to watch
*/
public void watch(WatchedCacheRecord record) {
record.getArtifacts().stream()
.map(Artifact::getFile)
.map(File::toPath)
.forEach(p -> {
validRecordsByPath.compute(p, (key, value) -> {
if (value == null) {
value = new ArrayList<>();
}
value.add(record);
return value;
});
final Path dir = p.getParent();
registrationsByDir.compute(dir, (key, value) -> {
if (value == null) {
log.debug("Starting to watch path {}", key);
try {
final WatchKey watchKey = dir.register(watchService, StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
return new Registration(watchKey);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
int cnt = value.count.incrementAndGet();
log.debug("Already {} watchers for path {}", cnt, key);
return value;
}
});
});
}

/**
* Stopn watching the JARs associated with the given {@code record} for deletions and modifications.
*
* @param record the {@link WatchedCacheRecord} to stop watching
*/
public void unwatch(WatchedCacheRecord record) {
record.getArtifacts().stream()
.map(Artifact::getFile)
.map(File::toPath)
.forEach(p -> {
final Path dir = p.getParent();
registrationsByDir.compute(dir, (key, value) -> {
if (value == null) {
log.debug("Already unwatchers for path {}", key);
return null;
} else {
final int cnt = value.count.decrementAndGet();
if (cnt <= 0) {
log.debug("Unwatching path {}", key);
value.watchKey.cancel();
return null;
} else {
log.debug("Still {} watchers for path {}", cnt, key);
return value;
}
}
});
});
}

/**
* Poll all events via {@link WatchKey}s available in {@link #registrationsByDir}.
*/
void processWatcherEvents() {
for (Entry<Path, Registration> entry : registrationsByDir.entrySet()) {
final Path dir = entry.getKey();
final WatchKey watchKey = entry.getValue().watchKey;
// TODO: figure out whether watchKey can be invalid and how to handle that situation
for (WatchEvent<?> event : watchKey.pollEvents()) {
Kind<?> kind = event.kind();
log.debug("Got watcher event {}", kind.name());
if (kind == StandardWatchEventKinds.ENTRY_DELETE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
final Path path = dir.resolve((Path) event.context());
final List<WatchedCacheRecord> records = validRecordsByPath.get(path);
log.debug("Records for path {}: {}", path, records);
if (records != null) {
synchronized(records) {
for (WatchedCacheRecord record : records) {
log.debug("Invalidating recorder of path {}", path);
record.valid = false;
unwatch(record);
}
records.clear();
}
}
}
// TODO: Handle StandardWatchEventKinds.OVERFLOW - probably by invalidating all related records
}
}
}

/**
* A watcher registration for a directory storing the {@link WatchKey} and the count of watchers to be able to
* tell when the {@link #watchKey} should be cancelled.
*/
static class Registration {
final AtomicInteger count = new AtomicInteger(1);
final WatchKey watchKey;
public Registration(WatchKey watchKey) {
this.watchKey = watchKey;
}
}

}

private static final Logger log = LoggerFactory.getLogger(CliPluginRealmCache.class);
protected final Map<Key, WatchedCacheRecord> cache = new ConcurrentHashMap<>();
private final MultiWatcher watcher = new MultiWatcher();

public Key createKey(Plugin plugin, ClassLoader parentRealm, Map<String, ClassLoader> foreignImports,
DependencyFilter dependencyFilter, List<RemoteRepository> repositories,
Expand All @@ -224,7 +332,8 @@ public Key createKey(Plugin plugin, ClassLoader parentRealm, Map<String, ClassLo

public CacheRecord get( Key key )
{
TimestampedCacheRecord record = cache.get( key );
watcher.processWatcherEvents();
WatchedCacheRecord record = cache.get( key );
if (record != null && !record.isValid()) {
record.dispose();
record = null;
Expand All @@ -243,16 +352,16 @@ public CacheRecord put( Key key, ClassRealm pluginRealm, List<Artifact> pluginAr
throw new IllegalStateException( "Duplicate plugin realm for plugin " + key );
}

TimestampedCacheRecord record = new TimestampedCacheRecord( pluginRealm, pluginArtifacts );

WatchedCacheRecord record = new WatchedCacheRecord( pluginRealm, pluginArtifacts );
watcher.watch(record);
cache.put( key, record );

return record;
}

public void flush()
{
for ( TimestampedCacheRecord record : cache.values() )
for ( WatchedCacheRecord record : cache.values() )
{
record.dispose();
}
Expand Down

0 comments on commit d1892e1

Please sign in to comment.