Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the netty interpreter for cats-effect #1485

Merged
merged 6 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,10 @@ lazy val nettyServer: ProjectMatrix = (projectMatrix in file("server/netty-serve
.settings(commonJvmSettings)
.settings(
name := "tapir-netty-server",
libraryDependencies ++= Seq("io.netty" % "netty-all" % "4.1.68.Final") ++ loggerDependencies
libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.1.68.Final",
"com.softwaremill.sttp.shared" %% "fs2" % Versions.sttpShared % Optional
) ++ loggerDependencies
)
.jvmPlatform(scalaVersions = scala2And3Versions)
.dependsOn(core, serverTests % Test)
Expand Down
16 changes: 10 additions & 6 deletions doc/server/netty.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ To expose an endpoint using a [Netty](https://netty.io)-based server, first add
"com.softwaremill.sttp.tapir" %% "tapir-netty-server" % "@VERSION@"
```

Then, use `NettyServer().addEndpoints` to expose `Future`-based server endpoints.
Then, use:

* `NettyFutureServer().addEndpoints` to expose `Future`-based server endpoints.
* `NettyCatsServer().addEndpoints` to expose `F`-based server endpoints, where `F` is any cats-effect supported effect.

For example:

```scala mdoc:compile-only
import sttp.tapir._
import sttp.tapir.server.netty.{NettyServer, NettyServerBinding}
import sttp.tapir.server.netty.{NettyFutureServer, NettyFutureServerBinding}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
Expand All @@ -25,16 +28,17 @@ val helloWorld = endpoint
.out(stringBody)
.serverLogic(name => Future.successful[Either[Unit, String]](Right(s"Hello, $name!")))

val binding: Future[NettyServerBinding] = NettyServer().addEndpoint(helloWorld).start()
val binding: Future[NettyFutureServerBinding] =
NettyFutureServer().addEndpoint(helloWorld).start()
```

## Configuration

The interpreter can be configured by providing an `NettyServerOptions` value, see [server options](options.md) for
The interpreter can be configured by providing an `NettyFutureServerOptions` value, see [server options](options.md) for
details.

Some of the options can be configured directly using a `NettyServer` instance, such as the host and port. Others
can be passed using the `NettyServer(options)` methods. Options may also be overriden when adding endpoints.
Some options can be configured directly using a `NettyFutureServer` instance, such as the host and port. Others
can be passed using the `NettyFutureServer(options)` methods. Options may also be overridden when adding endpoints.

## Defining an endpoint together with the server logic

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sttp.tapir.examples

import sttp.client3.{HttpURLConnectionBackend, Identity, SttpBackend, UriContext, asStringAlways, basicRequest}
import sttp.model.StatusCode
import sttp.tapir.server.netty.NettyServer
import sttp.tapir.server.netty.{NettyFutureServer, NettyFutureServerBinding}
import sttp.tapir.{Endpoint, endpoint, query, stringBody}

import scala.concurrent.ExecutionContext.Implicits.global
Expand All @@ -20,7 +20,7 @@ object HelloWorldNettyServer extends App {

// Creating handler for netty bootstrap
val serverBinding =
Await.result(NettyServer().port(8888).addEndpoint(helloWorldServerEndpoint).start(), Duration.Inf)
Await.result(NettyFutureServer().port(8888).addEndpoint(helloWorldServerEndpoint).start(), Duration.Inf)

// Bind and start to accept incoming connections.
val port = serverBinding.port
Expand Down
4 changes: 2 additions & 2 deletions generated-doc/out/server/netty.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ For example:

```scala
import sttp.tapir._
import sttp.tapir.server.netty.{NettyServer, NettyServerBinding}
import sttp.tapir.server.netty.{NettyFutureServer, NettyFutureServerBinding}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
Expand All @@ -25,7 +25,7 @@ val helloWorld = endpoint
.out(stringBody)
.serverLogic(name => Future.successful[Either[Unit, String]](Right(s"Hello, $name!")))

val binding: Future[NettyServerBinding] = NettyServer().addEndpoint(helloWorld).start()
val binding: Future[NettyFutureServerBinding] = NettyFutureServer().addEndpoint(helloWorld).start()
```

## Configuration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package sttp.tapir.server.netty

import cats.effect.{Async, IO, Resource}
import cats.effect.std.Dispatcher
import cats.syntax.all._
import io.netty.channel._
import sttp.monad.MonadError
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.netty.internal.CatsUtil._
import sttp.tapir.server.netty.internal.{NettyBootstrap, NettyServerHandler}

import java.net.InetSocketAddress

case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: NettyCatsServerOptions[F]) {
def addEndpoint(se: ServerEndpoint[_, _, _, Any, F]): NettyCatsServer[F] = addEndpoints(List(se))
def addEndpoint(se: ServerEndpoint[_, _, _, Any, F], overrideOptions: NettyCatsServerOptions[F]): NettyCatsServer[F] =
addEndpoints(List(se), overrideOptions)
def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, F]]): NettyCatsServer[F] = addRoute(
NettyCatsServerInterpreter(options).toRoute(ses)
)
def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, F]], overrideOptions: NettyCatsServerOptions[F]): NettyCatsServer[F] = addRoute(
NettyCatsServerInterpreter(overrideOptions).toRoute(ses)
)

def addRoute(r: Route[F]): NettyCatsServer[F] = copy(routes = routes :+ r)
def addRoutes(r: Iterable[Route[F]]): NettyCatsServer[F] = copy(routes = routes ++ r)

def options(o: NettyCatsServerOptions[F]): NettyCatsServer[F] = copy(options = o)
def host(s: String): NettyCatsServer[F] = copy(options = options.host(s))
def port(p: Int): NettyCatsServer[F] = copy(options = options.port(p))

def start(): F[NettyCatsServerBinding[F]] = Async[F].defer {
val eventLoopGroup = options.nettyOptions.eventLoopGroup()
implicit val monadError: MonadError[F] = new CatsMonadError[F]()
val route: Route[F] = Route.combine(routes)

val channelFuture = NettyBootstrap(
options.nettyOptions,
new NettyServerHandler(route, (f: F[Unit]) => options.dispatcher.unsafeToFuture(f)),
eventLoopGroup,
options.host,
options.port
)

nettyChannelFutureToScala(channelFuture).map(ch =>
NettyCatsServerBinding(
ch.localAddress().asInstanceOf[InetSocketAddress],
() => stop(ch, eventLoopGroup)
)
)
}

private def stop(ch: Channel, eventLoopGroup: EventLoopGroup): F[Unit] = {
Async[F].defer {
nettyFutureToScala(ch.close()).flatMap { _ =>
if (options.nettyOptions.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ())
} else Async[F].unit
}
}
}
}

object NettyCatsServer {
def apply[F[_]: Async](dispatcher: Dispatcher[F]): NettyCatsServer[F] =
NettyCatsServer(Vector.empty, NettyCatsServerOptions.default[F](dispatcher))

def apply[F[_]: Async](options: NettyCatsServerOptions[F]): NettyCatsServer[F] =
NettyCatsServer(Vector.empty, options)

def io(): Resource[IO, NettyCatsServer[IO]] = Dispatcher[IO].map(apply[IO](_))
}

case class NettyCatsServerBinding[F[_]](localSocket: InetSocketAddress, stop: () => F[Unit]) {
def host: String = localSocket.getHostString
def port: Int = localSocket.getPort
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package sttp.tapir.server.netty

import cats.effect.Async
import cats.effect.std.Dispatcher
import sttp.monad.MonadError
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.netty.internal.CatsUtil.CatsMonadError
import sttp.tapir.server.netty.internal.NettyServerInterpreter

trait NettyCatsServerInterpreter[F[_]] {
implicit def async: Async[F]
def nettyServerOptions: NettyCatsServerOptions[F]

def toRoute(ses: List[ServerEndpoint[_, _, _, Any, F]]): Route[F] = {
implicit val monad: MonadError[F] = new CatsMonadError[F]
NettyServerInterpreter.toRoute(ses, nettyServerOptions.interceptors, nettyServerOptions.createFile, nettyServerOptions.deleteFile)
}
}

object NettyCatsServerInterpreter {
def apply[F[_]](dispatcher: Dispatcher[F])(implicit _fa: Async[F]): NettyCatsServerInterpreter[F] = {
new NettyCatsServerInterpreter[F] {
override implicit def async: Async[F] = _fa
override def nettyServerOptions: NettyCatsServerOptions[F] = NettyCatsServerOptions.default[F](dispatcher)(async)
}
}
def apply[F[_]](options: NettyCatsServerOptions[F])(implicit _fa: Async[F]): NettyCatsServerInterpreter[F] = {
new NettyCatsServerInterpreter[F] {
override implicit def async: Async[F] = _fa
override def nettyServerOptions: NettyCatsServerOptions[F] = options
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package sttp.tapir.server.netty

import cats.effect.std.Dispatcher
import cats.effect.{Async, Sync}
import com.typesafe.scalalogging.Logger
import sttp.tapir.{Defaults, TapirFile}
import sttp.tapir.model.ServerRequest
import sttp.tapir.server.interceptor.log.{DefaultServerLog, ServerLog, ServerLogInterceptor}
import sttp.tapir.server.interceptor.{CustomInterceptors, Interceptor}

case class NettyCatsServerOptions[F[_]](
host: String,
port: Int,
interceptors: List[Interceptor[F]],
createFile: ServerRequest => F[TapirFile],
deleteFile: TapirFile => F[Unit],
dispatcher: Dispatcher[F],
nettyOptions: NettyOptions
) {
def host(s: String): NettyCatsServerOptions[F] = copy(host = s)
def port(p: Int): NettyCatsServerOptions[F] = copy(port = p)
def randomPort: NettyCatsServerOptions[F] = port(0)
def prependInterceptor(i: Interceptor[F]): NettyCatsServerOptions[F] = copy(interceptors = i :: interceptors)
def appendInterceptor(i: Interceptor[F]): NettyCatsServerOptions[F] = copy(interceptors = interceptors :+ i)
def nettyOptions(o: NettyOptions): NettyCatsServerOptions[F] = copy(nettyOptions = o)
}

object NettyCatsServerOptions {
def default[F[_]: Async](dispatcher: Dispatcher[F]): NettyCatsServerOptions[F] = customInterceptors(dispatcher).options

def default[F[_]: Async](interceptors: List[Interceptor[F]], dispatcher: Dispatcher[F]): NettyCatsServerOptions[F] =
NettyCatsServerOptions(
NettyDefaults.DefaultHost,
NettyDefaults.DefaultPort,
interceptors,
_ => Sync[F].delay(Defaults.createTempFile()),
file => Sync[F].delay(Defaults.deleteFile()(file)),
dispatcher,
NettyOptions.default
)

def customInterceptors[F[_]: Async](dispatcher: Dispatcher[F]): CustomInterceptors[F, Logger => F[Unit], NettyCatsServerOptions[F]] =
CustomInterceptors(
createLogInterceptor =
(sl: ServerLog[Logger => F[Unit]]) => new ServerLogInterceptor[Logger => F[Unit], F](sl, (_, _) => Sync[F].unit),
createOptions = (ci: CustomInterceptors[F, Logger => F[Unit], NettyCatsServerOptions[F]]) => default(ci.interceptors, dispatcher)
).serverLog(defaultServerLog)

def defaultServerLog[F[_]: Async]: ServerLog[Logger => F[Unit]] = DefaultServerLog(
doLogWhenHandled = debugLog[F],
doLogAllDecodeFailures = debugLog[F],
doLogExceptions = (msg: String, ex: Throwable) => log => Sync[F].delay(log.error(msg, ex)),
noLog = _ => Sync[F].unit
)

private def debugLog[F[_]: Async](msg: String, exOpt: Option[Throwable]): Logger => F[Unit] = log =>
Sync[F].delay(NettyDefaults.debugLog(log, msg, exOpt))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package sttp.tapir.server.netty

import com.typesafe.scalalogging.Logger

object NettyDefaults {
val DefaultHost = "localhost"
val DefaultPort = 8080

def debugLog(log: Logger, msg: String, exOpt: Option[Throwable]): Unit =
exOpt match {
case None => log.debug(msg)
case Some(ex) => log.debug(s"$msg; exception: {}", ex)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package sttp.tapir.server.netty

import io.netty.channel._
import sttp.monad.{FutureMonad, MonadError}
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.netty.internal.FutureUtil._
import sttp.tapir.server.netty.internal.{NettyBootstrap, NettyServerHandler}

import java.net.InetSocketAddress
import scala.concurrent.{ExecutionContext, Future}

case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureServerOptions)(implicit ec: ExecutionContext) {
def addEndpoint(se: ServerEndpoint[_, _, _, Any, Future]): NettyFutureServer = addEndpoints(List(se))
def addEndpoint(se: ServerEndpoint[_, _, _, Any, Future], overrideOptions: NettyFutureServerOptions): NettyFutureServer =
addEndpoints(List(se), overrideOptions)
def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, Future]]): NettyFutureServer = addRoute(
NettyFutureServerInterpreter(options).toRoute(ses)
)
def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, Future]], overrideOptions: NettyFutureServerOptions): NettyFutureServer =
addRoute(
NettyFutureServerInterpreter(overrideOptions).toRoute(ses)
)

def addRoute(r: FutureRoute): NettyFutureServer = copy(routes = routes :+ r)
def addRoutes(r: Iterable[FutureRoute]): NettyFutureServer = copy(routes = routes ++ r)

def options(o: NettyFutureServerOptions): NettyFutureServer = copy(options = o)
def host(s: String): NettyFutureServer = copy(options = options.host(s))
def port(p: Int): NettyFutureServer = copy(options = options.port(p))

def start(): Future[NettyFutureServerBinding] = {
val eventLoopGroup = options.nettyOptions.eventLoopGroup()
implicit val monadError: MonadError[Future] = new FutureMonad()
val route = Route.combine(routes)

val channelFuture = NettyBootstrap(
options.nettyOptions,
new NettyServerHandler(route, (f: Future[Unit]) => f),
eventLoopGroup,
options.host,
options.port
)

nettyChannelFutureToScala(channelFuture).map(ch =>
NettyFutureServerBinding(
ch.localAddress().asInstanceOf[InetSocketAddress],
() => stop(ch, eventLoopGroup)
)
)
}

private def stop(ch: Channel, eventLoopGroup: EventLoopGroup): Future[Unit] = {
nettyFutureToScala(ch.close()).flatMap { _ =>
if (options.nettyOptions.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ())
} else Future.successful(())
}
}
}

object NettyFutureServer {
def apply(serverOptions: NettyFutureServerOptions = NettyFutureServerOptions.default)(implicit ec: ExecutionContext): NettyFutureServer =
NettyFutureServer(Vector.empty, serverOptions)
}

case class NettyFutureServerBinding(localSocket: InetSocketAddress, stop: () => Future[Unit]) {
def host: String = localSocket.getHostString
def port: Int = localSocket.getPort
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package sttp.tapir.server.netty

import sttp.monad.FutureMonad
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.netty.internal.NettyServerInterpreter

import scala.concurrent.{ExecutionContext, Future}

trait NettyFutureServerInterpreter {
def nettyServerOptions: NettyFutureServerOptions

def toRoute(
ses: List[ServerEndpoint[_, _, _, Any, Future]]
)(implicit ec: ExecutionContext): FutureRoute = {
implicit val monad: FutureMonad = new FutureMonad()
NettyServerInterpreter.toRoute(ses, nettyServerOptions.interceptors, nettyServerOptions.createFile, nettyServerOptions.deleteFile)
}
}

object NettyFutureServerInterpreter {
def apply(serverOptions: NettyFutureServerOptions = NettyFutureServerOptions.default): NettyFutureServerInterpreter = {
new NettyFutureServerInterpreter {
override def nettyServerOptions: NettyFutureServerOptions = serverOptions
}
}
}
Loading