From 2be43a10e1f3b206fc624f0e4e3578a27292cd4f Mon Sep 17 00:00:00 2001 From: Krzysiek Ciesielski Date: Wed, 21 Feb 2024 11:30:29 +0100 Subject: [PATCH] Perf test: WebSockets (pekko-http) (#3526) --- build.sbt | 9 +- .../sttp/tapir/perf/apis/Endpoints.scala | 2 + .../sttp/tapir/perf/apis/ServerRunner.scala | 1 - .../scala/sttp/tapir/perf/http4s/Http4s.scala | 7 +- .../sttp/tapir/perf/pekko/PekkoHttp.scala | 104 ++++++++++++------ 5 files changed, 82 insertions(+), 41 deletions(-) diff --git a/build.sbt b/build.sbt index b76089906e..989885e2e3 100644 --- a/build.sbt +++ b/build.sbt @@ -499,6 +499,11 @@ lazy val tests: ProjectMatrix = (projectMatrix in file("tests")) ) .dependsOn(core, files, circeJson, cats) +lazy val perfServerJavaOptions = List( + "-Xms16g", + "-Xmx16g", + "-XX:+AlwaysPreTouch" +) lazy val flightRecordingJavaOpts = "-XX:StartFlightRecording=filename=recording.jfr,dumponexit=true,duration=120s" lazy val perfTests: ProjectMatrix = (projectMatrix in file("perf-tests")) @@ -530,8 +535,8 @@ lazy val perfTests: ProjectMatrix = (projectMatrix in file("perf-tests")) .settings( fork := true, connectInput := true, - Compile / run / javaOptions += flightRecordingJavaOpts, - Test / run / javaOptions -= flightRecordingJavaOpts + Compile / run / javaOptions ++= flightRecordingJavaOpts :: perfServerJavaOptions, + Test / run / javaOptions --= flightRecordingJavaOpts :: perfServerJavaOptions ) .jvmPlatform(scalaVersions = List(scala2_13)) .dependsOn(core, pekkoHttpServer, http4sServer, nettyServer, nettyServerCats, playServer, vertxServer, vertxServerCats) diff --git a/perf-tests/src/main/scala/sttp/tapir/perf/apis/Endpoints.scala b/perf-tests/src/main/scala/sttp/tapir/perf/apis/Endpoints.scala index 58f3667ab6..9359682160 100644 --- a/perf-tests/src/main/scala/sttp/tapir/perf/apis/Endpoints.scala +++ b/perf-tests/src/main/scala/sttp/tapir/perf/apis/Endpoints.scala @@ -59,6 +59,8 @@ trait Endpoints { ) } + val wsBaseEndpoint = endpoint.get.in("ws" / "ts") + def genServerEndpoints[F[_]](routeCount: Int)(reply: String => F[String]): List[ServerEndpoint[Any, F]] = serverEndpoints[F](reply).flatMap(gen => (0 to routeCount).map(i => gen(i))) diff --git a/perf-tests/src/main/scala/sttp/tapir/perf/apis/ServerRunner.scala b/perf-tests/src/main/scala/sttp/tapir/perf/apis/ServerRunner.scala index 4ed9ae1738..679aacccc2 100644 --- a/perf-tests/src/main/scala/sttp/tapir/perf/apis/ServerRunner.scala +++ b/perf-tests/src/main/scala/sttp/tapir/perf/apis/ServerRunner.scala @@ -1,7 +1,6 @@ package sttp.tapir.perf.apis import cats.effect.{ExitCode, IO, IOApp} -import sttp.tapir.perf.Common._ import scala.reflect.runtime.universe diff --git a/perf-tests/src/main/scala/sttp/tapir/perf/http4s/Http4s.scala b/perf-tests/src/main/scala/sttp/tapir/perf/http4s/Http4s.scala index 92592c269a..12db65e1e9 100644 --- a/perf-tests/src/main/scala/sttp/tapir/perf/http4s/Http4s.scala +++ b/perf-tests/src/main/scala/sttp/tapir/perf/http4s/Http4s.scala @@ -16,7 +16,7 @@ import sttp.tapir.integ.cats.effect.CatsMonadError import sttp.tapir.perf.Common._ import sttp.tapir.perf.apis._ import sttp.tapir.server.http4s.{Http4sServerInterpreter, Http4sServerOptions} -import sttp.tapir.{CodecFormat, endpoint, webSocketBody} +import sttp.tapir.{CodecFormat, webSocketBody} object Http4sCommon { // Websocket response is returned with a lag, so that we can have more concurrent users talking to the server. @@ -74,8 +74,7 @@ object Tapir extends Endpoints { implicit val mErr: MonadError[IO] = new CatsMonadError[IO] - private val wsEndpoint = endpoint.get - .in("ts") + private val wsEndpoint = wsBaseEndpoint .out( webSocketBody[Long, CodecFormat.TextPlain, Long, CodecFormat.TextPlain](Fs2Streams[IO]) .concatenateFragmentedFrames(false) @@ -95,7 +94,7 @@ object Tapir extends Endpoints { def wsApp(withServerLog: Boolean = false): WebSocketBuilder2[IO] => HttpApp[IO] = { wsb => val serverOptions = buildOptions(Http4sServerOptions.customiseInterceptors[IO], withServerLog) - Router("/ws" -> { + Router("/" -> { Http4sServerInterpreter[IO](serverOptions) .toWebSocketRoutes( wsEndpoint.serverLogicSuccess(_ => diff --git a/perf-tests/src/main/scala/sttp/tapir/perf/pekko/PekkoHttp.scala b/perf-tests/src/main/scala/sttp/tapir/perf/pekko/PekkoHttp.scala index 94f1feb862..0abad119a8 100644 --- a/perf-tests/src/main/scala/sttp/tapir/perf/pekko/PekkoHttp.scala +++ b/perf-tests/src/main/scala/sttp/tapir/perf/pekko/PekkoHttp.scala @@ -4,64 +4,100 @@ import cats.effect.IO import org.apache.pekko.actor.ActorSystem import org.apache.pekko.http.scaladsl.Http import org.apache.pekko.http.scaladsl.model.HttpEntity +import org.apache.pekko.http.scaladsl.model.ws.{Message, TextMessage} import org.apache.pekko.http.scaladsl.server.Directives._ import org.apache.pekko.http.scaladsl.server.Route -import org.apache.pekko.stream.scaladsl.FileIO +import org.apache.pekko.stream.scaladsl.{FileIO, Flow, Sink, Source} +import sttp.capabilities.pekko.PekkoStreams import sttp.tapir.perf.Common._ import sttp.tapir.perf.apis._ import sttp.tapir.server.pekkohttp.{PekkoHttpServerInterpreter, PekkoHttpServerOptions} -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} + +object PekkoCommon { + // Define a source that emits the current timestamp every 100 milliseconds + // We have to .take exact number of items, which will guarantee returning a proper final frame. Otherwise the stream would never end, + // causing Gatling to hang for long and then fail. + val timestampSource = Source.tick(0.seconds, 100.milliseconds, ()).take(WebSocketRequestsPerUser.toLong).map { _ => + System.currentTimeMillis() + } +} object Vanilla { + val wsRoute: Route = path("ws" / "ts") { + handleWebSocketMessages(timeStampWebSocketFlow.map(ts => TextMessage(ts.toString))) + } + + def timeStampWebSocketFlow: Flow[Message, Long, Any] = { + // Incoming messages are ignored, but we need to define a sink for them + val sink = Flow[Message].to(Sink.ignore) + Flow.fromSinkAndSource(sink, PekkoCommon.timestampSource) + } + val router: Int => ActorSystem => Route = (nRoutes: Int) => (_: ActorSystem) => concat( - (0 to nRoutes).flatMap { (n: Int) => - List( - get { - path(("path" + n.toString) / IntNumber) { id => - complete((n + id).toString) - } - }, - post { - path(("path" + n.toString)) { - entity(as[String]) { _ => - complete((n).toString) + wsRoute :: + (0 to nRoutes).flatMap { (n: Int) => + List( + get { + path(("path" + n.toString) / IntNumber) { id => + complete((n + id).toString) } - } - }, - post { - path(("pathBytes" + n.toString)) { - entity(as[Array[Byte]]) { bytes => - complete(s"Received ${bytes.length} bytes") + }, + post { + path(("path" + n.toString)) { + entity(as[String]) { _ => + complete((n).toString) + } } - } - }, - post { - path(("pathFile" + n.toString)) { - extractRequestContext { ctx => - entity(as[HttpEntity]) { httpEntity => - val path = newTempFilePath() - val sink = FileIO.toPath(path) - val finishedWriting = httpEntity.dataBytes.runWith(sink)(ctx.materializer) - onSuccess(finishedWriting) { _ => - complete(s"File saved to $path") + }, + post { + path(("pathBytes" + n.toString)) { + entity(as[Array[Byte]]) { bytes => + complete(s"Received ${bytes.length} bytes") + } + } + }, + post { + path(("pathFile" + n.toString)) { + extractRequestContext { ctx => + entity(as[HttpEntity]) { httpEntity => + val path = newTempFilePath() + val sink = FileIO.toPath(path) + val finishedWriting = httpEntity.dataBytes.runWith(sink)(ctx.materializer) + onSuccess(finishedWriting) { _ => + complete(s"File saved to $path") + } } } } } - } - ) - }: _* + ) + }.toList: _* ) } object Tapir extends Endpoints { + import sttp.tapir._ + val wsSink = Flow[Long].to(Sink.ignore) + val wsEndpoint = wsBaseEndpoint + .out( + webSocketBody[Long, CodecFormat.TextPlain, Long, CodecFormat.TextPlain](PekkoStreams) + .concatenateFragmentedFrames(false) + ) + val wsServerEndpoint = wsEndpoint.serverLogicSuccess[Future] { _ => + Future.successful { + Flow.fromSinkAndSource(wsSink, PekkoCommon.timestampSource) + } + } + def router(nRoutes: Int, withServerLog: Boolean = false): ActorSystem => Route = { (actorSystem: ActorSystem) => val serverOptions = buildOptions(PekkoHttpServerOptions.customiseInterceptors(ExecutionContext.Implicits.global), withServerLog) PekkoHttpServerInterpreter(serverOptions)(actorSystem.dispatcher).toRoute( - genEndpointsFuture(nRoutes) + wsServerEndpoint :: genEndpointsFuture(nRoutes) ) } }