Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration to virtual threads - phase 4 (2nd try) #758

Closed
wants to merge 8 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class PullMetricsRequestsFilter implements HttpServerFilter {
private RouteHandler routeHelper

@Inject
@Named(TaskExecutors.IO)
@Named(TaskExecutors.BLOCKING)
private ExecutorService executor

@Override
Expand Down
11 changes: 6 additions & 5 deletions src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ package io.seqera.wave.http

import java.net.http.HttpClient
import java.time.Duration
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.concurrent.locks.ReentrantLock

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.seqera.wave.util.CustomThreadFactory
/**
* Java HttpClient factory
*
Expand All @@ -36,7 +35,9 @@ import io.seqera.wave.util.CustomThreadFactory
@CompileStatic
class HttpClientFactory {

static private ExecutorService threadPool = Executors.newCachedThreadPool(new CustomThreadFactory("HttpClientThread"))
static final private ThreadFactory customThreadFactory = Thread.ofVirtual()
.name("httpclient-virtual-thread-", 1)
.factory();

static private Duration timeout = Duration.ofSeconds(20)

Expand Down Expand Up @@ -84,7 +85,7 @@ class HttpClientFactory {
.version(HttpClient.Version.HTTP_1_1)
.followRedirects(HttpClient.Redirect.NORMAL)
.connectTimeout(timeout)
.executor(threadPool)
.executor(Executors.newThreadPerTaskExecutor(customThreadFactory))
.build()
log.debug "Creating new followRedirectsHttpClient: $result"
return result
Expand All @@ -95,7 +96,7 @@ class HttpClientFactory {
.version(HttpClient.Version.HTTP_1_1)
.followRedirects(HttpClient.Redirect.NEVER)
.connectTimeout(timeout)
.executor(threadPool)
.executor(Executors.newThreadPerTaskExecutor(customThreadFactory))
.build()
log.debug "Creating new neverRedirectsHttpClient: $result"
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class BuildStateStoreImpl extends AbstractStateStore<BuildEntry> implements Buil

private ExecutorService ioExecutor

BuildStateStoreImpl(StateProvider<String, String> provider, BuildConfig buildConfig, @Named(TaskExecutors.IO) ExecutorService ioExecutor) {
BuildStateStoreImpl(StateProvider<String, String> provider, BuildConfig buildConfig, @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor) {
super(provider, new MoshiEncodeStrategy<BuildEntry>() {})
this.buildConfig = buildConfig
this.ioExecutor = ioExecutor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class ContainerBuildServiceImpl implements ContainerBuildService, JobHandler<Bui
private BuildStateStore buildStore

@Inject
@Named(TaskExecutors.IO)
@Named(TaskExecutors.BLOCKING)
private ExecutorService executor

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Value
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.encoder.EncodingStrategy
import jakarta.inject.Inject
import jakarta.inject.Named
Expand Down Expand Up @@ -55,7 +56,7 @@ abstract class AbstractFutureStore<V> implements FutureStore<String,V> {
private volatile Duration pollInterval

@Inject
@Named('future-store-executor')
@Named(TaskExecutors.BLOCKING)
private ExecutorService executor

AbstractFutureStore(FutureHash<String> store, EncodingStrategy<V> encodingStrategy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class BuildLogServiceImpl implements BuildLogService {
private String condaLockPrefix

@Inject
@Named(TaskExecutors.IO)
@Named(TaskExecutors.BLOCKING)
private volatile ExecutorService ioExecutor

@PostConstruct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ContainerMirrorServiceImpl implements ContainerMirrorService, JobHandler<M
private JobService jobService

@Inject
@Named(TaskExecutors.IO)
@Named(TaskExecutors.BLOCKING)
private ExecutorService ioExecutor

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class ContainerScanServiceImpl implements ContainerScanService, JobHandler<ScanE
private ScanConfig config

@Inject
@Named(TaskExecutors.IO)
@Named(TaskExecutors.BLOCKING)
private ExecutorService executor

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ abstract class TowerConnector {
return ioExecutor
}

protected ExecutorService getIoExecutor() {
return ioExecutor
}

/**
* Generic async get with authorization
* that converts to the provided json model T
Expand Down
3 changes: 0 additions & 3 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ micronaut:
stream-executor:
type: FIXED
number-of-threads: 16
future-store-executor:
type : FIXED
number-of-threads : 32
netty:
event-loops:
stream-pool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class BuildStoreLocalTest extends Specification {
private BuildConfig buildConfig

@Inject
@Named(TaskExecutors.IO)
@Named(TaskExecutors.BLOCKING)
ExecutorService ioExecutor

BuildResult zeroResult = BuildResult.create('0')
Expand Down
Loading