Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful Netty server shutdown in case of startup errors #3558

Merged
merged 23 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
035cda1
Graceful Netty shutdown in case of startup errors
atalankov Mar 1, 2024
04c96a7
Graceful Netty shutdown in case of startup errors
goawash Mar 1, 2024
be8da97
Merge branch 'netty-binding-error-shutdown' of https://github.com/goa…
goawash Mar 4, 2024
135c76c
Merge branch 'master' into netty-binding-error-shutdown
goawash Mar 4, 2024
460cad2
Update scala-library, scala-reflect to 2.13.13 (#3537)
softwaremill-ci Feb 28, 2024
17577f6
Update logback-classic to 1.5.1 (#3541)
softwaremill-ci Feb 29, 2024
18775ca
Update aws-lambda-java-runtime-interface-client to 2.4.2 (#3542)
softwaremill-ci Feb 29, 2024
d193826
Make some Endpoint method lazy vals (#3551)
kciesielski Feb 29, 2024
32fa9a4
Optimize Http4sServerRequest.uri (#3543)
kciesielski Feb 29, 2024
53d9844
avoid allocation in Validator.All.contramap (#3555)
fwbrasil Feb 29, 2024
68700a7
Update scala3-library, ... to 3.3.3 (#3556)
softwaremill-ci Mar 1, 2024
470f462
Update sbt-projectmatrix to 0.10.0 (#3559)
softwaremill-ci Mar 2, 2024
4229466
Update play, play-netty-server, ... to 3.0.2 (#3562)
softwaremill-ci Mar 2, 2024
4e27233
Update jsoniter-scala-core, ... to 2.28.3 (#3560)
softwaremill-ci Mar 2, 2024
01611c5
Update play, play-akka-http-server, ... to 2.9.2 (#3561)
softwaremill-ci Mar 2, 2024
8c295e6
Update logback-classic to 1.5.2 (#3563)
softwaremill-ci Mar 3, 2024
e8b6c87
Update iron to 2.5.0 (#3564)
softwaremill-ci Mar 4, 2024
b9eaa2b
Merge branch 'netty-binding-error-shutdown' of https://github.com/goa…
goawash Mar 4, 2024
c854adc
added for other netty-based servers
goawash Mar 4, 2024
476f5cd
Formatting
goawash Mar 13, 2024
c75be38
Hook the recovery logic
kciesielski Mar 20, 2024
f29622e
Merge branch 'master' into netty-binding-error-shutdown
kciesielski Mar 20, 2024
dfa4f02
Use sync instead of await to actually get an exception on failure
kciesielski Mar 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,17 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty
socketOverride
)

nettyChannelFutureToScala(channelFuture).map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
nettyChannelFutureToScala(channelFuture)
.map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
)
)
.recoverWith { case e =>
stopRecovering(eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
.flatMap(_ => Async[F].raiseError(e))
}
}

private def waitForClosedChannels(
Expand All @@ -107,20 +112,41 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): F[Unit] = {
shutdownChannelGroup(channelGroup, isShuttingDown, gracefulShutdownTimeout) >>
Async[F].defer {
nettyFutureToScala(ch.close()).flatMap { _ => stopEventLoopGroup(eventLoopGroup, eventExecutor) }
}
}

private def stopRecovering(
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): F[Unit] = {
shutdownChannelGroup(channelGroup, isShuttingDown, gracefulShutdownTimeout) >>
stopEventLoopGroup(eventLoopGroup, eventExecutor)
}

private def shutdownChannelGroup(
channelGroup: ChannelGroup,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
) = {
Sync[F].delay(isShuttingDown.set(true)) >>
waitForClosedChannels(
channelGroup,
startNanos = System.nanoTime(),
gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos)
) >>
Async[F].defer {
nettyFutureToScala(ch.close()).flatMap { _ =>
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else Async[F].unit
}
}
)
}

private def stopEventLoopGroup(eventLoopGroup: EventLoopGroup, eventExecutor: DefaultEventExecutor) = {
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else Async[F].unit
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,22 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions,
eventLoopGroup,
socketOverride
)
channelIdFuture.await()
val channelId = channelIdFuture.channel()

(
channelId.localAddress().asInstanceOf[SA],
() => stop(channelId, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
try {
channelIdFuture.sync()
val channelId = channelIdFuture.channel()
(
channelId.localAddress().asInstanceOf[SA],
() => stop(channelId, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
} catch {
case NonFatal(startFailureCause) =>
try {
stopRecovering(eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
} catch {
case NonFatal(recoveryFailureCause) => startFailureCause.addSuppressed(recoveryFailureCause)
}
throw startFailureCause
}
}

private def waitForClosedChannels(
Expand All @@ -122,6 +131,7 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions,
}
val _ = channelGroup.close().get()
}

private def stop(
ch: Channel,
eventLoopGroup: EventLoopGroup,
Expand All @@ -142,6 +152,25 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions,
val _ = eventExecutor.shutdownGracefully().get()
}
}

private def stopRecovering(
kciesielski marked this conversation as resolved.
Show resolved Hide resolved
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): Unit = {
isShuttingDown.set(true)
waitForClosedChannels(
channelGroup,
startNanos = System.nanoTime(),
gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos)
)
if (config.shutdownEventLoopGroupOnClose) {
val _ = eventLoopGroup.shutdownGracefully().get()
val _ = eventExecutor.shutdownGracefully().get()
}
}
}

object NettyIdServer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,17 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe
socketOverride
)

nettyChannelFutureToScala(channelFuture).map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
nettyChannelFutureToScala(channelFuture)
.map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
)
)
.recoverWith { case e =>
stopRecovering(eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
.flatMap(_ => Future.failed(e))
}
}

private def waitForClosedChannels(
Expand Down Expand Up @@ -115,14 +120,32 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe
startNanos = System.nanoTime(),
gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos)
).flatMap { _ =>
nettyFutureToScala(ch.close()).flatMap { _ =>
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else Future.successful(())
}
nettyFutureToScala(ch.close()).flatMap { _ => stopEventLoopGroup(eventLoopGroup, eventExecutor) }
}
}

private def stopRecovering(
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): Future[Unit] = {
isShuttingDown.set(true)
waitForClosedChannels(
channelGroup,
startNanos = System.nanoTime(),
gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos)
).flatMap { _ => stopEventLoopGroup(eventLoopGroup, eventExecutor) }
}

private def stopEventLoopGroup(eventLoopGroup: EventLoopGroup, eventExecutor: DefaultEventExecutor) = {
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else Future.successful(())
}

}

object NettyFutureServer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import java.net.{InetSocketAddress, SocketAddress}
import java.nio.file.{Path, Paths}
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean

import scala.concurrent.ExecutionContext.Implicits
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -98,12 +97,17 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options:
socketOverride
)
}
binding <- nettyChannelFutureToScala(channelFuture).map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
binding <- nettyChannelFutureToScala(channelFuture)
.map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
)
)
.catchAll { e =>
stopRecovering(eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) *>
ZIO.fail(e)
}
} yield binding

private def waitForClosedChannels(
Expand All @@ -126,20 +130,43 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options:
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): RIO[R, Unit] = {
stopChannelGroup(channelGroup, isShuttingDown, gracefulShutdownTimeout) *>
ZIO.suspend {
nettyFutureToScala(ch.close()).flatMap { _ =>
stopEventLoopGroup(eventLoopGroup, eventExecutor)
}
}
}

private def stopRecovering(
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): RIO[R, Unit] = {
stopChannelGroup(channelGroup, isShuttingDown, gracefulShutdownTimeout) *>
stopEventLoopGroup(eventLoopGroup, eventExecutor)
}

private def stopChannelGroup(
channelGroup: ChannelGroup,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
) = {
ZIO.attempt(isShuttingDown.set(true)) *>
waitForClosedChannels(
channelGroup,
startNanos = System.nanoTime(),
gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos)
) *>
ZIO.suspend {
nettyFutureToScala(ch.close()).flatMap { _ =>
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else ZIO.succeed(())
}
}
)
}

private def stopEventLoopGroup(eventLoopGroup: EventLoopGroup, eventExecutor: DefaultEventExecutor) = {
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else ZIO.succeed(())
}
}

Expand Down
Loading