diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala index 13a18f0d38..1b7a449326 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala @@ -79,12 +79,17 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty socketOverride ) - nettyChannelFutureToScala(channelFuture).map(ch => - ( - ch.localAddress().asInstanceOf[SA], - () => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) + nettyChannelFutureToScala(channelFuture) + .map(ch => + ( + ch.localAddress().asInstanceOf[SA], + () => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) + ) ) - ) + .recoverWith { case e => + stopRecovering(eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) + .flatMap(_ => Async[F].raiseError(e)) + } } private def waitForClosedChannels( @@ -107,20 +112,41 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty isShuttingDown: AtomicBoolean, gracefulShutdownTimeout: Option[FiniteDuration] ): F[Unit] = { + shutdownChannelGroup(channelGroup, isShuttingDown, gracefulShutdownTimeout) >> + Async[F].defer { + nettyFutureToScala(ch.close()).flatMap { _ => stopEventLoopGroup(eventLoopGroup, eventExecutor) } + } + } + + private def stopRecovering( + eventLoopGroup: EventLoopGroup, + channelGroup: ChannelGroup, + eventExecutor: DefaultEventExecutor, + isShuttingDown: AtomicBoolean, + gracefulShutdownTimeout: Option[FiniteDuration] + ): F[Unit] = { + shutdownChannelGroup(channelGroup, isShuttingDown, gracefulShutdownTimeout) >> + stopEventLoopGroup(eventLoopGroup, eventExecutor) + } + + private def shutdownChannelGroup( + channelGroup: ChannelGroup, + isShuttingDown: AtomicBoolean, + gracefulShutdownTimeout: Option[FiniteDuration] + ) = { Sync[F].delay(isShuttingDown.set(true)) >> waitForClosedChannels( channelGroup, startNanos = System.nanoTime(), gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos) - ) >> - Async[F].defer { - nettyFutureToScala(ch.close()).flatMap { _ => - if (config.shutdownEventLoopGroupOnClose) { - nettyFutureToScala(eventLoopGroup.shutdownGracefully()) - .flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ())) - } else Async[F].unit - } - } + ) + } + + private def stopEventLoopGroup(eventLoopGroup: EventLoopGroup, eventExecutor: DefaultEventExecutor) = { + if (config.shutdownEventLoopGroupOnClose) { + nettyFutureToScala(eventLoopGroup.shutdownGracefully()) + .flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ())) + } else Async[F].unit } } diff --git a/server/netty-server/loom/src/main/scala/sttp/tapir/server/netty/loom/NettyIdServer.scala b/server/netty-server/loom/src/main/scala/sttp/tapir/server/netty/loom/NettyIdServer.scala index 0caa56ceac..46918fc8fd 100644 --- a/server/netty-server/loom/src/main/scala/sttp/tapir/server/netty/loom/NettyIdServer.scala +++ b/server/netty-server/loom/src/main/scala/sttp/tapir/server/netty/loom/NettyIdServer.scala @@ -103,13 +103,22 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions, eventLoopGroup, socketOverride ) - channelIdFuture.await() - val channelId = channelIdFuture.channel() - - ( - channelId.localAddress().asInstanceOf[SA], - () => stop(channelId, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) - ) + try { + channelIdFuture.sync() + val channelId = channelIdFuture.channel() + ( + channelId.localAddress().asInstanceOf[SA], + () => stop(channelId, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) + ) + } catch { + case NonFatal(startFailureCause) => + try { + stopRecovering(eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) + } catch { + case NonFatal(recoveryFailureCause) => startFailureCause.addSuppressed(recoveryFailureCause) + } + throw startFailureCause + } } private def waitForClosedChannels( @@ -122,6 +131,7 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions, } val _ = channelGroup.close().get() } + private def stop( ch: Channel, eventLoopGroup: EventLoopGroup, @@ -142,6 +152,25 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions, val _ = eventExecutor.shutdownGracefully().get() } } + + private def stopRecovering( + eventLoopGroup: EventLoopGroup, + channelGroup: ChannelGroup, + eventExecutor: DefaultEventExecutor, + isShuttingDown: AtomicBoolean, + gracefulShutdownTimeout: Option[FiniteDuration] + ): Unit = { + isShuttingDown.set(true) + waitForClosedChannels( + channelGroup, + startNanos = System.nanoTime(), + gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos) + ) + if (config.shutdownEventLoopGroupOnClose) { + val _ = eventLoopGroup.shutdownGracefully().get() + val _ = eventExecutor.shutdownGracefully().get() + } + } } object NettyIdServer { diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala index d280b4a64f..f510e8e85b 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala @@ -76,12 +76,17 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe socketOverride ) - nettyChannelFutureToScala(channelFuture).map(ch => - ( - ch.localAddress().asInstanceOf[SA], - () => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) + nettyChannelFutureToScala(channelFuture) + .map(ch => + ( + ch.localAddress().asInstanceOf[SA], + () => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) + ) ) - ) + .recoverWith { case e => + stopRecovering(eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) + .flatMap(_ => Future.failed(e)) + } } private def waitForClosedChannels( @@ -115,14 +120,32 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe startNanos = System.nanoTime(), gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos) ).flatMap { _ => - nettyFutureToScala(ch.close()).flatMap { _ => - if (config.shutdownEventLoopGroupOnClose) { - nettyFutureToScala(eventLoopGroup.shutdownGracefully()) - .flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ())) - } else Future.successful(()) - } + nettyFutureToScala(ch.close()).flatMap { _ => stopEventLoopGroup(eventLoopGroup, eventExecutor) } } } + + private def stopRecovering( + eventLoopGroup: EventLoopGroup, + channelGroup: ChannelGroup, + eventExecutor: DefaultEventExecutor, + isShuttingDown: AtomicBoolean, + gracefulShutdownTimeout: Option[FiniteDuration] + ): Future[Unit] = { + isShuttingDown.set(true) + waitForClosedChannels( + channelGroup, + startNanos = System.nanoTime(), + gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos) + ).flatMap { _ => stopEventLoopGroup(eventLoopGroup, eventExecutor) } + } + + private def stopEventLoopGroup(eventLoopGroup: EventLoopGroup, eventExecutor: DefaultEventExecutor) = { + if (config.shutdownEventLoopGroupOnClose) { + nettyFutureToScala(eventLoopGroup.shutdownGracefully()) + .flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ())) + } else Future.successful(()) + } + } object NettyFutureServer { diff --git a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala index 10c3ffe664..8eb00c55bc 100644 --- a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala +++ b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala @@ -17,7 +17,6 @@ import java.net.{InetSocketAddress, SocketAddress} import java.nio.file.{Path, Paths} import java.util.UUID import java.util.concurrent.atomic.AtomicBoolean - import scala.concurrent.ExecutionContext.Implicits import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration @@ -98,12 +97,17 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: socketOverride ) } - binding <- nettyChannelFutureToScala(channelFuture).map(ch => - ( - ch.localAddress().asInstanceOf[SA], - () => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) + binding <- nettyChannelFutureToScala(channelFuture) + .map(ch => + ( + ch.localAddress().asInstanceOf[SA], + () => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) + ) ) - ) + .catchAll { e => + stopRecovering(eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) *> + ZIO.fail(e) + } } yield binding private def waitForClosedChannels( @@ -126,20 +130,43 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: isShuttingDown: AtomicBoolean, gracefulShutdownTimeout: Option[FiniteDuration] ): RIO[R, Unit] = { + stopChannelGroup(channelGroup, isShuttingDown, gracefulShutdownTimeout) *> + ZIO.suspend { + nettyFutureToScala(ch.close()).flatMap { _ => + stopEventLoopGroup(eventLoopGroup, eventExecutor) + } + } + } + + private def stopRecovering( + eventLoopGroup: EventLoopGroup, + channelGroup: ChannelGroup, + eventExecutor: DefaultEventExecutor, + isShuttingDown: AtomicBoolean, + gracefulShutdownTimeout: Option[FiniteDuration] + ): RIO[R, Unit] = { + stopChannelGroup(channelGroup, isShuttingDown, gracefulShutdownTimeout) *> + stopEventLoopGroup(eventLoopGroup, eventExecutor) + } + + private def stopChannelGroup( + channelGroup: ChannelGroup, + isShuttingDown: AtomicBoolean, + gracefulShutdownTimeout: Option[FiniteDuration] + ) = { ZIO.attempt(isShuttingDown.set(true)) *> waitForClosedChannels( channelGroup, startNanos = System.nanoTime(), gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos) - ) *> - ZIO.suspend { - nettyFutureToScala(ch.close()).flatMap { _ => - if (config.shutdownEventLoopGroupOnClose) { - nettyFutureToScala(eventLoopGroup.shutdownGracefully()) - .flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ())) - } else ZIO.succeed(()) - } - } + ) + } + + private def stopEventLoopGroup(eventLoopGroup: EventLoopGroup, eventExecutor: DefaultEventExecutor) = { + if (config.shutdownEventLoopGroupOnClose) { + nettyFutureToScala(eventLoopGroup.shutdownGracefully()) + .flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ())) + } else ZIO.succeed(()) } }