From 0a938e311ec863508531355c5d6cb265564114e1 Mon Sep 17 00:00:00 2001 From: adamw Date: Tue, 14 Sep 2021 16:44:38 +0200 Subject: [PATCH 1/5] Rename netty server to netty future server --- .../examples/HelloWorldNettyServer.scala | 2 +- ...tyServer.scala => NettyFutureServer.scala} | 14 +++++------ ...ala => NettyFutureServerInterpreter.scala} | 12 +++++----- ...s.scala => NettyFutureServerOptions.scala} | 24 +++++++++---------- .../netty/internal/NettyRequestBody.scala | 4 ++-- .../netty/NettyTestServerInterpreter.scala | 10 ++++---- 6 files changed, 33 insertions(+), 33 deletions(-) rename server/netty-server/src/main/scala/sttp/tapir/server/netty/{NettyServer.scala => NettyFutureServer.scala} (81%) rename server/netty-server/src/main/scala/sttp/tapir/server/netty/{NettyServerInterpreter.scala => NettyFutureServerInterpreter.scala} (75%) rename server/netty-server/src/main/scala/sttp/tapir/server/netty/{NettyServerOptions.scala => NettyFutureServerOptions.scala} (67%) diff --git a/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyServer.scala b/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyServer.scala index 5443f41942..66b1600a02 100644 --- a/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyServer.scala +++ b/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyServer.scala @@ -2,7 +2,7 @@ package sttp.tapir.examples import sttp.client3.{HttpURLConnectionBackend, Identity, SttpBackend, UriContext, asStringAlways, basicRequest} import sttp.model.StatusCode -import sttp.tapir.server.netty.NettyServer +import sttp.tapir.server.netty.{NettyServer, NettyServerBinding} import sttp.tapir.{Endpoint, endpoint, query, stringBody} import scala.concurrent.ExecutionContext.Implicits.global diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyServer.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala similarity index 81% rename from server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyServer.scala rename to server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala index 784972198a..82ac67c03e 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyServer.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala @@ -9,21 +9,21 @@ import sttp.tapir.server.netty.internal.{NettyServerHandler, nettyChannelFutureT import java.net.InetSocketAddress import scala.concurrent.{ExecutionContext, Future} -case class NettyServer(routes: Vector[Route], options: NettyServerOptions)(implicit ec: ExecutionContext) { +case class NettyServer(routes: Vector[Route], options: NettyFutureServerOptions)(implicit ec: ExecutionContext) { def addEndpoint(se: ServerEndpoint[_, _, _, Any, Future]): NettyServer = addEndpoints(List(se)) - def addEndpoint(se: ServerEndpoint[_, _, _, Any, Future], overrideOptions: NettyServerOptions): NettyServer = + def addEndpoint(se: ServerEndpoint[_, _, _, Any, Future], overrideOptions: NettyFutureServerOptions): NettyServer = addEndpoints(List(se), overrideOptions) def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, Future]]): NettyServer = addRoute( - NettyServerInterpreter(options).toRoute(ses) + NettyFutureServerInterpreter(options).toRoute(ses) ) - def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, Future]], overrideOptions: NettyServerOptions): NettyServer = addRoute( - NettyServerInterpreter(overrideOptions).toRoute(ses) + def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, Future]], overrideOptions: NettyFutureServerOptions): NettyServer = addRoute( + NettyFutureServerInterpreter(overrideOptions).toRoute(ses) ) def addRoute(r: Route): NettyServer = copy(routes = routes :+ r) def addRoutes(r: Iterable[Route]): NettyServer = copy(routes = routes ++ r) - def options(o: NettyServerOptions): NettyServer = copy(options = o) + def options(o: NettyFutureServerOptions): NettyServer = copy(options = o) def host(s: String): NettyServer = copy(options = options.host(s)) def port(p: Int): NettyServer = copy(options = options.port(p)) @@ -60,7 +60,7 @@ case class NettyServer(routes: Vector[Route], options: NettyServerOptions)(impli } object NettyServer { - def apply(serverOptions: NettyServerOptions = NettyServerOptions.default)(implicit ec: ExecutionContext): NettyServer = + def apply(serverOptions: NettyFutureServerOptions = NettyFutureServerOptions.default)(implicit ec: ExecutionContext): NettyServer = NettyServer(Vector.empty, serverOptions) } diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyServerInterpreter.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerInterpreter.scala similarity index 75% rename from server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyServerInterpreter.scala rename to server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerInterpreter.scala index 35106d4c25..c1cc35d152 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyServerInterpreter.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerInterpreter.scala @@ -9,8 +9,8 @@ import sttp.tapir.server.interceptor.RequestResult import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} import sttp.tapir.server.netty.internal.{NettyBodyListener, NettyRequestBody, NettyToResponseBody} -trait NettyServerInterpreter { - def nettyServerOptions: NettyServerOptions = NettyServerOptions.default +trait NettyFutureServerInterpreter { + def nettyServerOptions: NettyFutureServerOptions = NettyFutureServerOptions.default def toRoute( ses: List[ServerEndpoint[_, _, _, Any, Future]] @@ -36,10 +36,10 @@ trait NettyServerInterpreter { } } -object NettyServerInterpreter { - def apply(serverOptions: NettyServerOptions = NettyServerOptions.default): NettyServerInterpreter = { - new NettyServerInterpreter { - override def nettyServerOptions: NettyServerOptions = serverOptions +object NettyFutureServerInterpreter { + def apply(serverOptions: NettyFutureServerOptions = NettyFutureServerOptions.default): NettyFutureServerInterpreter = { + new NettyFutureServerInterpreter { + override def nettyServerOptions: NettyFutureServerOptions = serverOptions } } } diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyServerOptions.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerOptions.scala similarity index 67% rename from server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyServerOptions.scala rename to server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerOptions.scala index 375d5d6772..0e03daec28 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyServerOptions.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerOptions.scala @@ -8,7 +8,7 @@ import sttp.tapir.{Defaults, TapirFile} import scala.concurrent.{Future, blocking} -case class NettyServerOptions( +case class NettyFutureServerOptions( host: String, port: Int, interceptors: List[Interceptor[Future]], @@ -16,16 +16,16 @@ case class NettyServerOptions( deleteFile: TapirFile => Future[Unit], nettyOptions: NettyOptions ) { - def host(s: String): NettyServerOptions = copy(host = s) - def port(p: Int): NettyServerOptions = copy(port = p) - def randomPort: NettyServerOptions = port(0) - def prependInterceptor(i: Interceptor[Future]): NettyServerOptions = copy(interceptors = i :: interceptors) - def appendInterceptor(i: Interceptor[Future]): NettyServerOptions = copy(interceptors = interceptors :+ i) - def nettyOptions(o: NettyOptions): NettyServerOptions = copy(nettyOptions = o) + def host(s: String): NettyFutureServerOptions = copy(host = s) + def port(p: Int): NettyFutureServerOptions = copy(port = p) + def randomPort: NettyFutureServerOptions = port(0) + def prependInterceptor(i: Interceptor[Future]): NettyFutureServerOptions = copy(interceptors = i :: interceptors) + def appendInterceptor(i: Interceptor[Future]): NettyFutureServerOptions = copy(interceptors = interceptors :+ i) + def nettyOptions(o: NettyOptions): NettyFutureServerOptions = copy(nettyOptions = o) } -object NettyServerOptions { - def default(interceptors: List[Interceptor[Future]]): NettyServerOptions = NettyServerOptions( +object NettyFutureServerOptions { + def default(interceptors: List[Interceptor[Future]]): NettyFutureServerOptions = NettyFutureServerOptions( "localhost", 8080, interceptors, @@ -47,11 +47,11 @@ object NettyServerOptions { noLog = _ => Future.unit ) - def customInterceptors: CustomInterceptors[Future, Logger => Future[Unit], NettyServerOptions] = { + def customInterceptors: CustomInterceptors[Future, Logger => Future[Unit], NettyFutureServerOptions] = { CustomInterceptors( createLogInterceptor = (sl: ServerLog[Logger => Future[Unit]]) => new ServerLogInterceptor[Logger => Future[Unit], Future](sl, (_, _) => Future.unit), - createOptions = (ci: CustomInterceptors[Future, Logger => Future[Unit], NettyServerOptions]) => default(ci.interceptors) + createOptions = (ci: CustomInterceptors[Future, Logger => Future[Unit], NettyFutureServerOptions]) => default(ci.interceptors) ).serverLog(defaultServerLog) } @@ -63,5 +63,5 @@ object NettyServerOptions { } } - val default: NettyServerOptions = customInterceptors.options + val default: NettyFutureServerOptions = customInterceptors.options } diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala index 6019c06b0a..8b57c95043 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala @@ -6,13 +6,13 @@ import sttp.tapir.RawBodyType import sttp.tapir.internal.NoStreams import sttp.tapir.model.ServerRequest import sttp.tapir.server.interpreter.{RawValue, RequestBody} -import sttp.tapir.server.netty.{NettyServerOptions, NettyServerRequest} +import sttp.tapir.server.netty.{NettyFutureServerOptions, NettyServerRequest} import java.nio.ByteBuffer import java.nio.file.Files import scala.concurrent.{ExecutionContext, Future} -class NettyRequestBody(req: NettyServerRequest, serverRequest: ServerRequest, serverOptions: NettyServerOptions)(implicit +class NettyRequestBody(req: NettyServerRequest, serverRequest: ServerRequest, serverOptions: NettyFutureServerOptions)(implicit val ec: ExecutionContext ) extends RequestBody[Future, NoStreams] { diff --git a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyTestServerInterpreter.scala b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyTestServerInterpreter.scala index 1d45f1acd6..16affe9957 100644 --- a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyTestServerInterpreter.scala +++ b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyTestServerInterpreter.scala @@ -21,26 +21,26 @@ class NettyTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)(implicit ec: decodeFailureHandler: Option[DecodeFailureHandler] = None, metricsInterceptor: Option[MetricsRequestInterceptor[Future]] = None ): Route = { - val serverOptions: NettyServerOptions = NettyServerOptions.customInterceptors + val serverOptions: NettyFutureServerOptions = NettyFutureServerOptions.customInterceptors .metricsInterceptor(metricsInterceptor) .decodeFailureHandler(decodeFailureHandler.getOrElse(DefaultDecodeFailureHandler.handler)) .options - NettyServerInterpreter(serverOptions).toRoute(List(e)) + NettyFutureServerInterpreter(serverOptions).toRoute(List(e)) } override def route[I, E, O](es: List[ServerEndpoint[I, E, O, Any, Future]]): Route = { - NettyServerInterpreter().toRoute(es) + NettyFutureServerInterpreter().toRoute(es) } override def routeRecoverErrors[I, E <: Throwable, O](e: Endpoint[I, E, O, Any], fn: I => Future[O])(implicit eClassTag: ClassTag[E] ): Route = { - NettyServerInterpreter().toRoute(List(e.serverLogicRecoverErrors(fn))) + NettyFutureServerInterpreter().toRoute(List(e.serverLogicRecoverErrors(fn))) } override def server(routes: NonEmptyList[Route]): Resource[IO, Port] = { - val options = NettyServerOptions.default.nettyOptions(NettyOptions.default.eventLoopGroup(eventLoopGroup)).randomPort + val options = NettyFutureServerOptions.default.nettyOptions(NettyOptions.default.eventLoopGroup(eventLoopGroup)).randomPort val bind = IO.fromFuture(IO.delay(NettyServer(options).addRoutes(routes.toList).start())) Resource From fcbf3b3a7e43da6f03bada505d8ec334a1667096 Mon Sep 17 00:00:00 2001 From: adamw Date: Wed, 15 Sep 2021 16:00:09 +0200 Subject: [PATCH 2/5] Add cats version of the netty server interpreter --- build.sbt | 5 +- doc/server/netty.md | 16 ++-- .../examples/HelloWorldNettyServer.scala | 4 +- generated-doc/out/server/netty.md | 4 +- .../tapir/server/netty/NettyCatsServer.scala | 84 +++++++++++++++++++ .../netty/NettyCatsServerInterpreter.scala | 54 ++++++++++++ .../server/netty/NettyCatsServerOptions.scala | 63 ++++++++++++++ .../server/netty/NettyFutureServer.scala | 45 +++++----- .../netty/NettyFutureServerInterpreter.scala | 8 +- .../server/netty/internal/CatsUtil.scala | 46 ++++++++++ .../server/netty/internal/FutureUtil.scala | 30 +++++++ .../netty/internal/NettyBodyListener.scala | 4 +- .../netty/internal/NettyRequestBody.scala | 26 +++--- .../netty/internal/NettyServerHandler.scala | 32 +++---- .../tapir/server/netty/internal/package.scala | 22 ----- .../sttp/tapir/server/netty/package.scala | 15 ++-- .../server/netty/NettyCatsServerTest.scala | 33 ++++++++ .../NettyCatsTestServerInterpreter.scala | 50 +++++++++++ ...Test.scala => NettyFutureServerTest.scala} | 6 +- ...=> NettyFutureTestServerInterpreter.scala} | 14 ++-- .../server/netty/NettyServerRequestSpec.scala | 4 +- 21 files changed, 461 insertions(+), 104 deletions(-) create mode 100644 server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServer.scala create mode 100644 server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerInterpreter.scala create mode 100644 server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerOptions.scala create mode 100644 server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/CatsUtil.scala create mode 100644 server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/FutureUtil.scala create mode 100644 server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyCatsServerTest.scala create mode 100644 server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyCatsTestServerInterpreter.scala rename server/netty-server/src/test/scala/sttp/tapir/server/netty/{NettyServerTest.scala => NettyFutureServerTest.scala} (83%) rename server/netty-server/src/test/scala/sttp/tapir/server/netty/{NettyTestServerInterpreter.scala => NettyFutureTestServerInterpreter.scala} (79%) diff --git a/build.sbt b/build.sbt index df9d9f9371..fa6185d4e0 100644 --- a/build.sbt +++ b/build.sbt @@ -832,7 +832,10 @@ lazy val nettyServer: ProjectMatrix = (projectMatrix in file("server/netty-serve .settings(commonJvmSettings) .settings( name := "tapir-netty-server", - libraryDependencies ++= Seq("io.netty" % "netty-all" % "4.1.66.Final") ++ loggerDependencies + libraryDependencies ++= Seq( + "io.netty" % "netty-all" % "4.1.66.Final", + "com.softwaremill.sttp.shared" %% "fs2" % Versions.sttpShared % Optional + ) ++ loggerDependencies ) .jvmPlatform(scalaVersions = scala2And3Versions) .dependsOn(core, serverTests % Test) diff --git a/doc/server/netty.md b/doc/server/netty.md index 2e42fb0aab..f018ac49c8 100644 --- a/doc/server/netty.md +++ b/doc/server/netty.md @@ -8,13 +8,16 @@ To expose an endpoint using a [Netty](https://netty.io)-based server, first add "com.softwaremill.sttp.tapir" %% "tapir-netty-server" % "@VERSION@" ``` -Then, use `NettyServer().addEndpoints` to expose `Future`-based server endpoints. +Then, use: + +* `NettyFutureServer().addEndpoints` to expose `Future`-based server endpoints. +* `NettyCatsServer().addEndpoints` to expose `F`-based server endpoints, where `F` is any cats-effect supported effect. For example: ```scala mdoc:compile-only import sttp.tapir._ -import sttp.tapir.server.netty.{NettyServer, NettyServerBinding} +import sttp.tapir.server.netty.{NettyFutureServer, NettyFutureServerBinding} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -25,16 +28,17 @@ val helloWorld = endpoint .out(stringBody) .serverLogic(name => Future.successful[Either[Unit, String]](Right(s"Hello, $name!"))) -val binding: Future[NettyServerBinding] = NettyServer().addEndpoint(helloWorld).start() +val binding: Future[NettyFutureServerBinding] = + NettyFutureServer().addEndpoint(helloWorld).start() ``` ## Configuration -The interpreter can be configured by providing an `NettyServerOptions` value, see [server options](options.md) for +The interpreter can be configured by providing an `NettyFutureServerOptions` value, see [server options](options.md) for details. -Some of the options can be configured directly using a `NettyServer` instance, such as the host and port. Others -can be passed using the `NettyServer(options)` methods. Options may also be overriden when adding endpoints. +Some options can be configured directly using a `NettyFutureServer` instance, such as the host and port. Others +can be passed using the `NettyFutureServer(options)` methods. Options may also be overridden when adding endpoints. ## Defining an endpoint together with the server logic diff --git a/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyServer.scala b/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyServer.scala index 66b1600a02..e405fd0b3c 100644 --- a/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyServer.scala +++ b/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettyServer.scala @@ -2,7 +2,7 @@ package sttp.tapir.examples import sttp.client3.{HttpURLConnectionBackend, Identity, SttpBackend, UriContext, asStringAlways, basicRequest} import sttp.model.StatusCode -import sttp.tapir.server.netty.{NettyServer, NettyServerBinding} +import sttp.tapir.server.netty.{NettyFutureServer, NettyFutureServerBinding} import sttp.tapir.{Endpoint, endpoint, query, stringBody} import scala.concurrent.ExecutionContext.Implicits.global @@ -20,7 +20,7 @@ object HelloWorldNettyServer extends App { // Creating handler for netty bootstrap val serverBinding = - Await.result(NettyServer().port(8888).addEndpoint(helloWorldServerEndpoint).start(), Duration.Inf) + Await.result(NettyFutureServer().port(8888).addEndpoint(helloWorldServerEndpoint).start(), Duration.Inf) // Bind and start to accept incoming connections. val port = serverBinding.port diff --git a/generated-doc/out/server/netty.md b/generated-doc/out/server/netty.md index 15dce06ac5..3a7b3853e8 100644 --- a/generated-doc/out/server/netty.md +++ b/generated-doc/out/server/netty.md @@ -14,7 +14,7 @@ For example: ```scala import sttp.tapir._ -import sttp.tapir.server.netty.{NettyServer, NettyServerBinding} +import sttp.tapir.server.netty.{NettyFutureServer, NettyFutureServerBinding} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -25,7 +25,7 @@ val helloWorld = endpoint .out(stringBody) .serverLogic(name => Future.successful[Either[Unit, String]](Right(s"Hello, $name!"))) -val binding: Future[NettyServerBinding] = NettyServer().addEndpoint(helloWorld).start() +val binding: Future[NettyFutureServerBinding] = NettyFutureServer().addEndpoint(helloWorld).start() ``` ## Configuration diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServer.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServer.scala new file mode 100644 index 0000000000..2dd97ceb15 --- /dev/null +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServer.scala @@ -0,0 +1,84 @@ +package sttp.tapir.server.netty + +import cats.effect.{Async, IO, Resource} +import cats.effect.std.Dispatcher +import cats.syntax.all._ +import io.netty.bootstrap.ServerBootstrap +import io.netty.channel._ +import io.netty.channel.socket.nio.NioServerSocketChannel +import sttp.monad.MonadError +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.netty.internal.CatsUtil._ +import sttp.tapir.server.netty.internal.NettyServerHandler + +import java.net.InetSocketAddress + +case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: NettyCatsServerOptions[F]) { + def addEndpoint(se: ServerEndpoint[_, _, _, Any, F]): NettyCatsServer[F] = addEndpoints(List(se)) + def addEndpoint(se: ServerEndpoint[_, _, _, Any, F], overrideOptions: NettyCatsServerOptions[F]): NettyCatsServer[F] = + addEndpoints(List(se), overrideOptions) + def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, F]]): NettyCatsServer[F] = addRoute( + NettyCatsServerInterpreter(options).toRoute(ses) + ) + def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, F]], overrideOptions: NettyCatsServerOptions[F]): NettyCatsServer[F] = addRoute( + NettyCatsServerInterpreter(overrideOptions).toRoute(ses) + ) + + def addRoute(r: Route[F]): NettyCatsServer[F] = copy(routes = routes :+ r) + def addRoutes(r: Iterable[Route[F]]): NettyCatsServer[F] = copy(routes = routes ++ r) + + def options(o: NettyCatsServerOptions[F]): NettyCatsServer[F] = copy(options = o) + def host(s: String): NettyCatsServer[F] = copy(options = options.host(s)) + def port(p: Int): NettyCatsServer[F] = copy(options = options.port(p)) + + def start(): F[NettyCatsServerBinding[F]] = Async[F].defer { + val httpBootstrap = new ServerBootstrap() + val eventLoopGroup = options.nettyOptions.eventLoopGroup() + implicit val monadError: MonadError[F] = new CatsMonadError[F]() + val route: Route[F] = Route.combine(routes) + + httpBootstrap + .group(eventLoopGroup) + .channel(classOf[NioServerSocketChannel]) + .childHandler(new ChannelInitializer[Channel] { + override def initChannel(ch: Channel): Unit = + options.nettyOptions + .initPipeline(ch.pipeline(), new NettyServerHandler(route, (f: F[Unit]) => options.dispatcher.unsafeToFuture(f))) + }) + .option[java.lang.Integer](ChannelOption.SO_BACKLOG, 128) //https://github.com/netty/netty/issues/1692 + .childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true) // https://github.com/netty/netty/issues/1692 + + val channelFuture = httpBootstrap.bind(options.host, options.port) + nettyChannelFutureToScala(channelFuture).map(ch => + NettyCatsServerBinding( + ch.localAddress().asInstanceOf[InetSocketAddress], + () => stop(ch, eventLoopGroup) + ) + ) + } + + private def stop(ch: Channel, eventLoopGroup: EventLoopGroup): F[Unit] = { + Async[F].defer { + nettyFutureToScala(ch.close()).flatMap { _ => + if (options.nettyOptions.shutdownEventLoopGroupOnClose) { + nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ()) + } else Async[F].unit + } + } + } +} + +object NettyCatsServer { + def apply[F[_]: Async](dispatcher: Dispatcher[F]): NettyCatsServer[F] = + NettyCatsServer(Vector.empty, NettyCatsServerOptions.default[F](dispatcher)) + + def apply[F[_]: Async](options: NettyCatsServerOptions[F]): NettyCatsServer[F] = + NettyCatsServer(Vector.empty, options) + + def io(): Resource[IO, NettyCatsServer[IO]] = Dispatcher[IO].map(apply[IO](_)) +} + +case class NettyCatsServerBinding[F[_]](localSocket: InetSocketAddress, stop: () => F[Unit]) { + def host: String = localSocket.getHostString + def port: Int = localSocket.getPort +} diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerInterpreter.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerInterpreter.scala new file mode 100644 index 0000000000..6c7321d23a --- /dev/null +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerInterpreter.scala @@ -0,0 +1,54 @@ +package sttp.tapir.server.netty + +import cats.effect.Async +import cats.effect.std.Dispatcher +import io.netty.buffer.ByteBuf +import sttp.monad.syntax._ +import sttp.monad.MonadError +import sttp.tapir.internal.NoStreams +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.interceptor.RequestResult +import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} +import sttp.tapir.server.netty.internal.CatsUtil.CatsMonadError +import sttp.tapir.server.netty.internal.{NettyBodyListener, NettyRequestBody, NettyToResponseBody} + +trait NettyCatsServerInterpreter[F[_]] { + implicit def async: Async[F] + def nettyServerOptions: NettyCatsServerOptions[F] + + def toRoute(ses: List[ServerEndpoint[_, _, _, Any, F]]): Route[F] = { + val handler: Route[F] = { (request: NettyServerRequest) => + implicit val monad: MonadError[F] = new CatsMonadError[F] + implicit val bodyListener: BodyListener[F, ByteBuf] = new NettyBodyListener + val serverInterpreter = new ServerInterpreter[Any, F, ByteBuf, NoStreams]( + new NettyRequestBody(request, request, nettyServerOptions.createFile), + new NettyToResponseBody, + nettyServerOptions.interceptors, + nettyServerOptions.deleteFile + ) + + serverInterpreter(request, ses) + .map { + case RequestResult.Response(response) => Some(response) + case RequestResult.Failure(_) => None + } + } + + handler + } +} + +object NettyCatsServerInterpreter { + def apply[F[_]](dispatcher: Dispatcher[F])(implicit _fa: Async[F]): NettyCatsServerInterpreter[F] = { + new NettyCatsServerInterpreter[F] { + override implicit def async: Async[F] = _fa + override def nettyServerOptions: NettyCatsServerOptions[F] = NettyCatsServerOptions.default[F](dispatcher)(async) + } + } + def apply[F[_]](options: NettyCatsServerOptions[F])(implicit _fa: Async[F]): NettyCatsServerInterpreter[F] = { + new NettyCatsServerInterpreter[F] { + override implicit def async: Async[F] = _fa + override def nettyServerOptions: NettyCatsServerOptions[F] = options + } + } +} diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerOptions.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerOptions.scala new file mode 100644 index 0000000000..7c815e6420 --- /dev/null +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerOptions.scala @@ -0,0 +1,63 @@ +package sttp.tapir.server.netty + +import cats.effect.std.Dispatcher +import cats.effect.{Async, Sync} +import com.typesafe.scalalogging.Logger +import sttp.tapir.{Defaults, TapirFile} +import sttp.tapir.model.ServerRequest +import sttp.tapir.server.interceptor.log.{DefaultServerLog, ServerLog, ServerLogInterceptor} +import sttp.tapir.server.interceptor.{CustomInterceptors, Interceptor} + +case class NettyCatsServerOptions[F[_]]( + host: String, + port: Int, + interceptors: List[Interceptor[F]], + createFile: ServerRequest => F[TapirFile], + deleteFile: TapirFile => F[Unit], + dispatcher: Dispatcher[F], + nettyOptions: NettyOptions +) { + def host(s: String): NettyCatsServerOptions[F] = copy(host = s) + def port(p: Int): NettyCatsServerOptions[F] = copy(port = p) + def randomPort: NettyCatsServerOptions[F] = port(0) + def prependInterceptor(i: Interceptor[F]): NettyCatsServerOptions[F] = copy(interceptors = i :: interceptors) + def appendInterceptor(i: Interceptor[F]): NettyCatsServerOptions[F] = copy(interceptors = interceptors :+ i) + def nettyOptions(o: NettyOptions): NettyCatsServerOptions[F] = copy(nettyOptions = o) +} + +object NettyCatsServerOptions { + def default[F[_]: Async](interceptors: List[Interceptor[F]], dispatcher: Dispatcher[F]): NettyCatsServerOptions[F] = + NettyCatsServerOptions( + "localhost", + 8080, + interceptors, + _ => Sync[F].delay(Defaults.createTempFile()), + file => Sync[F].delay(Defaults.deleteFile()(file)), + dispatcher, + NettyOptions.default + ) + + def defaultServerLog[F[_]: Async]: ServerLog[Logger => F[Unit]] = DefaultServerLog( + doLogWhenHandled = debugLog[F], + doLogAllDecodeFailures = debugLog[F], + doLogExceptions = (msg: String, ex: Throwable) => log => Sync[F].delay(log.error(msg, ex)), + noLog = _ => Sync[F].unit + ) + + def customInterceptors[F[_]: Async](dispatcher: Dispatcher[F]): CustomInterceptors[F, Logger => F[Unit], NettyCatsServerOptions[F]] = + CustomInterceptors( + createLogInterceptor = + (sl: ServerLog[Logger => F[Unit]]) => new ServerLogInterceptor[Logger => F[Unit], F](sl, (_, _) => Sync[F].unit), + createOptions = (ci: CustomInterceptors[F, Logger => F[Unit], NettyCatsServerOptions[F]]) => default(ci.interceptors, dispatcher) + ).serverLog(defaultServerLog) + + private def debugLog[F[_]: Async](msg: String, exOpt: Option[Throwable]): Logger => F[Unit] = log => + Sync[F].delay { + exOpt match { + case None => log.debug(msg) + case Some(ex) => log.debug(s"$msg; exception: {}", ex) + } + } + + def default[F[_]: Async](dispatcher: Dispatcher[F]): NettyCatsServerOptions[F] = customInterceptors(dispatcher).options +} diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala index 82ac67c03e..30bf39b383 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala @@ -3,47 +3,52 @@ package sttp.tapir.server.netty import io.netty.bootstrap.ServerBootstrap import io.netty.channel._ import io.netty.channel.socket.nio.NioServerSocketChannel +import sttp.monad.{FutureMonad, MonadError} import sttp.tapir.server.ServerEndpoint -import sttp.tapir.server.netty.internal.{NettyServerHandler, nettyChannelFutureToScala, nettyFutureToScala} +import sttp.tapir.server.netty.internal.FutureUtil._ +import sttp.tapir.server.netty.internal.NettyServerHandler import java.net.InetSocketAddress import scala.concurrent.{ExecutionContext, Future} -case class NettyServer(routes: Vector[Route], options: NettyFutureServerOptions)(implicit ec: ExecutionContext) { - def addEndpoint(se: ServerEndpoint[_, _, _, Any, Future]): NettyServer = addEndpoints(List(se)) - def addEndpoint(se: ServerEndpoint[_, _, _, Any, Future], overrideOptions: NettyFutureServerOptions): NettyServer = +case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureServerOptions)(implicit ec: ExecutionContext) { + def addEndpoint(se: ServerEndpoint[_, _, _, Any, Future]): NettyFutureServer = addEndpoints(List(se)) + def addEndpoint(se: ServerEndpoint[_, _, _, Any, Future], overrideOptions: NettyFutureServerOptions): NettyFutureServer = addEndpoints(List(se), overrideOptions) - def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, Future]]): NettyServer = addRoute( + def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, Future]]): NettyFutureServer = addRoute( NettyFutureServerInterpreter(options).toRoute(ses) ) - def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, Future]], overrideOptions: NettyFutureServerOptions): NettyServer = addRoute( - NettyFutureServerInterpreter(overrideOptions).toRoute(ses) - ) + def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, Future]], overrideOptions: NettyFutureServerOptions): NettyFutureServer = + addRoute( + NettyFutureServerInterpreter(overrideOptions).toRoute(ses) + ) - def addRoute(r: Route): NettyServer = copy(routes = routes :+ r) - def addRoutes(r: Iterable[Route]): NettyServer = copy(routes = routes ++ r) + def addRoute(r: FutureRoute): NettyFutureServer = copy(routes = routes :+ r) + def addRoutes(r: Iterable[FutureRoute]): NettyFutureServer = copy(routes = routes ++ r) - def options(o: NettyFutureServerOptions): NettyServer = copy(options = o) - def host(s: String): NettyServer = copy(options = options.host(s)) - def port(p: Int): NettyServer = copy(options = options.port(p)) + def options(o: NettyFutureServerOptions): NettyFutureServer = copy(options = o) + def host(s: String): NettyFutureServer = copy(options = options.host(s)) + def port(p: Int): NettyFutureServer = copy(options = options.port(p)) - def start(): Future[NettyServerBinding] = { + def start(): Future[NettyFutureServerBinding] = { val httpBootstrap = new ServerBootstrap() val eventLoopGroup = options.nettyOptions.eventLoopGroup() + implicit val monadError: MonadError[Future] = new FutureMonad() + val route = Route.combine(routes) httpBootstrap .group(eventLoopGroup) .channel(classOf[NioServerSocketChannel]) .childHandler(new ChannelInitializer[Channel] { override def initChannel(ch: Channel): Unit = - options.nettyOptions.initPipeline(ch.pipeline(), new NettyServerHandler(Route.combine(routes))) + options.nettyOptions.initPipeline(ch.pipeline(), new NettyServerHandler(route, (f: Future[Unit]) => f)) }) .option[java.lang.Integer](ChannelOption.SO_BACKLOG, 128) //https://github.com/netty/netty/issues/1692 .childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true) // https://github.com/netty/netty/issues/1692 val channelFuture = httpBootstrap.bind(options.host, options.port) nettyChannelFutureToScala(channelFuture).map(ch => - NettyServerBinding( + NettyFutureServerBinding( ch.localAddress().asInstanceOf[InetSocketAddress], () => stop(ch, eventLoopGroup) ) @@ -59,12 +64,12 @@ case class NettyServer(routes: Vector[Route], options: NettyFutureServerOptions) } } -object NettyServer { - def apply(serverOptions: NettyFutureServerOptions = NettyFutureServerOptions.default)(implicit ec: ExecutionContext): NettyServer = - NettyServer(Vector.empty, serverOptions) +object NettyFutureServer { + def apply(serverOptions: NettyFutureServerOptions = NettyFutureServerOptions.default)(implicit ec: ExecutionContext): NettyFutureServer = + NettyFutureServer(Vector.empty, serverOptions) } -case class NettyServerBinding(localSocket: InetSocketAddress, stop: () => Future[Unit]) { +case class NettyFutureServerBinding(localSocket: InetSocketAddress, stop: () => Future[Unit]) { def host: String = localSocket.getHostString def port: Int = localSocket.getPort } diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerInterpreter.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerInterpreter.scala index c1cc35d152..ace3c1dee9 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerInterpreter.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerInterpreter.scala @@ -10,16 +10,16 @@ import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} import sttp.tapir.server.netty.internal.{NettyBodyListener, NettyRequestBody, NettyToResponseBody} trait NettyFutureServerInterpreter { - def nettyServerOptions: NettyFutureServerOptions = NettyFutureServerOptions.default + def nettyServerOptions: NettyFutureServerOptions def toRoute( ses: List[ServerEndpoint[_, _, _, Any, Future]] - )(implicit ec: ExecutionContext): Route = { - val handler: Route = { (request: NettyServerRequest) => + )(implicit ec: ExecutionContext): FutureRoute = { + val handler: FutureRoute = { (request: NettyServerRequest) => implicit val monad: FutureMonad = new FutureMonad() implicit val bodyListener: BodyListener[Future, ByteBuf] = new NettyBodyListener val serverInterpreter = new ServerInterpreter[Any, Future, ByteBuf, NoStreams]( - new NettyRequestBody(request, request, nettyServerOptions), + new NettyRequestBody(request, request, nettyServerOptions.createFile), new NettyToResponseBody, nettyServerOptions.interceptors, nettyServerOptions.deleteFile diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/CatsUtil.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/CatsUtil.scala new file mode 100644 index 0000000000..d3d8c11c91 --- /dev/null +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/CatsUtil.scala @@ -0,0 +1,46 @@ +package sttp.tapir.server.netty.internal + +import cats.effect.{Async, Sync} +import io.netty.channel.{Channel, ChannelFuture} +import sttp.monad.MonadError + +import scala.concurrent.CancellationException + +object CatsUtil { + class CatsMonadError[F[_]](implicit F: Sync[F]) extends MonadError[F] { + override def unit[T](t: T): F[T] = F.pure(t) + override def map[T, T2](fa: F[T])(f: T => T2): F[T2] = F.map(fa)(f) + override def flatMap[T, T2](fa: F[T])(f: T => F[T2]): F[T2] = F.flatMap(fa)(f) + override def error[T](t: Throwable): F[T] = F.raiseError(t) + override protected def handleWrappedError[T](rt: F[T])(h: PartialFunction[Throwable, F[T]]): F[T] = + F.recoverWith(rt)(h) + override def eval[T](t: => T): F[T] = F.delay(t) + override def suspend[T](t: => F[T]): F[T] = F.defer(t) + override def flatten[T](ffa: F[F[T]]): F[T] = F.flatten(ffa) + override def ensure[T](f: F[T], e: => F[Unit]): F[T] = F.guaranteeCase(f)(_ => e) + } + + def nettyChannelFutureToScala[F[_]: Async](nettyFuture: ChannelFuture): F[Channel] = { + Async[F].async { k => + nettyFuture.addListener((future: ChannelFuture) => + if (future.isSuccess) k(Right(future.channel())) + else if (future.isCancelled) k(Left(new CancellationException)) + else k(Left(future.cause())) + ) + + Async[F].pure(None) + } + } + + def nettyFutureToScala[F[_]: Async, T](f: io.netty.util.concurrent.Future[T]): F[T] = { + Async[F].async { k => + f.addListener((future: io.netty.util.concurrent.Future[T]) => { + if (future.isSuccess) k(Right(future.getNow)) + else if (future.isCancelled) k(Left(new CancellationException)) + else k(Left(f.cause())) + }) + + Async[F].pure(None) + } + } +} diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/FutureUtil.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/FutureUtil.scala new file mode 100644 index 0000000000..031347fef6 --- /dev/null +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/FutureUtil.scala @@ -0,0 +1,30 @@ +package sttp.tapir.server.netty.internal + +import io.netty.channel.{Channel, ChannelFuture} + +import scala.concurrent.{CancellationException, Future, Promise} +import scala.util.{Failure, Success} + +object FutureUtil { + def nettyChannelFutureToScala(nettyFuture: ChannelFuture): Future[Channel] = { + val p = Promise[Channel]() + nettyFuture.addListener((future: ChannelFuture) => + p.complete( + if (future.isSuccess) Success(future.channel()) + else if (future.isCancelled) Failure(new CancellationException) + else Failure(future.cause()) + ) + ) + p.future + } + + def nettyFutureToScala[T](f: io.netty.util.concurrent.Future[T]): Future[T] = { + val p = Promise[T]() + f.addListener((future: io.netty.util.concurrent.Future[T]) => { + if (future.isSuccess) p.complete(Success(future.getNow)) + else if (future.isCancelled) p.complete(Failure(new CancellationException)) + else p.complete(Failure(f.cause())) + }) + p.future + } +} diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyBodyListener.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyBodyListener.scala index c21d477845..0642185e1a 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyBodyListener.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyBodyListener.scala @@ -7,8 +7,8 @@ import sttp.tapir.server.interpreter.BodyListener import scala.util.{Success, Try} -class NettyBodyListener[Future[_]](implicit m: MonadError[Future]) extends BodyListener[Future, ByteBuf] { - override def onComplete(body: ByteBuf)(cb: Try[Unit] => Future[Unit]): Future[ByteBuf] = { +class NettyBodyListener[F[_]](implicit m: MonadError[F]) extends BodyListener[F, ByteBuf] { + override def onComplete(body: ByteBuf)(cb: Try[Unit] => F[Unit]): F[ByteBuf] = { cb(Success(())).map(_ => body) } } diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala index 8b57c95043..d86da4bb74 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala @@ -2,31 +2,31 @@ package sttp.tapir.server.netty.internal import io.netty.buffer.{ByteBufInputStream, ByteBufUtil} import sttp.capabilities -import sttp.tapir.RawBodyType +import sttp.monad.MonadError +import sttp.tapir.{RawBodyType, TapirFile} import sttp.tapir.internal.NoStreams import sttp.tapir.model.ServerRequest +import sttp.monad.syntax._ import sttp.tapir.server.interpreter.{RawValue, RequestBody} -import sttp.tapir.server.netty.{NettyFutureServerOptions, NettyServerRequest} +import sttp.tapir.server.netty.NettyServerRequest import java.nio.ByteBuffer import java.nio.file.Files -import scala.concurrent.{ExecutionContext, Future} -class NettyRequestBody(req: NettyServerRequest, serverRequest: ServerRequest, serverOptions: NettyFutureServerOptions)(implicit - val ec: ExecutionContext -) extends RequestBody[Future, NoStreams] { +class NettyRequestBody[F[_]](req: NettyServerRequest, serverRequest: ServerRequest, createFile: ServerRequest => F[TapirFile])(implicit + monadError: MonadError[F] +) extends RequestBody[F, NoStreams] { override val streams: capabilities.Streams[NoStreams] = NoStreams - override def toRaw[RAW](bodyType: RawBodyType[RAW]): Future[RawValue[RAW]] = { + override def toRaw[RAW](bodyType: RawBodyType[RAW]): F[RawValue[RAW]] = { bodyType match { - case RawBodyType.StringBody(charset) => Future.successful(RawValue(req.req.content().toString(charset))) - case RawBodyType.ByteArrayBody => Future.successful(RawValue(requestContent)) - case RawBodyType.ByteBufferBody => Future.successful(RawValue(ByteBuffer.wrap(requestContent))) - case RawBodyType.InputStreamBody => Future.successful(RawValue(new ByteBufInputStream(req.req.content()))) + case RawBodyType.StringBody(charset) => monadError.unit(RawValue(req.req.content().toString(charset))) + case RawBodyType.ByteArrayBody => monadError.unit(RawValue(requestContent)) + case RawBodyType.ByteBufferBody => monadError.unit(RawValue(ByteBuffer.wrap(requestContent))) + case RawBodyType.InputStreamBody => monadError.unit(RawValue(new ByteBufInputStream(req.req.content()))) case RawBodyType.FileBody => - serverOptions - .createFile(serverRequest) + createFile(serverRequest) .map(file => { Files.write(file.toPath, requestContent) RawValue(file, Seq(file)) diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala index 9f4c721800..58db0d51d6 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerHandler.scala @@ -3,14 +3,16 @@ package sttp.tapir.server.netty.internal import io.netty.buffer.{ByteBuf, Unpooled} import io.netty.channel.{ChannelFutureListener, ChannelHandlerContext, SimpleChannelInboundHandler} import io.netty.handler.codec.http._ +import sttp.monad.MonadError +import sttp.monad.syntax._ import sttp.tapir.model.ServerResponse import sttp.tapir.server.netty.{NettyServerRequest, Route} import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext -import scala.util.{Failure, Success} +import scala.concurrent.Future -class NettyServerHandler(route: Route)(implicit val ec: ExecutionContext) extends SimpleChannelInboundHandler[FullHttpRequest] { +class NettyServerHandler[F[_]](route: Route[F], unsafeToFuture: F[Unit] => Future[Unit])(implicit me: MonadError[F]) + extends SimpleChannelInboundHandler[FullHttpRequest] { private def toHttpResponse(interpreterResponse: ServerResponse[ByteBuf], req: FullHttpRequest): FullHttpResponse = { val res = new DefaultFullHttpResponse( @@ -37,17 +39,19 @@ class NettyServerHandler(route: Route)(implicit val ec: ExecutionContext) extend } else { val req = request.retainedDuplicate() - route(NettyServerRequest(req)) - .map { - case Some(response) => response - case None => ServerResponse(sttp.model.StatusCode.NotFound, Nil, None) - } - .map(toHttpResponse(_, request)) - .map(flushResponse(ctx, request, _)) - .onComplete { - case Failure(exception) => ctx.fireExceptionCaught(exception) - case Success(_) => () - } + unsafeToFuture { + route(NettyServerRequest(req)) + .map { + case Some(response) => response + case None => ServerResponse(sttp.model.StatusCode.NotFound, Nil, None) + } + .map(toHttpResponse(_, request)) + .map(flushResponse(ctx, request, _)) + .handleError { case e: Exception => + ctx.fireExceptionCaught(e) + me.unit(()) + } + } // ignoring the result, exceptions should already be handled () } diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/package.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/package.scala index 09fb3d8253..160844cd41 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/package.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/package.scala @@ -14,26 +14,4 @@ package object internal { def toHeaderSeq: Seq[Header] = underlying.asScala.map(e => Header(e.getKey, e.getValue)).toList } - - def nettyChannelFutureToScala(nettyFuture: ChannelFuture): Future[Channel] = { - val p = Promise[Channel]() - nettyFuture.addListener((future: ChannelFuture) => - p.complete( - if (future.isSuccess) Success(future.channel()) - else if (future.isCancelled) Failure(new CancellationException) - else Failure(future.cause()) - ) - ) - p.future - } - - def nettyFutureToScala[T](f: io.netty.util.concurrent.Future[T]): Future[T] = { - val p = Promise[T]() - f.addListener((future: io.netty.util.concurrent.Future[T]) => { - if (future.isSuccess) p.complete(Success(future.getNow)) - else if (future.isCancelled) p.complete(Failure(new CancellationException)) - else p.complete(Failure(f.cause())) - }) - p.future - } } diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/package.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/package.scala index 5bfa44d6c6..5c341ed227 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/package.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/package.scala @@ -1,22 +1,25 @@ package sttp.tapir.server import io.netty.buffer.ByteBuf +import sttp.monad.MonadError +import sttp.monad.syntax._ import sttp.tapir.model.ServerResponse -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future package object netty { - type Route = NettyServerRequest => Future[Option[ServerResponse[ByteBuf]]] + type Route[F[_]] = NettyServerRequest => F[Option[ServerResponse[ByteBuf]]] + type FutureRoute = Route[Future] object Route { - def combine(routes: Iterable[Route])(implicit ec: ExecutionContext): Route = { req => - def run(rs: List[Route]): Future[Option[ServerResponse[ByteBuf]]] = rs match { + def combine[F[_]](routes: Iterable[Route[F]])(implicit me: MonadError[F]): Route[F] = { req => + def run(rs: List[Route[F]]): F[Option[ServerResponse[ByteBuf]]] = rs match { case head :: tail => head(req).flatMap { - case Some(response) => Future.successful(Some(response)) + case Some(response) => me.unit(Some(response)) case None => run(tail) } - case Nil => Future.successful(None) + case Nil => me.unit(None) } run(routes.toList) diff --git a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyCatsServerTest.scala b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyCatsServerTest.scala new file mode 100644 index 0000000000..2ddab12ca5 --- /dev/null +++ b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyCatsServerTest.scala @@ -0,0 +1,33 @@ +package sttp.tapir.server.netty + +import cats.effect.{IO, Resource} +import io.netty.channel.nio.NioEventLoopGroup +import org.scalatest.EitherValues +import sttp.monad.MonadError +import sttp.tapir.server.netty.internal.{CatsUtil, FutureUtil} +import sttp.tapir.server.tests._ +import sttp.tapir.tests.{Test, TestSuite} + +class NettyCatsServerTest extends TestSuite with EitherValues { + override def tests: Resource[IO, List[Test]] = + backendResource.flatMap { backend => + Resource + .make { + implicit val m: MonadError[IO] = new CatsUtil.CatsMonadError[IO]() + val eventLoopGroup = new NioEventLoopGroup() + + val interpreter = new NettyCatsTestServerInterpreter(eventLoopGroup, dispatcher) + val createServerTest = new DefaultCreateServerTest(backend, interpreter) + + val tests = new ServerBasicTests(createServerTest, interpreter).tests() ++ + new ServerAuthenticationTests(createServerTest).tests() ++ + new ServerMetricsTest(createServerTest).tests() ++ + new ServerRejectTests(createServerTest, interpreter).tests() + + IO.pure((tests, eventLoopGroup)) + } { case (_, eventLoopGroup) => + IO.fromFuture(IO.delay(FutureUtil.nettyFutureToScala(eventLoopGroup.shutdownGracefully()))).void + } + .map { case (tests, _) => tests } + } +} diff --git a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyCatsTestServerInterpreter.scala b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyCatsTestServerInterpreter.scala new file mode 100644 index 0000000000..2ce81c8371 --- /dev/null +++ b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyCatsTestServerInterpreter.scala @@ -0,0 +1,50 @@ +package sttp.tapir.server.netty + +import cats.data.NonEmptyList +import cats.effect.std.Dispatcher +import cats.effect.{IO, Resource} +import io.netty.channel.nio.NioEventLoopGroup +import sttp.tapir.Endpoint +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.interceptor.decodefailure.{DecodeFailureHandler, DefaultDecodeFailureHandler} +import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor +import sttp.tapir.server.tests.TestServerInterpreter +import sttp.tapir.tests.Port + +import scala.reflect.ClassTag + +class NettyCatsTestServerInterpreter(eventLoopGroup: NioEventLoopGroup, dispatcher: Dispatcher[IO]) + extends TestServerInterpreter[IO, Any, Route[IO]] { + + override def route[I, E, O]( + e: ServerEndpoint[I, E, O, Any, IO], + decodeFailureHandler: Option[DecodeFailureHandler] = None, + metricsInterceptor: Option[MetricsRequestInterceptor[IO]] = None + ): Route[IO] = { + val serverOptions: NettyCatsServerOptions[IO] = NettyCatsServerOptions + .customInterceptors[IO](dispatcher) + .metricsInterceptor(metricsInterceptor) + .decodeFailureHandler(decodeFailureHandler.getOrElse(DefaultDecodeFailureHandler.handler)) + .options + + NettyCatsServerInterpreter(serverOptions).toRoute(List(e)) + } + + override def route[I, E, O](es: List[ServerEndpoint[I, E, O, Any, IO]]): Route[IO] = + NettyCatsServerInterpreter[IO](dispatcher).toRoute(es) + + override def routeRecoverErrors[I, E <: Throwable, O](e: Endpoint[I, E, O, Any], fn: I => IO[O])(implicit + eClassTag: ClassTag[E] + ): Route[IO] = + NettyCatsServerInterpreter[IO](dispatcher).toRoute(List(e.serverLogicRecoverErrors(fn))) + + override def server(routes: NonEmptyList[Route[IO]]): Resource[IO, Port] = { + val options = + NettyCatsServerOptions.default[IO](dispatcher).nettyOptions(NettyOptions.default.eventLoopGroup(eventLoopGroup)).randomPort + val bind = NettyCatsServer(options).addRoutes(routes.toList).start() + + Resource + .make(bind)(_.stop()) + .map(_.port) + } +} diff --git a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyServerTest.scala b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureServerTest.scala similarity index 83% rename from server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyServerTest.scala rename to server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureServerTest.scala index 0cf0923382..83fa5709bb 100644 --- a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyServerTest.scala +++ b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureServerTest.scala @@ -4,11 +4,11 @@ import cats.effect.{IO, Resource} import io.netty.channel.nio.NioEventLoopGroup import org.scalatest.EitherValues import sttp.monad.FutureMonad +import sttp.tapir.server.netty.internal.FutureUtil.nettyFutureToScala import sttp.tapir.server.tests._ import sttp.tapir.tests.{Test, TestSuite} -import sttp.tapir.server.netty.internal.nettyFutureToScala -class NettyServerTest extends TestSuite with EitherValues { +class NettyFutureServerTest extends TestSuite with EitherValues { override def tests: Resource[IO, List[Test]] = backendResource.flatMap { backend => Resource @@ -16,7 +16,7 @@ class NettyServerTest extends TestSuite with EitherValues { implicit val m: FutureMonad = new FutureMonad() val eventLoopGroup = new NioEventLoopGroup() - val interpreter = new NettyTestServerInterpreter(eventLoopGroup) + val interpreter = new NettyFutureTestServerInterpreter(eventLoopGroup) val createServerTest = new DefaultCreateServerTest(backend, interpreter) val tests = new ServerBasicTests(createServerTest, interpreter).tests() ++ diff --git a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyTestServerInterpreter.scala b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala similarity index 79% rename from server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyTestServerInterpreter.scala rename to server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala index 16affe9957..86cae26584 100644 --- a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyTestServerInterpreter.scala +++ b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyFutureTestServerInterpreter.scala @@ -13,14 +13,14 @@ import sttp.tapir.tests.Port import scala.concurrent.{ExecutionContext, Future} import scala.reflect.ClassTag -class NettyTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)(implicit ec: ExecutionContext) - extends TestServerInterpreter[Future, Any, Route] { +class NettyFutureTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)(implicit ec: ExecutionContext) + extends TestServerInterpreter[Future, Any, FutureRoute] { override def route[I, E, O]( e: ServerEndpoint[I, E, O, Any, Future], decodeFailureHandler: Option[DecodeFailureHandler] = None, metricsInterceptor: Option[MetricsRequestInterceptor[Future]] = None - ): Route = { + ): FutureRoute = { val serverOptions: NettyFutureServerOptions = NettyFutureServerOptions.customInterceptors .metricsInterceptor(metricsInterceptor) .decodeFailureHandler(decodeFailureHandler.getOrElse(DefaultDecodeFailureHandler.handler)) @@ -29,19 +29,19 @@ class NettyTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)(implicit ec: NettyFutureServerInterpreter(serverOptions).toRoute(List(e)) } - override def route[I, E, O](es: List[ServerEndpoint[I, E, O, Any, Future]]): Route = { + override def route[I, E, O](es: List[ServerEndpoint[I, E, O, Any, Future]]): FutureRoute = { NettyFutureServerInterpreter().toRoute(es) } override def routeRecoverErrors[I, E <: Throwable, O](e: Endpoint[I, E, O, Any], fn: I => Future[O])(implicit eClassTag: ClassTag[E] - ): Route = { + ): FutureRoute = { NettyFutureServerInterpreter().toRoute(List(e.serverLogicRecoverErrors(fn))) } - override def server(routes: NonEmptyList[Route]): Resource[IO, Port] = { + override def server(routes: NonEmptyList[FutureRoute]): Resource[IO, Port] = { val options = NettyFutureServerOptions.default.nettyOptions(NettyOptions.default.eventLoopGroup(eventLoopGroup)).randomPort - val bind = IO.fromFuture(IO.delay(NettyServer(options).addRoutes(routes.toList).start())) + val bind = IO.fromFuture(IO.delay(NettyFutureServer(options).addRoutes(routes.toList).start())) Resource .make(bind)(binding => IO.fromFuture(IO.delay(binding.stop()))) diff --git a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyServerRequestSpec.scala b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyServerRequestSpec.scala index 96d7dd747b..6ef1e00dfe 100644 --- a/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyServerRequestSpec.scala +++ b/server/netty-server/src/test/scala/sttp/tapir/server/netty/NettyServerRequestSpec.scala @@ -10,7 +10,7 @@ import sttp.model.Method import sttp.tapir.server.netty.internal.RichNettyHttpHeaders class NettyServerRequestSpec extends AnyFreeSpec with Matchers { - val uri = JavaUri.create("/with%20space/another/last?param=value1¶m=value2") + val uri: JavaUri = JavaUri.create("/with%20space/another/last?param=value1¶m=value2") val headers = new DefaultHttpHeaders() headers.add(HttpHeaderNames.CONTENT_TYPE, "text/plain") @@ -28,7 +28,7 @@ class NettyServerRequestSpec extends AnyFreeSpec with Matchers { trailingHeaders ) - val nettyServerRequest = new NettyServerRequest(emptyPostRequest) + val nettyServerRequest: NettyServerRequest = NettyServerRequest(emptyPostRequest) "uri is the same as in request" in { nettyServerRequest.uri.toString should equal(uri.toString) From 6a55375c255db5a25e1c36bdbe006abd5380bfad Mon Sep 17 00:00:00 2001 From: adamw Date: Wed, 15 Sep 2021 16:19:38 +0200 Subject: [PATCH 3/5] Remove some code duplication --- .../netty/NettyCatsServerInterpreter.scala | 27 ++----------- .../netty/NettyFutureServerInterpreter.scala | 29 +++----------- .../internal/NettyServerInterpreter.scala | 39 +++++++++++++++++++ 3 files changed, 47 insertions(+), 48 deletions(-) create mode 100644 server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerInterpreter.scala diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerInterpreter.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerInterpreter.scala index 6c7321d23a..f4af73ba9e 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerInterpreter.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerInterpreter.scala @@ -2,39 +2,18 @@ package sttp.tapir.server.netty import cats.effect.Async import cats.effect.std.Dispatcher -import io.netty.buffer.ByteBuf -import sttp.monad.syntax._ import sttp.monad.MonadError -import sttp.tapir.internal.NoStreams import sttp.tapir.server.ServerEndpoint -import sttp.tapir.server.interceptor.RequestResult -import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} import sttp.tapir.server.netty.internal.CatsUtil.CatsMonadError -import sttp.tapir.server.netty.internal.{NettyBodyListener, NettyRequestBody, NettyToResponseBody} +import sttp.tapir.server.netty.internal.NettyServerInterpreter trait NettyCatsServerInterpreter[F[_]] { implicit def async: Async[F] def nettyServerOptions: NettyCatsServerOptions[F] def toRoute(ses: List[ServerEndpoint[_, _, _, Any, F]]): Route[F] = { - val handler: Route[F] = { (request: NettyServerRequest) => - implicit val monad: MonadError[F] = new CatsMonadError[F] - implicit val bodyListener: BodyListener[F, ByteBuf] = new NettyBodyListener - val serverInterpreter = new ServerInterpreter[Any, F, ByteBuf, NoStreams]( - new NettyRequestBody(request, request, nettyServerOptions.createFile), - new NettyToResponseBody, - nettyServerOptions.interceptors, - nettyServerOptions.deleteFile - ) - - serverInterpreter(request, ses) - .map { - case RequestResult.Response(response) => Some(response) - case RequestResult.Failure(_) => None - } - } - - handler + implicit val monad: MonadError[F] = new CatsMonadError[F] + NettyServerInterpreter.toRoute(ses, nettyServerOptions.interceptors, nettyServerOptions.createFile, nettyServerOptions.deleteFile) } } diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerInterpreter.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerInterpreter.scala index ace3c1dee9..400f08caa4 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerInterpreter.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerInterpreter.scala @@ -1,13 +1,10 @@ package sttp.tapir.server.netty -import scala.concurrent.{ExecutionContext, Future} -import io.netty.buffer.ByteBuf import sttp.monad.FutureMonad -import sttp.tapir.internal.NoStreams import sttp.tapir.server.ServerEndpoint -import sttp.tapir.server.interceptor.RequestResult -import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} -import sttp.tapir.server.netty.internal.{NettyBodyListener, NettyRequestBody, NettyToResponseBody} +import sttp.tapir.server.netty.internal.NettyServerInterpreter + +import scala.concurrent.{ExecutionContext, Future} trait NettyFutureServerInterpreter { def nettyServerOptions: NettyFutureServerOptions @@ -15,24 +12,8 @@ trait NettyFutureServerInterpreter { def toRoute( ses: List[ServerEndpoint[_, _, _, Any, Future]] )(implicit ec: ExecutionContext): FutureRoute = { - val handler: FutureRoute = { (request: NettyServerRequest) => - implicit val monad: FutureMonad = new FutureMonad() - implicit val bodyListener: BodyListener[Future, ByteBuf] = new NettyBodyListener - val serverInterpreter = new ServerInterpreter[Any, Future, ByteBuf, NoStreams]( - new NettyRequestBody(request, request, nettyServerOptions.createFile), - new NettyToResponseBody, - nettyServerOptions.interceptors, - nettyServerOptions.deleteFile - ) - - serverInterpreter(request, ses) - .map { - case RequestResult.Response(response) => Some(response) - case RequestResult.Failure(_) => None - } - } - - handler + implicit val monad: FutureMonad = new FutureMonad() + NettyServerInterpreter.toRoute(ses, nettyServerOptions.interceptors, nettyServerOptions.createFile, nettyServerOptions.deleteFile) } } diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerInterpreter.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerInterpreter.scala new file mode 100644 index 0000000000..6ccd4b1052 --- /dev/null +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyServerInterpreter.scala @@ -0,0 +1,39 @@ +package sttp.tapir.server.netty.internal + +import io.netty.buffer.ByteBuf +import sttp.monad.syntax._ +import sttp.monad.MonadError +import sttp.tapir.TapirFile +import sttp.tapir.internal.NoStreams +import sttp.tapir.model.ServerRequest +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.interceptor.{Interceptor, RequestResult} +import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} +import sttp.tapir.server.netty.{NettyServerRequest, Route} + +object NettyServerInterpreter { + def toRoute[F[_]: MonadError]( + ses: List[ServerEndpoint[_, _, _, Any, F]], + interceptors: List[Interceptor[F]], + createFile: ServerRequest => F[TapirFile], + deleteFile: TapirFile => F[Unit] + ): Route[F] = { + val handler: Route[F] = { (request: NettyServerRequest) => + implicit val bodyListener: BodyListener[F, ByteBuf] = new NettyBodyListener + val serverInterpreter = new ServerInterpreter[Any, F, ByteBuf, NoStreams]( + new NettyRequestBody(request, request, createFile), + new NettyToResponseBody, + interceptors, + deleteFile + ) + + serverInterpreter(request, ses) + .map { + case RequestResult.Response(response) => Some(response) + case RequestResult.Failure(_) => None + } + } + + handler + } +} From 6656ee73a1db83a77f6534651eb236c4028c8140 Mon Sep 17 00:00:00 2001 From: adamw Date: Wed, 15 Sep 2021 16:33:13 +0200 Subject: [PATCH 4/5] More deduplication --- .../tapir/server/netty/NettyCatsServer.scala | 23 +++++---------- .../server/netty/NettyFutureServer.scala | 22 +++++--------- .../netty/internal/NettyBootstrap.scala | 29 +++++++++++++++++++ 3 files changed, 45 insertions(+), 29 deletions(-) create mode 100644 server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyBootstrap.scala diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServer.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServer.scala index 2dd97ceb15..eafffbca95 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServer.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServer.scala @@ -3,13 +3,11 @@ package sttp.tapir.server.netty import cats.effect.{Async, IO, Resource} import cats.effect.std.Dispatcher import cats.syntax.all._ -import io.netty.bootstrap.ServerBootstrap import io.netty.channel._ -import io.netty.channel.socket.nio.NioServerSocketChannel import sttp.monad.MonadError import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.netty.internal.CatsUtil._ -import sttp.tapir.server.netty.internal.NettyServerHandler +import sttp.tapir.server.netty.internal.{NettyBootstrap, NettyServerHandler} import java.net.InetSocketAddress @@ -32,23 +30,18 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty def port(p: Int): NettyCatsServer[F] = copy(options = options.port(p)) def start(): F[NettyCatsServerBinding[F]] = Async[F].defer { - val httpBootstrap = new ServerBootstrap() val eventLoopGroup = options.nettyOptions.eventLoopGroup() implicit val monadError: MonadError[F] = new CatsMonadError[F]() val route: Route[F] = Route.combine(routes) - httpBootstrap - .group(eventLoopGroup) - .channel(classOf[NioServerSocketChannel]) - .childHandler(new ChannelInitializer[Channel] { - override def initChannel(ch: Channel): Unit = - options.nettyOptions - .initPipeline(ch.pipeline(), new NettyServerHandler(route, (f: F[Unit]) => options.dispatcher.unsafeToFuture(f))) - }) - .option[java.lang.Integer](ChannelOption.SO_BACKLOG, 128) //https://github.com/netty/netty/issues/1692 - .childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true) // https://github.com/netty/netty/issues/1692 + val channelFuture = NettyBootstrap( + options.nettyOptions, + new NettyServerHandler(route, (f: F[Unit]) => options.dispatcher.unsafeToFuture(f)), + eventLoopGroup, + options.host, + options.port + ) - val channelFuture = httpBootstrap.bind(options.host, options.port) nettyChannelFutureToScala(channelFuture).map(ch => NettyCatsServerBinding( ch.localAddress().asInstanceOf[InetSocketAddress], diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala index 30bf39b383..56b60d76c6 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala @@ -1,12 +1,10 @@ package sttp.tapir.server.netty -import io.netty.bootstrap.ServerBootstrap import io.netty.channel._ -import io.netty.channel.socket.nio.NioServerSocketChannel import sttp.monad.{FutureMonad, MonadError} import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.netty.internal.FutureUtil._ -import sttp.tapir.server.netty.internal.NettyServerHandler +import sttp.tapir.server.netty.internal.{NettyBootstrap, NettyServerHandler} import java.net.InetSocketAddress import scala.concurrent.{ExecutionContext, Future} @@ -31,22 +29,18 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe def port(p: Int): NettyFutureServer = copy(options = options.port(p)) def start(): Future[NettyFutureServerBinding] = { - val httpBootstrap = new ServerBootstrap() val eventLoopGroup = options.nettyOptions.eventLoopGroup() implicit val monadError: MonadError[Future] = new FutureMonad() val route = Route.combine(routes) - httpBootstrap - .group(eventLoopGroup) - .channel(classOf[NioServerSocketChannel]) - .childHandler(new ChannelInitializer[Channel] { - override def initChannel(ch: Channel): Unit = - options.nettyOptions.initPipeline(ch.pipeline(), new NettyServerHandler(route, (f: Future[Unit]) => f)) - }) - .option[java.lang.Integer](ChannelOption.SO_BACKLOG, 128) //https://github.com/netty/netty/issues/1692 - .childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true) // https://github.com/netty/netty/issues/1692 + val channelFuture = NettyBootstrap( + options.nettyOptions, + new NettyServerHandler(route, (f: Future[Unit]) => f), + eventLoopGroup, + options.host, + options.port + ) - val channelFuture = httpBootstrap.bind(options.host, options.port) nettyChannelFutureToScala(channelFuture).map(ch => NettyFutureServerBinding( ch.localAddress().asInstanceOf[InetSocketAddress], diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyBootstrap.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyBootstrap.scala new file mode 100644 index 0000000000..3f80f2c146 --- /dev/null +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyBootstrap.scala @@ -0,0 +1,29 @@ +package sttp.tapir.server.netty.internal + +import io.netty.bootstrap.ServerBootstrap +import io.netty.channel.{Channel, ChannelFuture, ChannelInitializer, ChannelOption, EventLoopGroup} +import io.netty.channel.socket.nio.NioServerSocketChannel +import sttp.tapir.server.netty.NettyOptions + +object NettyBootstrap { + def apply[F[_]]( + nettyOptions: NettyOptions, + handler: => NettyServerHandler[F], + eventLoopGroup: EventLoopGroup, + host: String, + port: Int + ): ChannelFuture = { + val httpBootstrap = new ServerBootstrap() + + httpBootstrap + .group(eventLoopGroup) + .channel(classOf[NioServerSocketChannel]) + .childHandler(new ChannelInitializer[Channel] { + override def initChannel(ch: Channel): Unit = nettyOptions.initPipeline(ch.pipeline(), handler) + }) + .option[java.lang.Integer](ChannelOption.SO_BACKLOG, 128) //https://github.com/netty/netty/issues/1692 + .childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true) // https://github.com/netty/netty/issues/1692 + + httpBootstrap.bind(host, port) + } +} From b76f5b005e39426bf62b2803c71b952c4068ad0e Mon Sep 17 00:00:00 2001 From: adamw Date: Wed, 15 Sep 2021 17:31:27 +0200 Subject: [PATCH 5/5] More deduplication --- .../server/netty/NettyCatsServerOptions.scala | 29 ++++++++----------- .../tapir/server/netty/NettyDefaults.scala | 14 +++++++++ .../netty/NettyFutureServerOptions.scala | 29 ++++++++----------- 3 files changed, 38 insertions(+), 34 deletions(-) create mode 100644 server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyDefaults.scala diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerOptions.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerOptions.scala index 7c815e6420..96c285954d 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerOptions.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerOptions.scala @@ -26,10 +26,12 @@ case class NettyCatsServerOptions[F[_]]( } object NettyCatsServerOptions { + def default[F[_]: Async](dispatcher: Dispatcher[F]): NettyCatsServerOptions[F] = customInterceptors(dispatcher).options + def default[F[_]: Async](interceptors: List[Interceptor[F]], dispatcher: Dispatcher[F]): NettyCatsServerOptions[F] = NettyCatsServerOptions( - "localhost", - 8080, + NettyDefaults.DefaultHost, + NettyDefaults.DefaultPort, interceptors, _ => Sync[F].delay(Defaults.createTempFile()), file => Sync[F].delay(Defaults.deleteFile()(file)), @@ -37,13 +39,6 @@ object NettyCatsServerOptions { NettyOptions.default ) - def defaultServerLog[F[_]: Async]: ServerLog[Logger => F[Unit]] = DefaultServerLog( - doLogWhenHandled = debugLog[F], - doLogAllDecodeFailures = debugLog[F], - doLogExceptions = (msg: String, ex: Throwable) => log => Sync[F].delay(log.error(msg, ex)), - noLog = _ => Sync[F].unit - ) - def customInterceptors[F[_]: Async](dispatcher: Dispatcher[F]): CustomInterceptors[F, Logger => F[Unit], NettyCatsServerOptions[F]] = CustomInterceptors( createLogInterceptor = @@ -51,13 +46,13 @@ object NettyCatsServerOptions { createOptions = (ci: CustomInterceptors[F, Logger => F[Unit], NettyCatsServerOptions[F]]) => default(ci.interceptors, dispatcher) ).serverLog(defaultServerLog) - private def debugLog[F[_]: Async](msg: String, exOpt: Option[Throwable]): Logger => F[Unit] = log => - Sync[F].delay { - exOpt match { - case None => log.debug(msg) - case Some(ex) => log.debug(s"$msg; exception: {}", ex) - } - } + def defaultServerLog[F[_]: Async]: ServerLog[Logger => F[Unit]] = DefaultServerLog( + doLogWhenHandled = debugLog[F], + doLogAllDecodeFailures = debugLog[F], + doLogExceptions = (msg: String, ex: Throwable) => log => Sync[F].delay(log.error(msg, ex)), + noLog = _ => Sync[F].unit + ) - def default[F[_]: Async](dispatcher: Dispatcher[F]): NettyCatsServerOptions[F] = customInterceptors(dispatcher).options + private def debugLog[F[_]: Async](msg: String, exOpt: Option[Throwable]): Logger => F[Unit] = log => + Sync[F].delay(NettyDefaults.debugLog(log, msg, exOpt)) } diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyDefaults.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyDefaults.scala new file mode 100644 index 0000000000..a702ad91fe --- /dev/null +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyDefaults.scala @@ -0,0 +1,14 @@ +package sttp.tapir.server.netty + +import com.typesafe.scalalogging.Logger + +object NettyDefaults { + val DefaultHost = "localhost" + val DefaultPort = 8080 + + def debugLog(log: Logger, msg: String, exOpt: Option[Throwable]): Unit = + exOpt match { + case None => log.debug(msg) + case Some(ex) => log.debug(s"$msg; exception: {}", ex) + } +} diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerOptions.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerOptions.scala index 0e03daec28..ca7ef4094d 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerOptions.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerOptions.scala @@ -25,9 +25,11 @@ case class NettyFutureServerOptions( } object NettyFutureServerOptions { + val default: NettyFutureServerOptions = customInterceptors.options + def default(interceptors: List[Interceptor[Future]]): NettyFutureServerOptions = NettyFutureServerOptions( - "localhost", - 8080, + NettyDefaults.DefaultHost, + NettyDefaults.DefaultPort, interceptors, _ => { import scala.concurrent.ExecutionContext.Implicits.global @@ -40,13 +42,6 @@ object NettyFutureServerOptions { NettyOptions.default ) - lazy val defaultServerLog: ServerLog[Logger => Future[Unit]] = DefaultServerLog( - doLogWhenHandled = debugLog, - doLogAllDecodeFailures = debugLog, - doLogExceptions = (msg: String, ex: Throwable) => log => Future.successful(log.error(msg, ex)), - noLog = _ => Future.unit - ) - def customInterceptors: CustomInterceptors[Future, Logger => Future[Unit], NettyFutureServerOptions] = { CustomInterceptors( createLogInterceptor = @@ -55,13 +50,13 @@ object NettyFutureServerOptions { ).serverLog(defaultServerLog) } - private def debugLog(msg: String, exOpt: Option[Throwable]): Logger => Future[Unit] = log => - Future.successful { - exOpt match { - case None => log.debug(msg) - case Some(ex) => log.debug(s"$msg; exception: {}", ex) - } - } + lazy val defaultServerLog: ServerLog[Logger => Future[Unit]] = DefaultServerLog( + doLogWhenHandled = debugLog, + doLogAllDecodeFailures = debugLog, + doLogExceptions = (msg: String, ex: Throwable) => log => Future.successful(log.error(msg, ex)), + noLog = _ => Future.unit + ) - val default: NettyFutureServerOptions = customInterceptors.options + private def debugLog(msg: String, exOpt: Option[Throwable]): Logger => Future[Unit] = log => + Future.successful(NettyDefaults.debugLog(log, msg, exOpt)) }