Skip to content

Commit

Permalink
[bugfix] Close the DefaultEventExecutor on shutdown (#3460)
Browse files Browse the repository at this point in the history
  • Loading branch information
kciesielski authored Jan 17, 2024
1 parent 07e41ed commit bf40d12
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty
val eventLoopGroup = config.eventLoopConfig.initEventLoopGroup()
implicit val monadError: MonadError[F] = new CatsMonadError[F]()
val route: Route[F] = Route.combine(routes)
val channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe
val eventExecutor = new DefaultEventExecutor()
val channelGroup = new DefaultChannelGroup(eventExecutor) // thread safe
val isShuttingDown: AtomicBoolean = new AtomicBoolean(false)

val channelFuture =
Expand All @@ -81,7 +82,7 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty
nettyChannelFutureToScala(channelFuture).map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout)
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
)
}
Expand All @@ -102,6 +103,7 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty
ch: Channel,
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): F[Unit] = {
Expand All @@ -114,7 +116,8 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty
Async[F].defer {
nettyFutureToScala(ch.close()).flatMap { _ =>
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ())
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else Async[F].unit
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions,
}
)
}
val channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe
val eventExecutor = new DefaultEventExecutor()
val channelGroup = new DefaultChannelGroup(eventExecutor) // thread safe
val isShuttingDown: AtomicBoolean = new AtomicBoolean(false)

val channelIdFuture = NettyBootstrap(
Expand All @@ -106,7 +107,7 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions,

(
channelId.localAddress().asInstanceOf[SA],
() => stop(channelId, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout)
() => stop(channelId, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
}

Expand All @@ -124,6 +125,7 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions,
ch: Channel,
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): Unit = {
Expand All @@ -136,6 +138,7 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions,
ch.close().get()
if (config.shutdownEventLoopGroupOnClose) {
val _ = eventLoopGroup.shutdownGracefully().get()
val _ = eventExecutor.shutdownGracefully().get()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe
val eventLoopGroup = config.eventLoopConfig.initEventLoopGroup()
implicit val monadError: MonadError[Future] = new FutureMonad()
val route = Route.combine(routes)
val channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe
val eventExecutor = new DefaultEventExecutor()
val channelGroup = new DefaultChannelGroup(eventExecutor) // thread safe
val isShuttingDown: AtomicBoolean = new AtomicBoolean(false)

val channelFuture =
Expand All @@ -76,7 +77,10 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe
)

nettyChannelFutureToScala(channelFuture).map(ch =>
(ch.localAddress().asInstanceOf[SA], () => stop(ch, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout))
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
)
}

Expand All @@ -101,6 +105,7 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe
ch: Channel,
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): Future[Unit] = {
Expand All @@ -112,7 +117,8 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe
).flatMap { _ =>
nettyFutureToScala(ch.close()).flatMap { _ =>
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ())
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else Future.successful(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options:
runtime <- ZIO.runtime[R]
routes <- ZIO.foreach(routes)(identity)
eventLoopGroup = config.eventLoopConfig.initEventLoopGroup()
channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe
eventExecutor = new DefaultEventExecutor()
channelGroup = new DefaultChannelGroup(eventExecutor) // thread safe
isShuttingDown = new AtomicBoolean(false)
channelFuture = {
implicit val monadError: RIOMonadError[R] = new RIOMonadError[R]
Expand All @@ -99,7 +100,7 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options:
binding <- nettyChannelFutureToScala(channelFuture).map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout)
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
)
} yield binding
Expand All @@ -120,6 +121,7 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options:
ch: Channel,
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): RIO[R, Unit] = {
Expand All @@ -132,7 +134,8 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options:
ZIO.suspend {
nettyFutureToScala(ch.close()).flatMap { _ =>
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ())
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else ZIO.succeed(())
}
}
Expand Down

0 comments on commit bf40d12

Please sign in to comment.