Skip to content

Commit

Permalink
Fix Possible Race Condition
Browse files Browse the repository at this point in the history
fixes zio#3268

Refactor 'refreshIdleTimeoutHandler' so that 'ChannelPipeline.replace' does not crash the fiber on unhandled exceptions.
  • Loading branch information
scottweaver committed Jan 9, 2025
1 parent 6d9dce1 commit 2036c22
Showing 1 changed file with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,23 +142,31 @@ private[netty] object NettyConnectionPool {
}

/**
* Refreshes the idle timeout handler on the channel pipeline.
* Attempts to refresh the idle timeout handler on the channel pipeline.
* @return
* true if the handler was successfully refreshed prior to the channel being
* closed
* closed and the channel is still open.
*
* true if the `timeout` is None and the channel is still open.
*
* false of the channel is closed.
*/
private def refreshIdleTimeoutHandler(
channel: JChannel,
timeout: Duration,
): Boolean = {
channel
.pipeline()
.replace(
Names.ReadTimeoutHandler,
Names.ReadTimeoutHandler,
new ReadTimeoutHandler(timeout.toMillis, TimeUnit.MILLISECONDS),
)
channel.isOpen
timeout: Option[Duration],
)(implicit trace: Trace): Task[Boolean] = {
ZIO.attempt {
timeout.map { timeout =>
channel
.pipeline()
.replace(
Names.ReadTimeoutHandler,
Names.ReadTimeoutHandler,
new ReadTimeoutHandler(timeout.toMillis, TimeUnit.MILLISECONDS),
)
}
}.when(channel.isOpen)
.as(channel.isOpen)
}

private final class ReadTimeoutErrorHandler(nettyRuntime: NettyRuntime)(implicit trace: Trace)
Expand Down Expand Up @@ -256,10 +264,12 @@ private[netty] object NettyConnectionPool {
// We retry a few times hoping to obtain an open channel
// NOTE: We need to release the channel before retrying, so that it can be closed and removed from the pool
// We do that in a forked fiber so that we don't "block" the current fiber while the new resource is obtained
if (channel.isOpen && idleTimeout.fold(true)(refreshIdleTimeoutHandler(channel, _)))
ZIO.succeed(channel)
else
invalidate(channel) *> release.forkDaemon *> ZIO.fail(None)

refreshIdleTimeoutHandler(channel, idleTimeout).flatMap {
case true => ZIO.succeed(channel)
case false => invalidate(channel) *> release.forkDaemon *> ZIO.fail(None)
}

}
.retry(retrySchedule(key))
.catchAll {
Expand Down

0 comments on commit 2036c22

Please sign in to comment.