From 1fe2d644acf15d2c8e08606b3161c8def1477d50 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka <5380167+OlegDokuka@users.noreply.github.com> Date: Tue, 16 Aug 2022 10:54:21 +0300 Subject: [PATCH] moves error propagation out of the synchronise to avoid deadlock (#1060) (cherry picked from commit 6426e45619ec32acef0c6c7184020776a9063ffe) --- .../java/io/rsocket/core/RSocketRequester.java | 16 +++++++++------- .../java/io/rsocket/core/RSocketResponder.java | 11 +++++++---- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index c10e86d56..bf298706a 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -345,15 +345,17 @@ private void terminate(Throwable e) { requesterLeaseTracker.dispose(e); } + final Collection activeStreamsCopy; synchronized (this) { final IntObjectMap activeStreams = this.activeStreams; - final Collection activeStreamsCopy = new ArrayList<>(activeStreams.values()); - for (FrameHandler handler : activeStreamsCopy) { - if (handler != null) { - try { - handler.handleError(e); - } catch (Throwable ignored) { - } + activeStreamsCopy = new ArrayList<>(activeStreams.values()); + } + + for (FrameHandler handler : activeStreamsCopy) { + if (handler != null) { + try { + handler.handleError(e); + } catch (Throwable ignored) { } } } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index b2f084f51..ce4fe70a3 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -185,15 +185,18 @@ final void doOnDispose() { requestHandler.dispose(); } - private synchronized void cleanUpSendingSubscriptions() { - final IntObjectMap activeStreams = this.activeStreams; - final Collection activeStreamsCopy = new ArrayList<>(activeStreams.values()); + private void cleanUpSendingSubscriptions() { + final Collection activeStreamsCopy; + synchronized (this) { + final IntObjectMap activeStreams = this.activeStreams; + activeStreamsCopy = new ArrayList<>(activeStreams.values()); + } + for (FrameHandler handler : activeStreamsCopy) { if (handler != null) { handler.handleCancel(); } } - activeStreams.clear(); } final void handleFrame(ByteBuf frame) {