Skip to content

Commit

Permalink
Merge pull request #3165 from michael72/master
Browse files Browse the repository at this point in the history
vertx: end responses safely
  • Loading branch information
adamw authored Sep 23, 2023
2 parents 4069447 + e195e45 commit 977f331
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sttp.tapir.server.vertx.cats
import cats.effect.std.Dispatcher
import cats.effect.{Async, Sync}
import cats.syntax.all._
import io.vertx.core.logging.LoggerFactory
import io.vertx.core.{Future, Handler}
import io.vertx.ext.web.{Route, Router, RoutingContext}
import sttp.capabilities.{Streams, WebSockets}
Expand All @@ -12,8 +11,8 @@ import sttp.monad.MonadError
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.interceptor.RequestResult
import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter}
import sttp.tapir.server.vertx.VertxBodyListener
import sttp.tapir.server.vertx.cats.VertxCatsServerInterpreter.{CatsFFromVFuture, CatsRunAsync, monadError}
import sttp.tapir.server.vertx.{VertxBodyListener, VertxErrorHandler}
import sttp.tapir.server.vertx.cats.VertxCatsServerInterpreter.{CatsFFromVFuture, CatsRunAsync, VertxFutureToCatsF, monadError}
import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest}
import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync}
Expand All @@ -23,9 +22,7 @@ import sttp.tapir.server.vertx.cats.streams.fs2.fs2ReadStreamCompatible

import java.util.concurrent.atomic.AtomicReference

trait VertxCatsServerInterpreter[F[_]] extends CommonServerInterpreter {

private val logger = LoggerFactory.getLogger(VertxCatsServerInterpreter.getClass)
trait VertxCatsServerInterpreter[F[_]] extends CommonServerInterpreter with VertxErrorHandler {

implicit def fa: Async[F]

Expand Down Expand Up @@ -69,11 +66,7 @@ trait VertxCatsServerInterpreter[F[_]] extends CommonServerInterpreter {
case RequestResult.Failure(_) => Async[F].delay(rc.next())
case RequestResult.Response(response) => fFromVFuture(VertxOutputEncoders(response).apply(rc)).void
}
.handleError { ex =>
logger.error("Error while processing the request", ex)
if (rc.response().bytesWritten() > 0) rc.response().end()
rc.fail(ex)
}
.handleErrorWith(handleError(rc, _).asF.void)

// we obtain the cancel token only after the effect is run, so we need to pass it to the exception handler
// via a mutable ref; however, before this is done, it's possible an exception has already been reported;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package sttp.tapir.server.vertx

import io.vertx.core.Promise
import io.vertx.core.http.HttpServerResponse

import scala.concurrent.Await
import scala.concurrent.Future

object Helpers {

/** Helper class that implements safer ending of a http server response
*/
implicit class RichResponse(response: HttpServerResponse) {

/** Ends the response if it hasn't ended yet
* @return
* A future that is completed when the response has been ended
*/
def safeEnd(): io.vertx.core.Future[Void] = {
if (!response.ended()) response.end()
else io.vertx.core.Future.succeededFuture(null)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package sttp.tapir.server.vertx

import io.vertx.ext.web.{RoutingContext}

import sttp.tapir.server.vertx.Helpers.RichResponse

/** Common error handler implementation for all Vertx interpreter classes.
*
* Ends the response of the current routing context safely.
*
* @param rc
* the routing context where the response shall be ended
* @param ex
* exception that occurred during the interpreter call
*/
trait VertxErrorHandler {
def handleError(rc: RoutingContext, ex: Throwable): io.vertx.core.Future[Void] = {
val r = if (rc.response().bytesWritten() > 0) rc.response().safeEnd() else io.vertx.core.Future.succeededFuture[Void](null)
rc.fail(ex)
r
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package sttp.tapir.server.vertx

import io.vertx.core.logging.LoggerFactory
import io.vertx.core.{Handler, Future => VFuture}
import io.vertx.ext.web.{Route, Router, RoutingContext}
import sttp.monad.FutureMonad
import sttp.capabilities.WebSockets
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.interceptor.RequestResult
import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter}
import sttp.tapir.server.vertx.VertxFutureServerInterpreter.{FutureFromVFuture, FutureRunAsync}
import sttp.tapir.server.vertx.VertxFutureServerInterpreter.{FutureFromVFuture, FutureRunAsync, VertxFutureToScalaFuture}
import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest}
import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync}
Expand All @@ -17,9 +16,7 @@ import sttp.tapir.server.vertx.streams.{ReadStreamCompatible, VertxStreams}

import scala.concurrent.{ExecutionContext, Future, Promise}

trait VertxFutureServerInterpreter extends CommonServerInterpreter {

private val logger = LoggerFactory.getLogger(VertxFutureServerInterpreter.getClass)
trait VertxFutureServerInterpreter extends CommonServerInterpreter with VertxErrorHandler {

def vertxFutureServerOptions: VertxFutureServerOptions = VertxFutureServerOptions.default

Expand Down Expand Up @@ -67,12 +64,7 @@ trait VertxFutureServerInterpreter extends CommonServerInterpreter {
case RequestResult.Failure(_) => Future.successful(rc.next())
case RequestResult.Response(response) => FutureFromVFuture(VertxOutputEncoders(response).apply(rc))
}
.failed
.foreach { ex =>
logger.error("Error while processing the request", ex)
if (rc.response().bytesWritten() > 0) rc.response().end()
rc.fail(ex)
}
.recoverWith { case t => handleError(rc, t).asScala }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sttp.tapir.server.vertx.encoders
import io.vertx.core.Future
import io.vertx.ext.web.RoutingContext
import sttp.tapir.server.model.ServerResponse
import sttp.tapir.server.vertx.Helpers.RichResponse

import scala.util.control.NonFatal

Expand All @@ -14,7 +15,7 @@ object VertxOutputEncoders {
serverResponse.headers.foreach { h => resp.headers.add(h.name, h.value) }
serverResponse.body match {
case Some(responseHandler) => responseHandler(rc)
case None => resp.end()
case None => resp.safeEnd()
}
} catch {
case NonFatal(e) => Future.failedFuture(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import sttp.capabilities.Streams
import sttp.model.{HasHeaders, Part}
import sttp.tapir.{CodecFormat, FileRange, RawBodyType, WebSocketBodyOutput}
import sttp.tapir.server.interpreter.ToResponseBody
import sttp.tapir.server.vertx.Helpers.RichResponse
import sttp.tapir.server.vertx.VertxServerOptions
import sttp.tapir.server.vertx.streams.{Pipe, ReadStreamCompatible}

Expand Down Expand Up @@ -88,7 +89,7 @@ class VertxToResponseBody[F[_], S <: Streams[S]](serverOptions: VertxServerOptio
}
})
.flatMap { _ =>
resp.end()
resp.safeEnd()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ import sttp.capabilities.zio.ZioStreams
import sttp.tapir.server.interceptor.RequestResult
import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter}
import sttp.tapir.server.vertx.VertxBodyListener
import sttp.tapir.server.vertx.VertxErrorHandler
import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest}
import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync}
import sttp.tapir.server.vertx.routing.PathMapping.extractRouteDefinition
import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, ZioRunAsync}
import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, VertxFutureToRIO, ZioRunAsync}
import sttp.tapir.server.vertx.zio.streams._
import sttp.tapir.ztapir._
import zio._

import java.util.concurrent.atomic.AtomicReference

trait VertxZioServerInterpreter[R] extends CommonServerInterpreter {
trait VertxZioServerInterpreter[R] extends CommonServerInterpreter with VertxErrorHandler {
def vertxZioServerOptions[R2 <: R]: VertxZioServerOptions[R2] = VertxZioServerOptions.default[R].widen

def route[R2](e: ZServerEndpoint[R2, ZioStreams with WebSockets])(implicit
Expand Down Expand Up @@ -47,11 +48,6 @@ trait VertxZioServerInterpreter[R] extends CommonServerInterpreter {
override def handle(rc: RoutingContext) = {
val serverRequest = VertxServerRequest(rc)

def fail(t: Throwable): Unit = {
if (rc.response().bytesWritten() > 0) rc.response().end()
rc.fail(t)
}

val result: ZIO[R & R2, Throwable, Any] =
interpreter(serverRequest)
.flatMap {
Expand All @@ -66,7 +62,7 @@ trait VertxZioServerInterpreter[R] extends CommonServerInterpreter {
})
})
}
.catchAll { t => ZIO.attempt(fail(t)) }
.catchAll { t => handleError(rc, t).asRIO }

// we obtain the cancel token only after the effect is run, so we need to pass it to the exception handler
// via a mutable ref; however, before this is done, it's possible an exception has already been reported;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import sttp.capabilities.WebSockets
import sttp.capabilities.zio.ZioStreams
import sttp.tapir.server.interceptor.RequestResult
import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter}
import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, ZioRunAsync}
import sttp.tapir.server.vertx.VertxErrorHandler
import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, VertxFutureToRIO, ZioRunAsync}
import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest}
import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync}
Expand All @@ -19,7 +20,7 @@ import _root_.zio.blocking.Blocking

