diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt index baf9b0a0..8a86a837 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt @@ -45,18 +45,35 @@ internal suspend inline fun connect( } val requester = interceptors.wrapRequester( - RSocketRequester(requestContext + CoroutineName("rSocket-requester"), frameSender, streamsStorage, connection.pool) + RSocketRequester( + requestContext + CoroutineName("rSocket-requester"), + frameSender, + streamsStorage, + connection.pool + ) ) val requestHandler = interceptors.wrapResponder( with(interceptors.wrapAcceptor(acceptor)) { ConnectionAcceptorContext(connectionConfig, requester).accept() } ) - val responder = RSocketResponder(requestContext + CoroutineName("rSocket-responder"), frameSender, requestHandler) + val responder = RSocketResponder( + requestContext + CoroutineName("rSocket-responder"), + frameSender, + requestHandler + ) - // link completing of connection and requestHandler - connection.coroutineContext[Job]?.invokeOnCompletion { requestHandler.cancel("Connection closed", it) } - requestHandler.coroutineContext[Job]?.invokeOnCompletion { if (it != null) connection.cancel("Request handler failed", it) } + // link completing of requester, connection and requestHandler + requester.coroutineContext[Job]?.invokeOnCompletion { + connection.cancel("Requester cancelled", it) + } + requestHandler.coroutineContext[Job]?.invokeOnCompletion { + if (it != null) connection.cancel("Request handler failed", it) + } + connection.coroutineContext[Job]?.invokeOnCompletion { + requester.cancel("Connection closed", it) + requestHandler.cancel("Connection closed", it) + } // start keepalive ticks (connection + CoroutineName("rSocket-connection-keep-alive")).launch { diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt index 5c51beae..03b1e6eb 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt @@ -404,4 +404,18 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { fun rcTerminatedOnConnectionClose() = streamIsTerminatedOnConnectionClose { requester.requestChannel(Payload.Empty, emptyFlow()).collect() } + @Test + fun cancelRequesterToCloseConnection() = test { + val request = requester.requestStream(Payload.Empty).produceIn(GlobalScope) + connection.test { + awaitFrame { frame -> + assertTrue(frame is RequestFrame) + } + requester.cancel() //cancel requester + expectNoEventsIn(200) + } + assertFalse(connection.isActive) + assertTrue(request.isClosedForReceive) + } + }