From 9e8f68ae02c7e76f7923a6fbc0dab157f7a7d6f4 Mon Sep 17 00:00:00 2001 From: Danil Bykov Date: Fri, 11 Feb 2022 10:48:46 +0500 Subject: [PATCH] Move task cancelation on blocking pool --- .../tapir/server/vertx/VertxCatsServerOptions.scala | 2 +- .../server/vertx/VertxFutureServerOptions.scala | 2 +- .../sttp/tapir/server/vertx/VertxServerOptions.scala | 3 +++ .../server/vertx/VertxZioServerInterpreter.scala | 12 +++++++++--- .../tapir/server/vertx/VertxZioServerOptions.scala | 2 +- 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxCatsServerOptions.scala b/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxCatsServerOptions.scala index 22452b4139..6a33c6adc4 100644 --- a/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxCatsServerOptions.scala +++ b/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxCatsServerOptions.scala @@ -32,7 +32,7 @@ object VertxCatsServerOptions { createOptions = (ci: CustomInterceptors[F, VertxCatsServerOptions[F]]) => VertxCatsServerOptions( dispatcher, - Defaults.createTempFile().getParentFile.getAbsoluteFile, + VertxServerOptions.uploadDirectory(), file => Sync[F].delay(Defaults.deleteFile()(file)), maxQueueSizeForReadStream = 16, ci.interceptors diff --git a/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerOptions.scala b/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerOptions.scala index 1e4d8a922a..3c8279ff70 100644 --- a/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerOptions.scala +++ b/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerOptions.scala @@ -36,7 +36,7 @@ object VertxFutureServerOptions { CustomInterceptors( createOptions = (ci: CustomInterceptors[Future, VertxFutureServerOptions]) => VertxFutureServerOptions( - Defaults.createTempFile().getParentFile.getAbsoluteFile, + VertxServerOptions.uploadDirectory(), defaultDeleteFile, ci.interceptors, None diff --git a/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxServerOptions.scala b/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxServerOptions.scala index ceed8bfea1..18653fda4b 100644 --- a/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxServerOptions.scala +++ b/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxServerOptions.scala @@ -22,4 +22,7 @@ object VertxServerOptions { case None => log.info(msg, Nil: _*) case Some(ex) => log.info(s"$msg; exception: {}", ex) } + + private[vertx] def uploadDirectory(): TapirFile = + new java.io.File(System.getProperty("java.io.tmpdir")).getAbsoluteFile } diff --git a/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxZioServerInterpreter.scala b/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxZioServerInterpreter.scala index 512bf9cfb4..4071a2808a 100644 --- a/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxZioServerInterpreter.scala +++ b/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxZioServerInterpreter.scala @@ -1,7 +1,7 @@ package sttp.tapir.server.vertx import io.vertx.core.logging.LoggerFactory -import io.vertx.core.{Future, Handler} +import io.vertx.core.{Future, Handler, Promise} import io.vertx.ext.web.{Route, Router, RoutingContext} import sttp.capabilities.zio.ZioStreams import sttp.monad.MonadError @@ -75,14 +75,20 @@ trait VertxZioServerInterpreter[R] extends CommonServerInterpreter { rc.response.exceptionHandler { (t: Throwable) => cancelRef.getAndSet(Some(Left(t))).collect { case Right(c) => - c(FiberId.None) + rc.vertx().executeBlocking[Unit]((promise: Promise[Unit]) => { + c(FiberId.None) + promise.complete(()) + }, false) } () } val canceler = runtime.unsafeRunAsyncCancelable(result) { _ => () } cancelRef.getAndSet(Some(Right(canceler))).collect { case Left(_) => - canceler(FiberId.None) + rc.vertx().executeBlocking[Unit]((promise: Promise[Unit]) => { + canceler(FiberId.None) + promise.complete(()) + }, false) } () diff --git a/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxZioServerOptions.scala b/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxZioServerOptions.scala index 0acb8e713c..55ac4fe1bc 100644 --- a/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxZioServerOptions.scala +++ b/server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxZioServerOptions.scala @@ -26,7 +26,7 @@ object VertxZioServerOptions { CustomInterceptors( createOptions = (ci: CustomInterceptors[RIO[R, *], VertxZioServerOptions[RIO[R, *]]]) => VertxZioServerOptions( - Defaults.createTempFile().getParentFile.getAbsoluteFile, + VertxServerOptions.uploadDirectory(), file => Task[Unit](Defaults.deleteFile()(file)), maxQueueSizeForReadStream = 16, ci.interceptors