diff --git a/server/vertx-server/cats/src/main/scala/sttp/tapir/server/vertx/cats/VertxCatsServerInterpreter.scala b/server/vertx-server/cats/src/main/scala/sttp/tapir/server/vertx/cats/VertxCatsServerInterpreter.scala index b69925e87a..e9f0d8d6d2 100644 --- a/server/vertx-server/cats/src/main/scala/sttp/tapir/server/vertx/cats/VertxCatsServerInterpreter.scala +++ b/server/vertx-server/cats/src/main/scala/sttp/tapir/server/vertx/cats/VertxCatsServerInterpreter.scala @@ -3,7 +3,6 @@ package sttp.tapir.server.vertx.cats import cats.effect.std.Dispatcher import cats.effect.{Async, Sync} import cats.syntax.all._ -import io.vertx.core.logging.LoggerFactory import io.vertx.core.{Future, Handler} import io.vertx.ext.web.{Route, Router, RoutingContext} import sttp.capabilities.{Streams, WebSockets} @@ -12,8 +11,8 @@ import sttp.monad.MonadError import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.interceptor.RequestResult import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} -import sttp.tapir.server.vertx.VertxBodyListener -import sttp.tapir.server.vertx.cats.VertxCatsServerInterpreter.{CatsFFromVFuture, CatsRunAsync, monadError} +import sttp.tapir.server.vertx.{VertxBodyListener, VertxErrorHandler} +import sttp.tapir.server.vertx.cats.VertxCatsServerInterpreter.{CatsFFromVFuture, CatsRunAsync, VertxFutureToCatsF, monadError} import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest} import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody} import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync} @@ -23,9 +22,7 @@ import sttp.tapir.server.vertx.cats.streams.fs2.fs2ReadStreamCompatible import java.util.concurrent.atomic.AtomicReference -trait VertxCatsServerInterpreter[F[_]] extends CommonServerInterpreter { - - private val logger = LoggerFactory.getLogger(VertxCatsServerInterpreter.getClass) +trait VertxCatsServerInterpreter[F[_]] extends CommonServerInterpreter with VertxErrorHandler { implicit def fa: Async[F] @@ -69,11 +66,7 @@ trait VertxCatsServerInterpreter[F[_]] extends CommonServerInterpreter { case RequestResult.Failure(_) => Async[F].delay(rc.next()) case RequestResult.Response(response) => fFromVFuture(VertxOutputEncoders(response).apply(rc)).void } - .handleError { ex => - logger.error("Error while processing the request", ex) - if (rc.response().bytesWritten() > 0) rc.response().end() - rc.fail(ex) - } + .handleErrorWith(handleError(rc, _).asF.void) // we obtain the cancel token only after the effect is run, so we need to pass it to the exception handler // via a mutable ref; however, before this is done, it's possible an exception has already been reported; diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala new file mode 100644 index 0000000000..4c46dbf946 --- /dev/null +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/RichResponse.scala @@ -0,0 +1,24 @@ +package sttp.tapir.server.vertx + +import io.vertx.core.Promise +import io.vertx.core.http.HttpServerResponse + +import scala.concurrent.Await +import scala.concurrent.Future + +object Helpers { + + /** Helper class that implements safer ending of a http server response + */ + implicit class RichResponse(response: HttpServerResponse) { + + /** Ends the response if it hasn't ended yet + * @return + * A future that is completed when the response has been ended + */ + def safeEnd(): io.vertx.core.Future[Void] = { + if (!response.ended()) response.end() + else io.vertx.core.Future.succeededFuture(null) + } + } +} diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala new file mode 100644 index 0000000000..bd51ade98e --- /dev/null +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxErrorHandler.scala @@ -0,0 +1,22 @@ +package sttp.tapir.server.vertx + +import io.vertx.ext.web.{RoutingContext} + +import sttp.tapir.server.vertx.Helpers.RichResponse + +/** Common error handler implementation for all Vertx interpreter classes. + * + * Ends the response of the current routing context safely. + * + * @param rc + * the routing context where the response shall be ended + * @param ex + * exception that occurred during the interpreter call + */ +trait VertxErrorHandler { + def handleError(rc: RoutingContext, ex: Throwable): io.vertx.core.Future[Void] = { + val r = if (rc.response().bytesWritten() > 0) rc.response().safeEnd() else io.vertx.core.Future.succeededFuture[Void](null) + rc.fail(ex) + r + } +} diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala index acd2dccc71..14fdc4794d 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerInterpreter.scala @@ -1,6 +1,5 @@ package sttp.tapir.server.vertx -import io.vertx.core.logging.LoggerFactory import io.vertx.core.{Handler, Future => VFuture} import io.vertx.ext.web.{Route, Router, RoutingContext} import sttp.monad.FutureMonad @@ -8,7 +7,7 @@ import sttp.capabilities.WebSockets import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.interceptor.RequestResult import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} -import sttp.tapir.server.vertx.VertxFutureServerInterpreter.{FutureFromVFuture, FutureRunAsync} +import sttp.tapir.server.vertx.VertxFutureServerInterpreter.{FutureFromVFuture, FutureRunAsync, VertxFutureToScalaFuture} import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest} import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody} import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync} @@ -17,9 +16,7 @@ import sttp.tapir.server.vertx.streams.{ReadStreamCompatible, VertxStreams} import scala.concurrent.{ExecutionContext, Future, Promise} -trait VertxFutureServerInterpreter extends CommonServerInterpreter { - - private val logger = LoggerFactory.getLogger(VertxFutureServerInterpreter.getClass) +trait VertxFutureServerInterpreter extends CommonServerInterpreter with VertxErrorHandler { def vertxFutureServerOptions: VertxFutureServerOptions = VertxFutureServerOptions.default @@ -67,12 +64,7 @@ trait VertxFutureServerInterpreter extends CommonServerInterpreter { case RequestResult.Failure(_) => Future.successful(rc.next()) case RequestResult.Response(response) => FutureFromVFuture(VertxOutputEncoders(response).apply(rc)) } - .failed - .foreach { ex => - logger.error("Error while processing the request", ex) - if (rc.response().bytesWritten() > 0) rc.response().end() - rc.fail(ex) - } + .recoverWith { case t => handleError(rc, t).asScala } } } diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxOutputEncoders.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxOutputEncoders.scala index 8dedbdff94..cbb879ba11 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxOutputEncoders.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxOutputEncoders.scala @@ -3,6 +3,7 @@ package sttp.tapir.server.vertx.encoders import io.vertx.core.Future import io.vertx.ext.web.RoutingContext import sttp.tapir.server.model.ServerResponse +import sttp.tapir.server.vertx.Helpers.RichResponse import scala.util.control.NonFatal @@ -14,7 +15,7 @@ object VertxOutputEncoders { serverResponse.headers.foreach { h => resp.headers.add(h.name, h.value) } serverResponse.body match { case Some(responseHandler) => responseHandler(rc) - case None => resp.end() + case None => resp.safeEnd() } } catch { case NonFatal(e) => Future.failedFuture(e) diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxToResponseBody.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxToResponseBody.scala index 3272ed7551..e415c6c378 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxToResponseBody.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/encoders/VertxToResponseBody.scala @@ -8,6 +8,7 @@ import sttp.capabilities.Streams import sttp.model.{HasHeaders, Part} import sttp.tapir.{CodecFormat, FileRange, RawBodyType, WebSocketBodyOutput} import sttp.tapir.server.interpreter.ToResponseBody +import sttp.tapir.server.vertx.Helpers.RichResponse import sttp.tapir.server.vertx.VertxServerOptions import sttp.tapir.server.vertx.streams.{Pipe, ReadStreamCompatible} @@ -88,7 +89,7 @@ class VertxToResponseBody[F[_], S <: Streams[S]](serverOptions: VertxServerOptio } }) .flatMap { _ => - resp.end() + resp.safeEnd() } } diff --git a/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala b/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala index 110fafed58..b4c4c42089 100644 --- a/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala +++ b/server/vertx-server/zio/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala @@ -7,18 +7,19 @@ import sttp.capabilities.zio.ZioStreams import sttp.tapir.server.interceptor.RequestResult import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} import sttp.tapir.server.vertx.VertxBodyListener +import sttp.tapir.server.vertx.VertxErrorHandler import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest} import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody} import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync} import sttp.tapir.server.vertx.routing.PathMapping.extractRouteDefinition -import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, ZioRunAsync} +import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, VertxFutureToRIO, ZioRunAsync} import sttp.tapir.server.vertx.zio.streams._ import sttp.tapir.ztapir._ import zio._ import java.util.concurrent.atomic.AtomicReference -trait VertxZioServerInterpreter[R] extends CommonServerInterpreter { +trait VertxZioServerInterpreter[R] extends CommonServerInterpreter with VertxErrorHandler { def vertxZioServerOptions[R2 <: R]: VertxZioServerOptions[R2] = VertxZioServerOptions.default[R].widen def route[R2](e: ZServerEndpoint[R2, ZioStreams with WebSockets])(implicit @@ -47,11 +48,6 @@ trait VertxZioServerInterpreter[R] extends CommonServerInterpreter { override def handle(rc: RoutingContext) = { val serverRequest = VertxServerRequest(rc) - def fail(t: Throwable): Unit = { - if (rc.response().bytesWritten() > 0) rc.response().end() - rc.fail(t) - } - val result: ZIO[R & R2, Throwable, Any] = interpreter(serverRequest) .flatMap { @@ -66,7 +62,7 @@ trait VertxZioServerInterpreter[R] extends CommonServerInterpreter { }) }) } - .catchAll { t => ZIO.attempt(fail(t)) } + .catchAll { t => handleError(rc, t).asRIO } // we obtain the cancel token only after the effect is run, so we need to pass it to the exception handler // via a mutable ref; however, before this is done, it's possible an exception has already been reported; diff --git a/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala b/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala index 4f2456655f..c799c8652d 100644 --- a/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala +++ b/server/vertx-server/zio1/src/main/scala/sttp/tapir/server/vertx/zio/VertxZioServerInterpreter.scala @@ -6,7 +6,8 @@ import sttp.capabilities.WebSockets import sttp.capabilities.zio.ZioStreams import sttp.tapir.server.interceptor.RequestResult import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} -import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, ZioRunAsync} +import sttp.tapir.server.vertx.VertxErrorHandler +import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, VertxFutureToRIO, ZioRunAsync} import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest} import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody} import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync} @@ -19,7 +20,7 @@ import _root_.zio.blocking.Blocking import java.util.concurrent.atomic.AtomicReference -trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter { +trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter with VertxErrorHandler { def vertxZioServerOptions: VertxZioServerOptions[R] = VertxZioServerOptions.default def route(e: ZServerEndpoint[R, ZioStreams with WebSockets])(implicit @@ -49,11 +50,6 @@ trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter { override def handle(rc: RoutingContext) = { val serverRequest = VertxServerRequest(rc) - def fail(t: Throwable): Unit = { - if (rc.response().bytesWritten() > 0) rc.response().end() - rc.fail(t) - } - val result: ZIO[R, Throwable, Any] = interpreter(serverRequest) .flatMap { @@ -68,7 +64,7 @@ trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter { }) }) } - .catchAll { t => RIO.effect(fail(t)) } + .catchAll { t => handleError(rc, t).asRIO } // we obtain the cancel token only after the effect is run, so we need to pass it to the exception handler // via a mutable ref; however, before this is done, it's possible an exception has already been reported; @@ -89,9 +85,9 @@ trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter { () } - val canceler = runtime.unsafeRunAsyncCancelable(result) { - case Exit.Failure(cause) => fail(cause.squash) - case Exit.Success(_) => () + val canceler = runtime.unsafeRunAsyncCancelable(result.catchAll { t => handleError(rc, t).asRIO }) { + case Exit.Failure(_) => () // should be handled + case Exit.Success(_) => () } cancelRef.getAndSet(Some(Right(canceler))).collect { case Left(_) => rc.vertx()