Skip to content

Commit

Permalink
Fix closing connection on cancelling requester (#220)
Browse files Browse the repository at this point in the history
  • Loading branch information
olme04 authored Apr 13, 2022
1 parent ee9a99f commit ea22902
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,35 @@ internal suspend inline fun connect(
}

val requester = interceptors.wrapRequester(
RSocketRequester(requestContext + CoroutineName("rSocket-requester"), frameSender, streamsStorage, connection.pool)
RSocketRequester(
requestContext + CoroutineName("rSocket-requester"),
frameSender,
streamsStorage,
connection.pool
)
)
val requestHandler = interceptors.wrapResponder(
with(interceptors.wrapAcceptor(acceptor)) {
ConnectionAcceptorContext(connectionConfig, requester).accept()
}
)
val responder = RSocketResponder(requestContext + CoroutineName("rSocket-responder"), frameSender, requestHandler)
val responder = RSocketResponder(
requestContext + CoroutineName("rSocket-responder"),
frameSender,
requestHandler
)

// link completing of connection and requestHandler
connection.coroutineContext[Job]?.invokeOnCompletion { requestHandler.cancel("Connection closed", it) }
requestHandler.coroutineContext[Job]?.invokeOnCompletion { if (it != null) connection.cancel("Request handler failed", it) }
// link completing of requester, connection and requestHandler
requester.coroutineContext[Job]?.invokeOnCompletion {
connection.cancel("Requester cancelled", it)
}
requestHandler.coroutineContext[Job]?.invokeOnCompletion {
if (it != null) connection.cancel("Request handler failed", it)
}
connection.coroutineContext[Job]?.invokeOnCompletion {
requester.cancel("Connection closed", it)
requestHandler.cancel("Connection closed", it)
}

// start keepalive ticks
(connection + CoroutineName("rSocket-connection-keep-alive")).launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,4 +404,18 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck {
fun rcTerminatedOnConnectionClose() =
streamIsTerminatedOnConnectionClose { requester.requestChannel(Payload.Empty, emptyFlow()).collect() }

@Test
fun cancelRequesterToCloseConnection() = test {
val request = requester.requestStream(Payload.Empty).produceIn(GlobalScope)
connection.test {
awaitFrame { frame ->
assertTrue(frame is RequestFrame)
}
requester.cancel() //cancel requester
expectNoEventsIn(200)
}
assertFalse(connection.isActive)
assertTrue(request.isClosedForReceive)
}

}

0 comments on commit ea22902

Please sign in to comment.