diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index ae9bf6e97..d1a37e7e8 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -50,6 +50,8 @@ import io.rsocket.keepalive.KeepAliveSupport; import io.rsocket.lease.RequesterLeaseHandler; import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; @@ -772,20 +774,26 @@ private void terminate(Throwable e) { leaseHandler.dispose(); // Iterate explicitly to handle collisions with concurrent removals - for (IntObjectMap.PrimitiveEntry> entry : receivers.entries()) { + final IntObjectMap> receivers = this.receivers; + // copy to avoid collection modification from the foreach loop + final Collection> receiversCopy = + new ArrayList<>(receivers.values()); + for (Processor handler : receiversCopy) { try { - entry.value().onError(e); + handler.onError(e); } catch (Throwable ex) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Dropped exception", ex); } } } - // Iterate explicitly to handle collisions with concurrent removals - for (IntObjectMap.PrimitiveEntry entry : senders.entries()) { + final IntObjectMap senders = this.senders; + // copy to avoid collection modification from the foreach loop + final Collection sendersCopy = new ArrayList<>(senders.values()); + for (Subscription subscription : sendersCopy) { try { - entry.value().cancel(); + subscription.cancel(); } catch (Throwable ex) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Dropped exception", ex); diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index 54f339c12..edb01ba16 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -40,6 +40,8 @@ import io.rsocket.internal.UnboundedProcessor; import io.rsocket.lease.ResponderLeaseHandler; import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; @@ -264,9 +266,12 @@ private void cleanup(Throwable e) { private synchronized void cleanUpSendingSubscriptions() { // Iterate explicitly to handle collisions with concurrent removals - for (IntObjectMap.PrimitiveEntry entry : sendingSubscriptions.entries()) { + final IntObjectMap sendingSubscriptions = this.sendingSubscriptions; + final Collection sendingSubscriptionsCopy = + new ArrayList<>(sendingSubscriptions.values()); + for (Subscription subscription : sendingSubscriptionsCopy) { try { - entry.value().cancel(); + subscription.cancel(); } catch (Throwable ex) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Dropped exception", ex);