Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Set default value for thread keep-alive to 1 minute #594

Merged
merged 1 commit into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions jvm/src/main/scala/com/avast/sst/jvm/execution/ExecutorModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,56 +53,71 @@ class ExecutorModule[F[_]: Sync](

object ExecutorModule {

private final val DefaultBlockingExecutorConfig = ThreadPoolExecutorConfig(0, Int.MaxValue, allowCoreThreadTimeout = true)

/** Makes [[com.avast.sst.jvm.execution.ExecutorModule]] with default callback executor and extra [[cats.effect.Blocker]] executor
* for blocking operations.
*/
def makeDefault[F[_]: Sync]: Resource[F, ExecutorModule[F]] = {
for {
numOfCpus <- Resource.eval(Sync[F].delay(Runtime.getRuntime.availableProcessors))
coreSize = numOfCpus * 2
Copy link
Contributor

@sideeffffect sideeffffect Jun 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I think that this is a bit questionable, but that is for a different debate.

I think you've already seen this before 😸
https://twitter.com/impurepics/status/987758585722621957?lang=en
and
https://github.com/ChristopherDavenport/linebacker

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The theory is that num of CPUs is the right number but in practice (and I've seen this discussed somewhere long in the past) is that a bit higher number might work better. And I think I also got inspired in ZIO runtime. Anyway, I don't think there's much difference so if you show me at least some examples or documents (not just one image ;) ) I think we could change it.

executor <- makeThreadPoolExecutor(ThreadPoolExecutorConfig(coreSize, coreSize), toolkitThreadFactory, new LinkedBlockingQueue)
executor <- makeThreadPoolExecutor(
ThreadPoolExecutorConfig(coreSize, coreSize, allowCoreThreadTimeout = true),
toolkitThreadFactory,
new LinkedBlockingQueue
)
.map(ExecutionContext.fromExecutorService)
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
blockingExecutor <- makeBlockingExecutor(DefaultBlockingExecutorConfig).map(ExecutionContext.fromExecutorService)
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
}

/** Makes [[com.avast.sst.jvm.execution.ExecutorModule]] with the provided callback executor and extra [[cats.effect.Blocker]]
* executor for blocking operations.
*/
def makeFromExecutionContext[F[_]: Sync](executor: ExecutionContext): Resource[F, ExecutorModule[F]] = {
def makeFromExecutionContext[F[_]: Sync](
executor: ExecutionContext,
blockingExecutorConfig: ThreadPoolExecutorConfig = DefaultBlockingExecutorConfig
): Resource[F, ExecutorModule[F]] = {
for {
numOfCpus <- Resource.eval(Sync[F].delay(Runtime.getRuntime.availableProcessors))
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
blockingExecutor <- makeBlockingExecutor(blockingExecutorConfig).map(ExecutionContext.fromExecutorService)
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
}

/** Makes [[com.avast.sst.jvm.execution.ExecutorModule]] with executor and extra [[cats.effect.Blocker]] executor
* for blocking operations.
*/
def makeFromConfig[F[_]: Sync](executorConfig: ThreadPoolExecutorConfig): Resource[F, ExecutorModule[F]] = {
def makeFromConfig[F[_]: Sync](
executorConfig: ThreadPoolExecutorConfig,
blockingExecutorConfig: ThreadPoolExecutorConfig = DefaultBlockingExecutorConfig
): Resource[F, ExecutorModule[F]] = {
for {
numOfCpus <- Resource.eval(Sync[F].delay(Runtime.getRuntime.availableProcessors))
executor <- makeThreadPoolExecutor(executorConfig, toolkitThreadFactory, new LinkedBlockingQueue)
.map(ExecutionContext.fromExecutorService)
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
blockingExecutor <- makeBlockingExecutor(blockingExecutorConfig).map(ExecutionContext.fromExecutorService)
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
}

/** Makes [[com.avast.sst.jvm.execution.ExecutorModule]] with fork-join executor and extra [[cats.effect.Blocker]] executor
* for blocking operations.
*/
def makeForkJoinFromConfig[F[_]: Sync](executorConfig: ForkJoinPoolConfig): Resource[F, ExecutorModule[F]] = {
def makeForkJoinFromConfig[F[_]: Sync](
executorConfig: ForkJoinPoolConfig,
blockingExecutorConfig: ThreadPoolExecutorConfig = DefaultBlockingExecutorConfig
): Resource[F, ExecutorModule[F]] = {
for {
numOfCpus <- Resource.eval(Sync[F].delay(Runtime.getRuntime.availableProcessors))
executor <- makeForkJoinPool(executorConfig, numOfCpus, toolkitThreadFactory)
.map(ExecutionContext.fromExecutorService)
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
blockingExecutor <- makeBlockingExecutor(blockingExecutorConfig).map(ExecutionContext.fromExecutorService)
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
}

private def makeBlockingExecutor[F[_]: Sync] =
private def makeBlockingExecutor[F[_]: Sync](config: ThreadPoolExecutorConfig) =
makeThreadPoolExecutor[F](
ThreadPoolExecutorConfig(0, Int.MaxValue),
config,
new ConfigurableThreadFactory(Config(nameFormat = Some("default-blocking-%02d"), daemon = true)),
new SynchronousQueue
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
final case class ThreadPoolExecutorConfig(
coreSize: Int,
maxSize: Int,
keepAlive: FiniteDuration = Duration(1000L, TimeUnit.MILLISECONDS),
keepAlive: FiniteDuration = Duration(60000L, TimeUnit.MILLISECONDS),
allowCoreThreadTimeout: Boolean = false
)