diff --git a/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpTestServerInterpreter.scala b/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpTestServerInterpreter.scala index 6f8d13d39d..19fd6f5e46 100644 --- a/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpTestServerInterpreter.scala +++ b/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpTestServerInterpreter.scala @@ -10,9 +10,10 @@ import sttp.capabilities.WebSockets import sttp.capabilities.akka.AkkaStreams import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import scala.concurrent.Future +import scala.concurrent.duration._ class AkkaHttpTestServerInterpreter(implicit actorSystem: ActorSystem) extends TestServerInterpreter[Future, AkkaStreams with WebSockets, AkkaHttpServerOptions, Route] { @@ -22,8 +23,22 @@ class AkkaHttpTestServerInterpreter(implicit actorSystem: ActorSystem) AkkaHttpServerInterpreter(serverOptions).toRoute(es) } - override def server(routes: NonEmptyList[Route]): Resource[IO, Port] = { + override def serverWithStop( + routes: NonEmptyList[Route], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { val bind = IO.fromFuture(IO(Http().newServerAt("localhost", 0).bind(concat(routes.toList: _*)))) - Resource.make(bind)(binding => IO.fromFuture(IO(binding.unbind())).void).map(_.localAddress.getPort) + + Resource + .make( + bind.map(b => + ( + b.localAddress.getPort, + IO.fromFuture(IO(b.terminate(gracefulShutdownTimeout.getOrElse(50.millis)))).void + ) + ) + ) { case (_, release) => + release + } } } diff --git a/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaTestServerInterpreter.scala b/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaTestServerInterpreter.scala index 6a159baade..4fab031ca9 100644 --- a/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaTestServerInterpreter.scala +++ b/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaTestServerInterpreter.scala @@ -5,32 +5,45 @@ import cats.effect.{IO, Resource} import com.linecorp.armeria.server.Server import sttp.capabilities.Streams import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ + +import scala.concurrent.duration._ trait ArmeriaTestServerInterpreter[S <: Streams[S], F[_], OPTIONS] extends TestServerInterpreter[F, S, OPTIONS, TapirService[S, F]] { - override def server(routes: NonEmptyList[TapirService[S, F]]): Resource[IO, Port] = { + override def serverWithStop( + routes: NonEmptyList[TapirService[S, F]], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { + val (quietPeriodMs, totalDeadlineMs) = gracefulShutdownTimeout + .map(t => (t.toMillis, t.toMillis + 50)) + .getOrElse((0L, 0L)) + val bind = IO.fromCompletableFuture( IO { val serverBuilder = Server .builder() .maxRequestLength(0) .connectionDrainDurationMicros(0) + .gracefulShutdownTimeoutMillis(quietPeriodMs, totalDeadlineMs) routes.foldLeft(serverBuilder)((sb, route) => sb.service(route)) val server = serverBuilder.build() server.start().thenApply[Server](_ => server) } ) Resource - .make(bind)(binding => - IO { - // Ignore returned future for fast test iterations. - // Armeria server wait for 2 seconds by default to let the boss group gracefully finish all remaining - // tasks in the queue. - val _ = binding.stop() - () - } - ) - .map(_.activeLocalPort()) + .make( + bind.map(b => + ( + b.activeLocalPort(), + // Ignore returned future for fast test iterations. + // Armeria server wait for 2 seconds by default to let the boss group gracefully finish all remaining + // tasks in the queue. Even if graceful shutdown timeouts are set to 0. + IO { val _ = b.stop() } + ) + ) + ) { case (_, release) => + release + } } } diff --git a/server/finatra-server/cats/src/test/scala/sttp/tapir/server/finatra/cats/FinatraCatsTestServerInterpreter.scala b/server/finatra-server/cats/src/test/scala/sttp/tapir/server/finatra/cats/FinatraCatsTestServerInterpreter.scala index 923f352438..4ae238ea29 100644 --- a/server/finatra-server/cats/src/test/scala/sttp/tapir/server/finatra/cats/FinatraCatsTestServerInterpreter.scala +++ b/server/finatra-server/cats/src/test/scala/sttp/tapir/server/finatra/cats/FinatraCatsTestServerInterpreter.scala @@ -6,9 +6,10 @@ import cats.effect.{IO, Resource} import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.finatra.{FinatraRoute, FinatraTestServerInterpreter} import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration class FinatraCatsTestServerInterpreter(dispatcher: Dispatcher[IO]) extends TestServerInterpreter[IO, Any, FinatraCatsServerOptions[IO], FinatraRoute] { @@ -20,5 +21,8 @@ class FinatraCatsTestServerInterpreter(dispatcher: Dispatcher[IO]) es.map(interpreter.toRoute).last } - override def server(routes: NonEmptyList[FinatraRoute]): Resource[IO, Port] = FinatraTestServerInterpreter.server(routes) + override def serverWithStop( + routes: NonEmptyList[FinatraRoute], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = FinatraTestServerInterpreter.serverWithStop(routes, gracefulShutdownTimeout) } diff --git a/server/finatra-server/src/test/scala/sttp/tapir/server/finatra/FinatraTestServerInterpreter.scala b/server/finatra-server/src/test/scala/sttp/tapir/server/finatra/FinatraTestServerInterpreter.scala index 004446d2b6..1cae8a25db 100644 --- a/server/finatra-server/src/test/scala/sttp/tapir/server/finatra/FinatraTestServerInterpreter.scala +++ b/server/finatra-server/src/test/scala/sttp/tapir/server/finatra/FinatraTestServerInterpreter.scala @@ -4,12 +4,14 @@ import cats.data.NonEmptyList import cats.effect.{IO, Resource} import com.twitter.finatra.http.routing.HttpRouter import com.twitter.finatra.http.{Controller, EmbeddedHttpServer, HttpServer} +import com.twitter.util.Duration import com.twitter.util.Future import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.FiniteDuration class FinatraTestServerInterpreter extends TestServerInterpreter[Future, Any, FinatraServerOptions, FinatraRoute] { override def route(es: List[ServerEndpoint[Any, Future]], interceptors: Interceptors): FinatraRoute = { @@ -18,11 +20,17 @@ class FinatraTestServerInterpreter extends TestServerInterpreter[Future, Any, Fi es.map(interpreter.toRoute).last } - override def server(routes: NonEmptyList[FinatraRoute]): Resource[IO, Port] = FinatraTestServerInterpreter.server(routes) + override def serverWithStop( + routes: NonEmptyList[FinatraRoute], + gracefulShutdownTimeout: Option[FiniteDuration] = None + ): Resource[IO, (Port, KillSwitch)] = FinatraTestServerInterpreter.serverWithStop(routes, gracefulShutdownTimeout) } object FinatraTestServerInterpreter { - def server(routes: NonEmptyList[FinatraRoute]): Resource[IO, Port] = { + def serverWithStop( + routes: NonEmptyList[FinatraRoute], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { def waitUntilHealthy(s: EmbeddedHttpServer, count: Int): IO[EmbeddedHttpServer] = if (s.isHealthy) IO.pure(s) else if (count > 1000) IO.raiseError(new IllegalStateException("Server unhealthy")) @@ -57,7 +65,15 @@ object FinatraTestServerInterpreter { }.flatMap(waitUntilHealthy(_, 0)) Resource - .make(bind)(httpServer => IO(httpServer.close())) - .map(_.httpExternalPort()) + .make( + bind.map(server => + ( + server.httpExternalPort(), + IO { server.close(Duration.fromMilliseconds(gracefulShutdownTimeout.map(_.toMillis).getOrElse(50))) } + ) + ) + ) { case (_, release) => + release + } } } diff --git a/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sTestServerInterpreter.scala b/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sTestServerInterpreter.scala index 41aa9dc868..304f143515 100644 --- a/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sTestServerInterpreter.scala +++ b/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sTestServerInterpreter.scala @@ -11,9 +11,10 @@ import sttp.capabilities.fs2.Fs2Streams import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.http4s.Http4sTestServerInterpreter._ import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ object Http4sTestServerInterpreter { type Routes = WebSocketBuilder2[IO] => HttpRoutes[IO] @@ -27,15 +28,23 @@ class Http4sTestServerInterpreter extends TestServerInterpreter[IO, Fs2Streams[I Http4sServerInterpreter(serverOptions).toWebSocketRoutes(es) } - override def server(routes: NonEmptyList[Routes]): Resource[IO, Port] = { + override def serverWithStop( + routes: NonEmptyList[Routes], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { val service: WebSocketBuilder2[IO] => HttpApp[IO] = wsb => routes.map(_.apply(wsb)).reduceK.orNotFound - BlazeServerBuilder[IO] - .withExecutionContext(ExecutionContext.global) - .bindHttp(0, "localhost") - .withHttpWebSocketApp(service) - .resource - .map(_.address.getPort) + Resource.make( + BlazeServerBuilder[IO] + .withExecutionContext(ExecutionContext.global) + .bindHttp(0, "localhost") + .withHttpWebSocketApp(service) + .resource + .allocated + .map { case (server, release) => // Blaze has no graceful shutdown support https://github.com/http4s/blaze/issues/676 + (server.address.getPort(), release) + } + ) { case (_, release) => release } } } diff --git a/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sTestServerInterpreter.scala b/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sTestServerInterpreter.scala index 321aff1dfc..23c847f7a5 100644 --- a/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sTestServerInterpreter.scala +++ b/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sTestServerInterpreter.scala @@ -11,13 +11,14 @@ import sttp.capabilities.zio.ZioStreams import sttp.tapir.server.http4s.Http4sServerOptions import sttp.tapir.server.http4s.ztapir.ZHttp4sTestServerInterpreter._ import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import sttp.tapir.ztapir.ZServerEndpoint import zio.{Runtime, Task, Unsafe} import zio.interop.catz._ import zio.interop.catz.implicits._ import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration object ZHttp4sTestServerInterpreter { type F[A] = Task[A] @@ -33,7 +34,10 @@ class ZHttp4sTestServerInterpreter extends TestServerInterpreter[Task, ZioStream ZHttp4sServerInterpreter(serverOptions).fromWebSocket(es).toRoutes } - override def server(routes: NonEmptyList[Routes]): Resource[IO, Port] = { + override def serverWithStop( + routes: NonEmptyList[Routes], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { val service: WebSocketBuilder2[Task] => HttpApp[Task] = wsb => routes.map(_.apply(wsb)).reduceK.orNotFound @@ -48,8 +52,10 @@ class ZHttp4sTestServerInterpreter extends TestServerInterpreter[Task, ZioStream val runtime = implicitly[zio.Runtime[Any]] Resource .eval(IO.fromFuture(IO(Unsafe.unsafe(implicit u => Runtime.default.unsafe.runToFuture(serverResource.allocated))))) - .flatMap { case (port, release) => - Resource.make(IO.pure(port))(_ => IO.fromFuture(IO(Unsafe.unsafe(implicit u => Runtime.default.unsafe.runToFuture(release))))) + .flatMap { case (port, release) => // Blaze has no graceful shutdown support https://github.com/http4s/blaze/issues/676 + Resource.make(IO.pure((port, IO.fromFuture(IO(Unsafe.unsafe(implicit u => Runtime.default.unsafe.runToFuture(release))))))) { + case (_, release) => release + } } } } diff --git a/server/http4s-server/zio1/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sTestServerInterpreter.scala b/server/http4s-server/zio1/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sTestServerInterpreter.scala index 3300a4295a..b97c19c154 100644 --- a/server/http4s-server/zio1/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sTestServerInterpreter.scala +++ b/server/http4s-server/zio1/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sTestServerInterpreter.scala @@ -11,7 +11,7 @@ import sttp.capabilities.zio.ZioStreams import sttp.tapir.server.http4s.Http4sServerOptions import sttp.tapir.server.http4s.ztapir.ZHttp4sTestServerInterpreter._ import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import sttp.tapir.ztapir.ZServerEndpoint import zio.RIO import zio.blocking.Blocking @@ -20,6 +20,7 @@ import zio.interop.catz._ import zio.interop.catz.implicits._ import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration object ZHttp4sTestServerInterpreter { type F[A] = RIO[Clock with Blocking, A] @@ -35,7 +36,10 @@ class ZHttp4sTestServerInterpreter extends TestServerInterpreter[F, ZioStreams w ZHttp4sServerInterpreter(serverOptions).fromWebSocket(es).toRoutes } - override def server(routes: NonEmptyList[Routes]): Resource[IO, Port] = { + override def serverWithStop( + routes: NonEmptyList[Routes], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { val service: WebSocketBuilder2[RIO[Clock with Blocking, *]] => HttpApp[RIO[Clock with Blocking, *]] = wsb => routes.map(_.apply(wsb)).reduceK.orNotFound @@ -50,8 +54,10 @@ class ZHttp4sTestServerInterpreter extends TestServerInterpreter[F, ZioStreams w val runtime = implicitly[zio.Runtime[Clock with Blocking]] Resource .eval(IO.fromFuture(IO(runtime.unsafeRunToFuture(serverResource.allocated)))) - .flatMap { case (port, release) => - Resource.make(IO.pure(port))(_ => IO.fromFuture(IO(runtime.unsafeRunToFuture(release)))) + .flatMap { case (port, release) => // Blaze has no graceful shutdown support https://github.com/http4s/blaze/issues/676 + Resource.make(IO.pure((port, IO.fromFuture(IO(runtime.unsafeRunToFuture(release)))))) { case (_, release) => + release + } } } } diff --git a/server/jdkhttp-server/src/test/scala/sttp/tapir/server/jdkhttp/JdkHttpTestServerInterpreter.scala b/server/jdkhttp-server/src/test/scala/sttp/tapir/server/jdkhttp/JdkHttpTestServerInterpreter.scala index 8387bec19b..783250fd3e 100644 --- a/server/jdkhttp-server/src/test/scala/sttp/tapir/server/jdkhttp/JdkHttpTestServerInterpreter.scala +++ b/server/jdkhttp-server/src/test/scala/sttp/tapir/server/jdkhttp/JdkHttpTestServerInterpreter.scala @@ -4,10 +4,11 @@ import cats.effect.{IO, Resource} import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer} import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import java.net.InetSocketAddress import scala.annotation.tailrec +import scala.concurrent.duration.FiniteDuration class JdkHttpTestServerInterpreter() extends TestServerInterpreter[Id, Any, JdkHttpServerOptions, HttpHandler] { override def route(es: List[ServerEndpoint[Any, Id]], interceptors: Interceptors): HttpHandler = { @@ -16,7 +17,7 @@ class JdkHttpTestServerInterpreter() extends TestServerInterpreter[Id, Any, JdkH JdkHttpServerInterpreter(serverOptions).toHandler(es) } - override def server(routes: NonEmptyList[HttpHandler]): Resource[IO, Port] = { + override def serverWithStop(routes: NonEmptyList[HttpHandler], gracefulShutdownTimeout: Option[FiniteDuration]): Resource[IO, (Port, KillSwitch)] = { val server = IO.blocking { val server = HttpServer.create(new InetSocketAddress(0), 0) @@ -43,7 +44,6 @@ class JdkHttpTestServerInterpreter() extends TestServerInterpreter[Id, Any, JdkH } Resource - .make(server)(server => IO.blocking(server.stop(0))) - .map(server => server.getAddress.getPort) + .make(server.map(s => (s.getAddress.getPort, IO.blocking(s.stop(gracefulShutdownTimeout.map(_.toSeconds.toInt).getOrElse(0)))))) { case (_, release) => release } } } diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsTestServerInterpreter.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsTestServerInterpreter.scala index 68e74d197b..521e6a342d 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsTestServerInterpreter.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsTestServerInterpreter.scala @@ -7,7 +7,7 @@ import io.netty.channel.nio.NioEventLoopGroup import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.netty.{NettyConfig, Route} import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import sttp.capabilities.fs2.Fs2Streams import scala.concurrent.duration.FiniteDuration @@ -23,7 +23,7 @@ class NettyCatsTestServerInterpreter(eventLoopGroup: NioEventLoopGroup, dispatch override def serverWithStop( routes: NonEmptyList[Route[IO]], gracefulShutdownTimeout: Option[FiniteDuration] = None - ): Resource[IO, (Port, IO[Unit])] = { + ): Resource[IO, (Port, KillSwitch)] = { val config = NettyConfig.defaultWithStreaming .eventLoopGroup(eventLoopGroup) .randomPort @@ -36,12 +36,7 @@ class NettyCatsTestServerInterpreter(eventLoopGroup: NioEventLoopGroup, dispatch val bind: IO[NettyCatsServerBinding[IO]] = NettyCatsServer(options, customizedConfig).addRoutes(routes.toList).start() Resource - .make(bind.map(b => (b, b.stop()))) { case (_, stop) => stop } - .map { case (b, stop) => (b.port, stop) } - } - - override def server(routes: NonEmptyList[Route[IO]]): Resource[IO, Port] = { - serverWithStop(routes).map(_._1) + .make(bind.map(b => (b.port, b.stop()))) { case (_, release) => release } } } diff --git a/server/netty-server/loom/src/test/scala/sttp/tapir/server/netty/loom/NettyIdTestServerInterpreter.scala b/server/netty-server/loom/src/test/scala/sttp/tapir/server/netty/loom/NettyIdTestServerInterpreter.scala index 517f891312..8d6e940a68 100644 --- a/server/netty-server/loom/src/test/scala/sttp/tapir/server/netty/loom/NettyIdTestServerInterpreter.scala +++ b/server/netty-server/loom/src/test/scala/sttp/tapir/server/netty/loom/NettyIdTestServerInterpreter.scala @@ -17,7 +17,10 @@ class NettyIdTestServerInterpreter(eventLoopGroup: NioEventLoopGroup) NettyIdServerInterpreter(serverOptions).toRoute(es) } - override def serverWithStop(routes: NonEmptyList[IdRoute], gracefulShutdownTimeout: Option[FiniteDuration] = None): Resource[IO, (Port, IO[Unit])] = { + override def serverWithStop( + routes: NonEmptyList[IdRoute], + gracefulShutdownTimeout: Option[FiniteDuration] = None + ): Resource[IO, (Port, IO[Unit])] = { val config = NettyConfig.defaultNoStreaming.eventLoopGroup(eventLoopGroup).randomPort.withDontShutdownEventLoopGroupOnClose.noGracefulShutdown val customizedConfig = gracefulShutdownTimeout.map(config.withGracefulShutdownTimeout).getOrElse(config) @@ -25,11 +28,6 @@ class NettyIdTestServerInterpreter(eventLoopGroup: NioEventLoopGroup) val bind = IO.blocking(NettyIdServer(options, customizedConfig).addRoutes(routes.toList).start()) Resource - .make(bind.map(b => (b, IO.blocking(b.stop())))) { case (_, stop) => stop } - .map { case (b, stop) => (b.port, stop) } - } - - override def server(routes: NonEmptyList[IdRoute]): Resource[IO, Port] = { - serverWithStop(routes).map(_._1) + .make(bind.map(b => (b.port, IO.blocking(b.stop())))) { case (_, stop) => stop } } } diff --git a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala index 174bedd65a..0073136a5e 100644 --- a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala +++ b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala @@ -5,7 +5,7 @@ import cats.effect.{IO, Resource} import io.netty.channel.nio.NioEventLoopGroup import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.FiniteDuration @@ -22,7 +22,7 @@ class NettyFutureTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)(implic override def serverWithStop( routes: NonEmptyList[FutureRoute], gracefulShutdownTimeout: Option[FiniteDuration] = None - ): Resource[IO, (Port, IO[Unit])] = { + ): Resource[IO, (Port, KillSwitch)] = { val config = NettyConfig.defaultNoStreaming .eventLoopGroup(eventLoopGroup) @@ -34,11 +34,6 @@ class NettyFutureTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)(implic val bind = IO.fromFuture(IO.delay(NettyFutureServer(options, customizedConfig).addRoutes(routes.toList).start())) Resource - .make(bind.map(b => (b, IO.fromFuture(IO.delay(b.stop()))))) { case (_, stop) => stop } - .map { case (b, stop) => (b.port, stop) } - } - - override def server(routes: NonEmptyList[FutureRoute]): Resource[IO, Port] = { - serverWithStop(routes).map(_._1) + .make(bind.map(b => (b.port, IO.fromFuture(IO.delay(b.stop()))))) { case (_, release) => release } } } diff --git a/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioTestServerInterpreter.scala b/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioTestServerInterpreter.scala index 9f00d05386..2f17fdefaf 100644 --- a/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioTestServerInterpreter.scala +++ b/server/netty-server/zio/src/test/scala/sttp/tapir/server/netty/zio/NettyZioTestServerInterpreter.scala @@ -7,7 +7,7 @@ import sttp.capabilities.zio.ZioStreams import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.netty.{NettyConfig, Route} import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import zio.{Runtime, Task, Unsafe} import scala.concurrent.duration.FiniteDuration @@ -24,7 +24,7 @@ class NettyZioTestServerInterpreter[R](eventLoopGroup: NioEventLoopGroup) override def serverWithStop( routes: NonEmptyList[Task[Route[Task]]], gracefulShutdownTimeout: Option[FiniteDuration] = None - ): Resource[IO, (Port, IO[Unit])] = { + ): Resource[IO, (Port, KillSwitch)] = { val config = NettyConfig.defaultWithStreaming .eventLoopGroup(eventLoopGroup) .randomPort @@ -46,13 +46,8 @@ class NettyZioTestServerInterpreter[R](eventLoopGroup: NioEventLoopGroup) ) Resource - .make(bind.map(b => (b, IO.fromFuture[Unit](IO(Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(b.stop()))))))) { - case (_, stop) => stop + .make(bind.map(b => (b.port, IO.fromFuture[Unit](IO(Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(b.stop()))))))) { + case (_, release) => release } - .map { case (b, stop) => (b.port, stop) } - } - - override def server(routes: NonEmptyList[Task[Route[Task]]]): Resource[IO, Port] = { - serverWithStop(routes).map(_._1) } } diff --git a/server/nima-server/src/test/scala/sttp/tapir/server/nima/NimaTestServerInterpreter.scala b/server/nima-server/src/test/scala/sttp/tapir/server/nima/NimaTestServerInterpreter.scala index 1af6110888..64b3498d6c 100644 --- a/server/nima-server/src/test/scala/sttp/tapir/server/nima/NimaTestServerInterpreter.scala +++ b/server/nima-server/src/test/scala/sttp/tapir/server/nima/NimaTestServerInterpreter.scala @@ -6,7 +6,10 @@ import io.helidon.webserver.WebServer import io.helidon.webserver.http.{Handler, HttpRouting} import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ + +import scala.concurrent.duration.FiniteDuration +import java.time.Duration class NimaTestServerInterpreter() extends TestServerInterpreter[Id, Any, NimaServerOptions, Handler] { override def route(es: List[ServerEndpoint[Any, Id]], interceptors: Interceptors): Handler = { @@ -14,7 +17,10 @@ class NimaTestServerInterpreter() extends TestServerInterpreter[Id, Any, NimaSer NimaServerInterpreter(serverOptions).toHandler(es) } - override def server(nimaRoutes: NonEmptyList[Handler]): Resource[IO, Port] = { + override def serverWithStop( + nimaRoutes: NonEmptyList[Handler], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { val bind = IO.blocking { WebServer .builder() @@ -22,14 +28,14 @@ class NimaTestServerInterpreter() extends TestServerInterpreter[Id, Any, NimaSer nimaRoutes.iterator .foreach(nimaHandler => builder.any(nimaHandler)) } + .shutdownGracePeriod(Duration.ofMillis(gracefulShutdownTimeout.map(_.toMillis).getOrElse(0L))) .build() .start() } Resource - .make(bind) { binding => - IO.blocking(binding.stop()).map(_ => ()) + .make(bind.map(b => (b.port, IO.blocking(b.stop()).map(_ => ())))) { case (_, release) => + release } - .map(b => b.port) } } diff --git a/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpTestServerInterpreter.scala b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpTestServerInterpreter.scala index ed60ecddf0..1f63934d43 100644 --- a/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpTestServerInterpreter.scala +++ b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpTestServerInterpreter.scala @@ -10,8 +10,9 @@ import sttp.capabilities.WebSockets import sttp.capabilities.pekko.PekkoStreams import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ +import scala.concurrent.duration._ import scala.concurrent.Future class PekkoHttpTestServerInterpreter(implicit actorSystem: ActorSystem) @@ -22,8 +23,22 @@ class PekkoHttpTestServerInterpreter(implicit actorSystem: ActorSystem) PekkoHttpServerInterpreter(serverOptions).toRoute(es) } - override def server(routes: NonEmptyList[Route]): Resource[IO, Port] = { + override def serverWithStop( + routes: NonEmptyList[Route], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { val bind = IO.fromFuture(IO(Http().newServerAt("localhost", 0).bind(concat(routes.toList: _*)))) - Resource.make(bind)(binding => IO.fromFuture(IO(binding.unbind())).void).map(_.localAddress.getPort) + + Resource + .make( + bind.map(b => + ( + b.localAddress.getPort(), + IO.fromFuture(IO(b.terminate(gracefulShutdownTimeout.getOrElse(50.millis)))).void + ) + ) + ) { case (_, release) => + release + } } } diff --git a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayTestServerInterpreter.scala b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayTestServerInterpreter.scala index 0c6c8142e7..17d4db055c 100644 --- a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayTestServerInterpreter.scala +++ b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayTestServerInterpreter.scala @@ -2,7 +2,9 @@ package sttp.tapir.server.play import cats.data.NonEmptyList import cats.effect.{IO, Resource} +import com.typesafe.config.ConfigFactory import org.apache.pekko.actor.ActorSystem +import play.api.Configuration import play.api.Mode import play.api.http.ParserConfiguration import play.api.mvc.{Handler, PlayBodyParsers, RequestHeader} @@ -13,9 +15,10 @@ import sttp.capabilities.WebSockets import sttp.capabilities.pekko.PekkoStreams import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import scala.concurrent.Future +import scala.concurrent.duration._ class PlayTestServerInterpreter(implicit actorSystem: ActorSystem) extends TestServerInterpreter[Future, PekkoStreams with WebSockets, PlayServerOptions, Routes] { @@ -28,9 +31,21 @@ class PlayTestServerInterpreter(implicit actorSystem: ActorSystem) PlayServerInterpreter(serverOptions).toRoutes(es) } - override def server(routes: NonEmptyList[Routes]): Resource[IO, Port] = { + import play.core.server.PekkoHttpServer + + override def serverWithStop( + routes: NonEmptyList[Routes], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { val components = new DefaultPekkoHttpServerComponents { - override lazy val serverConfig: ServerConfig = ServerConfig(port = Some(0), address = "127.0.0.1", mode = Mode.Test) + val initialServerConfig = ServerConfig(port = Some(0), address = "127.0.0.1", mode = Mode.Test) + + val customConf = + Configuration( + ConfigFactory.parseString(s"play { server.terminationTimeout=${gracefulShutdownTimeout.getOrElse(50.millis).toString} }") + ) + override lazy val serverConfig: ServerConfig = + initialServerConfig.copy(configuration = customConf.withFallback(initialServerConfig.configuration)) override lazy val actorSystem: ActorSystem = ActorSystem("tapir", defaultExecutionContext = Some(PlayTestServerInterpreter.this.actorSystem.dispatcher)) override def router: Router = @@ -47,6 +62,6 @@ class PlayTestServerInterpreter(implicit actorSystem: ActorSystem) val bind = IO { components.server } - Resource.make(bind)(s => IO(s.stop())).map(_.mainAddress.getPort) + Resource.make(bind.map(s => (s.mainAddress.getPort, IO(s.stop())))) { case (_, release) => release } } } diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala index 2195898125..b717469d51 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala @@ -40,8 +40,8 @@ trait CreateServerTest[F[_], +R, OPTIONS, ROUTE] { interceptors: Interceptors = identity, gracefulShutdownTimeout: Option[FiniteDuration] = None )( - runTest: IO[Unit] => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] - ): Test = testServerLogic(e, testNameSuffix, interceptors)(runTest(IO.unit)) + runTest: KillSwitch => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] + ): Test def testServer(name: String, rs: => NonEmptyList[ROUTE])( runTest: (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] @@ -51,7 +51,7 @@ trait CreateServerTest[F[_], +R, OPTIONS, ROUTE] { * By default, this method just uses a no-op IO.unit. */ def testServerWithStop(name: String, rs: => NonEmptyList[ROUTE], gracefulShutdownTimeout: Option[FiniteDuration])( - runTest: IO[Unit] => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] + runTest: KillSwitch => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] ): Test = testServer(name, rs)(runTest(IO.unit)) } diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala index 9b848b9364..b4c655dbc6 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala @@ -4,10 +4,12 @@ import cats.data.NonEmptyList import cats.effect.{IO, Resource} import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.interceptor.CustomiseInterceptors -import sttp.tapir.tests.Port +import sttp.tapir.tests._ + import scala.concurrent.duration.FiniteDuration trait TestServerInterpreter[F[_], +R, OPTIONS, ROUTE] { + protected type Interceptors = CustomiseInterceptors[F, OPTIONS] => CustomiseInterceptors[F, OPTIONS] def route(e: ServerEndpoint[R, F]): ROUTE = route(List(e), (ci: CustomiseInterceptors[F, OPTIONS]) => ci) @@ -16,11 +18,8 @@ trait TestServerInterpreter[F[_], +R, OPTIONS, ROUTE] { def route(es: List[ServerEndpoint[R, F]], interceptors: Interceptors = identity): ROUTE - def server(routes: NonEmptyList[ROUTE]): Resource[IO, Port] + def serverWithStop(routes: NonEmptyList[ROUTE], gracefulShutdownTimeout: Option[FiniteDuration] = None): Resource[IO, (Port, KillSwitch)] - /** Exposes additional `stop` effect, which allows stopping the server inside your test. It will be called after the test anyway (assuming - * idempotency), but may be useful for some cases where tests need to check specific behavior like returning 503s during shutdown. - */ - def serverWithStop(routes: NonEmptyList[ROUTE], gracefulShutdownTimeout: Option[FiniteDuration] = None): Resource[IO, (Port, IO[Unit])] = - server(routes).map(port => (port, IO.unit)) + def server(routes: NonEmptyList[ROUTE]): Resource[IO, Port] = + serverWithStop(routes, gracefulShutdownTimeout = None).map(_._1) } diff --git a/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/CatsVertxTestServerInterpreter.scala b/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/CatsVertxTestServerInterpreter.scala index 6e8e7cabf5..9bfe0fafc3 100644 --- a/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/CatsVertxTestServerInterpreter.scala +++ b/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/CatsVertxTestServerInterpreter.scala @@ -11,7 +11,9 @@ import sttp.capabilities.WebSockets import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.TestServerInterpreter import sttp.tapir.server.vertx.cats.VertxCatsServerInterpreter.CatsFFromVFuture -import sttp.tapir.tests.Port +import sttp.tapir.tests._ + +import scala.concurrent.duration.FiniteDuration class CatsVertxTestServerInterpreter(vertx: Vertx, dispatcher: Dispatcher[IO]) extends TestServerInterpreter[IO, Fs2Streams[IO] with WebSockets, VertxCatsServerOptions[IO], Router => Route] { @@ -25,11 +27,15 @@ class CatsVertxTestServerInterpreter(vertx: Vertx, dispatcher: Dispatcher[IO]) es.map(interpreter.route(_)(router)).last } - override def server(routes: NonEmptyList[Router => Route]): Resource[IO, Port] = { + override def serverWithStop( + routes: NonEmptyList[Router => Route], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { val router = Router.router(vertx) routes.toList.foreach(_.apply(router)) val server = vertx.createHttpServer(new HttpServerOptions().setPort(0)).requestHandler(router) val listenIO = ioFromVFuture(server.listen(0)) - Resource.make(listenIO)(s => ioFromVFuture(s.close).void).map(_.actualPort()) + // Vertx doesn't offer graceful shutdown with timeout OOTB + Resource.make(listenIO.map(s => (s.actualPort(), ioFromVFuture(s.close).void))) { case (_, release) => release } } } diff --git a/server/vertx-server/src/test/scala/sttp/tapir/server/vertx/VertxTestServerInterpreter.scala b/server/vertx-server/src/test/scala/sttp/tapir/server/vertx/VertxTestServerInterpreter.scala index c98255bb9e..988eb9a212 100644 --- a/server/vertx-server/src/test/scala/sttp/tapir/server/vertx/VertxTestServerInterpreter.scala +++ b/server/vertx-server/src/test/scala/sttp/tapir/server/vertx/VertxTestServerInterpreter.scala @@ -9,9 +9,10 @@ import sttp.capabilities.WebSockets import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.TestServerInterpreter import sttp.tapir.server.vertx.streams.VertxStreams -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration class VertxTestServerInterpreter(vertx: Vertx) extends TestServerInterpreter[Future, VertxStreams with WebSockets, VertxFutureServerOptions, Router => Route] { @@ -24,12 +25,16 @@ class VertxTestServerInterpreter(vertx: Vertx) es.map(interpreter.route(_)(router)).last } - override def server(routes: NonEmptyList[Router => Route]): Resource[IO, Port] = { + override def serverWithStop( + routes: NonEmptyList[Router => Route], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { val router = Router.router(vertx) val server = vertx.createHttpServer(new HttpServerOptions().setPort(0)).requestHandler(router) val listenIO = vertxFutureToIo(server.listen(0)) routes.toList.foreach(_.apply(router)) - Resource.make(listenIO)(s => vertxFutureToIo(s.close()).void).map(_.actualPort()) + // Vertx doesn't offer graceful shutdown with timeout OOTB + Resource.make(listenIO.map(s => (s.actualPort(), vertxFutureToIo(s.close()).void))) { case (_, release) => release } } } diff --git a/server/vertx-server/zio/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxTestServerInterpreter.scala b/server/vertx-server/zio/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxTestServerInterpreter.scala index 1a223e571f..3a0e97613c 100644 --- a/server/vertx-server/zio/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxTestServerInterpreter.scala +++ b/server/vertx-server/zio/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxTestServerInterpreter.scala @@ -9,9 +9,10 @@ import sttp.capabilities.zio.ZioStreams import sttp.capabilities.WebSockets import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import _root_.zio.{Runtime, Task} import sttp.tapir.server.vertx.VertxTestServerInterpreter +import scala.concurrent.duration.FiniteDuration class ZioVertxTestServerInterpreter(vertx: Vertx) extends TestServerInterpreter[Task, ZioStreams with WebSockets, VertxZioServerOptions[Any], Router => Route] { @@ -23,11 +24,19 @@ class ZioVertxTestServerInterpreter(vertx: Vertx) es.map(interpreter.route(_)(runtime)(router)).last } - override def server(routes: NonEmptyList[Router => Route]): Resource[IO, Port] = { + override def serverWithStop( + routes: NonEmptyList[Router => Route], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { val router = Router.router(vertx) val server = vertx.createHttpServer(new HttpServerOptions().setPort(0)).requestHandler(router) routes.toList.foreach(_.apply(router)) - Resource.eval(VertxTestServerInterpreter.vertxFutureToIo(server.listen(0)).map(_.actualPort())) + + val listenIO = VertxTestServerInterpreter.vertxFutureToIo(server.listen(0)) + // Vertx doesn't offer graceful shutdown with timeout OOTB + Resource.make(listenIO.map(s => (s.actualPort(), VertxTestServerInterpreter.vertxFutureToIo(s.close).void))) { case (_, release) => + release + } } } diff --git a/server/vertx-server/zio1/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxTestServerInterpreter.scala b/server/vertx-server/zio1/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxTestServerInterpreter.scala index 85dcd8d388..3c59e6eae0 100644 --- a/server/vertx-server/zio1/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxTestServerInterpreter.scala +++ b/server/vertx-server/zio1/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxTestServerInterpreter.scala @@ -9,10 +9,11 @@ import sttp.capabilities.zio.ZioStreams import sttp.capabilities.WebSockets import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import _root_.zio.{Has, RIO, Runtime} import _root_.zio.blocking.Blocking import sttp.tapir.server.vertx.VertxTestServerInterpreter +import scala.concurrent.duration.FiniteDuration class ZioVertxTestServerInterpreter(vertx: Vertx) extends TestServerInterpreter[RIO[Blocking, *], ZioStreams with WebSockets, VertxZioServerOptions[Blocking], Router => Route] { @@ -27,11 +28,18 @@ class ZioVertxTestServerInterpreter(vertx: Vertx) es.map(interpreter.route(_)(runtime)(router)).last } - override def server(routes: NonEmptyList[Router => Route]): Resource[IO, Port] = { + override def serverWithStop( + routes: NonEmptyList[Router => Route], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { val router = Router.router(vertx) val server = vertx.createHttpServer(new HttpServerOptions().setPort(0)).requestHandler(router) routes.toList.foreach(_.apply(router)) - Resource.eval(VertxTestServerInterpreter.vertxFutureToIo(server.listen(0)).map(_.actualPort())) + val listenIO = VertxTestServerInterpreter.vertxFutureToIo(server.listen(0)) + // Vertx doesn't offer graceful shutdown with timeout OOTB + Resource.make(listenIO.map(s => (s.actualPort(), VertxTestServerInterpreter.vertxFutureToIo(s.close).void))) { case (_, release) => + release + } } } diff --git a/server/zio-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpTestServerInterpreter.scala b/server/zio-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpTestServerInterpreter.scala index c6abb761ef..abe306f3e9 100644 --- a/server/zio-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpTestServerInterpreter.scala +++ b/server/zio-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpTestServerInterpreter.scala @@ -7,10 +7,11 @@ import sttp.capabilities.WebSockets import sttp.capabilities.zio.ZioStreams import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import zio._ import zio.http._ import zio.interop.catz._ +import scala.concurrent.duration.FiniteDuration class ZioHttpTestServerInterpreter( eventLoopGroup: ZLayer[Any, Nothing, EventLoopGroup], @@ -27,10 +28,13 @@ class ZioHttpTestServerInterpreter( ZioHttpInterpreter(serverOptions).toHttp(es) } - override def server(routes: NonEmptyList[HttpApp[Any]]): Resource[IO, Port] = { + override def serverWithStop( + routes: NonEmptyList[HttpApp[Any]], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { implicit val r: Runtime[Any] = Runtime.default - val effect: ZIO[Scope, Throwable, Int] = + val effect: ZIO[Scope, Throwable, Port] = (for { driver <- ZIO.service[Driver] result <- driver.start(trace) @@ -40,9 +44,14 @@ class ZioHttpTestServerInterpreter( zio.test.driver, eventLoopGroup, channelFactory, - ZLayer.succeed(Server.Config.default.port(0).enableRequestStreaming) + ZLayer.succeed( + Server.Config.default + .port(0) + .enableRequestStreaming + .gracefulShutdownTimeout(gracefulShutdownTimeout.map(Duration.fromScala).getOrElse(50.millis)) + ) ) - Resource.scoped[IO, Any, Int](effect) + Resource.make(Resource.scoped[IO, Any, Port](effect).allocated) { case (_, release) => release } } } diff --git a/server/zio1-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpTestServerInterpreter.scala b/server/zio1-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpTestServerInterpreter.scala index 775bce0cd7..b6a51d2951 100644 --- a/server/zio1-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpTestServerInterpreter.scala +++ b/server/zio1-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpTestServerInterpreter.scala @@ -5,11 +5,12 @@ import cats.effect.{IO, Resource} import sttp.capabilities.zio.ZioStreams import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.TestServerInterpreter -import sttp.tapir.tests.Port +import sttp.tapir.tests._ import zhttp.http._ import zhttp.service.{EventLoopGroup, Server, ServerChannelFactory} import zio._ import zio.interop.catz._ +import scala.concurrent.duration.FiniteDuration class ZioHttpTestServerInterpreter(nettyDeps: EventLoopGroup with ServerChannelFactory) extends TestServerInterpreter[Task, ZioStreams, ZioHttpServerOptions[Any], Http[Any, Throwable, Request, Response]] { @@ -19,13 +20,20 @@ class ZioHttpTestServerInterpreter(nettyDeps: EventLoopGroup with ServerChannelF ZioHttpInterpreter(serverOptions).toHttp(es) } - override def server(routes: NonEmptyList[Http[Any, Throwable, Request, Response]]): Resource[IO, Port] = { + override def serverWithStop( + routes: NonEmptyList[Http[Any, Throwable, Request, Response]], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = { implicit val r: Runtime[Any] = Runtime.default val server: Server[Any, Throwable] = Server.app(routes.toList.reduce(_ ++ _)) - Server - .make(server ++ Server.port(0)) - .provide(nettyDeps) - .map(_.port) - .toResource[IO] + // ZIO HTTP 1.x doesn't offer graceful shutdown with timeout OOTB + Resource.make( + Server + .make(server ++ Server.port(0)) + .provide(nettyDeps) + .map(_.port) + .toResource[IO] + .allocated + ) { case (_, release) => release } } } diff --git a/serverless/aws/lambda-cats-effect-tests/src/test/scala/sttp/tapir/serverless/aws/lambda/tests/AwsLambdaCreateServerStubTest.scala b/serverless/aws/lambda-cats-effect-tests/src/test/scala/sttp/tapir/serverless/aws/lambda/tests/AwsLambdaCreateServerStubTest.scala index a66edc907b..318b96c05b 100644 --- a/serverless/aws/lambda-cats-effect-tests/src/test/scala/sttp/tapir/serverless/aws/lambda/tests/AwsLambdaCreateServerStubTest.scala +++ b/serverless/aws/lambda-cats-effect-tests/src/test/scala/sttp/tapir/serverless/aws/lambda/tests/AwsLambdaCreateServerStubTest.scala @@ -16,7 +16,9 @@ import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.CreateServerTest import sttp.tapir.serverless.aws.lambda._ import sttp.tapir.serverless.aws.lambda.tests.AwsLambdaCreateServerStubTest._ -import sttp.tapir.tests.Test +import sttp.tapir.tests._ + +import scala.concurrent.duration._ class AwsLambdaCreateServerStubTest extends CreateServerTest[IO, Any, AwsServerOptions[IO], Route[IO]] { @@ -41,6 +43,15 @@ class AwsLambdaCreateServerStubTest extends CreateServerTest[IO, Any, AwsServerO Test(name)(runTest(stubBackend(route), uri"http://localhost:3001").unsafeToFuture()) } + override def testServerLogicWithStop( + e: ServerEndpoint[Any, IO], + testNameSuffix: String = "", + interceptors: Interceptors = identity, + gracefulShutdownTimeout: Option[FiniteDuration] = None + )( + runTest: KillSwitch => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] + ): Test = throw new java.lang.UnsupportedOperationException + override def testServer(name: String, rs: => NonEmptyList[Route[IO]])( runTest: (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] ): Test = { diff --git a/serverless/aws/lambda-cats-effect-tests/src/test/scala/sttp/tapir/serverless/aws/lambda/tests/AwsLambdaStubHttpTest.scala b/serverless/aws/lambda-cats-effect-tests/src/test/scala/sttp/tapir/serverless/aws/lambda/tests/AwsLambdaStubHttpTest.scala index 1c553af6e7..8d5f159420 100644 --- a/serverless/aws/lambda-cats-effect-tests/src/test/scala/sttp/tapir/serverless/aws/lambda/tests/AwsLambdaStubHttpTest.scala +++ b/serverless/aws/lambda-cats-effect-tests/src/test/scala/sttp/tapir/serverless/aws/lambda/tests/AwsLambdaStubHttpTest.scala @@ -6,7 +6,9 @@ import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.{ServerBasicTests, ServerMetricsTest, TestServerInterpreter} import sttp.tapir.serverless.aws.lambda._ import sttp.tapir.serverless.aws.lambda.tests.AwsLambdaCreateServerStubTest.catsMonadIO -import sttp.tapir.tests.{Port, Test, TestSuite} +import sttp.tapir.tests._ + +import scala.concurrent.duration._ class AwsLambdaStubHttpTest extends TestSuite { override def tests: Resource[IO, List[Test]] = Resource.eval( @@ -27,6 +29,9 @@ object AwsLambdaStubHttpTest { AwsCatsEffectServerInterpreter(serverOptions).toRoute(es) } - override def server(routes: NonEmptyList[Route[IO]]): Resource[IO, Port] = ??? + override def serverWithStop( + routes: NonEmptyList[Route[IO]], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = ??? } } diff --git a/serverless/aws/lambda-zio-tests/src/test/scala/sttp/tapir/serverless/aws/ziolambda/tests/AwsLambdaCreateServerStubTest.scala b/serverless/aws/lambda-zio-tests/src/test/scala/sttp/tapir/serverless/aws/ziolambda/tests/AwsLambdaCreateServerStubTest.scala index 80693513bc..399987650c 100644 --- a/serverless/aws/lambda-zio-tests/src/test/scala/sttp/tapir/serverless/aws/ziolambda/tests/AwsLambdaCreateServerStubTest.scala +++ b/serverless/aws/lambda-zio-tests/src/test/scala/sttp/tapir/serverless/aws/ziolambda/tests/AwsLambdaCreateServerStubTest.scala @@ -18,9 +18,11 @@ import sttp.tapir.server.tests.CreateServerTest import sttp.tapir.serverless.aws.lambda._ import sttp.tapir.serverless.aws.ziolambda.{AwsZioServerInterpreter, AwsZioServerOptions} import sttp.tapir.serverless.aws.ziolambda.tests.AwsLambdaCreateServerStubTest.{awsToSttpResponse, sttpToAwsRequest} -import sttp.tapir.tests.Test +import sttp.tapir.tests._ import sttp.tapir.ztapir.RIOMonadError +import scala.concurrent.duration._ + class AwsLambdaCreateServerStubTest extends CreateServerTest[Task, Any, AwsServerOptions[Task], Route[Task]] { private implicit val m: RIOMonadError[Any] = new RIOMonadError[Any] @@ -49,6 +51,15 @@ class AwsLambdaCreateServerStubTest extends CreateServerTest[Task, Any, AwsServe Test(name)(runTest(stubBackend(transformMonad(route)), uri"http://localhost:3002").unsafeToFuture()) } + override def testServerLogicWithStop( + e: ServerEndpoint[Any, Task], + testNameSuffix: String = "", + interceptors: Interceptors = identity, + gracefulShutdownTimeout: Option[FiniteDuration] = None + )( + runTest: KillSwitch => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] + ): Test = throw new java.lang.UnsupportedOperationException + override def testServer(name: String, rs: => NonEmptyList[Route[Task]])( runTest: (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion] ): Test = { diff --git a/serverless/aws/lambda-zio-tests/src/test/scala/sttp/tapir/serverless/aws/ziolambda/tests/AwsLambdaStubHttpTest.scala b/serverless/aws/lambda-zio-tests/src/test/scala/sttp/tapir/serverless/aws/ziolambda/tests/AwsLambdaStubHttpTest.scala index 32c154ed55..e69b76cecb 100644 --- a/serverless/aws/lambda-zio-tests/src/test/scala/sttp/tapir/serverless/aws/ziolambda/tests/AwsLambdaStubHttpTest.scala +++ b/serverless/aws/lambda-zio-tests/src/test/scala/sttp/tapir/serverless/aws/ziolambda/tests/AwsLambdaStubHttpTest.scala @@ -6,10 +6,12 @@ import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.tests.{ServerBasicTests, ServerMetricsTest, TestServerInterpreter} import sttp.tapir.serverless.aws.ziolambda.{AwsZioServerInterpreter, AwsZioServerOptions} import sttp.tapir.serverless.aws.lambda.{AwsServerOptions, Route} -import sttp.tapir.tests.{Port, Test, TestSuite} +import sttp.tapir.tests._ import sttp.tapir.ztapir.RIOMonadError import zio.Task +import scala.concurrent.duration._ + class AwsLambdaStubHttpTest extends TestSuite { override def tests: Resource[IO, List[Test]] = Resource.eval( IO.pure { @@ -33,6 +35,9 @@ object AwsLambdaStubHttpTest { AwsZioServerInterpreter(serverOptions).toRoute(es) } - override def server(routes: NonEmptyList[Route[Task]]): Resource[IO, Port] = ??? + override def serverWithStop( + routes: NonEmptyList[Route[Task]], + gracefulShutdownTimeout: Option[FiniteDuration] + ): Resource[IO, (Port, KillSwitch)] = ??? } } diff --git a/tests/src/main/scala/sttp/tapir/tests/package.scala b/tests/src/main/scala/sttp/tapir/tests/package.scala index 62ee5b8b71..dbb32ff963 100644 --- a/tests/src/main/scala/sttp/tapir/tests/package.scala +++ b/tests/src/main/scala/sttp/tapir/tests/package.scala @@ -1,5 +1,8 @@ package sttp.tapir +import cats.effect.IO + package object tests { type Port = Int + type KillSwitch = IO[Unit] }