Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix exposing both WS and REST endpoints in http4s perf tests #3531

Merged
merged 2 commits into from
Feb 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 65 additions & 68 deletions perf-tests/src/main/scala/sttp/tapir/perf/http4s/Http4s.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sttp.tapir.perf.http4s

import cats.effect._
import cats.syntax.all._
import fs2._
import fs2.io.file.{Files, Path => Fs2Path}
import org.http4s._
Expand All @@ -26,48 +27,47 @@ object Http4sCommon {
}

object Vanilla {
val router: Int => HttpRoutes[IO] = (nRoutes: Int) =>
Router(
(0 to nRoutes).map((n: Int) =>
("/") -> {
val dsl = Http4sDsl[IO]
import dsl._
HttpRoutes.of[IO] {
case GET -> Root / s"path$n" / IntVar(id) =>
Ok((id + n.toInt).toString)
case req @ POST -> Root / s"path$n" =>
req.as[String].flatMap { str =>
Ok(s"Ok [$n], string length = ${str.length}")
}
case req @ POST -> Root / s"pathBytes$n" =>
req.as[Array[Byte]].flatMap { bytes =>
Ok(s"Ok [$n], bytes length = ${bytes.length}")
}
case req @ POST -> Root / s"pathFile$n" =>
val filePath = newTempFilePath()
val sink = Files[IO].writeAll(Fs2Path.fromNioPath(filePath))
req.body
.through(sink)
.compile
.drain
.flatMap(_ => Ok(s"Ok [$n], file saved to ${filePath.toAbsolutePath.toString}"))
}
}
): _*
)
val dsl = Http4sDsl[IO]
import dsl._
val receive: Pipe[IO, WebSocketFrame, Unit] = _.as(())
val wsResponseStream = Http4sCommon.wsResponseStream.evalMap(_ => IO.realTime.map(ts => WebSocketFrame.Text(s"${ts.toMillis}")))

def webSocketApp(wsb: WebSocketBuilder2[IO]): HttpApp[IO] = {
val dsl = new Http4sDsl[IO] {}
import dsl._
val receive: Pipe[IO, WebSocketFrame, Unit] = _.as(())
val responseStream = Http4sCommon.wsResponseStream.evalMap(_ => IO.realTime.map(ts => WebSocketFrame.Text(s"${ts.toMillis}")))

HttpRoutes
.of[IO] { case GET -> Root / "ws" / "ts" =>
wsb.withFilterPingPongs(true).build(responseStream, receive)
}
.orNotFound
}
val router: Int => WebSocketBuilder2[IO] => HttpRoutes[IO] = (nRoutes: Int) =>
wsb =>
Router(
(("/ws") -> {
HttpRoutes
.of[IO] { case GET -> Root / "ts" =>
wsb.withFilterPingPongs(true).build(wsResponseStream, receive)
}
}) ::
(0 to nRoutes)
.map((n: Int) =>
("/") -> {
HttpRoutes.of[IO] {
case GET -> Root / s"path$n" / IntVar(id) =>
Ok((id + n.toInt).toString)
case req @ POST -> Root / s"path$n" =>
req.as[String].flatMap { str =>
Ok(s"Ok [$n], string length = ${str.length}")
}
case req @ POST -> Root / s"pathBytes$n" =>
req.as[Array[Byte]].flatMap { bytes =>
Ok(s"Ok [$n], bytes length = ${bytes.length}")
}
case req @ POST -> Root / s"pathFile$n" =>
val filePath = newTempFilePath()
val sink = Files[IO].writeAll(Fs2Path.fromNioPath(filePath))
req.body
.through(sink)
.compile
.drain
.flatMap(_ => Ok(s"Ok [$n], file saved to ${filePath.toAbsolutePath.toString}"))
}
}
)
.toList: _*
)
}

object Tapir extends Endpoints {
Expand All @@ -83,38 +83,35 @@ object Tapir extends Endpoints {
.autoPing(None)
)

def router(nRoutes: Int, withServerLog: Boolean = false): HttpRoutes[IO] = {
def router(nRoutes: Int, withServerLog: Boolean = false)(wsb: WebSocketBuilder2[IO]): HttpRoutes[IO] = {
val serverOptions = buildOptions(Http4sServerOptions.customiseInterceptors[IO], withServerLog)
Router("/" -> {
Http4sServerInterpreter[IO](serverOptions).toRoutes(
genEndpointsIO(nRoutes)
val interpreter = Http4sServerInterpreter[IO](serverOptions)
Router(
(
(("/") -> {
interpreter
.toWebSocketRoutes(
wsEndpoint.serverLogicSuccess(_ =>
IO.pure { (in: Stream[IO, Long]) =>
Http4sCommon.wsResponseStream.evalMap(_ => IO.realTime.map(_.toMillis)).concurrently(in.as(()))
}
)
)(wsb) <+> interpreter.toRoutes(genEndpointsIO(nRoutes))
})
)
})
}

def wsApp(withServerLog: Boolean = false): WebSocketBuilder2[IO] => HttpApp[IO] = { wsb =>
val serverOptions = buildOptions(Http4sServerOptions.customiseInterceptors[IO], withServerLog)
Router("/" -> {
Http4sServerInterpreter[IO](serverOptions)
.toWebSocketRoutes(
wsEndpoint.serverLogicSuccess(_ =>
IO.pure { (in: Stream[IO, Long]) =>
Http4sCommon.wsResponseStream.evalMap(_ => IO.realTime.map(_.toMillis)).concurrently(in.as(()))
}
)
)(wsb)
}).orNotFound
)
}
}

object server {
val maxConnections = 65536
val connectorPoolSize: Int = Math.max(2, Runtime.getRuntime.availableProcessors() / 4)
def runServer(router: HttpRoutes[IO], webSocketApp: WebSocketBuilder2[IO] => HttpApp[IO]): IO[ServerRunner.KillSwitch] =
def runServer(
router: WebSocketBuilder2[IO] => HttpRoutes[IO]
): IO[ServerRunner.KillSwitch] =
BlazeServerBuilder[IO]
.bindHttp(Port, "localhost")
.withHttpApp(router.orNotFound)
.withHttpWebSocketApp(webSocketApp)
.withHttpWebSocketApp(wsb => router(wsb).orNotFound)
.withMaxConnections(maxConnections)
.withConnectorPoolSize(connectorPoolSize)
.resource
Expand All @@ -125,10 +122,10 @@ object server {
})
}

object TapirServer extends ServerRunner { override def start = server.runServer(Tapir.router(1), Tapir.wsApp()) }
object TapirMultiServer extends ServerRunner { override def start = server.runServer(Tapir.router(128), Tapir.wsApp()) }
object TapirServer extends ServerRunner { override def start = server.runServer(Tapir.router(1)) }
object TapirMultiServer extends ServerRunner { override def start = server.runServer(Tapir.router(128)) }
object TapirInterceptorMultiServer extends ServerRunner {
override def start = server.runServer(Tapir.router(128, withServerLog = true), Tapir.wsApp(withServerLog = true))
override def start = server.runServer(Tapir.router(128, withServerLog = true))
}
object VanillaServer extends ServerRunner { override def start = server.runServer(Vanilla.router(1), Vanilla.webSocketApp) }
object VanillaMultiServer extends ServerRunner { override def start = server.runServer(Vanilla.router(128), Vanilla.webSocketApp) }
object VanillaServer extends ServerRunner { override def start = server.runServer(Vanilla.router(1)) }
object VanillaMultiServer extends ServerRunner { override def start = server.runServer(Vanilla.router(128)) }
Loading