From 4c7d511c65d4fca7d745ec1284df62bdc6da0bee Mon Sep 17 00:00:00 2001 From: Michael Schulte Date: Wed, 13 Sep 2023 08:12:47 +0200 Subject: [PATCH 1/4] vertx: end responses safely --- build.sbt | 6 ++- .../cats/VertxCatsServerInterpreter.scala | 13 ++--- .../tapir/server/vertx/RichResponse.scala | 51 +++++++++++++++++++ .../server/vertx/VertxErrorHandler.scala | 28 ++++++++++ .../vertx/VertxFutureServerInterpreter.scala | 11 +--- .../vertx/encoders/VertxOutputEncoders.scala | 3 +- .../vertx/encoders/VertxToResponseBody.scala | 3 +- .../vertx/zio/VertxZioServerInterpreter.scala | 6 +-- .../vertx/zio/VertxZioServerInterpreter.scala | 6 +-- 9 files changed, 99 insertions(+), 28 deletions(-) create mode 100644 server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala create mode 100644 server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala diff --git a/build.sbt b/build.sbt index 81d5b3632c..c4772738ae 100644 --- a/build.sbt +++ b/build.sbt @@ -158,6 +158,10 @@ lazy val loggerDependencies = Seq( "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5" ) +lazy val slf4jDependencies = Seq( + "org.slf4j" % "slf4j-api" % "2.0.9" +) + lazy val rawAllAggregates = core.projectRefs ++ testing.projectRefs ++ cats.projectRefs ++ @@ -1427,7 +1431,7 @@ lazy val vertxServer: ProjectMatrix = (projectMatrix in file("server/vertx-serve .settings(commonJvmSettings) .settings( name := "tapir-vertx-server", - libraryDependencies ++= Seq( + libraryDependencies ++= slf4jDependencies ++ Seq( "io.vertx" % "vertx-web" % Versions.vertx ) ) diff --git a/server/vertx-server/cats/src/main/scala/sttp/tapir/server/vertx/cats/VertxCatsServerInterpreter.scala b/server/vertx-server/cats/src/main/scala/sttp/tapir/server/vertx/cats/VertxCatsServerInterpreter.scala index b69925e87a..fbab6ef528 100644 --- a/server/vertx-server/cats/src/main/scala/sttp/tapir/server/vertx/cats/VertxCatsServerInterpreter.scala +++ b/server/vertx-server/cats/src/main/scala/sttp/tapir/server/vertx/cats/VertxCatsServerInterpreter.scala @@ -3,7 +3,6 @@ package sttp.tapir.server.vertx.cats import cats.effect.std.Dispatcher import cats.effect.{Async, Sync} import cats.syntax.all._ -import io.vertx.core.logging.LoggerFactory import io.vertx.core.{Future, Handler} import io.vertx.ext.web.{Route, Router, RoutingContext} import sttp.capabilities.{Streams, WebSockets} @@ -12,7 +11,7 @@ import sttp.monad.MonadError import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.interceptor.RequestResult import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} -import sttp.tapir.server.vertx.VertxBodyListener +import sttp.tapir.server.vertx.{VertxBodyListener, VertxErrorHandler} import sttp.tapir.server.vertx.cats.VertxCatsServerInterpreter.{CatsFFromVFuture, CatsRunAsync, monadError} import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest} import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody} @@ -23,9 +22,7 @@ import sttp.tapir.server.vertx.cats.streams.fs2.fs2ReadStreamCompatible import java.util.concurrent.atomic.AtomicReference -trait VertxCatsServerInterpreter[F[_]] extends CommonServerInterpreter { - - private val logger = LoggerFactory.getLogger(VertxCatsServerInterpreter.getClass) +trait VertxCatsServerInterpreter[F[_]] extends CommonServerInterpreter with VertxErrorHandler { implicit def fa: Async[F] @@ -69,11 +66,7 @@ trait VertxCatsServerInterpreter[F[_]] extends CommonServerInterpreter { case RequestResult.Failure(_) => Async[F].delay(rc.next()) case RequestResult.Response(response) => fFromVFuture(VertxOutputEncoders(response).apply(rc)).void } - .handleError { ex => - logger.error("Error while processing the request", ex) - if (rc.response().bytesWritten() > 0) rc.response().end() - rc.fail(ex) - } + .handleError(handleError(rc, _)) // we obtain the cancel token only after the effect is run, so we need to pass it to the exception handler // via a mutable ref; however, before this is done, it's possible an exception has already been reported; diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala new file mode 100644 index 0000000000..5c81a60de1 --- /dev/null +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala @@ -0,0 +1,51 @@ +package sttp.tapir.server.vertx + +import io.vertx.core.Promise +import io.vertx.core.http.HttpServerResponse + +import org.slf4j.{LoggerFactory} + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.concurrent.{Future, ExecutionContext} + +object Helpers { + + /** + * Helper class that implements safer ending of a http server response + */ + implicit class RichResponse(response: HttpServerResponse) { + + /** + * Ends the response if it hasn't ended yet + * @return + * A future that is completed when the response has been ended + */ + def safeEnd(): io.vertx.core.Future[Void] = { + if (!response.ended()) response.end() + else Promise.promise().future() + } + + /** Ends the response if it hasn't ended yet and waits for it to be executed + * + * @param duration + * maximum waiting time + */ + def safeEndWait(duration: FiniteDuration = 2.seconds): Unit = { + import ExecutionContext.Implicits.global + if (!response.ended()) { + try { + Await.result( + Future { + response.end().toCompletionStage().toCompletableFuture().get() + }, + duration + ): Unit + } catch { + case t: Throwable => + LoggerFactory.getLogger(getClass.getName).error("Caught exception while processing end", t) + } + } + } + } +} diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala new file mode 100644 index 0000000000..c9067bde33 --- /dev/null +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala @@ -0,0 +1,28 @@ +package sttp.tapir.server.vertx + +import io.vertx.ext.web.{RoutingContext} +import org.slf4j.{LoggerFactory} + +import sttp.tapir.server.vertx.Helpers.RichResponse + +/** + * Common error handler implementation for all Vertx interpreter classes. + * + * Ends the response of the current routing context safely. + * + * @param rc + * the routing context where the response shall be ended + * @param ex + * exception that occurred during the interpreter call + * @param performLogging + * whether to log an additional message with the exception or not + */ +trait VertxErrorHandler { + def handleError(rc: RoutingContext, ex: Throwable, performLogging: Boolean = true): Unit = { + if (performLogging) { + LoggerFactory.getLogger(getClass.getName).error("Error while processing the request", ex) + } + if (rc.response().bytesWritten() > 0) rc.response().safeEndWait() + rc.fail(ex) + } +} diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala index acd2dccc71..ac1b25ff08 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala @@ -1,6 +1,5 @@ package sttp.tapir.server.vertx -import io.vertx.core.logging.LoggerFactory import io.vertx.core.{Handler, Future => VFuture} import io.vertx.ext.web.{Route, Router, RoutingContext} import sttp.monad.FutureMonad @@ -17,9 +16,7 @@ import sttp.tapir.server.vertx.streams.{ReadStreamCompatible, VertxStreams} import scala.concurrent.{ExecutionContext, Future, Promise} -trait VertxFutureServerInterpreter extends CommonServerInterpreter { - - private val logger = LoggerFactory.getLogger(VertxFutureServerInterpreter.getClass) +trait VertxFutureServerInterpreter extends CommonServerInterpreter with VertxErrorHandler { def vertxFutureServerOptions: VertxFutureServerOptions = VertxFutureServerOptions.default @@ -68,11 +65,7 @@ trait VertxFutureServerInterpreter extends CommonServerInterpreter { case RequestResult.Response(response) => FutureFromVFuture(VertxOutputEncoders(response).apply(rc)) } .failed - .foreach { ex => - logger.error("Error while processing the request", ex) - if (rc.response().bytesWritten() > 0) rc.response().end() - rc.fail(ex) - } + .foreach {handleError(rc, _)} } } diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxOutputEncoders.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxOutputEncoders.scala index 8dedbdff94..cbb879ba11 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxOutputEncoders.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxOutputEncoders.scala @@ -3,6 +3,7 @@ package sttp.tapir.server.vertx.encoders import io.vertx.core.Future import io.vertx.ext.web.RoutingContext import sttp.tapir.server.model.ServerResponse +import sttp.tapir.server.vertx.Helpers.RichResponse import scala.util.control.NonFatal @@ -14,7 +15,7 @@ object VertxOutputEncoders { serverResponse.headers.foreach { h => resp.headers.add(h.name, h.value) } serverResponse.body match { case Some(responseHandler) => responseHandler(rc) - case None => resp.end() + case None => resp.safeEnd() } } catch { case NonFatal(e) => Future.failedFuture(e) diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxToResponseBody.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxToResponseBody.scala index 3272ed7551..e415c6c378 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxToResponseBody.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxToResponseBody.scala @@ -8,6 +8,7 @@ import sttp.capabilities.Streams import sttp.model.{HasHeaders, Part} import sttp.tapir.{CodecFormat, FileRange, RawBodyType, WebSocketBodyOutput} import sttp.tapir.server.interpreter.ToResponseBody +import sttp.tapir.server.vertx.Helpers.RichResponse import sttp.tapir.server.vertx.VertxServerOptions import sttp.tapir.server.vertx.streams.{Pipe, ReadStreamCompatible} @@ -88,7 +89,7 @@ class VertxToResponseBody[F[_], S <: Streams[S]](serverOptions: VertxServerOptio } }) .flatMap { _ => - resp.end() + resp.safeEnd() } } diff --git a/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala b/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala index 110fafed58..84c36f1f14 100644 --- a/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala +++ b/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala @@ -7,6 +7,7 @@ import sttp.capabilities.zio.ZioStreams import sttp.tapir.server.interceptor.RequestResult import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} import sttp.tapir.server.vertx.VertxBodyListener +import sttp.tapir.server.vertx.VertxErrorHandler import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest} import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody} import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync} @@ -18,7 +19,7 @@ import zio._ import java.util.concurrent.atomic.AtomicReference -trait VertxZioServerInterpreter[R] extends CommonServerInterpreter { +trait VertxZioServerInterpreter[R] extends CommonServerInterpreter with VertxErrorHandler { def vertxZioServerOptions[R2 <: R]: VertxZioServerOptions[R2] = VertxZioServerOptions.default[R].widen def route[R2](e: ZServerEndpoint[R2, ZioStreams with WebSockets])(implicit @@ -48,8 +49,7 @@ trait VertxZioServerInterpreter[R] extends CommonServerInterpreter { val serverRequest = VertxServerRequest(rc) def fail(t: Throwable): Unit = { - if (rc.response().bytesWritten() > 0) rc.response().end() - rc.fail(t) + handleError(rc, t, performLogging = false) } val result: ZIO[R & R2, Throwable, Any] = diff --git a/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala b/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala index 4f2456655f..41237ba8fc 100644 --- a/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala +++ b/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala @@ -6,6 +6,7 @@ import sttp.capabilities.WebSockets import sttp.capabilities.zio.ZioStreams import sttp.tapir.server.interceptor.RequestResult import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} +import sttp.tapir.server.vertx.VertxErrorHandler import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, ZioRunAsync} import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest} import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody} @@ -19,7 +20,7 @@ import _root_.zio.blocking.Blocking import java.util.concurrent.atomic.AtomicReference -trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter { +trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter with VertxErrorHandler { def vertxZioServerOptions: VertxZioServerOptions[R] = VertxZioServerOptions.default def route(e: ZServerEndpoint[R, ZioStreams with WebSockets])(implicit @@ -50,8 +51,7 @@ trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter { val serverRequest = VertxServerRequest(rc) def fail(t: Throwable): Unit = { - if (rc.response().bytesWritten() > 0) rc.response().end() - rc.fail(t) + handleError(rc, t, performLogging = false) } val result: ZIO[R, Throwable, Any] = From a863f49617e3653de7dfa34ff8a378f54ca6d03e Mon Sep 17 00:00:00 2001 From: Michael Schulte Date: Fri, 15 Sep 2023 14:23:20 +0200 Subject: [PATCH 2/4] check if response has ended - address reviewer comments: remove slf4j dependency - add 2 seconds max wait time to VertxServerOptions --- build.sbt | 6 +-- .../tapir/server/vertx/RichResponse.scala | 38 +++++-------------- .../server/vertx/VertxErrorHandler.scala | 8 +--- .../server/vertx/VertxServerOptions.scala | 4 ++ .../vertx/zio/VertxZioServerInterpreter.scala | 2 +- .../vertx/zio/VertxZioServerInterpreter.scala | 2 +- 6 files changed, 18 insertions(+), 42 deletions(-) diff --git a/build.sbt b/build.sbt index c4772738ae..81d5b3632c 100644 --- a/build.sbt +++ b/build.sbt @@ -158,10 +158,6 @@ lazy val loggerDependencies = Seq( "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5" ) -lazy val slf4jDependencies = Seq( - "org.slf4j" % "slf4j-api" % "2.0.9" -) - lazy val rawAllAggregates = core.projectRefs ++ testing.projectRefs ++ cats.projectRefs ++ @@ -1431,7 +1427,7 @@ lazy val vertxServer: ProjectMatrix = (projectMatrix in file("server/vertx-serve .settings(commonJvmSettings) .settings( name := "tapir-vertx-server", - libraryDependencies ++= slf4jDependencies ++ Seq( + libraryDependencies ++= Seq( "io.vertx" % "vertx-web" % Versions.vertx ) ) diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala index 5c81a60de1..84314e00df 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala @@ -3,48 +3,30 @@ package sttp.tapir.server.vertx import io.vertx.core.Promise import io.vertx.core.http.HttpServerResponse -import org.slf4j.{LoggerFactory} - import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.concurrent.{Future, ExecutionContext} +import scala.concurrent.Future object Helpers { - /** - * Helper class that implements safer ending of a http server response - */ + /** Helper class that implements safer ending of a http server response + */ implicit class RichResponse(response: HttpServerResponse) { - /** - * Ends the response if it hasn't ended yet - * @return - * A future that is completed when the response has been ended - */ + /** Ends the response if it hasn't ended yet + * @return + * A future that is completed when the response has been ended + */ def safeEnd(): io.vertx.core.Future[Void] = { if (!response.ended()) response.end() else Promise.promise().future() } /** Ends the response if it hasn't ended yet and waits for it to be executed - * - * @param duration - * maximum waiting time */ - def safeEndWait(duration: FiniteDuration = 2.seconds): Unit = { - import ExecutionContext.Implicits.global + def safeEndWait(): Unit = { + import VertxFutureServerInterpreter.VertxFutureToScalaFuture if (!response.ended()) { - try { - Await.result( - Future { - response.end().toCompletionStage().toCompletableFuture().get() - }, - duration - ): Unit - } catch { - case t: Throwable => - LoggerFactory.getLogger(getClass.getName).error("Caught exception while processing end", t) - } + Await.result(response.end().asScala, VertxServerOptions.safeEndWaitTime): Unit } } } diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala index c9067bde33..d7afbd3d65 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala @@ -1,7 +1,6 @@ package sttp.tapir.server.vertx import io.vertx.ext.web.{RoutingContext} -import org.slf4j.{LoggerFactory} import sttp.tapir.server.vertx.Helpers.RichResponse @@ -14,14 +13,9 @@ import sttp.tapir.server.vertx.Helpers.RichResponse * the routing context where the response shall be ended * @param ex * exception that occurred during the interpreter call - * @param performLogging - * whether to log an additional message with the exception or not */ trait VertxErrorHandler { - def handleError(rc: RoutingContext, ex: Throwable, performLogging: Boolean = true): Unit = { - if (performLogging) { - LoggerFactory.getLogger(getClass.getName).error("Error while processing the request", ex) - } + def handleError(rc: RoutingContext, ex: Throwable): Unit = { if (rc.response().bytesWritten() > 0) rc.response().safeEndWait() rc.fail(ex) } diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxServerOptions.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxServerOptions.scala index 18653fda4b..f61ae5ce75 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxServerOptions.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxServerOptions.scala @@ -4,6 +4,8 @@ import io.vertx.core.logging.Logger import sttp.tapir.TapirFile import sttp.tapir.server.interceptor.Interceptor +import scala.concurrent.duration._ + trait VertxServerOptions[F[_]] { def uploadDirectory: TapirFile def deleteFile: TapirFile => F[Unit] @@ -25,4 +27,6 @@ object VertxServerOptions { private[vertx] def uploadDirectory(): TapirFile = new java.io.File(System.getProperty("java.io.tmpdir")).getAbsoluteFile + + private[vertx] val safeEndWaitTime = 2.seconds } diff --git a/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala b/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala index 84c36f1f14..30b078a5d7 100644 --- a/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala +++ b/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala @@ -49,7 +49,7 @@ trait VertxZioServerInterpreter[R] extends CommonServerInterpreter with VertxErr val serverRequest = VertxServerRequest(rc) def fail(t: Throwable): Unit = { - handleError(rc, t, performLogging = false) + handleError(rc, t) } val result: ZIO[R & R2, Throwable, Any] = diff --git a/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala b/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala index 41237ba8fc..74db5b0bfc 100644 --- a/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala +++ b/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala @@ -51,7 +51,7 @@ trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter w val serverRequest = VertxServerRequest(rc) def fail(t: Throwable): Unit = { - handleError(rc, t, performLogging = false) + handleError(rc, t) } val result: ZIO[R, Throwable, Any] = From 0db6c442a8d954d341eaf8ed8ff84f7244cc6548 Mon Sep 17 00:00:00 2001 From: adamw Date: Wed, 20 Sep 2023 01:42:30 +0200 Subject: [PATCH 3/4] Make safeEnd() return a future, wait for the future to complete in integrations --- .../vertx/cats/VertxCatsServerInterpreter.scala | 4 ++-- .../sttp/tapir/server/vertx/RichResponse.scala | 11 +---------- .../tapir/server/vertx/VertxErrorHandler.scala | 14 +++++++------- .../vertx/VertxFutureServerInterpreter.scala | 5 ++--- .../tapir/server/vertx/VertxServerOptions.scala | 4 ---- .../vertx/zio/VertxZioServerInterpreter.scala | 8 ++------ .../vertx/zio/VertxZioServerInterpreter.scala | 14 +++++--------- 7 files changed, 19 insertions(+), 41 deletions(-) diff --git a/server/vertx-server/cats/src/main/scala/sttp/tapir/server/vertx/cats/VertxCatsServerInterpreter.scala b/server/vertx-server/cats/src/main/scala/sttp/tapir/server/vertx/cats/VertxCatsServerInterpreter.scala index fbab6ef528..e9f0d8d6d2 100644 --- a/server/vertx-server/cats/src/main/scala/sttp/tapir/server/vertx/cats/VertxCatsServerInterpreter.scala +++ b/server/vertx-server/cats/src/main/scala/sttp/tapir/server/vertx/cats/VertxCatsServerInterpreter.scala @@ -12,7 +12,7 @@ import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.interceptor.RequestResult import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} import sttp.tapir.server.vertx.{VertxBodyListener, VertxErrorHandler} -import sttp.tapir.server.vertx.cats.VertxCatsServerInterpreter.{CatsFFromVFuture, CatsRunAsync, monadError} +import sttp.tapir.server.vertx.cats.VertxCatsServerInterpreter.{CatsFFromVFuture, CatsRunAsync, VertxFutureToCatsF, monadError} import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest} import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody} import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync} @@ -66,7 +66,7 @@ trait VertxCatsServerInterpreter[F[_]] extends CommonServerInterpreter with Vert case RequestResult.Failure(_) => Async[F].delay(rc.next()) case RequestResult.Response(response) => fFromVFuture(VertxOutputEncoders(response).apply(rc)).void } - .handleError(handleError(rc, _)) + .handleErrorWith(handleError(rc, _).asF.void) // we obtain the cancel token only after the effect is run, so we need to pass it to the exception handler // via a mutable ref; however, before this is done, it's possible an exception has already been reported; diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala index 84314e00df..4c46dbf946 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala @@ -18,16 +18,7 @@ object Helpers { */ def safeEnd(): io.vertx.core.Future[Void] = { if (!response.ended()) response.end() - else Promise.promise().future() - } - - /** Ends the response if it hasn't ended yet and waits for it to be executed - */ - def safeEndWait(): Unit = { - import VertxFutureServerInterpreter.VertxFutureToScalaFuture - if (!response.ended()) { - Await.result(response.end().asScala, VertxServerOptions.safeEndWaitTime): Unit - } + else io.vertx.core.Future.succeededFuture(null) } } } diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala index d7afbd3d65..bd51ade98e 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala @@ -4,19 +4,19 @@ import io.vertx.ext.web.{RoutingContext} import sttp.tapir.server.vertx.Helpers.RichResponse -/** - * Common error handler implementation for all Vertx interpreter classes. - * +/** Common error handler implementation for all Vertx interpreter classes. + * * Ends the response of the current routing context safely. - * - * @param rc + * + * @param rc * the routing context where the response shall be ended * @param ex * exception that occurred during the interpreter call */ trait VertxErrorHandler { - def handleError(rc: RoutingContext, ex: Throwable): Unit = { - if (rc.response().bytesWritten() > 0) rc.response().safeEndWait() + def handleError(rc: RoutingContext, ex: Throwable): io.vertx.core.Future[Void] = { + val r = if (rc.response().bytesWritten() > 0) rc.response().safeEnd() else io.vertx.core.Future.succeededFuture[Void](null) rc.fail(ex) + r } } diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala index ac1b25ff08..3093595524 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala @@ -7,7 +7,7 @@ import sttp.capabilities.WebSockets import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.interceptor.RequestResult import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} -import sttp.tapir.server.vertx.VertxFutureServerInterpreter.{FutureFromVFuture, FutureRunAsync} +import sttp.tapir.server.vertx.VertxFutureServerInterpreter.{FutureFromVFuture, FutureRunAsync, VertxFutureToScalaFuture} import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest} import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody} import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync} @@ -64,8 +64,7 @@ trait VertxFutureServerInterpreter extends CommonServerInterpreter with VertxErr case RequestResult.Failure(_) => Future.successful(rc.next()) case RequestResult.Response(response) => FutureFromVFuture(VertxOutputEncoders(response).apply(rc)) } - .failed - .foreach {handleError(rc, _)} + .recoverWith(t => handleError(rc, t).asScala) } } diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxServerOptions.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxServerOptions.scala index f61ae5ce75..18653fda4b 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxServerOptions.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxServerOptions.scala @@ -4,8 +4,6 @@ import io.vertx.core.logging.Logger import sttp.tapir.TapirFile import sttp.tapir.server.interceptor.Interceptor -import scala.concurrent.duration._ - trait VertxServerOptions[F[_]] { def uploadDirectory: TapirFile def deleteFile: TapirFile => F[Unit] @@ -27,6 +25,4 @@ object VertxServerOptions { private[vertx] def uploadDirectory(): TapirFile = new java.io.File(System.getProperty("java.io.tmpdir")).getAbsoluteFile - - private[vertx] val safeEndWaitTime = 2.seconds } diff --git a/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala b/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala index 30b078a5d7..b4c4c42089 100644 --- a/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala +++ b/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala @@ -12,7 +12,7 @@ import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest} import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody} import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync} import sttp.tapir.server.vertx.routing.PathMapping.extractRouteDefinition -import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, ZioRunAsync} +import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, VertxFutureToRIO, ZioRunAsync} import sttp.tapir.server.vertx.zio.streams._ import sttp.tapir.ztapir._ import zio._ @@ -48,10 +48,6 @@ trait VertxZioServerInterpreter[R] extends CommonServerInterpreter with VertxErr override def handle(rc: RoutingContext) = { val serverRequest = VertxServerRequest(rc) - def fail(t: Throwable): Unit = { - handleError(rc, t) - } - val result: ZIO[R & R2, Throwable, Any] = interpreter(serverRequest) .flatMap { @@ -66,7 +62,7 @@ trait VertxZioServerInterpreter[R] extends CommonServerInterpreter with VertxErr }) }) } - .catchAll { t => ZIO.attempt(fail(t)) } + .catchAll { t => handleError(rc, t).asRIO } // we obtain the cancel token only after the effect is run, so we need to pass it to the exception handler // via a mutable ref; however, before this is done, it's possible an exception has already been reported; diff --git a/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala b/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala index 74db5b0bfc..c799c8652d 100644 --- a/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala +++ b/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala @@ -7,7 +7,7 @@ import sttp.capabilities.zio.ZioStreams import sttp.tapir.server.interceptor.RequestResult import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} import sttp.tapir.server.vertx.VertxErrorHandler -import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, ZioRunAsync} +import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, VertxFutureToRIO, ZioRunAsync} import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest} import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody} import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync} @@ -50,10 +50,6 @@ trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter w override def handle(rc: RoutingContext) = { val serverRequest = VertxServerRequest(rc) - def fail(t: Throwable): Unit = { - handleError(rc, t) - } - val result: ZIO[R, Throwable, Any] = interpreter(serverRequest) .flatMap { @@ -68,7 +64,7 @@ trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter w }) }) } - .catchAll { t => RIO.effect(fail(t)) } + .catchAll { t => handleError(rc, t).asRIO } // we obtain the cancel token only after the effect is run, so we need to pass it to the exception handler // via a mutable ref; however, before this is done, it's possible an exception has already been reported; @@ -89,9 +85,9 @@ trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter w () } - val canceler = runtime.unsafeRunAsyncCancelable(result) { - case Exit.Failure(cause) => fail(cause.squash) - case Exit.Success(_) => () + val canceler = runtime.unsafeRunAsyncCancelable(result.catchAll { t => handleError(rc, t).asRIO }) { + case Exit.Failure(_) => () // should be handled + case Exit.Success(_) => () } cancelRef.getAndSet(Some(Right(canceler))).collect { case Left(_) => rc.vertx() From e195e45a56edd8aac5249bdd733df3c21e2eb234 Mon Sep 17 00:00:00 2001 From: adamw Date: Wed, 20 Sep 2023 02:38:50 +0200 Subject: [PATCH 4/4] Fix compile --- .../sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala index 3093595524..14fdc4794d 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala @@ -64,7 +64,7 @@ trait VertxFutureServerInterpreter extends CommonServerInterpreter with VertxErr case RequestResult.Failure(_) => Future.successful(rc.next()) case RequestResult.Response(response) => FutureFromVFuture(VertxOutputEncoders(response).apply(rc)) } - .recoverWith(t => handleError(rc, t).asScala) + .recoverWith { case t => handleError(rc, t).asScala } } }