Skip to content

Commit

Permalink
[tests] Implement serverWithStop in all test interpreters (#3306)
Browse files Browse the repository at this point in the history
  • Loading branch information
kciesielski authored Nov 9, 2023
1 parent ad213aa commit f6f8bfc
Show file tree
Hide file tree
Showing 28 changed files with 293 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import sttp.capabilities.WebSockets
import sttp.capabilities.akka.AkkaStreams
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.tests.TestServerInterpreter
import sttp.tapir.tests.Port
import sttp.tapir.tests._

import scala.concurrent.Future
import scala.concurrent.duration._

class AkkaHttpTestServerInterpreter(implicit actorSystem: ActorSystem)
extends TestServerInterpreter[Future, AkkaStreams with WebSockets, AkkaHttpServerOptions, Route] {
Expand All @@ -22,8 +23,22 @@ class AkkaHttpTestServerInterpreter(implicit actorSystem: ActorSystem)
AkkaHttpServerInterpreter(serverOptions).toRoute(es)
}

override def server(routes: NonEmptyList[Route]): Resource[IO, Port] = {
override def serverWithStop(
routes: NonEmptyList[Route],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
val bind = IO.fromFuture(IO(Http().newServerAt("localhost", 0).bind(concat(routes.toList: _*))))
Resource.make(bind)(binding => IO.fromFuture(IO(binding.unbind())).void).map(_.localAddress.getPort)

Resource
.make(
bind.map(b =>
(
b.localAddress.getPort,
IO.fromFuture(IO(b.terminate(gracefulShutdownTimeout.getOrElse(50.millis)))).void
)
)
) { case (_, release) =>
release
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,45 @@ import cats.effect.{IO, Resource}
import com.linecorp.armeria.server.Server
import sttp.capabilities.Streams
import sttp.tapir.server.tests.TestServerInterpreter
import sttp.tapir.tests.Port
import sttp.tapir.tests._

import scala.concurrent.duration._

trait ArmeriaTestServerInterpreter[S <: Streams[S], F[_], OPTIONS] extends TestServerInterpreter[F, S, OPTIONS, TapirService[S, F]] {

override def server(routes: NonEmptyList[TapirService[S, F]]): Resource[IO, Port] = {
override def serverWithStop(
routes: NonEmptyList[TapirService[S, F]],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
val (quietPeriodMs, totalDeadlineMs) = gracefulShutdownTimeout
.map(t => (t.toMillis, t.toMillis + 50))
.getOrElse((0L, 0L))

val bind = IO.fromCompletableFuture(
IO {
val serverBuilder = Server
.builder()
.maxRequestLength(0)
.connectionDrainDurationMicros(0)
.gracefulShutdownTimeoutMillis(quietPeriodMs, totalDeadlineMs)
routes.foldLeft(serverBuilder)((sb, route) => sb.service(route))
val server = serverBuilder.build()
server.start().thenApply[Server](_ => server)
}
)
Resource
.make(bind)(binding =>
IO {
// Ignore returned future for fast test iterations.
// Armeria server wait for 2 seconds by default to let the boss group gracefully finish all remaining
// tasks in the queue.
val _ = binding.stop()
()
}
)
.map(_.activeLocalPort())
.make(
bind.map(b =>
(
b.activeLocalPort(),
// Ignore returned future for fast test iterations.
// Armeria server wait for 2 seconds by default to let the boss group gracefully finish all remaining
// tasks in the queue. Even if graceful shutdown timeouts are set to 0.
IO { val _ = b.stop() }
)
)
) { case (_, release) =>
release
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import cats.effect.{IO, Resource}
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.finatra.{FinatraRoute, FinatraTestServerInterpreter}
import sttp.tapir.server.tests.TestServerInterpreter
import sttp.tapir.tests.Port
import sttp.tapir.tests._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

class FinatraCatsTestServerInterpreter(dispatcher: Dispatcher[IO])
extends TestServerInterpreter[IO, Any, FinatraCatsServerOptions[IO], FinatraRoute] {
Expand All @@ -20,5 +21,8 @@ class FinatraCatsTestServerInterpreter(dispatcher: Dispatcher[IO])
es.map(interpreter.toRoute).last
}

override def server(routes: NonEmptyList[FinatraRoute]): Resource[IO, Port] = FinatraTestServerInterpreter.server(routes)
override def serverWithStop(
routes: NonEmptyList[FinatraRoute],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = FinatraTestServerInterpreter.serverWithStop(routes, gracefulShutdownTimeout)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import cats.data.NonEmptyList
import cats.effect.{IO, Resource}
import com.twitter.finatra.http.routing.HttpRouter
import com.twitter.finatra.http.{Controller, EmbeddedHttpServer, HttpServer}
import com.twitter.util.Duration
import com.twitter.util.Future
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.tests.TestServerInterpreter
import sttp.tapir.tests.Port
import sttp.tapir.tests._

import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.FiniteDuration

class FinatraTestServerInterpreter extends TestServerInterpreter[Future, Any, FinatraServerOptions, FinatraRoute] {
override def route(es: List[ServerEndpoint[Any, Future]], interceptors: Interceptors): FinatraRoute = {
Expand All @@ -18,11 +20,17 @@ class FinatraTestServerInterpreter extends TestServerInterpreter[Future, Any, Fi
es.map(interpreter.toRoute).last
}

override def server(routes: NonEmptyList[FinatraRoute]): Resource[IO, Port] = FinatraTestServerInterpreter.server(routes)
override def serverWithStop(
routes: NonEmptyList[FinatraRoute],
gracefulShutdownTimeout: Option[FiniteDuration] = None
): Resource[IO, (Port, KillSwitch)] = FinatraTestServerInterpreter.serverWithStop(routes, gracefulShutdownTimeout)
}

object FinatraTestServerInterpreter {
def server(routes: NonEmptyList[FinatraRoute]): Resource[IO, Port] = {
def serverWithStop(
routes: NonEmptyList[FinatraRoute],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
def waitUntilHealthy(s: EmbeddedHttpServer, count: Int): IO[EmbeddedHttpServer] =
if (s.isHealthy) IO.pure(s)
else if (count > 1000) IO.raiseError(new IllegalStateException("Server unhealthy"))
Expand Down Expand Up @@ -57,7 +65,15 @@ object FinatraTestServerInterpreter {
}.flatMap(waitUntilHealthy(_, 0))

Resource
.make(bind)(httpServer => IO(httpServer.close()))
.map(_.httpExternalPort())
.make(
bind.map(server =>
(
server.httpExternalPort(),
IO { server.close(Duration.fromMilliseconds(gracefulShutdownTimeout.map(_.toMillis).getOrElse(50))) }
)
)
) { case (_, release) =>
release
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import sttp.capabilities.fs2.Fs2Streams
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.http4s.Http4sTestServerInterpreter._
import sttp.tapir.server.tests.TestServerInterpreter
import sttp.tapir.tests.Port
import sttp.tapir.tests._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object Http4sTestServerInterpreter {
type Routes = WebSocketBuilder2[IO] => HttpRoutes[IO]
Expand All @@ -27,15 +28,23 @@ class Http4sTestServerInterpreter extends TestServerInterpreter[IO, Fs2Streams[I
Http4sServerInterpreter(serverOptions).toWebSocketRoutes(es)
}

override def server(routes: NonEmptyList[Routes]): Resource[IO, Port] = {
override def serverWithStop(
routes: NonEmptyList[Routes],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
val service: WebSocketBuilder2[IO] => HttpApp[IO] =
wsb => routes.map(_.apply(wsb)).reduceK.orNotFound

BlazeServerBuilder[IO]
.withExecutionContext(ExecutionContext.global)
.bindHttp(0, "localhost")
.withHttpWebSocketApp(service)
.resource
.map(_.address.getPort)
Resource.make(
BlazeServerBuilder[IO]
.withExecutionContext(ExecutionContext.global)
.bindHttp(0, "localhost")
.withHttpWebSocketApp(service)
.resource
.allocated
.map { case (server, release) => // Blaze has no graceful shutdown support https://github.com/http4s/blaze/issues/676
(server.address.getPort(), release)
}
) { case (_, release) => release }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import sttp.capabilities.zio.ZioStreams
import sttp.tapir.server.http4s.Http4sServerOptions
import sttp.tapir.server.http4s.ztapir.ZHttp4sTestServerInterpreter._
import sttp.tapir.server.tests.TestServerInterpreter
import sttp.tapir.tests.Port
import sttp.tapir.tests._
import sttp.tapir.ztapir.ZServerEndpoint
import zio.{Runtime, Task, Unsafe}
import zio.interop.catz._
import zio.interop.catz.implicits._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

object ZHttp4sTestServerInterpreter {
type F[A] = Task[A]
Expand All @@ -33,7 +34,10 @@ class ZHttp4sTestServerInterpreter extends TestServerInterpreter[Task, ZioStream
ZHttp4sServerInterpreter(serverOptions).fromWebSocket(es).toRoutes
}

override def server(routes: NonEmptyList[Routes]): Resource[IO, Port] = {
override def serverWithStop(
routes: NonEmptyList[Routes],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
val service: WebSocketBuilder2[Task] => HttpApp[Task] =
wsb => routes.map(_.apply(wsb)).reduceK.orNotFound

Expand All @@ -48,8 +52,10 @@ class ZHttp4sTestServerInterpreter extends TestServerInterpreter[Task, ZioStream
val runtime = implicitly[zio.Runtime[Any]]
Resource
.eval(IO.fromFuture(IO(Unsafe.unsafe(implicit u => Runtime.default.unsafe.runToFuture(serverResource.allocated)))))
.flatMap { case (port, release) =>
Resource.make(IO.pure(port))(_ => IO.fromFuture(IO(Unsafe.unsafe(implicit u => Runtime.default.unsafe.runToFuture(release)))))
.flatMap { case (port, release) => // Blaze has no graceful shutdown support https://github.com/http4s/blaze/issues/676
Resource.make(IO.pure((port, IO.fromFuture(IO(Unsafe.unsafe(implicit u => Runtime.default.unsafe.runToFuture(release))))))) {
case (_, release) => release
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import sttp.capabilities.zio.ZioStreams
import sttp.tapir.server.http4s.Http4sServerOptions
import sttp.tapir.server.http4s.ztapir.ZHttp4sTestServerInterpreter._
import sttp.tapir.server.tests.TestServerInterpreter
import sttp.tapir.tests.Port
import sttp.tapir.tests._
import sttp.tapir.ztapir.ZServerEndpoint
import zio.RIO
import zio.blocking.Blocking
Expand All @@ -20,6 +20,7 @@ import zio.interop.catz._
import zio.interop.catz.implicits._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

object ZHttp4sTestServerInterpreter {
type F[A] = RIO[Clock with Blocking, A]
Expand All @@ -35,7 +36,10 @@ class ZHttp4sTestServerInterpreter extends TestServerInterpreter[F, ZioStreams w
ZHttp4sServerInterpreter(serverOptions).fromWebSocket(es).toRoutes
}

override def server(routes: NonEmptyList[Routes]): Resource[IO, Port] = {
override def serverWithStop(
routes: NonEmptyList[Routes],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
val service: WebSocketBuilder2[RIO[Clock with Blocking, *]] => HttpApp[RIO[Clock with Blocking, *]] =
wsb => routes.map(_.apply(wsb)).reduceK.orNotFound

Expand All @@ -50,8 +54,10 @@ class ZHttp4sTestServerInterpreter extends TestServerInterpreter[F, ZioStreams w
val runtime = implicitly[zio.Runtime[Clock with Blocking]]
Resource
.eval(IO.fromFuture(IO(runtime.unsafeRunToFuture(serverResource.allocated))))
.flatMap { case (port, release) =>
Resource.make(IO.pure(port))(_ => IO.fromFuture(IO(runtime.unsafeRunToFuture(release))))
.flatMap { case (port, release) => // Blaze has no graceful shutdown support https://github.com/http4s/blaze/issues/676
Resource.make(IO.pure((port, IO.fromFuture(IO(runtime.unsafeRunToFuture(release)))))) { case (_, release) =>
release
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import cats.effect.{IO, Resource}
import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer}
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.tests.TestServerInterpreter
import sttp.tapir.tests.Port
import sttp.tapir.tests._

import java.net.InetSocketAddress
import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration

class JdkHttpTestServerInterpreter() extends TestServerInterpreter[Id, Any, JdkHttpServerOptions, HttpHandler] {
override def route(es: List[ServerEndpoint[Any, Id]], interceptors: Interceptors): HttpHandler = {
Expand All @@ -16,7 +17,7 @@ class JdkHttpTestServerInterpreter() extends TestServerInterpreter[Id, Any, JdkH
JdkHttpServerInterpreter(serverOptions).toHandler(es)
}

override def server(routes: NonEmptyList[HttpHandler]): Resource[IO, Port] = {
override def serverWithStop(routes: NonEmptyList[HttpHandler], gracefulShutdownTimeout: Option[FiniteDuration]): Resource[IO, (Port, KillSwitch)] = {
val server = IO.blocking {
val server = HttpServer.create(new InetSocketAddress(0), 0)

Expand All @@ -43,7 +44,6 @@ class JdkHttpTestServerInterpreter() extends TestServerInterpreter[Id, Any, JdkH
}

Resource
.make(server)(server => IO.blocking(server.stop(0)))
.map(server => server.getAddress.getPort)
.make(server.map(s => (s.getAddress.getPort, IO.blocking(s.stop(gracefulShutdownTimeout.map(_.toSeconds.toInt).getOrElse(0)))))) { case (_, release) => release }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.netty.channel.nio.NioEventLoopGroup
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.netty.{NettyConfig, Route}
import sttp.tapir.server.tests.TestServerInterpreter
import sttp.tapir.tests.Port
import sttp.tapir.tests._
import sttp.capabilities.fs2.Fs2Streams
import scala.concurrent.duration.FiniteDuration

Expand All @@ -23,7 +23,7 @@ class NettyCatsTestServerInterpreter(eventLoopGroup: NioEventLoopGroup, dispatch
override def serverWithStop(
routes: NonEmptyList[Route[IO]],
gracefulShutdownTimeout: Option[FiniteDuration] = None
): Resource[IO, (Port, IO[Unit])] = {
): Resource[IO, (Port, KillSwitch)] = {
val config = NettyConfig.defaultWithStreaming
.eventLoopGroup(eventLoopGroup)
.randomPort
Expand All @@ -36,12 +36,7 @@ class NettyCatsTestServerInterpreter(eventLoopGroup: NioEventLoopGroup, dispatch
val bind: IO[NettyCatsServerBinding[IO]] = NettyCatsServer(options, customizedConfig).addRoutes(routes.toList).start()

Resource
.make(bind.map(b => (b, b.stop()))) { case (_, stop) => stop }
.map { case (b, stop) => (b.port, stop) }
}

override def server(routes: NonEmptyList[Route[IO]]): Resource[IO, Port] = {
serverWithStop(routes).map(_._1)
.make(bind.map(b => (b.port, b.stop()))) { case (_, release) => release }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@ class NettyIdTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)
NettyIdServerInterpreter(serverOptions).toRoute(es)
}

override def serverWithStop(routes: NonEmptyList[IdRoute], gracefulShutdownTimeout: Option[FiniteDuration] = None): Resource[IO, (Port, IO[Unit])] = {
override def serverWithStop(
routes: NonEmptyList[IdRoute],
gracefulShutdownTimeout: Option[FiniteDuration] = None
): Resource[IO, (Port, IO[Unit])] = {
val config =
NettyConfig.defaultNoStreaming.eventLoopGroup(eventLoopGroup).randomPort.withDontShutdownEventLoopGroupOnClose.noGracefulShutdown
val customizedConfig = gracefulShutdownTimeout.map(config.withGracefulShutdownTimeout).getOrElse(config)
val options = NettyIdServerOptions.default
val bind = IO.blocking(NettyIdServer(options, customizedConfig).addRoutes(routes.toList).start())

Resource
.make(bind.map(b => (b, IO.blocking(b.stop())))) { case (_, stop) => stop }
.map { case (b, stop) => (b.port, stop) }
}

override def server(routes: NonEmptyList[IdRoute]): Resource[IO, Port] = {
serverWithStop(routes).map(_._1)
.make(bind.map(b => (b.port, IO.blocking(b.stop())))) { case (_, stop) => stop }
}
}
Loading

0 comments on commit f6f8bfc

Please sign in to comment.