Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use read lock when subscribing to publishers… #44

Merged
merged 1 commit into from
Jan 20, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
*******************************************************************************/
package org.eclipse.sisu.inject;

import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import javax.inject.Inject;

Expand Down Expand Up @@ -48,6 +48,8 @@ public final class DefaultBeanLocator

private final Long[] typeIdHolder = new Long[1];

private final ReentrantReadWriteLock publisherLock = new ReentrantReadWriteLock();

// ----------------------------------------------------------------------
// Public methods
// ----------------------------------------------------------------------
Expand All @@ -73,64 +75,119 @@ public Iterable<BeanEntry> locate( final Key key )
return new LocatedBeans( key, bindings, isImplicit ? implicitBindings : null );
}

public synchronized void watch( final Key key, final Mediator mediator, final Object watcher )
public void watch( final Key key, final Mediator mediator, final Object watcher )
{
final WatchedBeans beans = new WatchedBeans( key, mediator, watcher );
for ( final BindingPublisher p : publishers() )
publisherLock.readLock().lock();
try
{
// subscribe new watcher to existing publishers while holding the read-lock
final WatchedBeans beans = new WatchedBeans( key, mediator, watcher );
for ( final BindingPublisher p : publishers() )
{
p.subscribe( beans );
}
synchronized ( cachedWatchers )
{
cachedWatchers.put( beans, watcher );
}
}
finally
{
p.subscribe( beans );
publisherLock.readLock().unlock();
}
cachedWatchers.put( beans, watcher );
}

public synchronized boolean add( final BindingPublisher publisher )
public boolean add( final BindingPublisher publisher )
{
if ( publishers.contains( publisher ) )
final WatchedBeans[] currentWatchers;
publisherLock.writeLock().lock();
try
{
if ( publishers.contains( publisher ) )
{
return false;
}
Logs.trace( "Add publisher: {}", publisher, null );
synchronized ( cachedBindings ) // block new lookup while we update the cache
{
final int rank = publisher.maxBindingRank();
publishers.insert( publisher, rank );
for ( final RankedBindings bindings : cachedBindings.values() )
{
bindings.add( publisher, rank );
}
}
synchronized ( cachedWatchers )
{
// capture snapshot of current watchers while we hold the write-lock
currentWatchers = cachedWatchers.keySet().toArray( new WatchedBeans[0] );
}
publisherLock.readLock().lock(); // begin downgrade to the read-lock
}
finally
{
return false;
publisherLock.writeLock().unlock();
}
Logs.trace( "Add publisher: {}", publisher, null );
synchronized ( cachedBindings ) // block new lookup while we update the cache
try
{
final int rank = publisher.maxBindingRank();
publishers.insert( publisher, rank );
for ( final RankedBindings bindings : cachedBindings.values() )
// subscribe watchers to the new publisher while holding the read-lock
for ( final WatchedBeans beans : currentWatchers )
{
bindings.add( publisher, rank );
publisher.subscribe( beans );
}
}
// take defensive copy in case publisher.subscribe has side-effect that triggers 'watch'
for ( final WatchedBeans beans : new ArrayList<WatchedBeans>( cachedWatchers.keySet() ) )
finally
{
publisher.subscribe( beans );
publisherLock.readLock().unlock();
}
return true;
}

public synchronized boolean remove( final BindingPublisher publisher )
public boolean remove( final BindingPublisher publisher )
{
final BindingPublisher oldPublisher;
synchronized ( cachedBindings ) // block new lookup while we update the cache
final WatchedBeans[] currentWatchers;
publisherLock.writeLock().lock();
try
{
oldPublisher = publishers.remove( publisher );
if ( null == oldPublisher )
synchronized ( cachedBindings ) // block new lookup while we update the cache
{
return false;
oldPublisher = publishers.remove( publisher );
if ( null == oldPublisher )
{
return false;
}
Logs.trace( "Remove publisher: {}", oldPublisher, null );
for ( final RankedBindings bindings : cachedBindings.values() )
{
bindings.remove( oldPublisher );
}
}
Logs.trace( "Remove publisher: {}", oldPublisher, null );
for ( final RankedBindings bindings : cachedBindings.values() )
synchronized ( cachedWatchers )
{
bindings.remove( oldPublisher );
// capture snapshot of current watchers while we hold the write-lock
currentWatchers = cachedWatchers.keySet().toArray( new WatchedBeans[0] );
}
publisherLock.readLock().lock(); // begin downgrade to the read-lock
}
for ( final WatchedBeans beans : cachedWatchers.keySet() )
finally
{
oldPublisher.unsubscribe( beans );
publisherLock.writeLock().unlock();
}
try
{
// unsubscribe watchers from the old publisher while holding the read-lock
for ( final WatchedBeans beans : currentWatchers )
{
oldPublisher.unsubscribe( beans );
}
}
finally
{
publisherLock.readLock().unlock();
}

// one last round of cleanup in case more was freed
( (MildConcurrentValues) cachedBindings ).compact();

return true;
}

Expand All @@ -139,7 +196,7 @@ public Iterable<BindingPublisher> publishers()
return publishers.snapshot();
}

public synchronized void clear()
public void clear()
{
for ( final BindingPublisher p : publishers() )
{
Expand Down