diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/InternalChannelRegistry.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/InternalChannelRegistry.java index a06929b05e..2ffe332f57 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/InternalChannelRegistry.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/InternalChannelRegistry.java @@ -3,7 +3,6 @@ import static io.smallrye.reactive.messaging.providers.i18n.ProviderMessages.msg; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Flow; import java.util.stream.Collectors; @@ -18,13 +17,13 @@ @ApplicationScoped public class InternalChannelRegistry implements ChannelRegistry { - private final Map>>> publishers = new ConcurrentHashMap<>(); - private final Map>>> subscribers = new ConcurrentHashMap<>(); + private final Map>>> publishers = new HashMap<>(); + private final Map>>> subscribers = new HashMap<>(); - private final Map outgoing = new ConcurrentHashMap<>(); - private final Map incoming = new ConcurrentHashMap<>(); + private final Map outgoing = new HashMap<>(); + private final Map incoming = new HashMap<>(); - private final Map, Map> emitters = new ConcurrentHashMap<>(); + private final Map, Map> emitters = new HashMap<>(); @Override public Flow.Publisher> register(String name, @@ -37,7 +36,7 @@ public Flow.Publisher> register(String name, } @Override - public Flow.Subscriber> register(String name, + public synchronized Flow.Subscriber> register(String name, Flow.Subscriber> subscriber, boolean merge) { Objects.requireNonNull(name, msg.nameMustBeSet()); Objects.requireNonNull(subscriber, msg.subscriberMustBeSet()); @@ -47,21 +46,21 @@ public Flow.Subscriber> register(String name, } @Override - public void register(String name, Emitter emitter) { + public synchronized void register(String name, Emitter emitter) { Objects.requireNonNull(name, msg.nameMustBeSet()); Objects.requireNonNull(emitter, msg.emitterMustBeSet()); register(name, Emitter.class, emitter); } @Override - public void register(String name, MutinyEmitter emitter) { + public synchronized void register(String name, MutinyEmitter emitter) { Objects.requireNonNull(name, msg.nameMustBeSet()); Objects.requireNonNull(emitter, msg.emitterMustBeSet()); register(name, MutinyEmitter.class, emitter); } @Override - public void register(String name, Class emitterType, T emitter) { + public synchronized void register(String name, Class emitterType, T emitter) { Objects.requireNonNull(name, msg.nameMustBeSet()); Objects.requireNonNull(emitter, msg.emitterMustBeSet()); Map map = emitters.computeIfAbsent(emitterType, key -> new HashMap<>()); @@ -69,26 +68,25 @@ public void register(String name, Class emitterType, T emitter) { } @Override - public List>> getPublishers(String name) { + public synchronized List>> getPublishers(String name) { Objects.requireNonNull(name, msg.nameMustBeSet()); return publishers.getOrDefault(name, Collections.emptyList()); } @Override - public Emitter getEmitter(String name) { + public synchronized Emitter getEmitter(String name) { Objects.requireNonNull(name, msg.nameMustBeSet()); return getEmitter(name, Emitter.class); } @Override - public MutinyEmitter getMutinyEmitter(String name) { + public synchronized MutinyEmitter getMutinyEmitter(String name) { Objects.requireNonNull(name, msg.nameMustBeSet()); return getEmitter(name, MutinyEmitter.class); } - @SuppressWarnings("unchecked") @Override - public T getEmitter(String name, Class emitterType) { + public synchronized T getEmitter(String name, Class emitterType) { Objects.requireNonNull(name, msg.nameMustBeSet()); Map typedEmitters = emitters.get(emitterType); if (typedEmitters == null) { @@ -99,7 +97,7 @@ public T getEmitter(String name, Class emitterType) { } @Override - public List>> getSubscribers(String name) { + public synchronized List>> getSubscribers(String name) { Objects.requireNonNull(name, msg.nameMustBeSet()); return subscribers.getOrDefault(name, Collections.emptyList()); } @@ -110,17 +108,17 @@ private void register(Map> multimap, String name, T item) { } @Override - public Set getIncomingNames() { - return publishers.keySet(); + public synchronized Set getIncomingNames() { + return new HashSet<>(publishers.keySet()); } @Override - public Set getOutgoingNames() { - return subscribers.keySet(); + public synchronized Set getOutgoingNames() { + return new HashSet<>(subscribers.keySet()); } @Override - public Set getEmitterNames() { + public synchronized Set getEmitterNames() { return emitters.values().stream().flatMap(m -> m.keySet().stream()).collect(Collectors.toSet()); }