import java.util.concurrent.atomic.AtomicReference

trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter {
trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter with VertxErrorHandler {
def vertxZioServerOptions: VertxZioServerOptions[R] = VertxZioServerOptions.default

def route(e: ZServerEndpoint[R, ZioStreams with WebSockets])(implicit
Expand Down Expand Up @@ -49,11 +50,6 @@ trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter {
override def handle(rc: RoutingContext) = {
val serverRequest = VertxServerRequest(rc)

def fail(t: Throwable): Unit = {
if (rc.response().bytesWritten() > 0) rc.response().end()
rc.fail(t)
}

val result: ZIO[R, Throwable, Any] =
interpreter(serverRequest)
.flatMap {
Expand All @@ -68,7 +64,7 @@ trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter {
})
})
}
.catchAll { t => RIO.effect(fail(t)) }
.catchAll { t => handleError(rc, t).asRIO }

// we obtain the cancel token only after the effect is run, so we need to pass it to the exception handler
// via a mutable ref; however, before this is done, it's possible an exception has already been reported;
Expand All @@ -89,9 +85,9 @@ trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter {
()
}

val canceler = runtime.unsafeRunAsyncCancelable(result) {
case Exit.Failure(cause) => fail(cause.squash)
case Exit.Success(_) => ()
val canceler = runtime.unsafeRunAsyncCancelable(result.catchAll { t => handleError(rc, t).asRIO }) {
case Exit.Failure(_) => () // should be handled
case Exit.Success(_) => ()
}
cancelRef.getAndSet(Some(Right(canceler))).collect { case Left(_) =>
rc.vertx()
Expand Down

0 comments on commit 977f331

Please sign in to comment.