Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf test: WebSockets (Vert.X) #3527

Merged
merged 2 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 101 additions & 12 deletions perf-tests/src/main/scala/sttp/tapir/perf/vertx/Vertx.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,115 @@ package sttp.tapir.perf.vertx

import _root_.cats.effect.IO
import _root_.cats.effect.kernel.Resource
import io.vertx.core.http.HttpServerOptions
import io.vertx.core.{Future => VFuture, Vertx}
import io.vertx.core.http.{HttpServerOptions, ServerWebSocket}
import io.vertx.core.streams.ReadStream
import io.vertx.core.{Future => VFuture, Handler, Vertx}
import io.vertx.ext.web.handler.BodyHandler
import io.vertx.ext.web.{Route, Router, RoutingContext}
import sttp.tapir.perf.Common._
import sttp.tapir.perf.apis.{Endpoints, ServerRunner}
import sttp.tapir.server.vertx.VertxFutureServerInterpreter
import sttp.tapir.server.vertx.VertxFutureServerOptions
import sttp.tapir.server.vertx.streams.VertxStreams
import sttp.tapir.server.vertx.{VertxFutureServerInterpreter, VertxFutureServerOptions}

import scala.concurrent.Future

object Tapir extends Endpoints {
def route(nRoutes: Int, withServerLog: Boolean = false): Router => Route = { router =>
val serverOptions = buildOptions(VertxFutureServerOptions.customiseInterceptors, withServerLog)
val interpreter = VertxFutureServerInterpreter(serverOptions)
genEndpointsFuture(nRoutes).map(interpreter.route(_)(router)).last
import sttp.tapir._
def route(nRoutes: Int, withServerLog: Boolean = false): Vertx => Router => Route = { vertx =>
router =>
val serverOptions = buildOptions(VertxFutureServerOptions.customiseInterceptors, withServerLog)
val interpreter = VertxFutureServerInterpreter(serverOptions)
val wsEndpoint = wsBaseEndpoint
.out(
webSocketBody[Long, CodecFormat.TextPlain, Long, CodecFormat.TextPlain](VertxStreams)
.concatenateFragmentedFrames(false)
)

val laggedTimestampPipe: ReadStream[Long] => ReadStream[Long] = { inputStream =>
new ReadStream[Long] {

override def fetch(amount: Long): ReadStream[Long] = this

private var dataHandler: Handler[Long] = _
private var endHandler: Handler[Void] = _
private var exceptionHandler: Handler[Throwable] = _

inputStream.handler(new Handler[Long] {
override def handle(event: Long): Unit = {
vertx.setTimer(
WebSocketSingleResponseLag.toMillis,
_ => {
if (dataHandler != null) dataHandler.handle(System.currentTimeMillis())
}
): Unit
}
})

inputStream.endHandler(new Handler[Void] {
override def handle(e: Void): Unit = {
if (endHandler != null) endHandler.handle(e)
}
})

inputStream.exceptionHandler(new Handler[Throwable] {
override def handle(e: Throwable): Unit = {
if (exceptionHandler != null) exceptionHandler.handle(e)
}
})

override def handler(handler: Handler[Long]): ReadStream[Long] = {
this.dataHandler = handler
this
}

override def pause(): ReadStream[Long] = this
override def resume(): ReadStream[Long] = this

override def endHandler(endHandler: Handler[Void]): ReadStream[Long] = {
this.endHandler = endHandler
this
}

override def exceptionHandler(exceptionHandler: Handler[Throwable]): ReadStream[Long] = {
this.exceptionHandler = exceptionHandler
this
}
}

}

val wsServerEndpoint = wsEndpoint.serverLogicSuccess[Future] { _ =>
Future.successful {
laggedTimestampPipe
}
}
(wsServerEndpoint :: genEndpointsFuture(nRoutes)).map(interpreter.route(_)(router)).last
}
}
object Vanilla extends Endpoints {

def bodyHandler = BodyHandler.create(false).setBodyLimit(LargeInputSize + 100L)
def webSocketHandler(vertx: Vertx): Router => Route = { router =>
router.get("/ws/ts").handler { ctx =>
val wss = ctx.request().toWebSocket()
wss.map {
ws: ServerWebSocket =>
ws.textMessageHandler(_ => ())

// Set a periodic timer to send timestamps every 100 milliseconds
val timerId = vertx.setPeriodic(
WebSocketSingleResponseLag.toMillis,
{ _ =>
ws.writeTextMessage(System.currentTimeMillis().toString): Unit
}
)

def route: Int => Router => Route = { (nRoutes: Int) => router =>
// Close the timer when the WebSocket is closed
ws.closeHandler(_ => vertx.cancelTimer(timerId): Unit)
}: Unit
}
}
def route: Int => Vertx => Router => Route = { (nRoutes: Int) => _ => router =>
(0 until nRoutes).map { n =>
router.get(s"/path$n/:id").handler {
ctx: RoutingContext =>
Expand Down Expand Up @@ -70,14 +158,15 @@ object Vanilla extends Endpoints {
}

object VertxRunner {
def runServer(route: Router => Route): IO[ServerRunner.KillSwitch] = {
def runServer(route: Vertx => Router => Route, wsRoute: Option[Vertx => Router => Route] = None): IO[ServerRunner.KillSwitch] = {
Resource
.make(IO.delay(Vertx.vertx()))(vertx => IO.delay(vertx.close()).void)
.flatMap { vertx =>
val router = Router.router(vertx)
val server = vertx.createHttpServer(new HttpServerOptions().setPort(Port)).requestHandler(router)
val listenIO = vertxFutureToIo(server.listen(Port))
route.apply(router): Unit
wsRoute.foreach(r => r(vertx).apply(router))
route(vertx).apply(router): Unit
Resource.make(listenIO)(s => vertxFutureToIo(s.close()).void)
}
.allocated
Expand All @@ -100,5 +189,5 @@ object TapirMultiServer extends ServerRunner { override def start = VertxRunner.
object TapirInterceptorMultiServer extends ServerRunner {
override def start = VertxRunner.runServer(Tapir.route(128, withServerLog = true))
}
object VanillaServer extends ServerRunner { override def start = VertxRunner.runServer(Vanilla.route(1)) }
object VanillaServer extends ServerRunner { override def start = VertxRunner.runServer(Vanilla.route(1), Some(Vanilla.webSocketHandler)) }
object VanillaMultiServer extends ServerRunner { override def start = VertxRunner.runServer(Vanilla.route(128)) }
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,36 @@ package sttp.tapir.perf.vertx.cats

import cats.effect.IO
import cats.effect.std.Dispatcher
import io.vertx.ext.web.Route
import io.vertx.ext.web.Router
import fs2.Stream
import io.vertx.core.Vertx
import io.vertx.ext.web.{Route, Router}
import sttp.capabilities.fs2.Fs2Streams
import sttp.tapir.perf.Common._
import sttp.tapir.perf.apis.{Endpoints, ServerRunner}
import sttp.tapir.perf.vertx.VertxRunner
import sttp.tapir.server.vertx.cats.VertxCatsServerInterpreter
import sttp.tapir.server.vertx.cats.VertxCatsServerOptions
import sttp.tapir.server.vertx.cats.{VertxCatsServerInterpreter, VertxCatsServerOptions}

object Tapir extends Endpoints {
def route(dispatcher: Dispatcher[IO], withServerLog: Boolean): Int => Router => Route = { (nRoutes: Int) => (router: Router) =>
val serverOptions = buildOptions(VertxCatsServerOptions.customiseInterceptors[IO](dispatcher), withServerLog)
val interpreter = VertxCatsServerInterpreter(serverOptions)
genEndpointsIO(nRoutes).map(interpreter.route(_)(router)).last
import sttp.tapir._
// Websocket response is returned with a lag, so that we can have more concurrent users talking to the server.
// This lag is not relevant for measurements, because the server returns a timestamp after having a response ready to send back,
// so the client can measure only the latency of the server stack handling the response.
val wsResponseStream = Stream.fixedRate[IO](WebSocketSingleResponseLag, dampen = false)

def wsLaggedPipe: fs2.Pipe[IO, Long, Long] = { requestStream =>
wsResponseStream.evalMap(_ => IO.realTime.map(_.toMillis)).concurrently(requestStream.as(()))
}
def route(dispatcher: Dispatcher[IO], withServerLog: Boolean): Int => Vertx => Router => Route = {
(nRoutes: Int) => _ => (router: Router) =>
val serverOptions = buildOptions(VertxCatsServerOptions.customiseInterceptors[IO](dispatcher), withServerLog)
val interpreter = VertxCatsServerInterpreter(serverOptions)
val wsServerEndpoint = wsBaseEndpoint
.out(
webSocketBody[Long, CodecFormat.TextPlain, Long, CodecFormat.TextPlain](Fs2Streams[IO])
.concatenateFragmentedFrames(false)
)
.serverLogicSuccess(_ => IO.pure(wsLaggedPipe))
(wsServerEndpoint :: genEndpointsIO(nRoutes)).map(interpreter.route(_)(router)).last
}
}

Expand Down
Loading