From 83a2e805ed80e6328b4dea4c935befed373b894c Mon Sep 17 00:00:00 2001 From: kciesielski Date: Tue, 31 Oct 2023 20:45:37 +0100 Subject: [PATCH 01/16] wip --- .../examples/HelloWorldNettyCatsServer.scala | 2 + .../sttp/tapir/server/netty/cats/Main.scala | 43 +++++++++++++++++++ server/tests/src/main/resources/logback.xml | 14 ++++++ 3 files changed, 59 insertions(+) create mode 100644 server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/Main.scala create mode 100644 server/tests/src/main/resources/logback.xml diff --git a/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyCatsServer.scala b/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyCatsServer.scala index a08185bc60..3080346edd 100644 --- a/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyCatsServer.scala +++ b/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyCatsServer.scala @@ -1,6 +1,7 @@ package sttp.tapir.examples import cats.effect.IO +import cats.effect.syntax.all._ import sttp.client3.{HttpURLConnectionBackend, Identity, SttpBackend, UriContext, asStringAlways, basicRequest} import sttp.model.StatusCode import sttp.tapir.{PublicEndpoint, endpoint, query, stringBody} @@ -52,5 +53,6 @@ object HelloWorldNettyCatsServer extends App { assert(host == declaredHost, "Hosts don't match") } } + .guarantee() .unsafeRunSync() } diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/Main.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/Main.scala new file mode 100644 index 0000000000..24b5249532 --- /dev/null +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/Main.scala @@ -0,0 +1,43 @@ +package sttp.tapir.server.netty.cats + +import cats.effect.IO +import sttp.tapir._ +import cats.effect.syntax.all._ +import cats.effect.unsafe.implicits.global +import sttp.tapir.server.netty.cats.{NettyCatsServer, NettyCatsServerBinding} +import cats.effect.IOApp +import scala.concurrent.duration._ + +object HelloWorldNettyCatsServer extends IOApp.Simple { + // One endpoint on GET /hello with query parameter `name` + val helloWorldEndpoint: PublicEndpoint[String, Unit, String, Any] = + endpoint.get.in("hello").in(query[String]("name")).out(stringBody) + + // Just returning passed name with `Hello, ` prepended + val helloWorldServerEndpoint = helloWorldEndpoint + .serverLogic(name => IO.pure[Either[Unit, String]](Right(s"Hello, $name!"))) + + private val declaredPort = 9090 + private val declaredHost = "localhost" + + override def run = + for { + runningServer <- NettyCatsServer + .io() + .use { server => + server + .port(declaredPort) + .host(declaredHost) + .addEndpoint(helloWorldServerEndpoint) + .start() + .bracket(binding => IO.println(">>>>>>>>>>>>>>>>>>> The app is now running... ") >> IO.never)(binding => + IO.println(">>>>>>>> Stopping binding ") >> binding.stop() + ) + .start + } + _ <- IO.sleep(3.seconds) + _ <- runningServer.cancel + _ <- IO.println("Server fiber cancellation requested") + } yield () + +} diff --git a/server/tests/src/main/resources/logback.xml b/server/tests/src/main/resources/logback.xml new file mode 100644 index 0000000000..ea07d0dfe3 --- /dev/null +++ b/server/tests/src/main/resources/logback.xml @@ -0,0 +1,14 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + From 3b01dcf42ba3d8a98c17260555949228d7d9834b Mon Sep 17 00:00:00 2001 From: kciesielski Date: Thu, 2 Nov 2023 17:54:43 +0100 Subject: [PATCH 02/16] wip --- build.sbt | 6 +- .../sttp/tapir/server/netty/cats/Main.scala | 34 +++++++++- .../server/netty/cats/NettyCatsServer.scala | 52 +++++++++++++--- .../scala/sttp/tapir/server/netty/Main.scala | 62 +++++++++++++++++++ .../sttp/tapir/server/netty/NettyConfig.scala | 10 ++- .../server/netty/NettyFutureServer.scala | 56 ++++++++++++++--- .../server/netty/internal/FutureUtil.scala | 6 +- .../netty/internal/NettyServerHandler.scala | 24 ++++--- 8 files changed, 214 insertions(+), 36 deletions(-) create mode 100644 server/netty-server/src/main/scala/sttp/tapir/server/netty/Main.scala diff --git a/build.sbt b/build.sbt index 7bcecb9a2b..059fc938b1 100644 --- a/build.sbt +++ b/build.sbt @@ -1434,7 +1434,8 @@ lazy val nettyServer: ProjectMatrix = (projectMatrix in file("server/netty-serve name := "tapir-netty-server", libraryDependencies ++= Seq( "io.netty" % "netty-all" % Versions.nettyAll, - "org.playframework.netty" % "netty-reactive-streams-http" % Versions.nettyReactiveStreams + "org.playframework.netty" % "netty-reactive-streams-http" % Versions.nettyReactiveStreams, + "com.softwaremill.sttp.client3" %%% "core" % Versions.sttp ) ++ loggerDependencies, // needed because of https://github.com/coursier/coursier/issues/2016 @@ -1447,7 +1448,8 @@ lazy val nettyServerCats: ProjectMatrix = nettyServerProject("cats", catsEffect) .settings( libraryDependencies ++= Seq( "com.softwaremill.sttp.shared" %% "fs2" % Versions.sttpShared, - "co.fs2" %% "fs2-reactive-streams" % Versions.fs2 + "co.fs2" %% "fs2-reactive-streams" % Versions.fs2, + "com.softwaremill.sttp.client4" %% "cats" % "4.0.0-M6" ) ) diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/Main.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/Main.scala index 24b5249532..d4e8afbb8b 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/Main.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/Main.scala @@ -5,8 +5,10 @@ import sttp.tapir._ import cats.effect.syntax.all._ import cats.effect.unsafe.implicits.global import sttp.tapir.server.netty.cats.{NettyCatsServer, NettyCatsServerBinding} +import sttp.client4.httpclient.cats.HttpClientCatsBackend import cats.effect.IOApp import scala.concurrent.duration._ +import sttp.client4._ object HelloWorldNettyCatsServer extends IOApp.Simple { // One endpoint on GET /hello with query parameter `name` @@ -15,7 +17,9 @@ object HelloWorldNettyCatsServer extends IOApp.Simple { // Just returning passed name with `Hello, ` prepended val helloWorldServerEndpoint = helloWorldEndpoint - .serverLogic(name => IO.pure[Either[Unit, String]](Right(s"Hello, $name!"))) + .serverLogic(name => + IO.println(s"=== Received request: $name") >> IO.sleep(6.seconds) >> IO.pure[Either[Unit, String]](Right(s"Hello, $name!")) + ) private val declaredPort = 9090 private val declaredHost = "localhost" @@ -33,11 +37,35 @@ object HelloWorldNettyCatsServer extends IOApp.Simple { .bracket(binding => IO.println(">>>>>>>>>>>>>>>>>>> The app is now running... ") >> IO.never)(binding => IO.println(">>>>>>>> Stopping binding ") >> binding.stop() ) - .start } + .start _ <- IO.sleep(3.seconds) - _ <- runningServer.cancel + _ <- HttpClientCatsBackend + .resource[IO]() + .use { sttpBackend => + for { + response <- basicRequest + .get(uri"http://localhost:9090/hello?name=Bob") + .send(sttpBackend) + _ <- IO.println(response) + } yield () + } + .start // send http request in background + _ <- IO.sleep(2.seconds) + cancelation <- runningServer.cancel.start + _ <- HttpClientCatsBackend + .resource[IO]() + .use { sttpBackend => + for { + response <- basicRequest + .get(uri"http://localhost:9090/hello?name=Cynthia") + .send(sttpBackend) + _ <- IO.println(response) + } yield () + } + .start // send http request in background _ <- IO.println("Server fiber cancellation requested") + _ <- cancelation.join } yield () } diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala index 7e7b3c8fea..56a592afa8 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala @@ -1,10 +1,13 @@ package sttp.tapir.server.netty.cats +import cats.effect.kernel.Sync import cats.effect.std.Dispatcher -import cats.effect.{Async, IO, Resource} +import cats.effect.{Async, IO, Resource, Temporal} import cats.syntax.all._ import io.netty.channel._ +import io.netty.channel.group.{ChannelGroup, DefaultChannelGroup} import io.netty.channel.unix.DomainSocketAddress +import io.netty.util.concurrent.DefaultEventExecutor import sttp.capabilities.fs2.Fs2Streams import sttp.monad.MonadError import sttp.tapir.integ.cats.effect.CatsMonadError @@ -17,7 +20,9 @@ import sttp.tapir.server.netty.{NettyConfig, NettyResponse, Route} import java.net.{InetSocketAddress, SocketAddress} import java.nio.file.{Path, Paths} import java.util.UUID +import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.Future +import scala.concurrent.duration._ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: NettyCatsServerOptions[F], config: NettyConfig) { def addEndpoint(se: ServerEndpoint[Fs2Streams[F], F]): NettyCatsServer[F] = addEndpoints(List(se)) @@ -62,26 +67,53 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty val eventLoopGroup = config.eventLoopConfig.initEventLoopGroup() implicit val monadError: MonadError[F] = new CatsMonadError[F]() val route: Route[F] = Route.combine(routes) + val channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe + val isShuttingDown: AtomicBoolean = new AtomicBoolean(false) val channelFuture = NettyBootstrap( config, - new NettyServerHandler(route, unsafeRunAsync, config.maxContentLength), + new NettyServerHandler(route, unsafeRunAsync, config.maxContentLength, channelGroup, isShuttingDown), eventLoopGroup, socketOverride ) - nettyChannelFutureToScala(channelFuture).map(ch => (ch.localAddress().asInstanceOf[SA], () => stop(ch, eventLoopGroup))) + nettyChannelFutureToScala(channelFuture).map(ch => + (ch.localAddress().asInstanceOf[SA], () => stop(ch, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout)) + ) } - private def stop(ch: Channel, eventLoopGroup: EventLoopGroup): F[Unit] = { - Async[F].defer { - nettyFutureToScala(ch.close()).flatMap { _ => - if (config.shutdownEventLoopGroupOnClose) { - nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ()) - } else Async[F].unit - } + private def waitForClosedChannels( + channelGroup: ChannelGroup, + startNanos: Long, + gracefulShutdownTimeoutNanos: Option[Long] + ): F[Unit] = + if (!channelGroup.isEmpty && gracefulShutdownTimeoutNanos.exists(_ >= System.nanoTime() - startNanos)) { + Temporal[F].sleep(100.millis) >> + waitForClosedChannels(channelGroup, startNanos, gracefulShutdownTimeoutNanos) + } else { + Sync[F].pure(()) } + + private def stop( + ch: Channel, + eventLoopGroup: EventLoopGroup, + channelGroup: ChannelGroup, + isShuttingDown: AtomicBoolean, + gracefulShutdownTimeout: Option[FiniteDuration] + ): F[Unit] = { + Async[F].delay(isShuttingDown.set(true)) >> Sync[F].delay(println(">>>> waiting for closed channels")) >> waitForClosedChannels( + channelGroup, + startNanos = System.nanoTime(), + gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toMillis * 1000000) + ) >> Sync[F].delay(println(">>>>>>>>> waiting complete, proceeding with shutdown")) >> + Async[F].defer { + nettyFutureToScala(ch.close()).flatMap { _ => + if (config.shutdownEventLoopGroupOnClose) { + nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ()) + } else Async[F].unit + } + } } } diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/Main.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/Main.scala new file mode 100644 index 0000000000..e02c36c137 --- /dev/null +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/Main.scala @@ -0,0 +1,62 @@ +package sttp.tapir.server.netty + +import sttp.client3._ +import sttp.model.Uri +import sttp.tapir._ +import sttp.tapir.server.netty.{NettyFutureServer, NettyFutureServerOptions} + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future, blocking} +import scala.util.{Failure, Success} + +object Main { + + private val helloWorldEndpoint = endpoint.get + .in("hello") + .out(stringBody) + .serverLogicSuccess(_ => + Future { + println("[Server] received the request.") + blocking { + Thread.sleep(3000) + println("[Server] successfully processed the request.") + } + s"Hello World!" + } + ) + + def main(args: Array[String]): Unit = { + implicit val backend: SttpBackend[Future, Any] = HttpClientFutureBackend() + + // shutdown hook to print if a signal is received + Runtime.getRuntime.addShutdownHook(new Thread((() => { + println("[Application] shutdown signal received.") + }): Runnable)) + + // start server + val serverOptions = NettyFutureServerOptions.default + val bindingF = NettyFutureServer().port(8080).options(serverOptions).addEndpoint(helloWorldEndpoint).start() + val binding = Await.result(bindingF, 10.seconds) + println("[Server] started.") + + // call my endpoint and then kill me + println(s"[Client] Sending request.") + emptyRequest + .get(Uri.parse("http://localhost:8080/hello").getOrElse(???)) + .send(backend) + .onComplete { + case Success(r) => println(s"[Client] Response received: $r") + case Failure(exception) => exception.printStackTrace() + } + // wait until the service receives the request + Thread.sleep(1000L) + + // kill myself + println(s"[Client] Stopping server.") + Await.result(binding.stop(), 60.seconds) + + println(s"[Client] Stopped server.") + } + +} diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyConfig.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyConfig.scala index ee37e6c7c8..5bab2d1549 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyConfig.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyConfig.scala @@ -47,6 +47,10 @@ import scala.concurrent.duration._ * @param lingerTimeout * Sets the delay for which the Netty waits, while data is being transmitted, before closing a socket after receiving a call to close the * socket + * + * @param gracefulShutdownTimeout + * If set, attempts to wait for a given time for all in-flight requests to complete, before proceeding with shutting down the server. If + * `None`, closes the channels and terminates the server without waiting. */ case class NettyConfig( host: String, @@ -64,7 +68,8 @@ case class NettyConfig( sslContext: Option[SslContext], eventLoopConfig: EventLoopConfig, socketConfig: NettySocketConfig, - initPipeline: NettyConfig => (ChannelPipeline, ChannelHandler) => Unit + initPipeline: NettyConfig => (ChannelPipeline, ChannelHandler) => Unit, + gracefulShutdownTimeout: Option[FiniteDuration] ) { def host(h: String): NettyConfig = copy(host = h) @@ -102,6 +107,8 @@ case class NettyConfig( def eventLoopGroup(elg: EventLoopGroup): NettyConfig = copy(eventLoopConfig = EventLoopConfig.useExisting(elg)) def initPipeline(f: NettyConfig => (ChannelPipeline, ChannelHandler) => Unit): NettyConfig = copy(initPipeline = f) + + def withGracefulShutdownTimeout(t: FiniteDuration) = copy(gracefulShutdownTimeout = Some(t)) } object NettyConfig { @@ -115,6 +122,7 @@ object NettyConfig { connectionTimeout = Some(10.seconds), socketTimeout = Some(60.seconds), lingerTimeout = Some(60.seconds), + gracefulShutdownTimeout = Some(10.seconds), maxContentLength = None, maxConnections = None, addLoggingHandler = false, diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala index d92c704b02..a5cbb64e91 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala @@ -1,17 +1,21 @@ package sttp.tapir.server.netty import io.netty.channel._ +import io.netty.channel.group.{ChannelGroup, DefaultChannelGroup} import io.netty.channel.unix.DomainSocketAddress +import io.netty.util.concurrent.DefaultEventExecutor import sttp.monad.{FutureMonad, MonadError} import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.model.ServerResponse import sttp.tapir.server.netty.internal.FutureUtil._ import sttp.tapir.server.netty.internal.{NettyBootstrap, NettyServerHandler} import java.net.{InetSocketAddress, SocketAddress} import java.nio.file.{Path, Paths} import java.util.UUID -import scala.concurrent.{ExecutionContext, Future} -import sttp.tapir.server.model.ServerResponse +import java.util.concurrent.atomic.AtomicBoolean +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future, blocking} case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureServerOptions, config: NettyConfig)(implicit ec: ExecutionContext @@ -60,23 +64,57 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe val eventLoopGroup = config.eventLoopConfig.initEventLoopGroup() implicit val monadError: MonadError[Future] = new FutureMonad() val route = Route.combine(routes) + val channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe + val isShuttingDown: AtomicBoolean = new AtomicBoolean(false) val channelFuture = NettyBootstrap( config, - new NettyServerHandler(route, unsafeRunAsync, config.maxContentLength), + new NettyServerHandler(route, unsafeRunAsync, config.maxContentLength, channelGroup, isShuttingDown), eventLoopGroup, socketOverride ) - nettyChannelFutureToScala(channelFuture).map(ch => (ch.localAddress().asInstanceOf[SA], () => stop(ch, eventLoopGroup))) + nettyChannelFutureToScala(channelFuture).map(ch => + (ch.localAddress().asInstanceOf[SA], () => stop(ch, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout)) + ) } - private def stop(ch: Channel, eventLoopGroup: EventLoopGroup): Future[Unit] = { - nettyFutureToScala(ch.close()).flatMap { _ => - if (config.shutdownEventLoopGroupOnClose) { - nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ()) - } else Future.successful(()) + private def waitForClosedChannels( + channelGroup: ChannelGroup, + startNanos: Long, + gracefulShutdownTimeoutNanos: Option[Long] + ): Future[Unit] = + if (!channelGroup.isEmpty && gracefulShutdownTimeoutNanos.exists(_ >= System.nanoTime() - startNanos)) { + Future { + blocking { + Thread.sleep(100) + } + }.flatMap(_ => { + waitForClosedChannels(channelGroup, startNanos, gracefulShutdownTimeoutNanos) + }) + } else { + Future.successful(()) + } + + private def stop( + ch: Channel, + eventLoopGroup: EventLoopGroup, + channelGroup: ChannelGroup, + isShuttingDown: AtomicBoolean, + gracefulShutdownTimeout: Option[FiniteDuration] + ): Future[Unit] = { + isShuttingDown.set(true) + waitForClosedChannels( + channelGroup, + startNanos = System.nanoTime(), + gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toMillis * 1000000) + ).flatMap { _ => + nettyFutureToScala(ch.close()).flatMap { _ => + if (config.shutdownEventLoopGroupOnClose) { + nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ()) + } else Future.successful(()) + } } } } diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/FutureUtil.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/FutureUtil.scala index 031347fef6..a2d481446b 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/FutureUtil.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/FutureUtil.scala @@ -10,8 +10,10 @@ object FutureUtil { val p = Promise[Channel]() nettyFuture.addListener((future: ChannelFuture) => p.complete( - if (future.isSuccess) Success(future.channel()) - else if (future.isCancelled) Failure(new CancellationException) + if (future.isSuccess) { + println(">>>>>>>>>>>>>>>>>>>>>>>>>>>> A channel is born") + Success(future.channel()) + } else if (future.isCancelled) Failure(new CancellationException) else Failure(future.cause()) ) ) diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala index d5bf90f4c7..9820ab1559 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala @@ -26,6 +26,8 @@ import scala.util.Failure import scala.util.Success import scala.util.control.NonFatal import scala.concurrent.ExecutionContext +import io.netty.channel.group.ChannelGroup +import java.util.concurrent.atomic.AtomicBoolean /** @param unsafeRunAsync * Function which dispatches given effect to run asynchronously, returning its result as a Future, and function of type `() => @@ -35,7 +37,9 @@ import scala.concurrent.ExecutionContext class NettyServerHandler[F[_]]( route: Route[F], unsafeRunAsync: (() => F[ServerResponse[NettyResponse]]) => (Future[ServerResponse[NettyResponse]], () => Future[Unit]), - maxContentLength: Option[Int] + maxContentLength: Option[Int], + channelGroup: ChannelGroup, + isShuttingDown: AtomicBoolean )(implicit me: MonadError[F] ) extends SimpleChannelInboundHandler[HttpRequest] { @@ -79,7 +83,10 @@ class NettyServerHandler[F[_]]( if (ctx.channel.isActive) { initHandler(ctx) } - override def channelActive(ctx: ChannelHandlerContext): Unit = initHandler(ctx) + override def channelActive(ctx: ChannelHandlerContext): Unit = { + channelGroup.add(ctx.channel) + initHandler(ctx) + } private[this] def initHandler(ctx: ChannelHandlerContext): Unit = { if (eventLoopContext == null) { @@ -97,6 +104,7 @@ class NettyServerHandler[F[_]]( override def channelRead0(ctx: ChannelHandlerContext, request: HttpRequest): Unit = { + println(s">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> running new request, isShuttingDown = ${isShuttingDown.get()}") def writeError500(req: HttpRequest, reason: Throwable): Unit = { logger.error("Error while processing the request", reason) // send 500 @@ -105,7 +113,6 @@ class NettyServerHandler[F[_]]( res.handleCloseAndKeepAliveHeaders(req) ctx.writeAndFlush(res).closeIfNeeded(req) - } def runRoute(req: HttpRequest, releaseReq: () => Any = () => ()): Unit = { @@ -122,11 +129,11 @@ class NettyServerHandler[F[_]]( case Success(serverResponse) => pendingResponses.dequeue() try { - handleResponse(ctx, req, serverResponse) + handleResponse(ctx, req, serverResponse) Success(()) } catch { case NonFatal(ex) => - writeError500(req, ex) + writeError500(req, ex) Failure(ex) } finally { val _ = releaseReq() @@ -135,8 +142,7 @@ class NettyServerHandler[F[_]]( try { writeError500(req, ex) Failure(ex) - } - finally { + } finally { val _ = releaseReq() } case Failure(fatalException) => Failure(fatalException) @@ -291,7 +297,7 @@ class NettyServerHandler[F[_]]( } def handleCloseAndKeepAliveHeaders(request: HttpRequest): Unit = { - if (!HttpUtil.isKeepAlive(request)) + if (!HttpUtil.isKeepAlive(request) || isShuttingDown.get()) m.headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) else if (request.protocolVersion.equals(HttpVersion.HTTP_1_0)) m.headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) @@ -300,7 +306,7 @@ class NettyServerHandler[F[_]]( private implicit class RichChannelFuture(val cf: ChannelFuture) { def closeIfNeeded(request: HttpRequest): Unit = { - if (!HttpUtil.isKeepAlive(request)) { + if (!HttpUtil.isKeepAlive(request) || isShuttingDown.get()) { cf.addListener(ChannelFutureListener.CLOSE) } } From 7bc814fc7e1d8b2ce5496b2a36c7987d67cb05c2 Mon Sep 17 00:00:00 2001 From: kciesielski Date: Fri, 3 Nov 2023 08:56:51 +0100 Subject: [PATCH 03/16] Implement graceful shutdown for ZIO --- .../server/netty/zio/NettyZioServer.scala | 53 +++++++++++++++---- 1 file changed, 43 insertions(+), 10 deletions(-) diff --git a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala index c642949bc6..7be5adc7d3 100644 --- a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala +++ b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala @@ -1,7 +1,9 @@ package sttp.tapir.server.netty.zio import io.netty.channel._ +import io.netty.channel.group.{ChannelGroup, DefaultChannelGroup} import io.netty.channel.unix.DomainSocketAddress +import io.netty.util.concurrent.DefaultEventExecutor import sttp.capabilities.zio.ZioStreams import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.model.ServerResponse @@ -9,13 +11,16 @@ import sttp.tapir.server.netty.internal.{NettyBootstrap, NettyServerHandler} import sttp.tapir.server.netty.zio.internal.ZioUtil.{nettyChannelFutureToScala, nettyFutureToScala} import sttp.tapir.server.netty.{NettyConfig, NettyResponse, Route} import sttp.tapir.ztapir.{RIOMonadError, ZServerEndpoint} -import zio.{RIO, Unsafe, ZIO} +import zio.{RIO, Task, Unsafe, ZIO, durationInt} import java.net.{InetSocketAddress, SocketAddress} import java.nio.file.{Path, Paths} import java.util.UUID +import java.util.concurrent.atomic.AtomicBoolean + import scala.concurrent.ExecutionContext.Implicits import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: NettyZioServerOptions[R], config: NettyConfig) { def addEndpoint(se: ZServerEndpoint[R, ZioStreams]): NettyZioServer[R] = addEndpoints(List(se)) @@ -73,6 +78,8 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: runtime <- ZIO.runtime[R] routes <- ZIO.foreach(routes)(identity) eventLoopGroup = config.eventLoopConfig.initEventLoopGroup() + channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe + isShuttingDown = new AtomicBoolean(false) channelFuture = { implicit val monadError: RIOMonadError[R] = new RIOMonadError[R] val route: Route[RIO[R, *]] = Route.combine(routes) @@ -82,7 +89,9 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: new NettyServerHandler[RIO[R, *]]( route, unsafeRunAsync(runtime), - config.maxContentLength + config.maxContentLength, + channelGroup, + isShuttingDown ), eventLoopGroup, socketOverride @@ -91,19 +100,43 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: binding <- nettyChannelFutureToScala(channelFuture).map(ch => ( ch.localAddress().asInstanceOf[SA], - () => stop(ch, eventLoopGroup) + () => stop(ch, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout) ) ) } yield binding - private def stop(ch: Channel, eventLoopGroup: EventLoopGroup): RIO[R, Unit] = { - ZIO.suspend { - nettyFutureToScala(ch.close()).flatMap { _ => - if (config.shutdownEventLoopGroupOnClose) { - nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ()) - } else ZIO.succeed(()) - } + private def waitForClosedChannels( + channelGroup: ChannelGroup, + startNanos: Long, + gracefulShutdownTimeoutNanos: Option[Long] + ): Task[Unit] = + if (!channelGroup.isEmpty && gracefulShutdownTimeoutNanos.exists(_ >= System.nanoTime() - startNanos)) { + ZIO.sleep(100.millis) *> + waitForClosedChannels(channelGroup, startNanos, gracefulShutdownTimeoutNanos) + } else { + ZIO.unit } + + private def stop( + ch: Channel, + eventLoopGroup: EventLoopGroup, + channelGroup: ChannelGroup, + isShuttingDown: AtomicBoolean, + gracefulShutdownTimeout: Option[FiniteDuration] + ): RIO[R, Unit] = { + ZIO.attempt(isShuttingDown.set(true)) *> + waitForClosedChannels( + channelGroup, + startNanos = System.nanoTime(), + gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toMillis * 1000000) + ) *> + ZIO.suspend { + nettyFutureToScala(ch.close()).flatMap { _ => + if (config.shutdownEventLoopGroupOnClose) { + nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ()) + } else ZIO.succeed(()) + } + } } } From e5ac16ffe647aefc0af5da756c42d44fec5187ad Mon Sep 17 00:00:00 2001 From: kciesielski Date: Fri, 3 Nov 2023 08:57:24 +0100 Subject: [PATCH 04/16] Reject requests during shutdown --- .../server/netty/internal/NettyServerHandler.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala index 9820ab1559..bf08f372d6 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala @@ -104,10 +104,8 @@ class NettyServerHandler[F[_]]( override def channelRead0(ctx: ChannelHandlerContext, request: HttpRequest): Unit = { - println(s">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> running new request, isShuttingDown = ${isShuttingDown.get()}") def writeError500(req: HttpRequest, reason: Throwable): Unit = { logger.error("Error while processing the request", reason) - // send 500 val res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR) res.headers().set(HttpHeaderNames.CONTENT_LENGTH, 0) res.handleCloseAndKeepAliveHeaders(req) @@ -115,6 +113,13 @@ class NettyServerHandler[F[_]]( ctx.writeAndFlush(res).closeIfNeeded(req) } + def writeError503(req: HttpRequest): Unit = { + val res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE) + res.headers().set(HttpHeaderNames.CONTENT_LENGTH, 0) + res.handleCloseAndKeepAliveHeaders(req) + ctx.writeAndFlush(res).closeIfNeeded(req) + } + def runRoute(req: HttpRequest, releaseReq: () => Any = () => ()): Unit = { val (runningFuture, cancellationSwitch) = unsafeRunAsync { () => route(NettyServerRequest(req)) @@ -150,7 +155,10 @@ class NettyServerHandler[F[_]]( }(eventLoopContext) } - if (HttpUtil.is100ContinueExpected(request)) { + if (isShuttingDown.get()) { + logger.info("Rejecting request, server is shutting down") + writeError503(request) + } else if (HttpUtil.is100ContinueExpected(request)) { ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE)) () } else { From 03dc2b8ec3300b0707a6fc2452502b46517c4cda Mon Sep 17 00:00:00 2001 From: kciesielski Date: Fri, 3 Nov 2023 09:04:31 +0100 Subject: [PATCH 05/16] Cleanup --- .../tapir/server/netty/cats/NettyCatsServer.scala | 11 ++++++----- .../tapir/server/netty/NettyFutureServer.scala | 4 ++-- .../tapir/server/netty/internal/FutureUtil.scala | 6 ++---- .../server/netty/internal/NettyServerHandler.scala | 12 +++++------- .../tapir/server/netty/zio/NettyZioServer.scala | 2 +- server/tests/src/main/resources/logback.xml | 14 -------------- 6 files changed, 16 insertions(+), 33 deletions(-) delete mode 100644 server/tests/src/main/resources/logback.xml diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala index 56a592afa8..df1355a78d 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala @@ -102,11 +102,12 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty isShuttingDown: AtomicBoolean, gracefulShutdownTimeout: Option[FiniteDuration] ): F[Unit] = { - Async[F].delay(isShuttingDown.set(true)) >> Sync[F].delay(println(">>>> waiting for closed channels")) >> waitForClosedChannels( - channelGroup, - startNanos = System.nanoTime(), - gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toMillis * 1000000) - ) >> Sync[F].delay(println(">>>>>>>>> waiting complete, proceeding with shutdown")) >> + Async[F].delay(isShuttingDown.set(true)) >> + waitForClosedChannels( + channelGroup, + startNanos = System.nanoTime(), + gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos) + ) >> Async[F].defer { nettyFutureToScala(ch.close()).flatMap { _ => if (config.shutdownEventLoopGroupOnClose) { diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala index a5cbb64e91..29697e002d 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala @@ -94,7 +94,7 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe waitForClosedChannels(channelGroup, startNanos, gracefulShutdownTimeoutNanos) }) } else { - Future.successful(()) + Future.unit } private def stop( @@ -108,7 +108,7 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe waitForClosedChannels( channelGroup, startNanos = System.nanoTime(), - gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toMillis * 1000000) + gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos) ).flatMap { _ => nettyFutureToScala(ch.close()).flatMap { _ => if (config.shutdownEventLoopGroupOnClose) { diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/FutureUtil.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/FutureUtil.scala index a2d481446b..031347fef6 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/FutureUtil.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/FutureUtil.scala @@ -10,10 +10,8 @@ object FutureUtil { val p = Promise[Channel]() nettyFuture.addListener((future: ChannelFuture) => p.complete( - if (future.isSuccess) { - println(">>>>>>>>>>>>>>>>>>>>>>>>>>>> A channel is born") - Success(future.channel()) - } else if (future.isCancelled) Failure(new CancellationException) + if (future.isSuccess) Success(future.channel()) + else if (future.isCancelled) Failure(new CancellationException) else Failure(future.cause()) ) ) diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala index bf08f372d6..bef744142d 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala @@ -1,12 +1,13 @@ package sttp.tapir.server.netty.internal -import org.playframework.netty.http.{DefaultStreamedHttpResponse, StreamedHttpRequest} import com.typesafe.scalalogging.Logger import io.netty.buffer.{ByteBuf, Unpooled} import io.netty.channel._ +import io.netty.channel.group.ChannelGroup import io.netty.handler.codec.http.HttpHeaderNames.{CONNECTION, CONTENT_LENGTH} import io.netty.handler.codec.http._ import io.netty.handler.stream.{ChunkedFile, ChunkedStream} +import org.playframework.netty.http.{DefaultStreamedHttpResponse, StreamedHttpRequest} import org.reactivestreams.Publisher import sttp.monad.MonadError import sttp.monad.syntax._ @@ -19,15 +20,12 @@ import sttp.tapir.server.netty.NettyResponseContent.{ } import sttp.tapir.server.netty.{NettyResponse, NettyServerRequest, Route} +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ import scala.collection.mutable.{Queue => MutableQueue} -import scala.concurrent.Future -import scala.util.Failure -import scala.util.Success +import scala.concurrent.{ExecutionContext, Future} import scala.util.control.NonFatal -import scala.concurrent.ExecutionContext -import io.netty.channel.group.ChannelGroup -import java.util.concurrent.atomic.AtomicBoolean +import scala.util.{Failure, Success} /** @param unsafeRunAsync * Function which dispatches given effect to run asynchronously, returning its result as a Future, and function of type `() => diff --git a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala index 7be5adc7d3..cf26e91774 100644 --- a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala +++ b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala @@ -128,7 +128,7 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: waitForClosedChannels( channelGroup, startNanos = System.nanoTime(), - gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toMillis * 1000000) + gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.map(_.toNanos) ) *> ZIO.suspend { nettyFutureToScala(ch.close()).flatMap { _ => diff --git a/server/tests/src/main/resources/logback.xml b/server/tests/src/main/resources/logback.xml deleted file mode 100644 index ea07d0dfe3..0000000000 --- a/server/tests/src/main/resources/logback.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - - - - - - From 04d71cb6407a724b4738fd25a2f7ed23b24b202d Mon Sep 17 00:00:00 2001 From: kciesielski Date: Fri, 3 Nov 2023 09:40:08 +0100 Subject: [PATCH 06/16] Update Cats Effect Netty example --- build.sbt | 3 +- .../examples/HelloWorldNettyCatsServer.scala | 68 +++++++++---------- 2 files changed, 35 insertions(+), 36 deletions(-) diff --git a/build.sbt b/build.sbt index 059fc938b1..15e92eab99 100644 --- a/build.sbt +++ b/build.sbt @@ -2046,7 +2046,8 @@ lazy val examples: ProjectMatrix = (projectMatrix in file("examples")) scalaTest.value ), libraryDependencies ++= loggerDependencies, - publishArtifact := false + publishArtifact := false, + Compile / run / fork := true ) .jvmPlatform(scalaVersions = examplesScalaVersions) .dependsOn( diff --git a/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyCatsServer.scala b/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyCatsServer.scala index 3080346edd..9ab3d19be5 100644 --- a/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyCatsServer.scala +++ b/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyCatsServer.scala @@ -1,14 +1,13 @@ package sttp.tapir.examples import cats.effect.IO -import cats.effect.syntax.all._ +import cats.effect.IOApp import sttp.client3.{HttpURLConnectionBackend, Identity, SttpBackend, UriContext, asStringAlways, basicRequest} import sttp.model.StatusCode import sttp.tapir.{PublicEndpoint, endpoint, query, stringBody} -import cats.effect.unsafe.implicits.global -import sttp.tapir.server.netty.cats.{NettyCatsServer, NettyCatsServerBinding} +import sttp.tapir.server.netty.cats.NettyCatsServer -object HelloWorldNettyCatsServer extends App { +object HelloWorldNettyCatsServer extends IOApp.Simple { // One endpoint on GET /hello with query parameter `name` val helloWorldEndpoint: PublicEndpoint[String, Unit, String, Any] = endpoint.get.in("hello").in(query[String]("name")).out(stringBody) @@ -21,38 +20,37 @@ object HelloWorldNettyCatsServer extends App { private val declaredHost = "localhost" // Creating handler for netty bootstrap - NettyCatsServer + override def run = NettyCatsServer .io() .use { server => - - val effect: IO[NettyCatsServerBinding[IO]] = server - .port(declaredPort) - .host(declaredHost) - .addEndpoint(helloWorldServerEndpoint) - .start() - - effect.map { binding => - - val port = binding.port - val host = binding.hostName - println(s"Server started at port = ${binding.port}") - - val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend() - val badUrl = uri"http://$host:$port/bad_url" - assert(basicRequest.response(asStringAlways).get(badUrl).send(backend).code == StatusCode(404)) - - val noQueryParameter = uri"http://$host:$port/hello" - assert(basicRequest.response(asStringAlways).get(noQueryParameter).send(backend).code == StatusCode(400)) - - val allGood = uri"http://$host:$port/hello?name=Netty" - val body = basicRequest.response(asStringAlways).get(allGood).send(backend).body - - println("Got result: " + body) - assert(body == "Hello, Netty!") - assert(port == declaredPort, "Ports don't match") - assert(host == declaredHost, "Hosts don't match") - } + for { + binding <- server + .port(declaredPort) + .host(declaredHost) + .addEndpoint(helloWorldServerEndpoint) + .start() + result <- IO + .blocking { + val port = binding.port + val host = binding.hostName + println(s"Server started at port = ${binding.port}") + + val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend() + val badUrl = uri"http://$host:$port/bad_url" + assert(basicRequest.response(asStringAlways).get(badUrl).send(backend).code == StatusCode(404)) + + val noQueryParameter = uri"http://$host:$port/hello" + assert(basicRequest.response(asStringAlways).get(noQueryParameter).send(backend).code == StatusCode(400)) + + val allGood = uri"http://$host:$port/hello?name=Netty" + val body = basicRequest.response(asStringAlways).get(allGood).send(backend).body + + println("Got result: " + body) + assert(body == "Hello, Netty!") + assert(port == declaredPort, "Ports don't match") + assert(host == declaredHost, "Hosts don't match") + } + .guarantee(binding.stop()) + } yield result } - .guarantee() - .unsafeRunSync() } From a1a4855dfb9014b1879c89ed81d2c59a784a34e3 Mon Sep 17 00:00:00 2001 From: kciesielski Date: Mon, 6 Nov 2023 09:17:55 +0100 Subject: [PATCH 07/16] Fix closing of ChannelGroup --- project/project/build.properties | 1 + .../sttp/tapir/server/netty/cats/NettyCatsServer.scala | 7 +++++-- .../main/scala/sttp/tapir/server/netty/NettyConfig.scala | 1 + .../scala/sttp/tapir/server/netty/NettyFutureServer.scala | 2 +- .../scala/sttp/tapir/server/netty/zio/NettyZioServer.scala | 2 +- 5 files changed, 9 insertions(+), 4 deletions(-) create mode 100644 project/project/build.properties diff --git a/project/project/build.properties b/project/project/build.properties new file mode 100644 index 0000000000..52413ab79a --- /dev/null +++ b/project/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.9.3 diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala index df1355a78d..c6b845ec92 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala @@ -79,7 +79,10 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty ) nettyChannelFutureToScala(channelFuture).map(ch => - (ch.localAddress().asInstanceOf[SA], () => stop(ch, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout)) + ( + ch.localAddress().asInstanceOf[SA], + () => stop(ch, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout) + ) ) } @@ -92,7 +95,7 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty Temporal[F].sleep(100.millis) >> waitForClosedChannels(channelGroup, startNanos, gracefulShutdownTimeoutNanos) } else { - Sync[F].pure(()) + Sync[F].delay(nettyFutureToScala(channelGroup.close())).void } private def stop( diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyConfig.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyConfig.scala index 5bab2d1549..fbadd899fe 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyConfig.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyConfig.scala @@ -109,6 +109,7 @@ case class NettyConfig( def initPipeline(f: NettyConfig => (ChannelPipeline, ChannelHandler) => Unit): NettyConfig = copy(initPipeline = f) def withGracefulShutdownTimeout(t: FiniteDuration) = copy(gracefulShutdownTimeout = Some(t)) + def noGracefulShutdown = copy(gracefulShutdownTimeout = None) } object NettyConfig { diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala index 29697e002d..7699e20d90 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala @@ -94,7 +94,7 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe waitForClosedChannels(channelGroup, startNanos, gracefulShutdownTimeoutNanos) }) } else { - Future.unit + nettyFutureToScala(channelGroup.close()).map(_ => ()) } private def stop( diff --git a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala index cf26e91774..117917fbbf 100644 --- a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala +++ b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala @@ -114,7 +114,7 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: ZIO.sleep(100.millis) *> waitForClosedChannels(channelGroup, startNanos, gracefulShutdownTimeoutNanos) } else { - ZIO.unit + ZIO.succeed(channelGroup.close()).unit } private def stop( From d452001e78ec78ac26f8d39bee78aa42aaf43190 Mon Sep 17 00:00:00 2001 From: kciesielski Date: Mon, 6 Nov 2023 09:57:30 +0100 Subject: [PATCH 08/16] Remove redundant file --- project/project/build.properties | 1 - 1 file changed, 1 deletion(-) delete mode 100644 project/project/build.properties diff --git a/project/project/build.properties b/project/project/build.properties deleted file mode 100644 index 52413ab79a..0000000000 --- a/project/project/build.properties +++ /dev/null @@ -1 +0,0 @@ -sbt.version=1.9.3 From 5b61c6c1e620e47dc68278325d14b77246ca783c Mon Sep 17 00:00:00 2001 From: kciesielski Date: Mon, 6 Nov 2023 10:11:02 +0100 Subject: [PATCH 09/16] Disable graceful shutdown in current Netty tests --- .../cats/NettyCatsTestServerInterpreter.scala | 1 + .../NettyFutureTestServerInterpreter.scala | 27 ++++++++++++++----- .../zio/NettyZioTestServerInterpreter.scala | 7 ++++- .../tapir/server/tests/CreateServerTest.scala | 23 +++++++++++++++- .../server/tests/TestServerInterpreter.scala | 4 ++- 5 files changed, 52 insertions(+), 10 deletions(-) diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsTestServerInterpreter.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsTestServerInterpreter.scala index b0847aedb4..8ac5391526 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsTestServerInterpreter.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsTestServerInterpreter.scala @@ -25,6 +25,7 @@ class NettyCatsTestServerInterpreter(eventLoopGroup: NioEventLoopGroup, dispatch .randomPort .withDontShutdownEventLoopGroupOnClose .maxContentLength(NettyCatsTestServerInterpreter.maxContentLength) + .noGracefulShutdown val options = NettyCatsServerOptions.default[IO](dispatcher) val bind: IO[NettyCatsServerBinding[IO]] = NettyCatsServer(options, config).addRoutes(routes.toList).start() diff --git a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala index 37f2f2481a..87fd7dc1b9 100644 --- a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala +++ b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala @@ -9,21 +9,34 @@ import sttp.tapir.tests.Port import scala.concurrent.{ExecutionContext, Future} -class NettyFutureTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)(implicit ec: ExecutionContext) - extends TestServerInterpreter[Future, Any, NettyFutureServerOptions, FutureRoute] { +class NettyFutureTestServerInterpreter(eventLoopGroup: NioEventLoopGroup, customConfig: Option[NettyConfig => NettyConfig] = None)(implicit + ec: ExecutionContext +) extends TestServerInterpreter[Future, Any, NettyFutureServerOptions, FutureRoute] { override def route(es: List[ServerEndpoint[Any, Future]], interceptors: Interceptors): FutureRoute = { val serverOptions = interceptors(NettyFutureServerOptions.customiseInterceptors).options NettyFutureServerInterpreter(serverOptions).toRoute(es) } - override def server(routes: NonEmptyList[FutureRoute]): Resource[IO, Port] = { - val config = NettyConfig.defaultNoStreaming.eventLoopGroup(eventLoopGroup).randomPort.withDontShutdownEventLoopGroupOnClose + override def serverWithStop(routes: NonEmptyList[FutureRoute]): Resource[IO, (Port, IO[Unit])] = { + val config = + NettyConfig.defaultNoStreaming + .eventLoopGroup(eventLoopGroup) + .randomPort + .withDontShutdownEventLoopGroupOnClose + .noGracefulShutdown + val customizedConfig = customConfig.map(transformation => transformation(config)).getOrElse(config) val options = NettyFutureServerOptions.default - val bind = IO.fromFuture(IO.delay(NettyFutureServer(options, config).addRoutes(routes.toList).start())) + val bind = IO.fromFuture(IO.delay(NettyFutureServer(options, customizedConfig).addRoutes(routes.toList).start())) Resource - .make(bind)(binding => IO.fromFuture(IO.delay(binding.stop()))) - .map(b => b.port) + .eval(bind) + .map(bind => (bind.port, IO.fromFuture(IO.delay(bind.stop())))) + } + + override def server(routes: NonEmptyList[FutureRoute]): Resource[IO, Port] = { + serverWithStop(routes).flatMap { case (port, stopServer) => + Resource.make(IO.pure(port))(_ => stopServer) + } } } diff --git a/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioTestServerInterpreter.scala b/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioTestServerInterpreter.scala index 5bc3bcf604..c6ac68635e 100644 --- a/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioTestServerInterpreter.scala +++ b/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioTestServerInterpreter.scala @@ -20,7 +20,12 @@ class NettyZioTestServerInterpreter[R](eventLoopGroup: NioEventLoopGroup) } override def server(routes: NonEmptyList[Task[Route[Task]]]): Resource[IO, Port] = { - val config = NettyConfig.defaultWithStreaming.eventLoopGroup(eventLoopGroup).randomPort.withDontShutdownEventLoopGroupOnClose + val config = NettyConfig + .defaultWithStreaming + .eventLoopGroup(eventLoopGroup) + .randomPort + .withDontShutdownEventLoopGroupOnClose + .noGracefulShutdown val options = NettyZioServerOptions.default[R] val runtime: Runtime[R] = Runtime.default.asInstanceOf[Runtime[R]] diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala index 950d018d67..e103c93c09 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala @@ -32,6 +32,14 @@ trait CreateServerTest[F[_], +R, OPTIONS, ROUTE] { runTest: (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] ): Test + def testServerLogicWithStop( + e: ServerEndpoint[R, F], + testNameSuffix: String = "", + interceptors: Interceptors = identity + )( + runTest: IO[Unit] => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] + ): Test + def testServer(name: String, rs: => NonEmptyList[ROUTE])( runTest: (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] ): Test @@ -39,7 +47,8 @@ trait CreateServerTest[F[_], +R, OPTIONS, ROUTE] { class DefaultCreateServerTest[F[_], +R, OPTIONS, ROUTE]( backend: SttpBackend[IO, Fs2Streams[IO] with WebSockets], - interpreter: TestServerInterpreter[F, R, OPTIONS, ROUTE] + interpreter: TestServerInterpreter[F, R, OPTIONS, ROUTE], + stopServer: IO[Unit] = IO.unit ) extends CreateServerTest[F, R, OPTIONS, ROUTE] with StrictLogging { @@ -56,6 +65,18 @@ class DefaultCreateServerTest[F[_], +R, OPTIONS, ROUTE]( )(runTest) } + override def testServerLogicWithStop( + e: ServerEndpoint[R, F], + testNameSuffix: String = "", + interceptors: Interceptors = identity + )( + runTest: IO[Unit] => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] + ): Test = { + testServer( + e.showDetail + (if (testNameSuffix == "") "" else " " + testNameSuffix), + NonEmptyList.of(interpreter.route(e, interceptors)) + )(runTest(stopServer)) + } override def testServerLogic( e: ServerEndpoint[R, F], testNameSuffix: String = "", diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala index 50751cb9cd..92c6522cf3 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala @@ -16,5 +16,7 @@ trait TestServerInterpreter[F[_], +R, OPTIONS, ROUTE] { def route(es: List[ServerEndpoint[R, F]], interceptors: Interceptors = identity): ROUTE def server(routes: NonEmptyList[ROUTE]): Resource[IO, Port] - + + def serverWithStop(routes: NonEmptyList[ROUTE]): Resource[IO, (Port, IO[Unit])] = + server(routes).map(port => (port, IO.unit)) } From b8a4c2bf009cbf987a16e5517bfadec19cff0f1e Mon Sep 17 00:00:00 2001 From: kciesielski Date: Mon, 6 Nov 2023 12:38:48 +0100 Subject: [PATCH 10/16] Test FutureServer for graceful shutdown --- .../server/netty/NettyFutureServer.scala | 2 +- .../server/netty/NettyFutureServerTest.scala | 5 +- .../NettyFutureTestServerInterpreter.scala | 7 +- .../tapir/server/tests/CreateServerTest.scala | 44 ++++++++++--- .../tests/ServerGracefulShutdownTests.scala | 65 +++++++++++++++++++ .../sttp/tapir/server/tests/Sleeper.scala | 20 ++++++ .../server/tests/TestServerInterpreter.scala | 3 +- 7 files changed, 131 insertions(+), 15 deletions(-) create mode 100644 server/tests/src/main/scala/sttp/tapir/server/tests/ServerGracefulShutdownTests.scala create mode 100644 server/tests/src/main/scala/sttp/tapir/server/tests/Sleeper.scala diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala index 7699e20d90..eaa7a86fe8 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala @@ -84,7 +84,7 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe channelGroup: ChannelGroup, startNanos: Long, gracefulShutdownTimeoutNanos: Option[Long] - ): Future[Unit] = + ): Future[Unit] = if (!channelGroup.isEmpty && gracefulShutdownTimeoutNanos.exists(_ >= System.nanoTime() - startNanos)) { Future { blocking { diff --git a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureServerTest.scala b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureServerTest.scala index 9b24ca83ed..247c967d63 100644 --- a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureServerTest.scala +++ b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureServerTest.scala @@ -9,6 +9,8 @@ import sttp.tapir.server.tests._ import sttp.tapir.tests.{Test, TestSuite} import scala.concurrent.Future +import Sleeper._ +import scala.concurrent.duration._ class NettyFutureServerTest extends TestSuite with EitherValues { override def tests: Resource[IO, List[Test]] = @@ -21,7 +23,8 @@ class NettyFutureServerTest extends TestSuite with EitherValues { val interpreter = new NettyFutureTestServerInterpreter(eventLoopGroup) val createServerTest = new DefaultCreateServerTest(backend, interpreter) - val tests = new AllServerTests(createServerTest, interpreter, backend, multipart = false).tests() + val tests = new AllServerTests(createServerTest, interpreter, backend, multipart = false).tests() ++ + new ServerGracefulShutdownTests(createServerTest).tests() (tests, eventLoopGroup) }) { case (_, eventLoopGroup) => diff --git a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala index 87fd7dc1b9..22e61bdc35 100644 --- a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala +++ b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala @@ -8,8 +8,9 @@ import sttp.tapir.server.tests.TestServerInterpreter import sttp.tapir.tests.Port import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.FiniteDuration -class NettyFutureTestServerInterpreter(eventLoopGroup: NioEventLoopGroup, customConfig: Option[NettyConfig => NettyConfig] = None)(implicit +class NettyFutureTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)(implicit ec: ExecutionContext ) extends TestServerInterpreter[Future, Any, NettyFutureServerOptions, FutureRoute] { @@ -18,14 +19,14 @@ class NettyFutureTestServerInterpreter(eventLoopGroup: NioEventLoopGroup, custom NettyFutureServerInterpreter(serverOptions).toRoute(es) } - override def serverWithStop(routes: NonEmptyList[FutureRoute]): Resource[IO, (Port, IO[Unit])] = { + override def serverWithStop(routes: NonEmptyList[FutureRoute], gracefulShutdownTimeout: Option[FiniteDuration] = None): Resource[IO, (Port, IO[Unit])] = { val config = NettyConfig.defaultNoStreaming .eventLoopGroup(eventLoopGroup) .randomPort .withDontShutdownEventLoopGroupOnClose .noGracefulShutdown - val customizedConfig = customConfig.map(transformation => transformation(config)).getOrElse(config) + val customizedConfig = gracefulShutdownTimeout.map(config.withGracefulShutdownTimeout).getOrElse(config) val options = NettyFutureServerOptions.default val bind = IO.fromFuture(IO.delay(NettyFutureServer(options, customizedConfig).addRoutes(routes.toList).start())) diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala index e103c93c09..ed838a47c3 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala @@ -14,6 +14,8 @@ import sttp.tapir._ import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.interceptor.CustomiseInterceptors import sttp.tapir.tests._ +import org.scalactic.anyvals.FiniteDouble +import scala.concurrent.duration.FiniteDuration trait CreateServerTest[F[_], +R, OPTIONS, ROUTE] { protected type Interceptors = CustomiseInterceptors[F, OPTIONS] => CustomiseInterceptors[F, OPTIONS] @@ -35,20 +37,24 @@ trait CreateServerTest[F[_], +R, OPTIONS, ROUTE] { def testServerLogicWithStop( e: ServerEndpoint[R, F], testNameSuffix: String = "", - interceptors: Interceptors = identity + interceptors: Interceptors = identity, + gracefulShutdownTimeout: Option[FiniteDuration] = None )( - runTest: IO[Unit] => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] + runTest: IO[Unit] => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] ): Test def testServer(name: String, rs: => NonEmptyList[ROUTE])( runTest: (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] ): Test + + def testServerWithStop(name: String, rs: => NonEmptyList[ROUTE], gracefulShutdownTimeout: Option[FiniteDuration])( + runTest: IO[Unit] => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] + ): Test } class DefaultCreateServerTest[F[_], +R, OPTIONS, ROUTE]( backend: SttpBackend[IO, Fs2Streams[IO] with WebSockets], - interpreter: TestServerInterpreter[F, R, OPTIONS, ROUTE], - stopServer: IO[Unit] = IO.unit + interpreter: TestServerInterpreter[F, R, OPTIONS, ROUTE] ) extends CreateServerTest[F, R, OPTIONS, ROUTE] with StrictLogging { @@ -68,14 +74,16 @@ class DefaultCreateServerTest[F[_], +R, OPTIONS, ROUTE]( override def testServerLogicWithStop( e: ServerEndpoint[R, F], testNameSuffix: String = "", - interceptors: Interceptors = identity + interceptors: Interceptors = identity, + gracefulShutdownTimeout: Option[FiniteDuration] = None )( - runTest: IO[Unit] => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] + runTest: IO[Unit] => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] ): Test = { - testServer( + testServerWithStop( e.showDetail + (if (testNameSuffix == "") "" else " " + testNameSuffix), - NonEmptyList.of(interpreter.route(e, interceptors)) - )(runTest(stopServer)) + NonEmptyList.of(interpreter.route(e, interceptors)), + gracefulShutdownTimeout, + )(runTest) } override def testServerLogic( e: ServerEndpoint[R, F], @@ -90,6 +98,24 @@ class DefaultCreateServerTest[F[_], +R, OPTIONS, ROUTE]( )(runTest) } + override def testServerWithStop(name: String, rs: => NonEmptyList[ROUTE], gracefulShutdownTimeout: Option[FiniteDuration])( + runTest: IO[Unit] => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] + ): Test = { + val resources = for { + portAndStop <- interpreter.serverWithStop(rs, gracefulShutdownTimeout).onError { case e: Exception => + Resource.eval(IO(logger.error(s"Starting server failed because of ${e.getMessage}"))) + } + _ <- Resource.eval(IO(logger.info(s"Bound server on port: ${portAndStop._1}"))) + } yield portAndStop + + Test(name)( + resources + .use { case (port, stopServer) => + runTest(stopServer)(backend, uri"http://localhost:$port").guarantee(IO(logger.info(s"Tests completed on port $port"))) + } + .unsafeToFuture() + ) + } override def testServer(name: String, rs: => NonEmptyList[ROUTE])( runTest: (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] ): Test = { diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerGracefulShutdownTests.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerGracefulShutdownTests.scala new file mode 100644 index 0000000000..dbdce454b3 --- /dev/null +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerGracefulShutdownTests.scala @@ -0,0 +1,65 @@ +package sttp.tapir.server.tests + +import cats.effect.IO +import cats.syntax.all._ +import org.scalatest.EitherValues +import org.scalatest.matchers.should.Matchers._ +import sttp.client3._ +import sttp.model.StatusCode +import sttp.monad.MonadError +import sttp.monad.syntax._ +import sttp.tapir._ +import sttp.tapir.tests._ + +import scala.concurrent.duration._ + +class ServerGracefulShutdownTests[F[_], OPTIONS, ROUTE](createServerTest: CreateServerTest[F, Any, OPTIONS, ROUTE])(implicit + m: MonadError[F], + sleeper: Sleeper[F] +) extends EitherValues { + import createServerTest._ + + def tests(): List[Test] = List( + testServerLogicWithStop( + endpoint + .out(plainBody[String]) + .serverLogic { _ => + sleeper.sleep(3.seconds).flatMap(_ => pureResult("processing finished".asRight[Unit])) + }, + "Server waits for long-running request to complete within timeout", + gracefulShutdownTimeout = Some(4.seconds) + ) { (stopServer) => (backend, baseUri) => + (for { + runningRequest <- basicRequest.get(uri"$baseUri").send(backend).start + _ <- IO.sleep(1.second) + runningStop <- stopServer.start + result <- runningRequest.join.attempt + _ <- runningStop.join + } yield { + result.value.isSuccess shouldBe true + }).guarantee(stopServer) + }, + testServerLogicWithStop( + endpoint + .out(plainBody[String]) + .serverLogic { _ => + sleeper.sleep(4.seconds).flatMap(_ => pureResult("processing finished".asRight[Unit])) + }, + "Server rejects requests with 503 during shutdown", + gracefulShutdownTimeout = Some(6.seconds) + ) { (stopServer) => (backend, baseUri) => + (for { + runningRequest <- basicRequest.get(uri"$baseUri").send(backend).start + _ <- IO.sleep(1.second) + runningStop <- stopServer.start + _ <- IO.sleep(1.seconds) + rejected <- basicRequest.get(uri"$baseUri").send(backend).attempt + firstResult <- runningRequest.join.attempt + _ <- runningStop.join + } yield { + (rejected.value.code shouldBe StatusCode.ServiceUnavailable): Unit + firstResult.value.isSuccess shouldBe true + }).guarantee(stopServer) + } + ) +} diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/Sleeper.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/Sleeper.scala new file mode 100644 index 0000000000..e5f874cb54 --- /dev/null +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/Sleeper.scala @@ -0,0 +1,20 @@ +package sttp.tapir.server.tests + +import scala.concurrent.duration._ +import scala.concurrent.blocking +import scala.concurrent.Future +import scala.concurrent.ExecutionContext + +trait Sleeper[F[_]] { + def sleep(duration: FiniteDuration): F[Unit] +} + +object Sleeper { + implicit def futureSleeper(implicit ec: ExecutionContext): Sleeper[Future] = new Sleeper[Future] { + override def sleep(duration: FiniteDuration): Future[Unit] = Future { + blocking { + Thread.sleep(duration.toMillis) + } + } + } +} diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala index 92c6522cf3..2e056782f8 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala @@ -5,6 +5,7 @@ import cats.effect.{IO, Resource} import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.interceptor.CustomiseInterceptors import sttp.tapir.tests.Port +import scala.concurrent.duration.FiniteDuration trait TestServerInterpreter[F[_], +R, OPTIONS, ROUTE] { protected type Interceptors = CustomiseInterceptors[F, OPTIONS] => CustomiseInterceptors[F, OPTIONS] @@ -17,6 +18,6 @@ trait TestServerInterpreter[F[_], +R, OPTIONS, ROUTE] { def server(routes: NonEmptyList[ROUTE]): Resource[IO, Port] - def serverWithStop(routes: NonEmptyList[ROUTE]): Resource[IO, (Port, IO[Unit])] = + def serverWithStop(routes: NonEmptyList[ROUTE], gracefulShutdownTimeout: Option[FiniteDuration] = None): Resource[IO, (Port, IO[Unit])] = server(routes).map(port => (port, IO.unit)) } From 63412648b4e1462cabc137a8d972750be82d6750 Mon Sep 17 00:00:00 2001 From: kciesielski Date: Mon, 6 Nov 2023 16:01:25 +0100 Subject: [PATCH 11/16] Adjust tests --- .../server/netty/cats/NettyCatsServer.scala | 2 +- .../netty/cats/NettyCatsServerTest.scala | 7 +++- .../cats/NettyCatsTestServerInterpreter.scala | 18 +++++++--- .../NettyFutureTestServerInterpreter.scala | 15 ++++---- .../server/netty/zio/NettyZioServer.scala | 2 +- .../server/netty/zio/NettyZioServerTest.scala | 9 +++-- .../zio/NettyZioTestServerInterpreter.scala | 36 +++++++++++++------ .../tests/ServerGracefulShutdownTests.scala | 4 +-- .../server/tests/TestServerInterpreter.scala | 5 ++- 9 files changed, 69 insertions(+), 29 deletions(-) diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala index c6b845ec92..340ebd76dc 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala @@ -105,7 +105,7 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty isShuttingDown: AtomicBoolean, gracefulShutdownTimeout: Option[FiniteDuration] ): F[Unit] = { - Async[F].delay(isShuttingDown.set(true)) >> + Sync[F].delay(isShuttingDown.set(true)) >> waitForClosedChannels( channelGroup, startNanos = System.nanoTime(), diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala index 7b3f2a1303..073a9965f9 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala @@ -11,6 +11,7 @@ import sttp.tapir.server.tests._ import sttp.tapir.tests.{Test, TestSuite} import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration class NettyCatsServerTest extends TestSuite with EitherValues { @@ -23,6 +24,9 @@ class NettyCatsServerTest extends TestSuite with EitherValues { val interpreter = new NettyCatsTestServerInterpreter(eventLoopGroup, dispatcher) val createServerTest = new DefaultCreateServerTest(backend, interpreter) + implicit val ioSleeper: Sleeper[IO] = new Sleeper[IO] { + override def sleep(duration: FiniteDuration): IO[Unit] = IO.sleep(duration) + } val tests = new AllServerTests( createServerTest, @@ -34,7 +38,8 @@ class NettyCatsServerTest extends TestSuite with EitherValues { .tests() ++ new ServerStreamingTests(createServerTest, Fs2Streams[IO]).tests() ++ new ServerCancellationTests(createServerTest)(m, IO.asyncForIO).tests() ++ - new NettyFs2StreamingCancellationTest(createServerTest).tests() + new NettyFs2StreamingCancellationTest(createServerTest).tests() ++ + new ServerGracefulShutdownTests(createServerTest).tests() IO.pure((tests, eventLoopGroup)) } { case (_, eventLoopGroup) => diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsTestServerInterpreter.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsTestServerInterpreter.scala index 8ac5391526..68e74d197b 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsTestServerInterpreter.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsTestServerInterpreter.scala @@ -9,6 +9,7 @@ import sttp.tapir.server.netty.{NettyConfig, Route} import sttp.tapir.server.tests.TestServerInterpreter import sttp.tapir.tests.Port import sttp.capabilities.fs2.Fs2Streams +import scala.concurrent.duration.FiniteDuration class NettyCatsTestServerInterpreter(eventLoopGroup: NioEventLoopGroup, dispatcher: Dispatcher[IO]) extends TestServerInterpreter[IO, Fs2Streams[IO], NettyCatsServerOptions[IO], Route[IO]] { @@ -19,19 +20,28 @@ class NettyCatsTestServerInterpreter(eventLoopGroup: NioEventLoopGroup, dispatch NettyCatsServerInterpreter(serverOptions).toRoute(es) } - override def server(routes: NonEmptyList[Route[IO]]): Resource[IO, Port] = { + override def serverWithStop( + routes: NonEmptyList[Route[IO]], + gracefulShutdownTimeout: Option[FiniteDuration] = None + ): Resource[IO, (Port, IO[Unit])] = { val config = NettyConfig.defaultWithStreaming .eventLoopGroup(eventLoopGroup) .randomPort .withDontShutdownEventLoopGroupOnClose .maxContentLength(NettyCatsTestServerInterpreter.maxContentLength) .noGracefulShutdown + + val customizedConfig = gracefulShutdownTimeout.map(config.withGracefulShutdownTimeout).getOrElse(config) val options = NettyCatsServerOptions.default[IO](dispatcher) - val bind: IO[NettyCatsServerBinding[IO]] = NettyCatsServer(options, config).addRoutes(routes.toList).start() + val bind: IO[NettyCatsServerBinding[IO]] = NettyCatsServer(options, customizedConfig).addRoutes(routes.toList).start() Resource - .make(bind)(_.stop()) - .map(_.port) + .make(bind.map(b => (b, b.stop()))) { case (_, stop) => stop } + .map { case (b, stop) => (b.port, stop) } + } + + override def server(routes: NonEmptyList[Route[IO]]): Resource[IO, Port] = { + serverWithStop(routes).map(_._1) } } diff --git a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala index 22e61bdc35..174bedd65a 100644 --- a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala +++ b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala @@ -19,7 +19,10 @@ class NettyFutureTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)(implic NettyFutureServerInterpreter(serverOptions).toRoute(es) } - override def serverWithStop(routes: NonEmptyList[FutureRoute], gracefulShutdownTimeout: Option[FiniteDuration] = None): Resource[IO, (Port, IO[Unit])] = { + override def serverWithStop( + routes: NonEmptyList[FutureRoute], + gracefulShutdownTimeout: Option[FiniteDuration] = None + ): Resource[IO, (Port, IO[Unit])] = { val config = NettyConfig.defaultNoStreaming .eventLoopGroup(eventLoopGroup) @@ -31,13 +34,11 @@ class NettyFutureTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)(implic val bind = IO.fromFuture(IO.delay(NettyFutureServer(options, customizedConfig).addRoutes(routes.toList).start())) Resource - .eval(bind) - .map(bind => (bind.port, IO.fromFuture(IO.delay(bind.stop())))) + .make(bind.map(b => (b, IO.fromFuture(IO.delay(b.stop()))))) { case (_, stop) => stop } + .map { case (b, stop) => (b.port, stop) } } - + override def server(routes: NonEmptyList[FutureRoute]): Resource[IO, Port] = { - serverWithStop(routes).flatMap { case (port, stopServer) => - Resource.make(IO.pure(port))(_ => stopServer) - } + serverWithStop(routes).map(_._1) } } diff --git a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala index 117917fbbf..968e0d1013 100644 --- a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala +++ b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala @@ -114,7 +114,7 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: ZIO.sleep(100.millis) *> waitForClosedChannels(channelGroup, startNanos, gracefulShutdownTimeoutNanos) } else { - ZIO.succeed(channelGroup.close()).unit + ZIO.attempt(channelGroup.close()).unit } private def stop( diff --git a/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioServerTest.scala b/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioServerTest.scala index 227d9223db..0264ef4a25 100644 --- a/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioServerTest.scala +++ b/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioServerTest.scala @@ -9,10 +9,11 @@ import sttp.tapir.server.netty.internal.FutureUtil import sttp.tapir.server.tests._ import sttp.tapir.tests.{Test, TestSuite} import sttp.tapir.ztapir.RIOMonadError -import zio.Task import zio.interop.catz._ +import zio.{Task, ZIO} import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration class NettyZioServerTest extends TestSuite with EitherValues { override def tests: Resource[IO, List[Test]] = @@ -24,11 +25,15 @@ class NettyZioServerTest extends TestSuite with EitherValues { val interpreter = new NettyZioTestServerInterpreter(eventLoopGroup) val createServerTest = new DefaultCreateServerTest(backend, interpreter) + implicit val zioSleeper: Sleeper[Task] = new Sleeper[Task] { + override def sleep(duration: FiniteDuration): Task[Unit] = ZIO.sleep(zio.Duration.fromScala(duration)) + } val tests = new AllServerTests(createServerTest, interpreter, backend, staticContent = false, multipart = false).tests() ++ new ServerStreamingTests(createServerTest, ZioStreams).tests() ++ - new ServerCancellationTests(createServerTest)(monadError, asyncInstance).tests() + new ServerCancellationTests(createServerTest)(monadError, asyncInstance).tests() ++ + new ServerGracefulShutdownTests(createServerTest).tests() IO.pure((tests, eventLoopGroup)) } { case (_, eventLoopGroup) => diff --git a/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioTestServerInterpreter.scala b/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioTestServerInterpreter.scala index c6ac68635e..9f00d05386 100644 --- a/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioTestServerInterpreter.scala +++ b/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioTestServerInterpreter.scala @@ -8,7 +8,9 @@ import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.netty.{NettyConfig, Route} import sttp.tapir.server.tests.TestServerInterpreter import sttp.tapir.tests.Port -import zio.{CancelableFuture, Runtime, Task, Unsafe} +import zio.{Runtime, Task, Unsafe} + +import scala.concurrent.duration.FiniteDuration class NettyZioTestServerInterpreter[R](eventLoopGroup: NioEventLoopGroup) extends TestServerInterpreter[Task, ZioStreams, NettyZioServerOptions[Any], Task[Route[Task]]] { @@ -19,24 +21,38 @@ class NettyZioTestServerInterpreter[R](eventLoopGroup: NioEventLoopGroup) NettyZioServerInterpreter(serverOptions).toRoute(es) } - override def server(routes: NonEmptyList[Task[Route[Task]]]): Resource[IO, Port] = { - val config = NettyConfig - .defaultWithStreaming + override def serverWithStop( + routes: NonEmptyList[Task[Route[Task]]], + gracefulShutdownTimeout: Option[FiniteDuration] = None + ): Resource[IO, (Port, IO[Unit])] = { + val config = NettyConfig.defaultWithStreaming .eventLoopGroup(eventLoopGroup) .randomPort .withDontShutdownEventLoopGroupOnClose .noGracefulShutdown + + val customizedConfig = gracefulShutdownTimeout.map(config.withGracefulShutdownTimeout).getOrElse(config) val options = NettyZioServerOptions.default[R] val runtime: Runtime[R] = Runtime.default.asInstanceOf[Runtime[R]] - val server: CancelableFuture[NettyZioServerBinding[R]] = - Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(NettyZioServer(options, config).addRoutes(routes.toList).start())) + val bind: IO[NettyZioServerBinding[R]] = + IO.fromFuture( + IO.delay( + Unsafe.unsafe(implicit u => + runtime.unsafe.runToFuture(NettyZioServer(options, customizedConfig).addRoutes(routes.toList).start()) + ) + ) + ) Resource - .make(IO.fromFuture(IO(server)))(binding => - IO.fromFuture(IO(Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(binding.stop())))) - ) - .map(b => b.port) + .make(bind.map(b => (b, IO.fromFuture[Unit](IO(Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(b.stop()))))))) { + case (_, stop) => stop + } + .map { case (b, stop) => (b.port, stop) } + } + + override def server(routes: NonEmptyList[Task[Route[Task]]]): Resource[IO, Port] = { + serverWithStop(routes).map(_._1) } } diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerGracefulShutdownTests.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerGracefulShutdownTests.scala index dbdce454b3..b7aae6c1cb 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerGracefulShutdownTests.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerGracefulShutdownTests.scala @@ -37,7 +37,7 @@ class ServerGracefulShutdownTests[F[_], OPTIONS, ROUTE](createServerTest: Create _ <- runningStop.join } yield { result.value.isSuccess shouldBe true - }).guarantee(stopServer) + }) }, testServerLogicWithStop( endpoint @@ -59,7 +59,7 @@ class ServerGracefulShutdownTests[F[_], OPTIONS, ROUTE](createServerTest: Create } yield { (rejected.value.code shouldBe StatusCode.ServiceUnavailable): Unit firstResult.value.isSuccess shouldBe true - }).guarantee(stopServer) + }) } ) } diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala index 2e056782f8..9b848b9364 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala @@ -17,7 +17,10 @@ trait TestServerInterpreter[F[_], +R, OPTIONS, ROUTE] { def route(es: List[ServerEndpoint[R, F]], interceptors: Interceptors = identity): ROUTE def server(routes: NonEmptyList[ROUTE]): Resource[IO, Port] - + + /** Exposes additional `stop` effect, which allows stopping the server inside your test. It will be called after the test anyway (assuming + * idempotency), but may be useful for some cases where tests need to check specific behavior like returning 503s during shutdown. + */ def serverWithStop(routes: NonEmptyList[ROUTE], gracefulShutdownTimeout: Option[FiniteDuration] = None): Resource[IO, (Port, IO[Unit])] = server(routes).map(port => (port, IO.unit)) } From 610e1282ef51c33d4675a62fa29241db7f6a309f Mon Sep 17 00:00:00 2001 From: kciesielski Date: Mon, 6 Nov 2023 16:13:36 +0100 Subject: [PATCH 12/16] Document graceful shutdown --- doc/server/netty.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/doc/server/netty.md b/doc/server/netty.md index abb6eb25aa..cabebd9102 100644 --- a/doc/server/netty.md +++ b/doc/server/netty.md @@ -63,6 +63,21 @@ NettyFutureServer(NettyFutureServerOptions.customiseInterceptors.serverLog(None) NettyFutureServer(NettyConfig.defaultNoStreaming.socketBacklog(256)) ``` +## Graceful shutdown + +A Netty server has to be properly closed using function `NettyFutureServerBinding.stop()` (and analogous functions available in Cats and ZIO bindings). This function ensures that the server will wait at most 10 seconds for in-flight requests to complete, while rejecting all new requests with 503 during this period. Afterwards, it closes all server resources. +You can customize this behavior in `NettyConfig`: + +```scala mdoc:compile-only +import sttp.tapir.server.netty.NettyConfig +import scala.concurrent.duration._ + +// adjust the waiting time to your needs +val config = NettyConfig.defaultNoStreaming.withGracefulShutdownTimeout(5.seconds) +// or if you don't want the server to wait for in-flight requests +val config2 = NettyConfig.defaultNoStreaming.noGracefulShutdown +``` + ## Domain socket support There is possibility to use Domain socket instead of TCP for handling traffic. From e74341a6ab8a32b82df2cefe2b8c27cf7fb8caad Mon Sep 17 00:00:00 2001 From: kciesielski Date: Mon, 6 Nov 2023 16:19:44 +0100 Subject: [PATCH 13/16] Remove debug code --- .../sttp/tapir/server/netty/cats/Main.scala | 71 ------------------- .../scala/sttp/tapir/server/netty/Main.scala | 62 ---------------- 2 files changed, 133 deletions(-) delete mode 100644 server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/Main.scala delete mode 100644 server/netty-server/src/main/scala/sttp/tapir/server/netty/Main.scala diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/Main.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/Main.scala deleted file mode 100644 index d4e8afbb8b..0000000000 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/Main.scala +++ /dev/null @@ -1,71 +0,0 @@ -package sttp.tapir.server.netty.cats - -import cats.effect.IO -import sttp.tapir._ -import cats.effect.syntax.all._ -import cats.effect.unsafe.implicits.global -import sttp.tapir.server.netty.cats.{NettyCatsServer, NettyCatsServerBinding} -import sttp.client4.httpclient.cats.HttpClientCatsBackend -import cats.effect.IOApp -import scala.concurrent.duration._ -import sttp.client4._ - -object HelloWorldNettyCatsServer extends IOApp.Simple { - // One endpoint on GET /hello with query parameter `name` - val helloWorldEndpoint: PublicEndpoint[String, Unit, String, Any] = - endpoint.get.in("hello").in(query[String]("name")).out(stringBody) - - // Just returning passed name with `Hello, ` prepended - val helloWorldServerEndpoint = helloWorldEndpoint - .serverLogic(name => - IO.println(s"=== Received request: $name") >> IO.sleep(6.seconds) >> IO.pure[Either[Unit, String]](Right(s"Hello, $name!")) - ) - - private val declaredPort = 9090 - private val declaredHost = "localhost" - - override def run = - for { - runningServer <- NettyCatsServer - .io() - .use { server => - server - .port(declaredPort) - .host(declaredHost) - .addEndpoint(helloWorldServerEndpoint) - .start() - .bracket(binding => IO.println(">>>>>>>>>>>>>>>>>>> The app is now running... ") >> IO.never)(binding => - IO.println(">>>>>>>> Stopping binding ") >> binding.stop() - ) - } - .start - _ <- IO.sleep(3.seconds) - _ <- HttpClientCatsBackend - .resource[IO]() - .use { sttpBackend => - for { - response <- basicRequest - .get(uri"http://localhost:9090/hello?name=Bob") - .send(sttpBackend) - _ <- IO.println(response) - } yield () - } - .start // send http request in background - _ <- IO.sleep(2.seconds) - cancelation <- runningServer.cancel.start - _ <- HttpClientCatsBackend - .resource[IO]() - .use { sttpBackend => - for { - response <- basicRequest - .get(uri"http://localhost:9090/hello?name=Cynthia") - .send(sttpBackend) - _ <- IO.println(response) - } yield () - } - .start // send http request in background - _ <- IO.println("Server fiber cancellation requested") - _ <- cancelation.join - } yield () - -} diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/Main.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/Main.scala deleted file mode 100644 index e02c36c137..0000000000 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/Main.scala +++ /dev/null @@ -1,62 +0,0 @@ -package sttp.tapir.server.netty - -import sttp.client3._ -import sttp.model.Uri -import sttp.tapir._ -import sttp.tapir.server.netty.{NettyFutureServer, NettyFutureServerOptions} - -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future, blocking} -import scala.util.{Failure, Success} - -object Main { - - private val helloWorldEndpoint = endpoint.get - .in("hello") - .out(stringBody) - .serverLogicSuccess(_ => - Future { - println("[Server] received the request.") - blocking { - Thread.sleep(3000) - println("[Server] successfully processed the request.") - } - s"Hello World!" - } - ) - - def main(args: Array[String]): Unit = { - implicit val backend: SttpBackend[Future, Any] = HttpClientFutureBackend() - - // shutdown hook to print if a signal is received - Runtime.getRuntime.addShutdownHook(new Thread((() => { - println("[Application] shutdown signal received.") - }): Runnable)) - - // start server - val serverOptions = NettyFutureServerOptions.default - val bindingF = NettyFutureServer().port(8080).options(serverOptions).addEndpoint(helloWorldEndpoint).start() - val binding = Await.result(bindingF, 10.seconds) - println("[Server] started.") - - // call my endpoint and then kill me - println(s"[Client] Sending request.") - emptyRequest - .get(Uri.parse("http://localhost:8080/hello").getOrElse(???)) - .send(backend) - .onComplete { - case Success(r) => println(s"[Client] Response received: $r") - case Failure(exception) => exception.printStackTrace() - } - // wait until the service receives the request - Thread.sleep(1000L) - - // kill myself - println(s"[Client] Stopping server.") - Await.result(binding.stop(), 60.seconds) - - println(s"[Client] Stopped server.") - } - -} From f33b86140290da89111b04f929f8e41d84a95692 Mon Sep 17 00:00:00 2001 From: kciesielski Date: Mon, 6 Nov 2023 16:31:56 +0100 Subject: [PATCH 14/16] Remove unneeded dependencies --- build.sbt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 15e92eab99..66571022f6 100644 --- a/build.sbt +++ b/build.sbt @@ -1434,8 +1434,7 @@ lazy val nettyServer: ProjectMatrix = (projectMatrix in file("server/netty-serve name := "tapir-netty-server", libraryDependencies ++= Seq( "io.netty" % "netty-all" % Versions.nettyAll, - "org.playframework.netty" % "netty-reactive-streams-http" % Versions.nettyReactiveStreams, - "com.softwaremill.sttp.client3" %%% "core" % Versions.sttp + "org.playframework.netty" % "netty-reactive-streams-http" % Versions.nettyReactiveStreams ) ++ loggerDependencies, // needed because of https://github.com/coursier/coursier/issues/2016 @@ -1448,8 +1447,7 @@ lazy val nettyServerCats: ProjectMatrix = nettyServerProject("cats", catsEffect) .settings( libraryDependencies ++= Seq( "com.softwaremill.sttp.shared" %% "fs2" % Versions.sttpShared, - "co.fs2" %% "fs2-reactive-streams" % Versions.fs2, - "com.softwaremill.sttp.client4" %% "cats" % "4.0.0-M6" + "co.fs2" %% "fs2-reactive-streams" % Versions.fs2 ) ) From 4b5cb55ea025761e5f64a09e6a44a0fcf0b3f671 Mon Sep 17 00:00:00 2001 From: kciesielski Date: Mon, 6 Nov 2023 17:03:21 +0100 Subject: [PATCH 15/16] Add stub implementations --- .../scala/sttp/tapir/server/tests/CreateServerTest.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala index ed838a47c3..81f67d250d 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala @@ -41,7 +41,7 @@ trait CreateServerTest[F[_], +R, OPTIONS, ROUTE] { gracefulShutdownTimeout: Option[FiniteDuration] = None )( runTest: IO[Unit] => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] - ): Test + ): Test = testServerLogic(e, testNameSuffix, interceptors)(runTest(IO.unit)) def testServer(name: String, rs: => NonEmptyList[ROUTE])( runTest: (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] @@ -49,7 +49,7 @@ trait CreateServerTest[F[_], +R, OPTIONS, ROUTE] { def testServerWithStop(name: String, rs: => NonEmptyList[ROUTE], gracefulShutdownTimeout: Option[FiniteDuration])( runTest: IO[Unit] => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] - ): Test + ): Test = testServer(name, rs)(runTest(IO.unit)) } class DefaultCreateServerTest[F[_], +R, OPTIONS, ROUTE]( @@ -82,7 +82,7 @@ class DefaultCreateServerTest[F[_], +R, OPTIONS, ROUTE]( testServerWithStop( e.showDetail + (if (testNameSuffix == "") "" else " " + testNameSuffix), NonEmptyList.of(interpreter.route(e, interceptors)), - gracefulShutdownTimeout, + gracefulShutdownTimeout )(runTest) } override def testServerLogic( From f2eacdcecbc096c245cca008f4df954a09bb6d8d Mon Sep 17 00:00:00 2001 From: kciesielski Date: Tue, 7 Nov 2023 07:30:31 +0100 Subject: [PATCH 16/16] Review fixes --- doc/server/netty.md | 2 +- .../sttp/tapir/server/netty/cats/NettyCatsServerTest.scala | 4 ++-- .../sttp/tapir/server/netty/NettyFutureServerTest.scala | 4 +--- .../sttp/tapir/server/netty/zio/NettyZioServerTest.scala | 4 ++-- .../scala/sttp/tapir/server/tests/CreateServerTest.scala | 3 +++ .../tapir/server/tests/ServerGracefulShutdownTests.scala | 5 ++--- .../src/main/scala/sttp/tapir/server/tests/Sleeper.scala | 2 +- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/doc/server/netty.md b/doc/server/netty.md index cabebd9102..f53228d530 100644 --- a/doc/server/netty.md +++ b/doc/server/netty.md @@ -65,7 +65,7 @@ NettyFutureServer(NettyConfig.defaultNoStreaming.socketBacklog(256)) ## Graceful shutdown -A Netty server has to be properly closed using function `NettyFutureServerBinding.stop()` (and analogous functions available in Cats and ZIO bindings). This function ensures that the server will wait at most 10 seconds for in-flight requests to complete, while rejecting all new requests with 503 during this period. Afterwards, it closes all server resources. +A Netty should can be gracefully closed using function `NettyFutureServerBinding.stop()` (and analogous functions available in Cats and ZIO bindings). This function ensures that the server will wait at most 10 seconds for in-flight requests to complete, while rejecting all new requests with 503 during this period. Afterwards, it closes all server resources. You can customize this behavior in `NettyConfig`: ```scala mdoc:compile-only diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala index 073a9965f9..cde65b3d38 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala @@ -24,7 +24,7 @@ class NettyCatsServerTest extends TestSuite with EitherValues { val interpreter = new NettyCatsTestServerInterpreter(eventLoopGroup, dispatcher) val createServerTest = new DefaultCreateServerTest(backend, interpreter) - implicit val ioSleeper: Sleeper[IO] = new Sleeper[IO] { + val ioSleeper: Sleeper[IO] = new Sleeper[IO] { override def sleep(duration: FiniteDuration): IO[Unit] = IO.sleep(duration) } @@ -39,7 +39,7 @@ class NettyCatsServerTest extends TestSuite with EitherValues { new ServerStreamingTests(createServerTest, Fs2Streams[IO]).tests() ++ new ServerCancellationTests(createServerTest)(m, IO.asyncForIO).tests() ++ new NettyFs2StreamingCancellationTest(createServerTest).tests() ++ - new ServerGracefulShutdownTests(createServerTest).tests() + new ServerGracefulShutdownTests(createServerTest, ioSleeper).tests() IO.pure((tests, eventLoopGroup)) } { case (_, eventLoopGroup) => diff --git a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureServerTest.scala b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureServerTest.scala index 247c967d63..b7d86c5e4a 100644 --- a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureServerTest.scala +++ b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureServerTest.scala @@ -9,8 +9,6 @@ import sttp.tapir.server.tests._ import sttp.tapir.tests.{Test, TestSuite} import scala.concurrent.Future -import Sleeper._ -import scala.concurrent.duration._ class NettyFutureServerTest extends TestSuite with EitherValues { override def tests: Resource[IO, List[Test]] = @@ -24,7 +22,7 @@ class NettyFutureServerTest extends TestSuite with EitherValues { val createServerTest = new DefaultCreateServerTest(backend, interpreter) val tests = new AllServerTests(createServerTest, interpreter, backend, multipart = false).tests() ++ - new ServerGracefulShutdownTests(createServerTest).tests() + new ServerGracefulShutdownTests(createServerTest, Sleeper.futureSleeper).tests() (tests, eventLoopGroup) }) { case (_, eventLoopGroup) => diff --git a/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioServerTest.scala b/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioServerTest.scala index 0264ef4a25..1486c0bb6c 100644 --- a/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioServerTest.scala +++ b/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioServerTest.scala @@ -25,7 +25,7 @@ class NettyZioServerTest extends TestSuite with EitherValues { val interpreter = new NettyZioTestServerInterpreter(eventLoopGroup) val createServerTest = new DefaultCreateServerTest(backend, interpreter) - implicit val zioSleeper: Sleeper[Task] = new Sleeper[Task] { + val zioSleeper: Sleeper[Task] = new Sleeper[Task] { override def sleep(duration: FiniteDuration): Task[Unit] = ZIO.sleep(zio.Duration.fromScala(duration)) } @@ -33,7 +33,7 @@ class NettyZioServerTest extends TestSuite with EitherValues { new AllServerTests(createServerTest, interpreter, backend, staticContent = false, multipart = false).tests() ++ new ServerStreamingTests(createServerTest, ZioStreams).tests() ++ new ServerCancellationTests(createServerTest)(monadError, asyncInstance).tests() ++ - new ServerGracefulShutdownTests(createServerTest).tests() + new ServerGracefulShutdownTests(createServerTest, zioSleeper).tests() IO.pure((tests, eventLoopGroup)) } { case (_, eventLoopGroup) => diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala index 81f67d250d..2195898125 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala @@ -47,6 +47,9 @@ trait CreateServerTest[F[_], +R, OPTIONS, ROUTE] { runTest: (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] ): Test + /** Override for a server to allow running tests which have access to a stop() effect, allowing shutting down the server within the test. + * By default, this method just uses a no-op IO.unit. + */ def testServerWithStop(name: String, rs: => NonEmptyList[ROUTE], gracefulShutdownTimeout: Option[FiniteDuration])( runTest: IO[Unit] => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] ): Test = testServer(name, rs)(runTest(IO.unit)) diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerGracefulShutdownTests.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerGracefulShutdownTests.scala index b7aae6c1cb..e950b81b23 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerGracefulShutdownTests.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerGracefulShutdownTests.scala @@ -13,9 +13,8 @@ import sttp.tapir.tests._ import scala.concurrent.duration._ -class ServerGracefulShutdownTests[F[_], OPTIONS, ROUTE](createServerTest: CreateServerTest[F, Any, OPTIONS, ROUTE])(implicit - m: MonadError[F], - sleeper: Sleeper[F] +class ServerGracefulShutdownTests[F[_], OPTIONS, ROUTE](createServerTest: CreateServerTest[F, Any, OPTIONS, ROUTE], sleeper: Sleeper[F])(implicit + m: MonadError[F] ) extends EitherValues { import createServerTest._ diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/Sleeper.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/Sleeper.scala index e5f874cb54..1e29cefe94 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/Sleeper.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/Sleeper.scala @@ -10,7 +10,7 @@ trait Sleeper[F[_]] { } object Sleeper { - implicit def futureSleeper(implicit ec: ExecutionContext): Sleeper[Future] = new Sleeper[Future] { + def futureSleeper(implicit ec: ExecutionContext): Sleeper[Future] = new Sleeper[Future] { override def sleep(duration: FiniteDuration): Future[Unit] = Future { blocking { Thread.sleep(duration.toMillis)