Skip to content

Commit

Permalink
Graceful Netty server shutdown in case of startup errors (#3558)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexei Talankov <[email protected]>
Co-authored-by: softwaremill-ci <[email protected]>
Co-authored-by: Krzysiek Ciesielski <[email protected]>
Co-authored-by: Flavio Brasil <[email protected]>
  • Loading branch information
5 people authored Mar 20, 2024
1 parent 3880806 commit 973c1b3
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,17 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty
socketOverride
)

nettyChannelFutureToScala(channelFuture).map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
nettyChannelFutureToScala(channelFuture)
.map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
)
)
.recoverWith { case e =>
stopRecovering(eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
.flatMap(_ => Async[F].raiseError(e))
}
}

private def waitForClosedChannels(
Expand All @@ -107,20 +112,41 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): F[Unit] = {
shutdownChannelGroup(channelGroup, isShuttingDown, gracefulShutdownTimeout) >>
Async[F].defer {
nettyFutureToScala(ch.close()).flatMap { _ => stopEventLoopGroup(eventLoopGroup, eventExecutor) }
}
}

private def stopRecovering(
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): F[Unit] = {
shutdownChannelGroup(channelGroup, isShuttingDown, gracefulShutdownTimeout) >>
stopEventLoopGroup(eventLoopGroup, eventExecutor)
}

private def shutdownChannelGroup(
channelGroup: ChannelGroup,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
) = {
Sync[F].delay(isShuttingDown.set(true)) >>
waitForClosedChannels(
channelGroup,
startNanos = System.nanoTime(),
gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos)
) >>
Async[F].defer {
nettyFutureToScala(ch.close()).flatMap { _ =>
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else Async[F].unit
}
}
)
}

private def stopEventLoopGroup(eventLoopGroup: EventLoopGroup, eventExecutor: DefaultEventExecutor) = {
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else Async[F].unit
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,22 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions,
eventLoopGroup,
socketOverride
)
channelIdFuture.await()
val channelId = channelIdFuture.channel()

(
channelId.localAddress().asInstanceOf[SA],
() => stop(channelId, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
try {
channelIdFuture.sync()
val channelId = channelIdFuture.channel()
(
channelId.localAddress().asInstanceOf[SA],
() => stop(channelId, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
} catch {
case NonFatal(startFailureCause) =>
try {
stopRecovering(eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
} catch {
case NonFatal(recoveryFailureCause) => startFailureCause.addSuppressed(recoveryFailureCause)
}
throw startFailureCause
}
}

private def waitForClosedChannels(
Expand All @@ -122,6 +131,7 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions,
}
val _ = channelGroup.close().get()
}

private def stop(
ch: Channel,
eventLoopGroup: EventLoopGroup,
Expand All @@ -142,6 +152,25 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions,
val _ = eventExecutor.shutdownGracefully().get()
}
}

private def stopRecovering(
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): Unit = {
isShuttingDown.set(true)
waitForClosedChannels(
channelGroup,
startNanos = System.nanoTime(),
gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos)
)
if (config.shutdownEventLoopGroupOnClose) {
val _ = eventLoopGroup.shutdownGracefully().get()
val _ = eventExecutor.shutdownGracefully().get()
}
}
}

object NettyIdServer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,17 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe
socketOverride
)

nettyChannelFutureToScala(channelFuture).map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
nettyChannelFutureToScala(channelFuture)
.map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
)
)
.recoverWith { case e =>
stopRecovering(eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
.flatMap(_ => Future.failed(e))
}
}

private def waitForClosedChannels(
Expand Down Expand Up @@ -115,14 +120,32 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe
startNanos = System.nanoTime(),
gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos)
).flatMap { _ =>
nettyFutureToScala(ch.close()).flatMap { _ =>
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else Future.successful(())
}
nettyFutureToScala(ch.close()).flatMap { _ => stopEventLoopGroup(eventLoopGroup, eventExecutor) }
}
}

private def stopRecovering(
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): Future[Unit] = {
isShuttingDown.set(true)
waitForClosedChannels(
channelGroup,
startNanos = System.nanoTime(),
gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos)
).flatMap { _ => stopEventLoopGroup(eventLoopGroup, eventExecutor) }
}

private def stopEventLoopGroup(eventLoopGroup: EventLoopGroup, eventExecutor: DefaultEventExecutor) = {
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else Future.successful(())
}

}

object NettyFutureServer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import java.net.{InetSocketAddress, SocketAddress}
import java.nio.file.{Path, Paths}
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean

import scala.concurrent.ExecutionContext.Implicits
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -98,12 +97,17 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options:
socketOverride
)
}
binding <- nettyChannelFutureToScala(channelFuture).map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
binding <- nettyChannelFutureToScala(channelFuture)
.map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
)
)
.catchAll { e =>
stopRecovering(eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) *>
ZIO.fail(e)
}
} yield binding

private def waitForClosedChannels(
Expand All @@ -126,20 +130,43 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options:
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): RIO[R, Unit] = {
stopChannelGroup(channelGroup, isShuttingDown, gracefulShutdownTimeout) *>
ZIO.suspend {
nettyFutureToScala(ch.close()).flatMap { _ =>
stopEventLoopGroup(eventLoopGroup, eventExecutor)
}
}
}

private def stopRecovering(
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): RIO[R, Unit] = {
stopChannelGroup(channelGroup, isShuttingDown, gracefulShutdownTimeout) *>
stopEventLoopGroup(eventLoopGroup, eventExecutor)
}

private def stopChannelGroup(
channelGroup: ChannelGroup,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
) = {
ZIO.attempt(isShuttingDown.set(true)) *>
waitForClosedChannels(
channelGroup,
startNanos = System.nanoTime(),
gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos)
) *>
ZIO.suspend {
nettyFutureToScala(ch.close()).flatMap { _ =>
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else ZIO.succeed(())
}
}
)
}

private def stopEventLoopGroup(eventLoopGroup: EventLoopGroup, eventExecutor: DefaultEventExecutor) = {
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else ZIO.succeed(())
}
}

Expand Down

0 comments on commit 973c1b3

Please sign in to comment.