-
Notifications
You must be signed in to change notification settings - Fork 423
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1485 from softwaremill/netty-cats
Implement the netty interpreter for cats-effect
- Loading branch information
Showing
27 changed files
with
603 additions
and
227 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
77 changes: 77 additions & 0 deletions
77
server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServer.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package sttp.tapir.server.netty | ||
|
||
import cats.effect.{Async, IO, Resource} | ||
import cats.effect.std.Dispatcher | ||
import cats.syntax.all._ | ||
import io.netty.channel._ | ||
import sttp.monad.MonadError | ||
import sttp.tapir.server.ServerEndpoint | ||
import sttp.tapir.server.netty.internal.CatsUtil._ | ||
import sttp.tapir.server.netty.internal.{NettyBootstrap, NettyServerHandler} | ||
|
||
import java.net.InetSocketAddress | ||
|
||
case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: NettyCatsServerOptions[F]) { | ||
def addEndpoint(se: ServerEndpoint[_, _, _, Any, F]): NettyCatsServer[F] = addEndpoints(List(se)) | ||
def addEndpoint(se: ServerEndpoint[_, _, _, Any, F], overrideOptions: NettyCatsServerOptions[F]): NettyCatsServer[F] = | ||
addEndpoints(List(se), overrideOptions) | ||
def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, F]]): NettyCatsServer[F] = addRoute( | ||
NettyCatsServerInterpreter(options).toRoute(ses) | ||
) | ||
def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, F]], overrideOptions: NettyCatsServerOptions[F]): NettyCatsServer[F] = addRoute( | ||
NettyCatsServerInterpreter(overrideOptions).toRoute(ses) | ||
) | ||
|
||
def addRoute(r: Route[F]): NettyCatsServer[F] = copy(routes = routes :+ r) | ||
def addRoutes(r: Iterable[Route[F]]): NettyCatsServer[F] = copy(routes = routes ++ r) | ||
|
||
def options(o: NettyCatsServerOptions[F]): NettyCatsServer[F] = copy(options = o) | ||
def host(s: String): NettyCatsServer[F] = copy(options = options.host(s)) | ||
def port(p: Int): NettyCatsServer[F] = copy(options = options.port(p)) | ||
|
||
def start(): F[NettyCatsServerBinding[F]] = Async[F].defer { | ||
val eventLoopGroup = options.nettyOptions.eventLoopGroup() | ||
implicit val monadError: MonadError[F] = new CatsMonadError[F]() | ||
val route: Route[F] = Route.combine(routes) | ||
|
||
val channelFuture = NettyBootstrap( | ||
options.nettyOptions, | ||
new NettyServerHandler(route, (f: F[Unit]) => options.dispatcher.unsafeToFuture(f)), | ||
eventLoopGroup, | ||
options.host, | ||
options.port | ||
) | ||
|
||
nettyChannelFutureToScala(channelFuture).map(ch => | ||
NettyCatsServerBinding( | ||
ch.localAddress().asInstanceOf[InetSocketAddress], | ||
() => stop(ch, eventLoopGroup) | ||
) | ||
) | ||
} | ||
|
||
private def stop(ch: Channel, eventLoopGroup: EventLoopGroup): F[Unit] = { | ||
Async[F].defer { | ||
nettyFutureToScala(ch.close()).flatMap { _ => | ||
if (options.nettyOptions.shutdownEventLoopGroupOnClose) { | ||
nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ()) | ||
} else Async[F].unit | ||
} | ||
} | ||
} | ||
} | ||
|
||
object NettyCatsServer { | ||
def apply[F[_]: Async](dispatcher: Dispatcher[F]): NettyCatsServer[F] = | ||
NettyCatsServer(Vector.empty, NettyCatsServerOptions.default[F](dispatcher)) | ||
|
||
def apply[F[_]: Async](options: NettyCatsServerOptions[F]): NettyCatsServer[F] = | ||
NettyCatsServer(Vector.empty, options) | ||
|
||
def io(): Resource[IO, NettyCatsServer[IO]] = Dispatcher[IO].map(apply[IO](_)) | ||
} | ||
|
||
case class NettyCatsServerBinding[F[_]](localSocket: InetSocketAddress, stop: () => F[Unit]) { | ||
def host: String = localSocket.getHostString | ||
def port: Int = localSocket.getPort | ||
} |
33 changes: 33 additions & 0 deletions
33
server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerInterpreter.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package sttp.tapir.server.netty | ||
|
||
import cats.effect.Async | ||
import cats.effect.std.Dispatcher | ||
import sttp.monad.MonadError | ||
import sttp.tapir.server.ServerEndpoint | ||
import sttp.tapir.server.netty.internal.CatsUtil.CatsMonadError | ||
import sttp.tapir.server.netty.internal.NettyServerInterpreter | ||
|
||
trait NettyCatsServerInterpreter[F[_]] { | ||
implicit def async: Async[F] | ||
def nettyServerOptions: NettyCatsServerOptions[F] | ||
|
||
def toRoute(ses: List[ServerEndpoint[_, _, _, Any, F]]): Route[F] = { | ||
implicit val monad: MonadError[F] = new CatsMonadError[F] | ||
NettyServerInterpreter.toRoute(ses, nettyServerOptions.interceptors, nettyServerOptions.createFile, nettyServerOptions.deleteFile) | ||
} | ||
} | ||
|
||
object NettyCatsServerInterpreter { | ||
def apply[F[_]](dispatcher: Dispatcher[F])(implicit _fa: Async[F]): NettyCatsServerInterpreter[F] = { | ||
new NettyCatsServerInterpreter[F] { | ||
override implicit def async: Async[F] = _fa | ||
override def nettyServerOptions: NettyCatsServerOptions[F] = NettyCatsServerOptions.default[F](dispatcher)(async) | ||
} | ||
} | ||
def apply[F[_]](options: NettyCatsServerOptions[F])(implicit _fa: Async[F]): NettyCatsServerInterpreter[F] = { | ||
new NettyCatsServerInterpreter[F] { | ||
override implicit def async: Async[F] = _fa | ||
override def nettyServerOptions: NettyCatsServerOptions[F] = options | ||
} | ||
} | ||
} |
58 changes: 58 additions & 0 deletions
58
server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyCatsServerOptions.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package sttp.tapir.server.netty | ||
|
||
import cats.effect.std.Dispatcher | ||
import cats.effect.{Async, Sync} | ||
import com.typesafe.scalalogging.Logger | ||
import sttp.tapir.{Defaults, TapirFile} | ||
import sttp.tapir.model.ServerRequest | ||
import sttp.tapir.server.interceptor.log.{DefaultServerLog, ServerLog, ServerLogInterceptor} | ||
import sttp.tapir.server.interceptor.{CustomInterceptors, Interceptor} | ||
|
||
case class NettyCatsServerOptions[F[_]]( | ||
host: String, | ||
port: Int, | ||
interceptors: List[Interceptor[F]], | ||
createFile: ServerRequest => F[TapirFile], | ||
deleteFile: TapirFile => F[Unit], | ||
dispatcher: Dispatcher[F], | ||
nettyOptions: NettyOptions | ||
) { | ||
def host(s: String): NettyCatsServerOptions[F] = copy(host = s) | ||
def port(p: Int): NettyCatsServerOptions[F] = copy(port = p) | ||
def randomPort: NettyCatsServerOptions[F] = port(0) | ||
def prependInterceptor(i: Interceptor[F]): NettyCatsServerOptions[F] = copy(interceptors = i :: interceptors) | ||
def appendInterceptor(i: Interceptor[F]): NettyCatsServerOptions[F] = copy(interceptors = interceptors :+ i) | ||
def nettyOptions(o: NettyOptions): NettyCatsServerOptions[F] = copy(nettyOptions = o) | ||
} | ||
|
||
object NettyCatsServerOptions { | ||
def default[F[_]: Async](dispatcher: Dispatcher[F]): NettyCatsServerOptions[F] = customInterceptors(dispatcher).options | ||
|
||
def default[F[_]: Async](interceptors: List[Interceptor[F]], dispatcher: Dispatcher[F]): NettyCatsServerOptions[F] = | ||
NettyCatsServerOptions( | ||
NettyDefaults.DefaultHost, | ||
NettyDefaults.DefaultPort, | ||
interceptors, | ||
_ => Sync[F].delay(Defaults.createTempFile()), | ||
file => Sync[F].delay(Defaults.deleteFile()(file)), | ||
dispatcher, | ||
NettyOptions.default | ||
) | ||
|
||
def customInterceptors[F[_]: Async](dispatcher: Dispatcher[F]): CustomInterceptors[F, Logger => F[Unit], NettyCatsServerOptions[F]] = | ||
CustomInterceptors( | ||
createLogInterceptor = | ||
(sl: ServerLog[Logger => F[Unit]]) => new ServerLogInterceptor[Logger => F[Unit], F](sl, (_, _) => Sync[F].unit), | ||
createOptions = (ci: CustomInterceptors[F, Logger => F[Unit], NettyCatsServerOptions[F]]) => default(ci.interceptors, dispatcher) | ||
).serverLog(defaultServerLog) | ||
|
||
def defaultServerLog[F[_]: Async]: ServerLog[Logger => F[Unit]] = DefaultServerLog( | ||
doLogWhenHandled = debugLog[F], | ||
doLogAllDecodeFailures = debugLog[F], | ||
doLogExceptions = (msg: String, ex: Throwable) => log => Sync[F].delay(log.error(msg, ex)), | ||
noLog = _ => Sync[F].unit | ||
) | ||
|
||
private def debugLog[F[_]: Async](msg: String, exOpt: Option[Throwable]): Logger => F[Unit] = log => | ||
Sync[F].delay(NettyDefaults.debugLog(log, msg, exOpt)) | ||
} |
14 changes: 14 additions & 0 deletions
14
server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyDefaults.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package sttp.tapir.server.netty | ||
|
||
import com.typesafe.scalalogging.Logger | ||
|
||
object NettyDefaults { | ||
val DefaultHost = "localhost" | ||
val DefaultPort = 8080 | ||
|
||
def debugLog(log: Logger, msg: String, exOpt: Option[Throwable]): Unit = | ||
exOpt match { | ||
case None => log.debug(msg) | ||
case Some(ex) => log.debug(s"$msg; exception: {}", ex) | ||
} | ||
} |
69 changes: 69 additions & 0 deletions
69
server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
package sttp.tapir.server.netty | ||
|
||
import io.netty.channel._ | ||
import sttp.monad.{FutureMonad, MonadError} | ||
import sttp.tapir.server.ServerEndpoint | ||
import sttp.tapir.server.netty.internal.FutureUtil._ | ||
import sttp.tapir.server.netty.internal.{NettyBootstrap, NettyServerHandler} | ||
|
||
import java.net.InetSocketAddress | ||
import scala.concurrent.{ExecutionContext, Future} | ||
|
||
case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureServerOptions)(implicit ec: ExecutionContext) { | ||
def addEndpoint(se: ServerEndpoint[_, _, _, Any, Future]): NettyFutureServer = addEndpoints(List(se)) | ||
def addEndpoint(se: ServerEndpoint[_, _, _, Any, Future], overrideOptions: NettyFutureServerOptions): NettyFutureServer = | ||
addEndpoints(List(se), overrideOptions) | ||
def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, Future]]): NettyFutureServer = addRoute( | ||
NettyFutureServerInterpreter(options).toRoute(ses) | ||
) | ||
def addEndpoints(ses: List[ServerEndpoint[_, _, _, Any, Future]], overrideOptions: NettyFutureServerOptions): NettyFutureServer = | ||
addRoute( | ||
NettyFutureServerInterpreter(overrideOptions).toRoute(ses) | ||
) | ||
|
||
def addRoute(r: FutureRoute): NettyFutureServer = copy(routes = routes :+ r) | ||
def addRoutes(r: Iterable[FutureRoute]): NettyFutureServer = copy(routes = routes ++ r) | ||
|
||
def options(o: NettyFutureServerOptions): NettyFutureServer = copy(options = o) | ||
def host(s: String): NettyFutureServer = copy(options = options.host(s)) | ||
def port(p: Int): NettyFutureServer = copy(options = options.port(p)) | ||
|
||
def start(): Future[NettyFutureServerBinding] = { | ||
val eventLoopGroup = options.nettyOptions.eventLoopGroup() | ||
implicit val monadError: MonadError[Future] = new FutureMonad() | ||
val route = Route.combine(routes) | ||
|
||
val channelFuture = NettyBootstrap( | ||
options.nettyOptions, | ||
new NettyServerHandler(route, (f: Future[Unit]) => f), | ||
eventLoopGroup, | ||
options.host, | ||
options.port | ||
) | ||
|
||
nettyChannelFutureToScala(channelFuture).map(ch => | ||
NettyFutureServerBinding( | ||
ch.localAddress().asInstanceOf[InetSocketAddress], | ||
() => stop(ch, eventLoopGroup) | ||
) | ||
) | ||
} | ||
|
||
private def stop(ch: Channel, eventLoopGroup: EventLoopGroup): Future[Unit] = { | ||
nettyFutureToScala(ch.close()).flatMap { _ => | ||
if (options.nettyOptions.shutdownEventLoopGroupOnClose) { | ||
nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ()) | ||
} else Future.successful(()) | ||
} | ||
} | ||
} | ||
|
||
object NettyFutureServer { | ||
def apply(serverOptions: NettyFutureServerOptions = NettyFutureServerOptions.default)(implicit ec: ExecutionContext): NettyFutureServer = | ||
NettyFutureServer(Vector.empty, serverOptions) | ||
} | ||
|
||
case class NettyFutureServerBinding(localSocket: InetSocketAddress, stop: () => Future[Unit]) { | ||
def host: String = localSocket.getHostString | ||
def port: Int = localSocket.getPort | ||
} |
26 changes: 26 additions & 0 deletions
26
...er/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerInterpreter.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package sttp.tapir.server.netty | ||
|
||
import sttp.monad.FutureMonad | ||
import sttp.tapir.server.ServerEndpoint | ||
import sttp.tapir.server.netty.internal.NettyServerInterpreter | ||
|
||
import scala.concurrent.{ExecutionContext, Future} | ||
|
||
trait NettyFutureServerInterpreter { | ||
def nettyServerOptions: NettyFutureServerOptions | ||
|
||
def toRoute( | ||
ses: List[ServerEndpoint[_, _, _, Any, Future]] | ||
)(implicit ec: ExecutionContext): FutureRoute = { | ||
implicit val monad: FutureMonad = new FutureMonad() | ||
NettyServerInterpreter.toRoute(ses, nettyServerOptions.interceptors, nettyServerOptions.createFile, nettyServerOptions.deleteFile) | ||
} | ||
} | ||
|
||
object NettyFutureServerInterpreter { | ||
def apply(serverOptions: NettyFutureServerOptions = NettyFutureServerOptions.default): NettyFutureServerInterpreter = { | ||
new NettyFutureServerInterpreter { | ||
override def nettyServerOptions: NettyFutureServerOptions = serverOptions | ||
} | ||
} | ||
} |
Oops, something went wrong.