From 328e9ea36238af533c316c1868fb8ab7960bca3d Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sun, 17 Nov 2024 21:07:53 +0100 Subject: [PATCH 01/12] Bump guava to version 33.3.1-jre Signed-off-by: Paolo Di Tommaso --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 7de40d237..289bb5c01 100644 --- a/build.gradle +++ b/build.gradle @@ -49,7 +49,7 @@ dependencies { implementation 'io.micronaut:micronaut-websocket' implementation 'org.apache.groovy:groovy-json' implementation 'org.apache.groovy:groovy-nio' - implementation 'com.google.guava:guava:32.1.2-jre' + implementation 'com.google.guava:guava:33.3.1-jre' implementation 'dev.failsafe:failsafe:3.1.0' implementation 'io.micronaut.reactor:micronaut-reactor' implementation 'io.micronaut.reactor:micronaut-reactor-http-client' From ffd0dacd14dc365cabb23a35a9cac4a0f06d37ae Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sun, 17 Nov 2024 18:55:27 +0100 Subject: [PATCH 02/12] Use runAsync instead supplyAsync Signed-off-by: Paolo Di Tommaso --- .../io/seqera/wave/filter/PullMetricsRequestsFilter.groovy | 4 ++-- .../service/builder/impl/ContainerBuildServiceImpl.groovy | 3 ++- .../wave/service/mirror/ContainerMirrorServiceImpl.groovy | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/filter/PullMetricsRequestsFilter.groovy b/src/main/groovy/io/seqera/wave/filter/PullMetricsRequestsFilter.groovy index da19fc7c6..bb70548ac 100644 --- a/src/main/groovy/io/seqera/wave/filter/PullMetricsRequestsFilter.groovy +++ b/src/main/groovy/io/seqera/wave/filter/PullMetricsRequestsFilter.groovy @@ -83,10 +83,10 @@ class PullMetricsRequestsFilter implements HttpServerFilter { final contentType = response.headers.get(HttpHeaders.CONTENT_TYPE) if( contentType && contentType in MANIFEST_TYPES ) { final route = routeHelper.parse(request.path) - CompletableFuture.supplyAsync(() -> metricsService.incrementPullsCounter(route.identity), executor) + CompletableFuture.runAsync(() -> metricsService.incrementPullsCounter(route.identity), executor) final version = route.request?.containerConfig?.fusionVersion() if (version) { - CompletableFuture.supplyAsync(() -> metricsService.incrementFusionPullsCounter(route.identity), executor) + CompletableFuture.runAsync(() -> metricsService.incrementFusionPullsCounter(route.identity), executor) } } } diff --git a/src/main/groovy/io/seqera/wave/service/builder/impl/ContainerBuildServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/builder/impl/ContainerBuildServiceImpl.groovy index 6dbe9f787..895a90210 100644 --- a/src/main/groovy/io/seqera/wave/service/builder/impl/ContainerBuildServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/builder/impl/ContainerBuildServiceImpl.groovy @@ -206,7 +206,8 @@ class ContainerBuildServiceImpl implements ContainerBuildService, JobHandler metricsService.incrementBuildsCounter(request.identity), executor) + CompletableFuture + .runAsync(() -> metricsService.incrementBuildsCounter(request.identity), executor) // launch the build async CompletableFuture diff --git a/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy index 42c4bafce..1126fe87d 100644 --- a/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy @@ -75,7 +75,7 @@ class ContainerMirrorServiceImpl implements ContainerMirrorService, JobHandler metricsService.incrementMirrorsCounter(request.identity), ioExecutor) + CompletableFuture.runAsync(() -> metricsService.incrementMirrorsCounter(request.identity), ioExecutor) jobService.launchMirror(request) return new BuildTrack(request.mirrorId, request.targetImage, false, null) } From 7af3046fef4ce5880a8cf9454dab7bc5f4d0344b Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sun, 17 Nov 2024 17:35:39 +0100 Subject: [PATCH 03/12] Remove deprecated ThreadPoolBuilder Signed-off-by: Paolo Di Tommaso --- .../seqera/wave/util/ThreadPoolBuilder.groovy | 176 ------------------ 1 file changed, 176 deletions(-) delete mode 100644 src/main/groovy/io/seqera/wave/util/ThreadPoolBuilder.groovy diff --git a/src/main/groovy/io/seqera/wave/util/ThreadPoolBuilder.groovy b/src/main/groovy/io/seqera/wave/util/ThreadPoolBuilder.groovy deleted file mode 100644 index 7afdf0e06..000000000 --- a/src/main/groovy/io/seqera/wave/util/ThreadPoolBuilder.groovy +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Wave, containers provisioning service - * Copyright (c) 2023-2024, Seqera Labs - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package io.seqera.wave.util - - - -import java.util.concurrent.BlockingQueue -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.RejectedExecutionHandler -import java.util.concurrent.ThreadFactory -import java.util.concurrent.ThreadPoolExecutor -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger - -import groovy.transform.CompileStatic -import groovy.util.logging.Slf4j -/** - * Builder class to create instance of {@link ThreadPoolExecutor} - * - * @author Paolo Di Tommaso - */ -@Slf4j -@CompileStatic -@Deprecated -class ThreadPoolBuilder { - - static AtomicInteger poolCount = new AtomicInteger() - - private String name - - private int minSize - - private int maxSize - - private BlockingQueue workQueue - - private int queueSize = -1 - - private Long keepAliveTime - - private RejectedExecutionHandler rejectionPolicy - - private ThreadFactory threadFactory - - private boolean allowCoreThreadTimeout - - String getName() { name } - - int getMinSize() { minSize } - - int getMaxSize() { maxSize } - - int getQueueSize() { queueSize } - - BlockingQueue getWorkQueue() { workQueue } - - Long getKeepAliveTime() { keepAliveTime } - - RejectedExecutionHandler getRejectionPolicy() { rejectionPolicy } - - ThreadFactory getThreadFactory() { threadFactory } - - boolean getAllowCoreThreadTimeout() { allowCoreThreadTimeout } - - ThreadPoolBuilder withName(String name) { - if( name ) { - this.name = name - this.threadFactory = new CustomThreadFactory(name) - } - return this - } - - ThreadPoolBuilder withThreadFactory(ThreadFactory threadFactory) { - assert !name || !threadFactory, "Property 'threadFactory' or 'name' was already set" - this.threadFactory = threadFactory - return this - } - - ThreadPoolBuilder withRejectionPolicy(RejectedExecutionHandler rejectionPolicy) { - this.rejectionPolicy = rejectionPolicy - return this - } - - ThreadPoolBuilder withMinSize(int min) { - this.minSize = min - return this - } - - ThreadPoolBuilder withMaxSize(int max) { - this.maxSize = max - return this - } - - ThreadPoolBuilder withQueueSize(int size) { - this.queueSize = size - this.workQueue = new LinkedBlockingQueue(size) - return this - } - - ThreadPoolBuilder withQueue(BlockingQueue workQueue) { - this.workQueue = workQueue - return this - } - - ThreadPoolBuilder withKeepAliveTime( long millis ) { - keepAliveTime = millis - return this - } - - ThreadPoolBuilder withAllowCoreThreadTimeout(boolean flag) { - this.allowCoreThreadTimeout = flag - return this - } - - ThreadPoolExecutor build() { - assert minSize <= maxSize - - if( !name ) - name = "nf-thread-pool-${poolCount.getAndIncrement()}" - - if(keepAliveTime==null) - keepAliveTime = 60_000 - if( workQueue==null ) - workQueue = new LinkedBlockingQueue<>() - if( rejectionPolicy==null ) - rejectionPolicy = new ThreadPoolExecutor.CallerRunsPolicy() - if( threadFactory==null ) - threadFactory = new CustomThreadFactory(name) - - log.debug "Creating thread pool '$name' minSize=$minSize; maxSize=$maxSize; workQueue=${workQueue.getClass().getSimpleName()}[${queueSize}]; allowCoreThreadTimeout=$allowCoreThreadTimeout" - - final result = new ThreadPoolExecutor( - minSize, - maxSize, - keepAliveTime, TimeUnit.MILLISECONDS, - workQueue, - threadFactory, - rejectionPolicy) - - result.allowCoreThreadTimeOut(allowCoreThreadTimeout) - - return result - } - - - static ThreadPoolExecutor io(String name=null) { - io(10, 100, 10_000, name) - } - - - static ThreadPoolExecutor io(int min, int max, int queue, String name=null) { - new ThreadPoolBuilder() - .withMinSize(min) - .withMaxSize(max) - .withQueueSize(queue) - .withName(name) - .build() - } - -} From cf813e0a8e0f6a0273edc0a6f3f5d051b0bc911f Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sun, 17 Nov 2024 22:14:36 +0100 Subject: [PATCH 04/12] Replace Guava cache with Caffeine (#745) Signed-off-by: Paolo Di Tommaso --- VERSION | 2 +- .../seqera/wave/auth/RegistryAuthServiceImpl.groovy | 13 ++++++------- .../wave/auth/RegistryLookupServiceImpl.groovy | 13 ++++++------- .../io/seqera/wave/service/aws/AwsEcrService.groovy | 8 ++++---- .../service/data/queue/AbstractMessageQueue.groovy | 6 +++--- .../tower/client/connector/TowerConnector.groovy | 10 +++++----- 6 files changed, 25 insertions(+), 27 deletions(-) diff --git a/VERSION b/VERSION index 63e799cf4..0b963338f 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.14.1 +1.14.2-B3 diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy index d8ea12d26..00dc5e9fa 100644 --- a/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy @@ -21,13 +21,12 @@ package io.seqera.wave.auth import java.net.http.HttpRequest import java.net.http.HttpResponse import java.time.Duration -import java.util.concurrent.ExecutionException +import java.util.concurrent.CompletionException import java.util.concurrent.TimeUnit -import com.google.common.cache.CacheBuilder -import com.google.common.cache.CacheLoader -import com.google.common.cache.LoadingCache -import com.google.common.util.concurrent.UncheckedExecutionException +import com.github.benmanes.caffeine.cache.CacheLoader +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.LoadingCache import groovy.json.JsonSlurper import groovy.transform.Canonical import groovy.transform.CompileStatic @@ -101,7 +100,7 @@ class RegistryAuthServiceImpl implements RegistryAuthService { return result } - private LoadingCache cacheTokens = CacheBuilder + private LoadingCache cacheTokens = Caffeine.newBuilder() .newBuilder() .maximumSize(10_000) .expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS) @@ -271,7 +270,7 @@ class RegistryAuthServiceImpl implements RegistryAuthService { try { return cacheTokens.get(key) } - catch (UncheckedExecutionException | ExecutionException e) { + catch (CompletionException e) { // this catches the exception thrown in the cache loader lookup // and throws the causing exception that should be `RegistryUnauthorizedAccessException` throw e.cause diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy index 0dc641879..58c41f617 100644 --- a/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy @@ -20,13 +20,12 @@ package io.seqera.wave.auth import java.net.http.HttpRequest import java.net.http.HttpResponse -import java.util.concurrent.ExecutionException +import java.util.concurrent.CompletionException import java.util.concurrent.TimeUnit -import com.google.common.cache.CacheBuilder -import com.google.common.cache.CacheLoader -import com.google.common.cache.LoadingCache -import com.google.common.util.concurrent.UncheckedExecutionException +import com.github.benmanes.caffeine.cache.CacheLoader +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.LoadingCache import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.seqera.wave.configuration.HttpClientConfig @@ -74,7 +73,7 @@ class RegistryLookupServiceImpl implements RegistryLookupService { } } - private LoadingCache cache = CacheBuilder + private LoadingCache cache = Caffeine.newBuilder() .newBuilder() .maximumSize(10_000) .expireAfterAccess(1, TimeUnit.HOURS) @@ -120,7 +119,7 @@ class RegistryLookupServiceImpl implements RegistryLookupService { final auth = cache.get(endpoint) return new RegistryInfo(registry, endpoint, auth) } - catch (UncheckedExecutionException | ExecutionException e) { + catch (CompletionException e) { // this catches the exception thrown in the cache loader lookup // and throws the causing exception that should be `RegistryUnauthorizedAccessException` throw e.cause diff --git a/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy b/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy index 17fdb3720..a85b1b354 100644 --- a/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy +++ b/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy @@ -21,9 +21,9 @@ package io.seqera.wave.service.aws import java.util.concurrent.TimeUnit import java.util.regex.Pattern -import com.google.common.cache.CacheBuilder -import com.google.common.cache.CacheLoader -import com.google.common.cache.LoadingCache +import com.github.benmanes.caffeine.cache.CacheLoader +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.LoadingCache import groovy.transform.Canonical import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -73,7 +73,7 @@ class AwsEcrService { } } - private LoadingCache cache = CacheBuilder + private LoadingCache cache = Caffeine.newBuilder() .newBuilder() .maximumSize(10_000) .expireAfterWrite(3, TimeUnit.HOURS) diff --git a/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy b/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy index 0f6ddc9a0..3a90e3108 100644 --- a/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy +++ b/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy @@ -23,8 +23,8 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger -import com.google.common.cache.Cache -import com.google.common.cache.CacheBuilder +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.Caffeine import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.micronaut.websocket.exceptions.WebSocketSessionException @@ -60,7 +60,7 @@ abstract class AbstractMessageQueue implements Runnable { final private String name0 - final private Cache closedClients = CacheBuilder + final private Cache closedClients = Caffeine.newBuilder() .newBuilder() .expireAfterWrite(10, TimeUnit.MINUTES) .build() diff --git a/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy b/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy index ec213628a..e4fa503ec 100644 --- a/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy +++ b/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy @@ -26,10 +26,10 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException import java.util.function.Function -import com.google.common.cache.Cache -import com.google.common.cache.CacheBuilder -import com.google.common.cache.CacheLoader -import com.google.common.cache.LoadingCache +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.CacheLoader +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.LoadingCache import groovy.transform.CompileDynamic import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -92,7 +92,7 @@ abstract class TowerConnector { } } - private LoadingCache> refreshCache = CacheBuilder> + private LoadingCache> refreshCache = Caffeine.newBuilder() .newBuilder() .expireAfterWrite(1, TimeUnit.MINUTES) .build(loader) From aaf0420cdc1c224cbcacc1283757a8aa5f2d856b Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 18 Nov 2024 07:46:09 +0100 Subject: [PATCH 05/12] Migration to virtual threads - phase 1 (#746) Signed-off-by: Paolo Di Tommaso --- build.gradle | 3 +-- .../wave/auth/RegistryAuthServiceImpl.groovy | 13 ++++++++----- .../wave/auth/RegistryLookupServiceImpl.groovy | 10 ++++++---- .../wave/controller/BuildController.groovy | 2 +- .../wave/controller/ContainerController.groovy | 6 +++--- .../wave/controller/InspectController.groovy | 2 +- .../wave/controller/MetricsController.groovy | 2 +- .../wave/controller/MirrorController.groovy | 2 +- .../controller/RegistryProxyController.groovy | 2 +- .../seqera/wave/controller/ScanController.groovy | 2 +- .../wave/controller/ServiceInfoController.groovy | 2 +- .../wave/controller/ValidateController.groovy | 2 +- .../seqera/wave/controller/ViewController.groovy | 2 +- .../seqera/wave/core/ContainerAugmenter.groovy | 3 +-- .../seqera/wave/core/RegistryProxyService.groovy | 13 +++++++++++-- .../io/seqera/wave/http/HttpClientFactory.groovy | 15 +++++++++++---- .../seqera/wave/service/aws/AwsEcrService.groovy | 10 ++++++---- .../data/queue/AbstractMessageQueue.groovy | 13 ++++++++----- .../data/stream/AbstractMessageStream.groovy | 4 ++++ .../io/seqera/wave/service/job/JobManager.groovy | 13 +++++++++---- .../pairing/socket/PairingWebSocket.groovy | 2 +- .../tower/client/connector/TowerConnector.groovy | 16 ++++++++++------ .../connector/WebSocketTowerConnector.groovy | 9 +-------- .../wave/service/job/JobManagerTest.groovy | 10 +++++----- 24 files changed, 94 insertions(+), 64 deletions(-) diff --git a/build.gradle b/build.gradle index 289bb5c01..56c9eb999 100644 --- a/build.gradle +++ b/build.gradle @@ -156,8 +156,7 @@ jib { run{ def envs = findProperty('micronautEnvs') - // note: "--enable-preview" is required to use virtual threads on Java 19 and 20 - def args = ["-Dmicronaut.environments=$envs","--enable-preview"] + def args = ["-Dmicronaut.environments=$envs","-Djdk.tracePinnedThreads=short"] if( environment['JVM_OPTS'] ) args.add(environment['JVM_OPTS']) jvmArgs args systemProperties 'DOCKER_USER': project.findProperty('DOCKER_USER') ?: environment['DOCKER_USER'], diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy index 00dc5e9fa..7ae087db7 100644 --- a/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy @@ -24,9 +24,9 @@ import java.time.Duration import java.util.concurrent.CompletionException import java.util.concurrent.TimeUnit +import com.github.benmanes.caffeine.cache.AsyncLoadingCache import com.github.benmanes.caffeine.cache.CacheLoader import com.github.benmanes.caffeine.cache.Caffeine -import com.github.benmanes.caffeine.cache.LoadingCache import groovy.json.JsonSlurper import groovy.transform.Canonical import groovy.transform.CompileStatic @@ -100,11 +100,12 @@ class RegistryAuthServiceImpl implements RegistryAuthService { return result } - private LoadingCache cacheTokens = Caffeine.newBuilder() + // FIXME https://github.com/seqeralabs/wave/issues/747 + private AsyncLoadingCache cacheTokens = Caffeine.newBuilder() .newBuilder() .maximumSize(10_000) .expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS) - .build(loader) + .buildAsync(loader) @Inject private RegistryLookupService lookupService @@ -268,7 +269,8 @@ class RegistryAuthServiceImpl implements RegistryAuthService { protected String getAuthToken(String image, RegistryAuth auth, RegistryCredentials creds) { final key = new CacheKey(image, auth, creds) try { - return cacheTokens.get(key) + // FIXME https://github.com/seqeralabs/wave/issues/747 + return cacheTokens.synchronous().get(key) } catch (CompletionException e) { // this catches the exception thrown in the cache loader lookup @@ -286,7 +288,8 @@ class RegistryAuthServiceImpl implements RegistryAuthService { */ void invalidateAuthorization(String image, RegistryAuth auth, RegistryCredentials creds) { final key = new CacheKey(image, auth, creds) - cacheTokens.invalidate(key) + // FIXME https://github.com/seqeralabs/wave/issues/747 + cacheTokens.synchronous().invalidate(key) tokenStore.remove(getStableKey(key)) } diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy index 58c41f617..2b15b8efa 100644 --- a/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy @@ -23,9 +23,9 @@ import java.net.http.HttpResponse import java.util.concurrent.CompletionException import java.util.concurrent.TimeUnit +import com.github.benmanes.caffeine.cache.AsyncLoadingCache import com.github.benmanes.caffeine.cache.CacheLoader import com.github.benmanes.caffeine.cache.Caffeine -import com.github.benmanes.caffeine.cache.LoadingCache import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.seqera.wave.configuration.HttpClientConfig @@ -73,11 +73,12 @@ class RegistryLookupServiceImpl implements RegistryLookupService { } } - private LoadingCache cache = Caffeine.newBuilder() + // FIXME https://github.com/seqeralabs/wave/issues/747 + private AsyncLoadingCache cache = Caffeine.newBuilder() .newBuilder() .maximumSize(10_000) .expireAfterAccess(1, TimeUnit.HOURS) - .build(loader) + .buildAsync(loader) protected RegistryAuth lookup0(URI endpoint) { final httpClient = HttpClientFactory.followRedirectsHttpClient() @@ -116,7 +117,8 @@ class RegistryLookupServiceImpl implements RegistryLookupService { RegistryInfo lookup(String registry) { try { final endpoint = registryEndpoint(registry) - final auth = cache.get(endpoint) + // FIXME https://github.com/seqeralabs/wave/issues/747 + final auth = cache.synchronous().get(endpoint) return new RegistryInfo(registry, endpoint, auth) } catch (CompletionException e) { diff --git a/src/main/groovy/io/seqera/wave/controller/BuildController.groovy b/src/main/groovy/io/seqera/wave/controller/BuildController.groovy index 06679fa4a..f99549087 100644 --- a/src/main/groovy/io/seqera/wave/controller/BuildController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/BuildController.groovy @@ -42,7 +42,7 @@ import jakarta.inject.Inject @Slf4j @CompileStatic @Controller("/") -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class BuildController { @Inject diff --git a/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy b/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy index 10f71ddb4..a65568c40 100644 --- a/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy @@ -103,7 +103,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture @Slf4j @CompileStatic @Controller("/") -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class ContainerController { @Inject @@ -181,13 +181,13 @@ class ContainerController { @Deprecated @Post('/container-token') - @ExecuteOn(TaskExecutors.IO) + @ExecuteOn(TaskExecutors.BLOCKING) CompletableFuture> getToken(HttpRequest httpRequest, @Body SubmitContainerTokenRequest req) { return getContainerImpl(httpRequest, req, false) } @Post('/v1alpha2/container') - @ExecuteOn(TaskExecutors.IO) + @ExecuteOn(TaskExecutors.BLOCKING) CompletableFuture> getTokenV2(HttpRequest httpRequest, @Body SubmitContainerTokenRequest req) { return getContainerImpl(httpRequest, req, true) } diff --git a/src/main/groovy/io/seqera/wave/controller/InspectController.groovy b/src/main/groovy/io/seqera/wave/controller/InspectController.groovy index 35e50bc58..6093d4dfb 100644 --- a/src/main/groovy/io/seqera/wave/controller/InspectController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/InspectController.groovy @@ -50,7 +50,7 @@ import static io.seqera.wave.util.ContainerHelper.patchPlatformEndpoint @Slf4j @CompileStatic @Controller("/") -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class InspectController { @Inject diff --git a/src/main/groovy/io/seqera/wave/controller/MetricsController.groovy b/src/main/groovy/io/seqera/wave/controller/MetricsController.groovy index a9ce6f89a..f8008674e 100644 --- a/src/main/groovy/io/seqera/wave/controller/MetricsController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/MetricsController.groovy @@ -52,7 +52,7 @@ import static io.micronaut.http.HttpHeaders.WWW_AUTHENTICATE @Requires(property = 'wave.metrics.enabled', value = 'true') @Secured(SecurityRule.IS_AUTHENTICATED) @Controller -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class MetricsController { @Inject diff --git a/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy b/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy index 8aaf87a51..c9c3bc8f7 100644 --- a/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy @@ -36,7 +36,7 @@ import jakarta.inject.Inject @Slf4j @CompileStatic @Controller("/") -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class MirrorController { @Inject diff --git a/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy b/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy index ec6416537..263763299 100644 --- a/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy @@ -67,7 +67,7 @@ import reactor.core.publisher.Mono @Slf4j @CompileStatic @Controller("/v2") -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class RegistryProxyController { @Inject diff --git a/src/main/groovy/io/seqera/wave/controller/ScanController.groovy b/src/main/groovy/io/seqera/wave/controller/ScanController.groovy index 230a8b4a9..43fc38a5d 100644 --- a/src/main/groovy/io/seqera/wave/controller/ScanController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/ScanController.groovy @@ -39,7 +39,7 @@ import jakarta.inject.Inject @CompileStatic @Requires(bean = ContainerScanService) @Controller("/") -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class ScanController { @Inject diff --git a/src/main/groovy/io/seqera/wave/controller/ServiceInfoController.groovy b/src/main/groovy/io/seqera/wave/controller/ServiceInfoController.groovy index 207c6e8d5..7f0a895e4 100644 --- a/src/main/groovy/io/seqera/wave/controller/ServiceInfoController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/ServiceInfoController.groovy @@ -39,7 +39,7 @@ import io.seqera.wave.util.BuildInfo @Slf4j @Controller("/") @CompileStatic -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class ServiceInfoController { @Value('${wave.landing.url}') diff --git a/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy b/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy index f7136f572..fd23fbdc6 100644 --- a/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy @@ -26,8 +26,8 @@ import io.seqera.wave.auth.RegistryAuthService import jakarta.inject.Inject import jakarta.validation.Valid -@ExecuteOn(TaskExecutors.IO) @Controller("/validate-creds") +@ExecuteOn(TaskExecutors.BLOCKING) class ValidateController { @Inject RegistryAuthService loginService diff --git a/src/main/groovy/io/seqera/wave/controller/ViewController.groovy b/src/main/groovy/io/seqera/wave/controller/ViewController.groovy index d30db91d8..68328eb36 100644 --- a/src/main/groovy/io/seqera/wave/controller/ViewController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/ViewController.groovy @@ -59,7 +59,7 @@ import static io.seqera.wave.util.DataTimeUtils.formatTimestamp @Slf4j @CompileStatic @Controller("/view") -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class ViewController { @Inject diff --git a/src/main/groovy/io/seqera/wave/core/ContainerAugmenter.groovy b/src/main/groovy/io/seqera/wave/core/ContainerAugmenter.groovy index 909cc8d5d..bd3837e87 100644 --- a/src/main/groovy/io/seqera/wave/core/ContainerAugmenter.groovy +++ b/src/main/groovy/io/seqera/wave/core/ContainerAugmenter.groovy @@ -280,7 +280,7 @@ class ContainerAugmenter { return result } - synchronized protected Map layerBlob(String image, ContainerLayer layer) { + protected Map layerBlob(String image, ContainerLayer layer) { log.debug "Adding layer: $layer to image: $client.registry.name/$image" // store the layer blob in the cache final String path = "$client.registry.name/v2/$image/blobs/$layer.gzipDigest" @@ -295,7 +295,6 @@ class ContainerAugmenter { protected Tuple2 updateImageManifest(String imageName, String imageManifest, String newImageConfigDigest, newImageConfigSize, boolean oci) { - // turn the json string into a json map // and append the new layer final manifest = (Map) new JsonSlurper().parseText(imageManifest) diff --git a/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy b/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy index 32cf49604..01ed6bd94 100644 --- a/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy +++ b/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy @@ -18,6 +18,8 @@ package io.seqera.wave.core +import java.util.concurrent.CompletableFuture + import groovy.transform.CompileStatic import groovy.transform.ToString import groovy.util.logging.Slf4j @@ -193,7 +195,7 @@ class RegistryProxyService { String getImageDigest(String containerImage, PlatformId identity, boolean retryOnNotFound=false) { try { - return getImageDigest0(containerImage, identity, retryOnNotFound) + return getImageDigest0(containerImage, identity, retryOnNotFound).get() } catch(Exception e) { log.warn "Unable to retrieve digest for image '${containerImage}' -- cause: ${e.message}" @@ -203,8 +205,15 @@ class RegistryProxyService { static private List RETRY_ON_NOT_FOUND = HTTP_RETRYABLE_ERRORS + 404 + // note: return a CompletableFuture to force micronaut to use caffeine AsyncCache + // that provides a workaround about the use of virtual threads with SyncCache + // see https://github.com/ben-manes/caffeine/issues/1468#issuecomment-1906733926 @Cacheable(value = 'cache-registry-proxy', atomic = true, parameters = ['image']) - protected String getImageDigest0(String image, PlatformId identity, boolean retryOnNotFound) { + protected CompletableFuture getImageDigest0(String image, PlatformId identity, boolean retryOnNotFound) { + CompletableFuture.completedFuture(getImageDigest1(image, identity, retryOnNotFound)) + } + + protected String getImageDigest1(String image, PlatformId identity, boolean retryOnNotFound) { final coords = ContainerCoordinates.parse(image) final route = RoutePath.v2manifestPath(coords, identity) final proxyClient = client(route) diff --git a/src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy b/src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy index f1301037e..46c9730d2 100644 --- a/src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy +++ b/src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy @@ -22,6 +22,7 @@ import java.net.http.HttpClient import java.time.Duration import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.locks.ReentrantLock import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -39,9 +40,9 @@ class HttpClientFactory { static private Duration timeout = Duration.ofSeconds(20) - static private final Object l1 = new Object() + static private final ReentrantLock l1 = new ReentrantLock() - static private final Object l2 = new Object() + static private final ReentrantLock l2 = new ReentrantLock() private static HttpClient client1 @@ -51,20 +52,26 @@ class HttpClientFactory { static HttpClient followRedirectsHttpClient() { if( client1!=null ) return client1 - synchronized (l1) { + l1.lock() + try { if( client1!=null ) return client1 return client1=followRedirectsHttpClient0() + } finally { + l1.unlock() } } static HttpClient neverRedirectsHttpClient() { if( client2!=null ) return client2 - synchronized (l2) { + l2.lock() + try { if( client2!=null ) return client2 return client2=neverRedirectsHttpClient0() + } finally { + l2.unlock() } } diff --git a/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy b/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy index a85b1b354..30f9d0e04 100644 --- a/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy +++ b/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy @@ -21,9 +21,9 @@ package io.seqera.wave.service.aws import java.util.concurrent.TimeUnit import java.util.regex.Pattern +import com.github.benmanes.caffeine.cache.AsyncLoadingCache import com.github.benmanes.caffeine.cache.CacheLoader import com.github.benmanes.caffeine.cache.Caffeine -import com.github.benmanes.caffeine.cache.LoadingCache import groovy.transform.Canonical import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -73,11 +73,12 @@ class AwsEcrService { } } - private LoadingCache cache = Caffeine.newBuilder() + // FIXME https://github.com/seqeralabs/wave/issues/747 + private AsyncLoadingCache cache = Caffeine.newBuilder() .newBuilder() .maximumSize(10_000) .expireAfterWrite(3, TimeUnit.HOURS) - .build(loader) + .buildAsync(loader) private EcrClient ecrClient(String accessKey, String secretKey, String region) { @@ -126,7 +127,8 @@ class AwsEcrService { try { // get the token from the cache, if missing the it's automatically // fetch using the AWS ECR client - return cache.get(new AwsCreds(accessKey,secretKey,region,isPublic)) + // FIXME https://github.com/seqeralabs/wave/issues/747 + return cache.synchronous().get(new AwsCreds(accessKey,secretKey,region,isPublic)) } catch (Exception e) { final type = isPublic ? "ECR public" : "ECR" diff --git a/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy b/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy index 3a90e3108..8c2d3fa2c 100644 --- a/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy +++ b/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy @@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger -import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.AsyncCache import com.github.benmanes.caffeine.cache.Caffeine import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -60,10 +60,11 @@ abstract class AbstractMessageQueue implements Runnable { final private String name0 - final private Cache closedClients = Caffeine.newBuilder() + // FIXME https://github.com/seqeralabs/wave/issues/747 + final private AsyncCache closedClients = Caffeine.newBuilder() .newBuilder() .expireAfterWrite(10, TimeUnit.MINUTES) - .build() + .buildAsync() AbstractMessageQueue(MessageQueue broker) { final type = TypeHelper.getGenericType(this, 0) @@ -149,13 +150,15 @@ abstract class AbstractMessageQueue implements Runnable { @Override void run() { + // FIXME https://github.com/seqeralabs/wave/issues/747 + final clientsCache = closedClients.synchronous() while( !thread.isInterrupted() ) { try { int sent=0 final clients = new HashMap>(this.clients) for( Map.Entry> entry : clients ) { // ignore clients marked as closed - if( closedClients.getIfPresent(entry.key)) + if( clientsCache.getIfPresent(entry.key)) continue // infer the target queue from the client key final target = targetFromClientKey(entry.key) @@ -173,7 +176,7 @@ abstract class AbstractMessageQueue implements Runnable { // offer back the value to be processed again broker.offer(target, value) if( e.message?.contains('close') ) { - closedClients.put(entry.key, true) + clientsCache.put(entry.key, true) } } } diff --git a/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy b/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy index 560a81306..ae16d7140 100644 --- a/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy +++ b/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy @@ -98,6 +98,10 @@ abstract class AbstractMessageStream implements Closeable { * The {@link Predicate} to be invoked when a stream message is consumed (read from) the stream. */ void addConsumer(String streamId, MessageConsumer consumer) { + // the use of synchronized block is meant to prevent a race condition while + // updating the 'listeners' from concurrent invocations. + // however, considering the addConsumer is invoked during the initialization phase + // (and therefore in the same thread) in should not be really needed. synchronized (listeners) { if( listeners.containsKey(streamId)) throw new IllegalStateException("Only one consumer can be defined for each stream - offending streamId=$streamId; consumer=$consumer") diff --git a/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy b/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy index 1c4ec6c03..e10cde8d4 100644 --- a/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy +++ b/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy @@ -21,6 +21,7 @@ package io.seqera.wave.service.job import java.time.Duration import java.time.Instant +import com.github.benmanes.caffeine.cache.AsyncCache import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine import groovy.transform.CompileStatic @@ -50,16 +51,19 @@ class JobManager { @Inject private JobConfig config - private Cache debounceCache + // FIXME https://github.com/seqeralabs/wave/issues/747 + private AsyncCache debounceCache @PostConstruct void init() { log.info "Creating job manager - config=$config" - debounceCache = Caffeine.newBuilder().expireAfterWrite(config.graceInterval.multipliedBy(2)).build() + debounceCache = Caffeine + .newBuilder() + .expireAfterWrite(config.graceInterval.multipliedBy(2)) + .buildAsync() queue.addConsumer((job)-> processJob(job)) } - protected boolean processJob(JobSpec jobSpec) { try { return processJob0(jobSpec) @@ -73,7 +77,8 @@ class JobManager { } protected JobState state(JobSpec job) { - return state0(job, config.graceInterval, debounceCache) + // FIXME https://github.com/seqeralabs/wave/issues/747 + return state0(job, config.graceInterval, debounceCache.synchronous()) } protected JobState state0(final JobSpec job, final Duration graceInterval, final Cache cache) { diff --git a/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingWebSocket.groovy b/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingWebSocket.groovy index a4f7b0785..3e6682fa5 100644 --- a/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingWebSocket.groovy +++ b/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingWebSocket.groovy @@ -49,7 +49,7 @@ import static io.seqera.wave.util.LongRndKey.rndHex @Slf4j @CompileStatic @Singleton -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) @ServerWebSocket("/pairing/{service}/token/{token}{?endpoint}") class PairingWebSocket { diff --git a/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy b/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy index e4fa503ec..27e12d87f 100644 --- a/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy +++ b/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy @@ -26,10 +26,10 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException import java.util.function.Function +import com.github.benmanes.caffeine.cache.AsyncLoadingCache import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.CacheLoader import com.github.benmanes.caffeine.cache.Caffeine -import com.github.benmanes.caffeine.cache.LoadingCache import groovy.transform.CompileDynamic import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -82,7 +82,7 @@ abstract class TowerConnector { private SpillwayRateLimiter limiter @Inject - @Named(TaskExecutors.IO) + @Named(TaskExecutors.BLOCKING) private volatile ExecutorService ioExecutor private CacheLoader> loader = new CacheLoader>() { @@ -92,14 +92,18 @@ abstract class TowerConnector { } } - private LoadingCache> refreshCache = Caffeine.newBuilder() + private AsyncLoadingCache> refreshCache = Caffeine .newBuilder() .expireAfterWrite(1, TimeUnit.MINUTES) - .build(loader) + .buildAsync(loader) /** Only for testing - do not use */ Cache> refreshCache0() { - return refreshCache + return refreshCache.synchronous() + } + + protected ExecutorService getIoExecutor() { + return ioExecutor } /** @@ -242,7 +246,7 @@ abstract class TowerConnector { * @return The refreshed {@link JwtAuth} object */ protected CompletableFuture refreshJwtToken(String endpoint, JwtAuth auth) { - return refreshCache.get(new JwtRefreshParams(endpoint,auth)) + return refreshCache.synchronous().get(new JwtRefreshParams(endpoint,auth)) } protected CompletableFuture refreshJwtToken0(String endpoint, JwtAuth auth) { diff --git a/src/main/groovy/io/seqera/wave/tower/client/connector/WebSocketTowerConnector.groovy b/src/main/groovy/io/seqera/wave/tower/client/connector/WebSocketTowerConnector.groovy index c52646638..04c591a09 100644 --- a/src/main/groovy/io/seqera/wave/tower/client/connector/WebSocketTowerConnector.groovy +++ b/src/main/groovy/io/seqera/wave/tower/client/connector/WebSocketTowerConnector.groovy @@ -19,19 +19,16 @@ package io.seqera.wave.tower.client.connector import java.util.concurrent.CompletableFuture -import java.util.concurrent.ExecutorService import java.util.function.Function import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.micronaut.context.annotation.Requires -import io.micronaut.scheduling.TaskExecutors import io.seqera.wave.service.pairing.socket.PairingChannel import io.seqera.wave.service.pairing.socket.msg.PairingMessage import io.seqera.wave.service.pairing.socket.msg.ProxyHttpRequest import io.seqera.wave.service.pairing.socket.msg.ProxyHttpResponse import jakarta.inject.Inject -import jakarta.inject.Named import jakarta.inject.Singleton import static io.seqera.wave.service.pairing.PairingService.TOWER_SERVICE /** @@ -49,15 +46,11 @@ class WebSocketTowerConnector extends TowerConnector { @Inject private PairingChannel channel - @Inject - @Named(TaskExecutors.IO) - private ExecutorService ioExecutor - @Override CompletableFuture sendAsync(String endpoint, ProxyHttpRequest request) { return channel .sendRequest(TOWER_SERVICE, endpoint, request) - .thenApplyAsync(Function.identity() as Function, ioExecutor) + .thenApplyAsync(Function.identity() as Function, getIoExecutor()) } } diff --git a/src/test/groovy/io/seqera/wave/service/job/JobManagerTest.groovy b/src/test/groovy/io/seqera/wave/service/job/JobManagerTest.groovy index f58513a13..04ea45d71 100644 --- a/src/test/groovy/io/seqera/wave/service/job/JobManagerTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/job/JobManagerTest.groovy @@ -26,7 +26,6 @@ import java.time.Instant import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine - /** * * @author Munish Chouhan @@ -38,7 +37,7 @@ class JobManagerTest extends Specification { def jobService = Mock(JobService) def jobDispatcher = Mock(JobDispatcher) def config = new JobConfig(graceInterval: Duration.ofMillis(1)) - def cache = Caffeine.newBuilder().build() + def cache = Caffeine.newBuilder().buildAsync() def manager = new JobManager(jobService: jobService, dispatcher: jobDispatcher, config: config, debounceCache: cache) and: def jobSpec = JobSpec.transfer('foo', 'scheduler-1', Instant.now(), Duration.ofMinutes(10)) @@ -57,7 +56,8 @@ class JobManagerTest extends Specification { def jobService = Mock(JobService) def jobDispatcher = Mock(JobDispatcher) def config = new JobConfig(graceInterval: Duration.ofMillis(1)) - def manager = new JobManager(jobService: jobService, dispatcher: jobDispatcher, config: config) + def cache = Caffeine.newBuilder().buildAsync() + def manager = new JobManager(jobService: jobService, dispatcher: jobDispatcher, config: config, debounceCache: cache) and: def jobSpec = JobSpec.transfer('foo', 'scheduler-1', Instant.now(), Duration.ofMinutes(10)) @@ -75,7 +75,7 @@ class JobManagerTest extends Specification { def jobService = Mock(JobService) def jobDispatcher = Mock(JobDispatcher) def config = new JobConfig(graceInterval: Duration.ofMillis(1)) - def cache = Caffeine.newBuilder().build() + def cache = Caffeine.newBuilder().buildAsync() def manager = new JobManager(jobService: jobService, dispatcher: jobDispatcher, config:config, debounceCache: cache) and: def jobSpec = JobSpec.transfer('foo', 'scheduler-1', Instant.now() - Duration.ofMinutes(5), Duration.ofMinutes(2)) @@ -94,7 +94,7 @@ class JobManagerTest extends Specification { def jobService = Mock(JobService) def jobDispatcher = Mock(JobDispatcher) def config = new JobConfig(graceInterval: Duration.ofMillis(1)) - def cache = Caffeine.newBuilder().build() + def cache = Caffeine.newBuilder().buildAsync() def manager = new JobManager(jobService: jobService, dispatcher: jobDispatcher, config: config, debounceCache: cache) and: def jobSpec = JobSpec.transfer('foo', 'scheduler-1', Instant.now().minus(Duration.ofMillis(500)), Duration.ofMinutes(10)) From ec9ae8334d19528ca068b2c13c725ec86e5319b9 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 18 Nov 2024 08:03:54 +0100 Subject: [PATCH 06/12] [release] bump version 1.15.0 Signed-off-by: Paolo Di Tommaso --- VERSION | 2 +- changelog.txt | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 0b963338f..141f2e805 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.14.2-B3 +1.15.0 diff --git a/changelog.txt b/changelog.txt index 7190df430..5e5b7fdad 100644 --- a/changelog.txt +++ b/changelog.txt @@ -1,4 +1,14 @@ # Wave changelog +1.15.0 - 18 Nov 2024 +- Migration to virtual threads - phase 1 (#746) [aaf0420c] +- Use runAsync instead supplyAsync [ffd0dacd] +- Remove deprecated ThreadPoolBuilder [7af3046f] +- Replace Guava cache with Caffeine (#745) [cf813e0a] +- Update project deps [f24b684d] +- Bump guava to version 33.3.1-jre [328e9ea3] +- Bump Netty version 4.1.115.Final [9ba433ce] +- Bump gradle 8.10.2 [52272fe1] + 1.14.1 - 14 Nov 2024 - Fix creds validation endpoint (#740) [8c0f3a4c] From 080d5cce435ba3b6986ea175545b5b165b370add Mon Sep 17 00:00:00 2001 From: Justine Geffen Date: Mon, 18 Nov 2024 15:30:19 +0200 Subject: [PATCH 07/12] Switched env vars around (#748) Switched env vars around for accuracy. --- docs/cli/index.mdx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/cli/index.mdx b/docs/cli/index.mdx index 6581f628d..45f1239e2 100644 --- a/docs/cli/index.mdx +++ b/docs/cli/index.mdx @@ -27,8 +27,8 @@ The following CLI arguments are available for Seqera Platform integration: The following environment variables are available for Seqera Platform integration: -- `TOWER_API_ENDPOINT`: A Seqera Platform auth token so that Wave can access your private registry credentials. -- `TOWER_ACCESS_TOKEN`: For Enterprise customers, the URL endpoint for your instance, such as `https://api.cloud.seqera.io`. +- `TOWER_ACCESS_TOKEN`: A Seqera Platform auth token so that Wave can access your private registry credentials. +- `TOWER_API_ENDPOINT`: For Enterprise customers, the URL endpoint for your instance, such as `https://api.cloud.seqera.io`. - `TOWER_WORKSPACE_ID`: A Seqera Platform workspace ID, such as `1234567890`, where credentials may be stored. ## Usage limits From e24ec62c6379928590ddefdefe5bb0a883d652d2 Mon Sep 17 00:00:00 2001 From: Munish Chouhan Date: Wed, 20 Nov 2024 16:51:18 +0100 Subject: [PATCH 08/12] Add /v1alpha2/validate-creds (#752) Signed-off-by: munishchouhan --- .../wave/controller/ValidateController.groovy | 11 +++- .../ValidateCredsControllerTest.groovy | 63 +++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy b/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy index fd23fbdc6..cebc541b5 100644 --- a/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy @@ -18,6 +18,7 @@ package io.seqera.wave.controller +import io.micronaut.http.annotation.Body import io.micronaut.http.annotation.Controller import io.micronaut.http.annotation.Post import io.micronaut.scheduling.TaskExecutors @@ -26,15 +27,21 @@ import io.seqera.wave.auth.RegistryAuthService import jakarta.inject.Inject import jakarta.validation.Valid -@Controller("/validate-creds") +@Controller("/") @ExecuteOn(TaskExecutors.BLOCKING) class ValidateController { @Inject RegistryAuthService loginService - @Post + @Deprecated + @Post("/validate-creds") Boolean validateCreds(@Valid ValidateRegistryCredsRequest request){ loginService.validateUser(request.registry, request.userName, request.password) } + @Post("/v1alpha2/validate-creds") + Boolean validateCredsV2(@Valid @Body ValidateRegistryCredsRequest request){ + loginService.validateUser(request.registry, request.userName, request.password) + } + } diff --git a/src/test/groovy/io/seqera/wave/controller/ValidateCredsControllerTest.groovy b/src/test/groovy/io/seqera/wave/controller/ValidateCredsControllerTest.groovy index 586d48616..884728787 100644 --- a/src/test/groovy/io/seqera/wave/controller/ValidateCredsControllerTest.groovy +++ b/src/test/groovy/io/seqera/wave/controller/ValidateCredsControllerTest.groovy @@ -127,4 +127,67 @@ class ValidateCredsControllerTest extends Specification implements SecureDockerR 'nope' | 'yepes' | "https://quay.io" | false 'test' | 'test' | 'test' | true } + + void 'should validate username required'() { + when: + HttpRequest request = HttpRequest.POST("/v1alpha2/validate-creds", [ + password: 'test', + ]) + client.toBlocking().exchange(request, Boolean) + then: + def e = thrown(HttpClientResponseException) + } + + void 'should validate pwd required'() { + when: + HttpRequest request = HttpRequest.POST("/v1alpha2/validate-creds", [ + userName: 'test', + ]) + client.toBlocking().exchange(request, Boolean) + then: + def e = thrown(HttpClientResponseException) + } + + void 'should validate the test user'() { + given: + def req = [ + userName:'test', + password:'test', + registry: getTestRegistryUrl('test') ] + and: + HttpRequest request = HttpRequest.POST("/v1alpha2/validate-creds", req) + when: + HttpResponse response = client.toBlocking().exchange(request, Boolean) + then: + response.status() == HttpStatus.OK + and: + response.body() + } + + void 'test validateController valid login'() { + given: + def req = [ + userName: USER, + password: PWD, + registry: getTestRegistryUrl(REGISTRY_URL) + ] + HttpRequest request = HttpRequest.POST("/v1alpha2/validate-creds", req) + when: + HttpResponse response = client.toBlocking().exchange(request, Boolean) + + then: + response.status() == HttpStatus.OK + and: + response.body() == VALID + + where: + USER | PWD | REGISTRY_URL | VALID + 'test' | 'test' | 'test' | true + 'nope' | 'yepes' | 'test' | false + dockerUsername | dockerPassword | "https://registry-1.docker.io" | true + 'nope' | 'yepes' | "https://registry-1.docker.io" | false + quayUsername | quayPassword | "https://quay.io" | true + 'nope' | 'yepes' | "https://quay.io" | false + 'test' | 'test' | 'test' | true + } } From 86ef526c6b1a86693d5d7293e64ec09716ffe1b7 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Wed, 20 Nov 2024 16:53:20 +0100 Subject: [PATCH 09/12] Check block existence with object operation (#750) Signed-off-by: Paolo Di Tommaso Co-authored-by: Munish Chouhan --- .../blob/impl/BlobCacheServiceImpl.groovy | 50 ++++++++++--------- .../blob/BlobStateStoreImplTest.groovy | 8 +++ 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy index 606a4ecd3..c6c3b61d7 100644 --- a/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy @@ -17,9 +17,6 @@ */ package io.seqera.wave.service.blob.impl -import java.net.http.HttpClient -import java.net.http.HttpRequest -import java.net.http.HttpResponse import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -29,7 +26,6 @@ import io.seqera.wave.configuration.BlobCacheConfig import io.seqera.wave.configuration.HttpClientConfig import io.seqera.wave.core.RegistryProxyService import io.seqera.wave.core.RoutePath -import io.seqera.wave.http.HttpClientFactory import io.seqera.wave.service.blob.BlobCacheService import io.seqera.wave.service.blob.BlobEntry import io.seqera.wave.service.blob.BlobSigningService @@ -38,14 +34,16 @@ import io.seqera.wave.service.job.JobHandler import io.seqera.wave.service.job.JobService import io.seqera.wave.service.job.JobSpec import io.seqera.wave.service.job.JobState +import io.seqera.wave.util.BucketTokenizer import io.seqera.wave.util.Escape -import io.seqera.wave.util.Retryable import io.seqera.wave.util.StringUtils import jakarta.annotation.PostConstruct import jakarta.inject.Inject import jakarta.inject.Named import jakarta.inject.Singleton -import static io.seqera.wave.WaveDefault.HTTP_SERVER_ERRORS +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.model.HeadObjectRequest +import software.amazon.awssdk.services.s3.model.S3Exception /** * Implements cache for container image layer blobs * @@ -79,11 +77,12 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler { @Inject private HttpClientConfig httpConfig - private HttpClient httpClient + @Inject + @Named('BlobS3Client') + private S3Client s3Client @PostConstruct private void init() { - httpClient = HttpClientFactory.followRedirectsHttpClient() log.info "Creating Blob cache service - $blobConfig" } @@ -98,7 +97,7 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler { // therefore it's safe to check and return directly // if it exists (no risk of returning a partial upload) // https://developers.cloudflare.com/r2/reference/consistency/ - if( blobExists(info.locationUri) && !debug ) { + if( blobExists(info.objectUri) && !debug ) { log.debug "== Blob cache exists for object '${info.locationUri}'" return info.cached() } @@ -113,21 +112,24 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler { return result?.withLocation(locationUri) } - protected boolean blobExists(String uri) { - final request = HttpRequest - .newBuilder(new URI(uri)) - .method("HEAD", HttpRequest.BodyPublishers.noBody()) - .build() - - // retry strategy - final retryable = Retryable - .>of(httpConfig) - .retryIf((response) -> response.statusCode() in HTTP_SERVER_ERRORS) - .onRetry((event) -> log.warn("Unable to connect '$uri' - event: $event")) - - // submit the request - final resp = retryable.apply(()-> httpClient.send(request, HttpResponse.BodyHandlers.ofString())) - return resp.statusCode() == 200 + protected boolean blobExists(String blobLocation) { + try { + final object = BucketTokenizer.from(blobLocation) + final request = HeadObjectRequest + .builder() + .bucket(object.bucket) + .key(object.key) + .build() as HeadObjectRequest + // Execute the request + s3Client.headObject(request) + return true + } + catch (S3Exception e) { + if (e.statusCode() != 404) { + log.error "Unexpected response=${e.statusCode()} checking existence for object=${blobLocation} - cause: ${e.message}" + } + return false + } } /** diff --git a/src/test/groovy/io/seqera/wave/service/blob/BlobStateStoreImplTest.groovy b/src/test/groovy/io/seqera/wave/service/blob/BlobStateStoreImplTest.groovy index 61ad3465f..2898dfd29 100644 --- a/src/test/groovy/io/seqera/wave/service/blob/BlobStateStoreImplTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/blob/BlobStateStoreImplTest.groovy @@ -23,10 +23,14 @@ import spock.lang.Specification import java.time.Duration import io.micronaut.context.annotation.Property +import io.micronaut.test.annotation.MockBean import io.micronaut.test.extensions.spock.annotation.MicronautTest import io.seqera.wave.configuration.BlobCacheConfig import io.seqera.wave.store.state.impl.StateProvider import jakarta.inject.Inject +import jakarta.inject.Named +import software.amazon.awssdk.services.s3.S3Client + /** * * @author Paolo Di Tommaso @@ -37,6 +41,10 @@ import jakarta.inject.Inject @MicronautTest class BlobStateStoreImplTest extends Specification { + @MockBean(S3Client) + @Named('BlobS3Client') + S3Client mockS3Client() { Mock(S3Client) } + @Inject BlobStoreImpl store From 84b13df279e79ab7f6e54d86e3cdc6607c108439 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Wed, 20 Nov 2024 18:44:05 +0100 Subject: [PATCH 10/12] [release] Bump version 1.15.1 Signed-off-by: Paolo Di Tommaso --- VERSION | 2 +- changelog.txt | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 141f2e805..ace44233b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.15.0 +1.15.1 diff --git a/changelog.txt b/changelog.txt index 5e5b7fdad..3aa45f8dd 100644 --- a/changelog.txt +++ b/changelog.txt @@ -1,4 +1,9 @@ # Wave changelog +1.15.1 - 20 Nov 2024 +- Check block existence with object operation (#750) [86ef526c] +- Add /v1alpha2/validate-creds (#752) [e24ec62c] +- Switched env vars around (#748) [080d5cce] + 1.15.0 - 18 Nov 2024 - Migration to virtual threads - phase 1 (#746) [aaf0420c] - Use runAsync instead supplyAsync [ffd0dacd] From 63ff591465870797d2930672bcddbb26f5022c16 Mon Sep 17 00:00:00 2001 From: Munish Chouhan Date: Thu, 21 Nov 2024 06:58:15 +0100 Subject: [PATCH 11/12] Add /v1alpha2/validate-creds in Typespec (#754) [ci skip] Signed-off-by: munishchouhan --- typespec/routes.tsp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/typespec/routes.tsp b/typespec/routes.tsp index 8e6f8d957..feb9d2045 100644 --- a/typespec/routes.tsp +++ b/typespec/routes.tsp @@ -107,8 +107,8 @@ namespace wave { }; } - @route("validate-creds") - @post op validateCreds(@body request: ValidateRegistryCredsRequest): boolean; + @route("/v1alpha2/validate-creds") + @post op validateCredsV2(@body request: ValidateRegistryCredsRequest): boolean; @route("/v1alpha1/mirrors") interface getMirrorRecord { From bf63599d77af23c6ceab58ea80b9ca65c3e17679 Mon Sep 17 00:00:00 2001 From: Munish Chouhan Date: Fri, 22 Nov 2024 14:19:58 +0100 Subject: [PATCH 12/12] Support Redis support for SSL and password (#717) Signed-off-by: munishchouhan Signed-off-by: Paolo Di Tommaso Co-authored-by: Paolo Di Tommaso --- configuration.md | 10 +++ .../impl/SpillWayStorageFactory.groovy | 7 +-- .../io/seqera/wave/redis/RedisFactory.groovy | 37 +++++++++-- .../seqera/wave/redis/RedisFactoryTest.groovy | 62 +++++++++++++++++++ 4 files changed, 108 insertions(+), 8 deletions(-) create mode 100644 src/test/groovy/io/seqera/wave/redis/RedisFactoryTest.groovy diff --git a/configuration.md b/configuration.md index d162a98b5..d119cae16 100644 --- a/configuration.md +++ b/configuration.md @@ -184,6 +184,16 @@ Rate limit configuration controls the limits of anonymous and authenticated user - **`redis.pool.enabled`**: whether to enable the Redis pool. It is set to `true` by default, enabling the use of a connection pool for efficient management of connections to the Redis server. *Optional*. +- **`redis.pool.minIdle`**: Specifies the minimum number of idle connections to maintain in the Redis connection pool. The default value is `0`. This ensures that connections are readily available for use.  *Optional*. + +- **`redis.pool.maxIdle`**: Specifies the maximum number of idle connections to maintain in the Redis connection pool. The default value is `10`.  *Optional*. + +- **`redis.pool.maxTotal`**: Specifies the maximum number of connections that can be maintained in the Redis connection pool. The default value is `50`. This helps to manage resource usage efficiently while supporting high demand.  *Optional*. + +- **`redis.client.timeout`**: Defines the timeout duration (in milliseconds) for Redis client operations. The default value is `5000` (5 seconds).  *Optional*. + +- **`redis.password`**: Specifies the password used to authenticate with the Redis server. This is needed when redis authentication is enabled.  *Optional*. + - **`surreal.default.ns`**: the namespace for the Surreal database. It can be set using `${SURREALDB_NS}` environment variable. *Mandatory*. - **`surreal.default.db`**: the name of the Surreal database. It can be set using`${SURREALDB_DB}` environment variable. This setting defines the target database within the Surreal database system that Wave should interact with. *Mandatory*. diff --git a/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillWayStorageFactory.groovy b/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillWayStorageFactory.groovy index cc6f9d036..337e5656b 100644 --- a/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillWayStorageFactory.groovy +++ b/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillWayStorageFactory.groovy @@ -26,6 +26,7 @@ import groovy.util.logging.Slf4j import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Requires import io.seqera.wave.configuration.RedisConfig +import jakarta.inject.Inject import jakarta.inject.Singleton import jakarta.validation.constraints.NotNull import redis.clients.jedis.JedisPool @@ -51,9 +52,7 @@ class SpillWayStorageFactory { @Singleton @Requires(property = 'redis.uri') - LimitUsageStorage redisStorage(@NotNull RedisConfig redisConfig){ - log.info "Using redis $redisConfig.uri as storage for rate limit" - def jedisPool = new JedisPool(redisConfig.uri) - return RedisStorage.builder().withJedisPool(jedisPool).build() + LimitUsageStorage redisStorage(JedisPool pool){ + return RedisStorage.builder().withJedisPool(pool).build() } } diff --git a/src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy b/src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy index c11ff2e85..8729f1336 100644 --- a/src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy +++ b/src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy @@ -23,9 +23,14 @@ import groovy.util.logging.Slf4j import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Value +import io.micronaut.core.annotation.Nullable import jakarta.inject.Singleton +import redis.clients.jedis.DefaultJedisClientConfig +import redis.clients.jedis.JedisClientConfig import redis.clients.jedis.JedisPool import redis.clients.jedis.JedisPoolConfig +import redis.clients.jedis.exceptions.InvalidURIException +import redis.clients.jedis.util.JedisURIHelper /** * Redis connection pool factory * @@ -39,17 +44,41 @@ class RedisFactory { @Singleton JedisPool createRedisPool( - @Value('${redis.uri}') String uri, + @Value('${redis.uri}') String connection, @Value('${redis.pool.minIdle:0}') int minIdle, @Value('${redis.pool.maxIdle:10}') int maxIdle, - @Value('${redis.pool.maxTotal:50}') int maxTotal + @Value('${redis.pool.maxTotal:50}') int maxTotal, + @Value('${redis.client.timeout:5000}') int timeout, + @Nullable @Value('${redis.password}') String password ) { - log.info "Using redis $uri as storage for rate limit - pool minIdle: ${minIdle}; maxIdle: ${maxIdle}; maxTotal: ${maxTotal}" + log.info "Using redis ${connection} as storage for rate limit - pool minIdle: ${minIdle}; maxIdle: ${maxIdle}; maxTotal: ${maxTotal}; timeout: ${timeout}" + + final uri = URI.create(connection) + // pool config final config = new JedisPoolConfig() config.setMinIdle(minIdle) config.setMaxIdle(maxIdle) config.setMaxTotal(maxTotal) - return new JedisPool(config, URI.create(uri)) + // client config + final clientConfig = clientConfig(uri, password, timeout) + // create the jedis pool + return new JedisPool(config, JedisURIHelper.getHostAndPort(uri), clientConfig) + } + + protected JedisClientConfig clientConfig(URI uri, String password, int timeout) { + if (!JedisURIHelper.isValid(uri)) { + throw new InvalidURIException("Invalid Redis connection URI: ${uri}") + } + + return DefaultJedisClientConfig.builder().connectionTimeoutMillis(timeout) + .socketTimeoutMillis(timeout) + .blockingSocketTimeoutMillis(timeout) + .user(JedisURIHelper.getUser(uri)) + .password(password?:JedisURIHelper.getPassword(uri)) + .database(JedisURIHelper.getDBIndex(uri)) + .protocol(JedisURIHelper.getRedisProtocol(uri)) + .ssl(JedisURIHelper.isRedisSSLScheme(uri)) + .build() } } diff --git a/src/test/groovy/io/seqera/wave/redis/RedisFactoryTest.groovy b/src/test/groovy/io/seqera/wave/redis/RedisFactoryTest.groovy new file mode 100644 index 000000000..cbed6fb6f --- /dev/null +++ b/src/test/groovy/io/seqera/wave/redis/RedisFactoryTest.groovy @@ -0,0 +1,62 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.redis + +import spock.lang.Specification + +import redis.clients.jedis.exceptions.InvalidURIException +/** + * + * @author Munish Chouhan + */ +class RedisFactoryTest extends Specification { + def 'should create redis pool with valid URI'() { + given: + def factory = new RedisFactory() + + when: + def pool = factory.createRedisPool(URI_STRING, MIN_IDLE, MAX_IDLE, MAX_TOTAL, TIMEOUT, 'password') + + then: + pool != null + + where: + URI_STRING | MIN_IDLE | MAX_IDLE | MAX_TOTAL | TIMEOUT + 'redis://localhost:6379' | 0 | 10 | 50 | 5000 + 'rediss://localhost:6379'| 1 | 5 | 20 | 3000 + } + + def 'should throw exception for invalid URI'() { + given: + def factory = new RedisFactory() + + when: + factory.createRedisPool(URI_STRING, MIN_IDLE, MAX_IDLE, MAX_TOTAL, TIMEOUT, null) + + then: + def e = thrown(InvalidURIException) + e.message.contains("Invalid Redis connection URI: $URI_STRING") + + where: + URI_STRING | MIN_IDLE | MAX_IDLE | MAX_TOTAL | TIMEOUT + 'redis://localhost' | 0 | 10 | 50 | 5000 + 'localhost:6379' | 1 | 5 | 20 | 3000 + } + +}