Skip to content

Commit

Permalink
Merge pull request #2212 from cescoffier/fix-sync-take-2
Browse files Browse the repository at this point in the history
Fix the synchronization protocol of the InternalChannelRegistry
  • Loading branch information
ozangunalp authored Jul 6, 2023
2 parents 31375e6 + 4ffe0a1 commit 119757f
Showing 1 changed file with 24 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static io.smallrye.reactive.messaging.providers.i18n.ProviderMessages.msg;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;

Expand All @@ -17,13 +19,13 @@
@ApplicationScoped
public class InternalChannelRegistry implements ChannelRegistry {

private final Map<String, List<Flow.Publisher<? extends Message<?>>>> publishers = new HashMap<>();
private final Map<String, List<Flow.Subscriber<? extends Message<?>>>> subscribers = new HashMap<>();
private final Map<String, List<Flow.Publisher<? extends Message<?>>>> publishers = new ConcurrentHashMap<>();
private final Map<String, List<Flow.Subscriber<? extends Message<?>>>> subscribers = new ConcurrentHashMap<>();

private final Map<String, Boolean> outgoing = new HashMap<>();
private final Map<String, Boolean> incoming = new HashMap<>();
private final Map<String, Boolean> outgoing = new ConcurrentHashMap<>();
private final Map<String, Boolean> incoming = new ConcurrentHashMap<>();

private final Map<Class<?>, Map<String, Object>> emitters = new HashMap<>();
private final Map<Class<?>, Map<String, Object>> emitters = new ConcurrentHashMap<>();

@Override
public Flow.Publisher<? extends Message<?>> register(String name,
Expand All @@ -36,7 +38,7 @@ public Flow.Publisher<? extends Message<?>> register(String name,
}

@Override
public synchronized Flow.Subscriber<? extends Message<?>> register(String name,
public Flow.Subscriber<? extends Message<?>> register(String name,
Flow.Subscriber<? extends Message<?>> subscriber, boolean merge) {
Objects.requireNonNull(name, msg.nameMustBeSet());
Objects.requireNonNull(subscriber, msg.subscriberMustBeSet());
Expand All @@ -46,47 +48,48 @@ public synchronized Flow.Subscriber<? extends Message<?>> register(String name,
}

@Override
public synchronized void register(String name, Emitter<?> emitter) {
public void register(String name, Emitter<?> emitter) {
Objects.requireNonNull(name, msg.nameMustBeSet());
Objects.requireNonNull(emitter, msg.emitterMustBeSet());
register(name, Emitter.class, emitter);
}

@Override
public synchronized void register(String name, MutinyEmitter<?> emitter) {
public void register(String name, MutinyEmitter<?> emitter) {
Objects.requireNonNull(name, msg.nameMustBeSet());
Objects.requireNonNull(emitter, msg.emitterMustBeSet());
register(name, MutinyEmitter.class, emitter);
}

@Override
public synchronized <T> void register(String name, Class<T> emitterType, T emitter) {
public <T> void register(String name, Class<T> emitterType, T emitter) {
Objects.requireNonNull(name, msg.nameMustBeSet());
Objects.requireNonNull(emitter, msg.emitterMustBeSet());
Map<String, Object> map = emitters.computeIfAbsent(emitterType, key -> new HashMap<>());
Map<String, Object> map = emitters.computeIfAbsent(emitterType, key -> new ConcurrentHashMap<>());
map.put(name, emitter);
}

@Override
public synchronized List<Flow.Publisher<? extends Message<?>>> getPublishers(String name) {
public List<Flow.Publisher<? extends Message<?>>> getPublishers(String name) {
Objects.requireNonNull(name, msg.nameMustBeSet());
return publishers.getOrDefault(name, Collections.emptyList());
}

@Override
public synchronized Emitter<?> getEmitter(String name) {
public Emitter<?> getEmitter(String name) {
Objects.requireNonNull(name, msg.nameMustBeSet());
return getEmitter(name, Emitter.class);
}

@Override
public synchronized MutinyEmitter<?> getMutinyEmitter(String name) {
public MutinyEmitter<?> getMutinyEmitter(String name) {
Objects.requireNonNull(name, msg.nameMustBeSet());
return getEmitter(name, MutinyEmitter.class);
}

@SuppressWarnings("unchecked")
@Override
public synchronized <T> T getEmitter(String name, Class<? super T> emitterType) {
public <T> T getEmitter(String name, Class<? super T> emitterType) {
Objects.requireNonNull(name, msg.nameMustBeSet());
Map<String, Object> typedEmitters = emitters.get(emitterType);
if (typedEmitters == null) {
Expand All @@ -97,28 +100,28 @@ public synchronized <T> T getEmitter(String name, Class<? super T> emitterType)
}

@Override
public synchronized List<Flow.Subscriber<? extends Message<?>>> getSubscribers(String name) {
public List<Flow.Subscriber<? extends Message<?>>> getSubscribers(String name) {
Objects.requireNonNull(name, msg.nameMustBeSet());
return subscribers.getOrDefault(name, Collections.emptyList());
}

private <T> void register(Map<String, List<T>> multimap, String name, T item) {
List<T> list = multimap.computeIfAbsent(name, key -> new ArrayList<>());
List<T> list = multimap.computeIfAbsent(name, key -> new CopyOnWriteArrayList<>());
list.add(item);
}

@Override
public synchronized Set<String> getIncomingNames() {
return new HashSet<>(publishers.keySet());
public Set<String> getIncomingNames() {
return publishers.keySet();
}

@Override
public synchronized Set<String> getOutgoingNames() {
return new HashSet<>(subscribers.keySet());
public Set<String> getOutgoingNames() {
return subscribers.keySet();
}

@Override
public synchronized Set<String> getEmitterNames() {
public Set<String> getEmitterNames() {
return emitters.values().stream().flatMap(m -> m.keySet().stream()).collect(Collectors.toSet());
}

Expand Down

0 comments on commit 119757f

Please sign in to comment.