diff --git a/jvm/src/main/scala/com/avast/sst/jvm/execution/ExecutorModule.scala b/jvm/src/main/scala/com/avast/sst/jvm/execution/ExecutorModule.scala index 65129c90d..5a3985dd8 100644 --- a/jvm/src/main/scala/com/avast/sst/jvm/execution/ExecutorModule.scala +++ b/jvm/src/main/scala/com/avast/sst/jvm/execution/ExecutorModule.scala @@ -53,6 +53,8 @@ class ExecutorModule[F[_]: Sync]( object ExecutorModule { + private final val DefaultBlockingExecutorConfig = ThreadPoolExecutorConfig(0, Int.MaxValue, allowCoreThreadTimeout = true) + /** Makes [[com.avast.sst.jvm.execution.ExecutorModule]] with default callback executor and extra [[cats.effect.Blocker]] executor * for blocking operations. */ @@ -60,49 +62,62 @@ object ExecutorModule { for { numOfCpus <- Resource.eval(Sync[F].delay(Runtime.getRuntime.availableProcessors)) coreSize = numOfCpus * 2 - executor <- makeThreadPoolExecutor(ThreadPoolExecutorConfig(coreSize, coreSize), toolkitThreadFactory, new LinkedBlockingQueue) + executor <- makeThreadPoolExecutor( + ThreadPoolExecutorConfig(coreSize, coreSize, allowCoreThreadTimeout = true), + toolkitThreadFactory, + new LinkedBlockingQueue + ) .map(ExecutionContext.fromExecutorService) - blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService) + blockingExecutor <- makeBlockingExecutor(DefaultBlockingExecutorConfig).map(ExecutionContext.fromExecutorService) } yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor) } /** Makes [[com.avast.sst.jvm.execution.ExecutorModule]] with the provided callback executor and extra [[cats.effect.Blocker]] * executor for blocking operations. */ - def makeFromExecutionContext[F[_]: Sync](executor: ExecutionContext): Resource[F, ExecutorModule[F]] = { + def makeFromExecutionContext[F[_]: Sync]( + executor: ExecutionContext, + blockingExecutorConfig: ThreadPoolExecutorConfig = DefaultBlockingExecutorConfig + ): Resource[F, ExecutorModule[F]] = { for { numOfCpus <- Resource.eval(Sync[F].delay(Runtime.getRuntime.availableProcessors)) - blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService) + blockingExecutor <- makeBlockingExecutor(blockingExecutorConfig).map(ExecutionContext.fromExecutorService) } yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor) } /** Makes [[com.avast.sst.jvm.execution.ExecutorModule]] with executor and extra [[cats.effect.Blocker]] executor * for blocking operations. */ - def makeFromConfig[F[_]: Sync](executorConfig: ThreadPoolExecutorConfig): Resource[F, ExecutorModule[F]] = { + def makeFromConfig[F[_]: Sync]( + executorConfig: ThreadPoolExecutorConfig, + blockingExecutorConfig: ThreadPoolExecutorConfig = DefaultBlockingExecutorConfig + ): Resource[F, ExecutorModule[F]] = { for { numOfCpus <- Resource.eval(Sync[F].delay(Runtime.getRuntime.availableProcessors)) executor <- makeThreadPoolExecutor(executorConfig, toolkitThreadFactory, new LinkedBlockingQueue) .map(ExecutionContext.fromExecutorService) - blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService) + blockingExecutor <- makeBlockingExecutor(blockingExecutorConfig).map(ExecutionContext.fromExecutorService) } yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor) } /** Makes [[com.avast.sst.jvm.execution.ExecutorModule]] with fork-join executor and extra [[cats.effect.Blocker]] executor * for blocking operations. */ - def makeForkJoinFromConfig[F[_]: Sync](executorConfig: ForkJoinPoolConfig): Resource[F, ExecutorModule[F]] = { + def makeForkJoinFromConfig[F[_]: Sync]( + executorConfig: ForkJoinPoolConfig, + blockingExecutorConfig: ThreadPoolExecutorConfig = DefaultBlockingExecutorConfig + ): Resource[F, ExecutorModule[F]] = { for { numOfCpus <- Resource.eval(Sync[F].delay(Runtime.getRuntime.availableProcessors)) executor <- makeForkJoinPool(executorConfig, numOfCpus, toolkitThreadFactory) .map(ExecutionContext.fromExecutorService) - blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService) + blockingExecutor <- makeBlockingExecutor(blockingExecutorConfig).map(ExecutionContext.fromExecutorService) } yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor) } - private def makeBlockingExecutor[F[_]: Sync] = + private def makeBlockingExecutor[F[_]: Sync](config: ThreadPoolExecutorConfig) = makeThreadPoolExecutor[F]( - ThreadPoolExecutorConfig(0, Int.MaxValue), + config, new ConfigurableThreadFactory(Config(nameFormat = Some("default-blocking-%02d"), daemon = true)), new SynchronousQueue ) diff --git a/jvm/src/main/scala/com/avast/sst/jvm/execution/ThreadPoolExecutorConfig.scala b/jvm/src/main/scala/com/avast/sst/jvm/execution/ThreadPoolExecutorConfig.scala index 0eea3c503..9a27f441c 100644 --- a/jvm/src/main/scala/com/avast/sst/jvm/execution/ThreadPoolExecutorConfig.scala +++ b/jvm/src/main/scala/com/avast/sst/jvm/execution/ThreadPoolExecutorConfig.scala @@ -6,6 +6,6 @@ import scala.concurrent.duration.{Duration, FiniteDuration} final case class ThreadPoolExecutorConfig( coreSize: Int, maxSize: Int, - keepAlive: FiniteDuration = Duration(1000L, TimeUnit.MILLISECONDS), + keepAlive: FiniteDuration = Duration(60000L, TimeUnit.MILLISECONDS), allowCoreThreadTimeout: Boolean = false )