Skip to content

Commit

Permalink
Move task cancelation on blocking pool
Browse files Browse the repository at this point in the history
  • Loading branch information
danilbykov committed Feb 15, 2022
1 parent c77e546 commit 9e8f68a
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object VertxCatsServerOptions {
createOptions = (ci: CustomInterceptors[F, VertxCatsServerOptions[F]]) =>
VertxCatsServerOptions(
dispatcher,
Defaults.createTempFile().getParentFile.getAbsoluteFile,
VertxServerOptions.uploadDirectory(),
file => Sync[F].delay(Defaults.deleteFile()(file)),
maxQueueSizeForReadStream = 16,
ci.interceptors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object VertxFutureServerOptions {
CustomInterceptors(
createOptions = (ci: CustomInterceptors[Future, VertxFutureServerOptions]) =>
VertxFutureServerOptions(
Defaults.createTempFile().getParentFile.getAbsoluteFile,
VertxServerOptions.uploadDirectory(),
defaultDeleteFile,
ci.interceptors,
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ object VertxServerOptions {
case None => log.info(msg, Nil: _*)
case Some(ex) => log.info(s"$msg; exception: {}", ex)
}

private[vertx] def uploadDirectory(): TapirFile =
new java.io.File(System.getProperty("java.io.tmpdir")).getAbsoluteFile
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package sttp.tapir.server.vertx

import io.vertx.core.logging.LoggerFactory
import io.vertx.core.{Future, Handler}
import io.vertx.core.{Future, Handler, Promise}
import io.vertx.ext.web.{Route, Router, RoutingContext}
import sttp.capabilities.zio.ZioStreams
import sttp.monad.MonadError
Expand Down Expand Up @@ -75,14 +75,20 @@ trait VertxZioServerInterpreter[R] extends CommonServerInterpreter {

rc.response.exceptionHandler { (t: Throwable) =>
cancelRef.getAndSet(Some(Left(t))).collect { case Right(c) =>
c(FiberId.None)
rc.vertx().executeBlocking[Unit]((promise: Promise[Unit]) => {
c(FiberId.None)
promise.complete(())
}, false)
}
()
}

val canceler = runtime.unsafeRunAsyncCancelable(result) { _ => () }
cancelRef.getAndSet(Some(Right(canceler))).collect { case Left(_) =>
canceler(FiberId.None)
rc.vertx().executeBlocking[Unit]((promise: Promise[Unit]) => {
canceler(FiberId.None)
promise.complete(())
}, false)
}

()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object VertxZioServerOptions {
CustomInterceptors(
createOptions = (ci: CustomInterceptors[RIO[R, *], VertxZioServerOptions[RIO[R, *]]]) =>
VertxZioServerOptions(
Defaults.createTempFile().getParentFile.getAbsoluteFile,
VertxServerOptions.uploadDirectory(),
file => Task[Unit](Defaults.deleteFile()(file)),
maxQueueSizeForReadStream = 16,
ci.interceptors
Expand Down

0 comments on commit 9e8f68a

Please sign in to comment.