Skip to content

Commit

Permalink
Merge branch 'master' into virtual-threads-phase2
Browse files Browse the repository at this point in the history
  • Loading branch information
pditommaso authored Nov 22, 2024
2 parents b2626b1 + bf63599 commit f0ffe2b
Show file tree
Hide file tree
Showing 24 changed files with 316 additions and 93 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.14.1
1.15.1
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ dependencies {
implementation 'io.micronaut:micronaut-websocket'
implementation 'org.apache.groovy:groovy-json'
implementation 'org.apache.groovy:groovy-nio'
implementation 'com.google.guava:guava:32.1.2-jre'
implementation 'com.google.guava:guava:33.3.1-jre'
implementation 'dev.failsafe:failsafe:3.1.0'
implementation 'io.micronaut.reactor:micronaut-reactor'
implementation 'io.micronaut.reactor:micronaut-reactor-http-client'
Expand Down
15 changes: 15 additions & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
# Wave changelog
1.15.1 - 20 Nov 2024
- Check block existence with object operation (#750) [86ef526c]
- Add /v1alpha2/validate-creds (#752) [e24ec62c]
- Switched env vars around (#748) [080d5cce]

1.15.0 - 18 Nov 2024
- Migration to virtual threads - phase 1 (#746) [aaf0420c]
- Use runAsync instead supplyAsync [ffd0dacd]
- Remove deprecated ThreadPoolBuilder [7af3046f]
- Replace Guava cache with Caffeine (#745) [cf813e0a]
- Update project deps [f24b684d]
- Bump guava to version 33.3.1-jre [328e9ea3]
- Bump Netty version 4.1.115.Final [9ba433ce]
- Bump gradle 8.10.2 [52272fe1]

1.14.1 - 14 Nov 2024
- Fix creds validation endpoint (#740) [8c0f3a4c]

Expand Down
10 changes: 10 additions & 0 deletions configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ Rate limit configuration controls the limits of anonymous and authenticated user

- **`redis.pool.enabled`**: whether to enable the Redis pool. It is set to `true` by default, enabling the use of a connection pool for efficient management of connections to the Redis server. *Optional*.

- **`redis.pool.minIdle`**: Specifies the minimum number of idle connections to maintain in the Redis connection pool. The default value is `0`. This ensures that connections are readily available for use.  *Optional*.

- **`redis.pool.maxIdle`**: Specifies the maximum number of idle connections to maintain in the Redis connection pool. The default value is `10`.  *Optional*.

- **`redis.pool.maxTotal`**: Specifies the maximum number of connections that can be maintained in the Redis connection pool. The default value is `50`. This helps to manage resource usage efficiently while supporting high demand.  *Optional*.

- **`redis.client.timeout`**: Defines the timeout duration (in milliseconds) for Redis client operations. The default value is `5000` (5 seconds).  *Optional*.

- **`redis.password`**: Specifies the password used to authenticate with the Redis server. This is needed when redis authentication is enabled.  *Optional*.

- **`surreal.default.ns`**: the namespace for the Surreal database. It can be set using `${SURREALDB_NS}` environment variable. *Mandatory*.

- **`surreal.default.db`**: the name of the Surreal database. It can be set using`${SURREALDB_DB}` environment variable. This setting defines the target database within the Surreal database system that Wave should interact with. *Mandatory*.
Expand Down
4 changes: 2 additions & 2 deletions docs/cli/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ The following CLI arguments are available for Seqera Platform integration:

The following environment variables are available for Seqera Platform integration:

- `TOWER_API_ENDPOINT`: A Seqera Platform auth token so that Wave can access your private registry credentials.
- `TOWER_ACCESS_TOKEN`: For Enterprise customers, the URL endpoint for your instance, such as `https://api.cloud.seqera.io`.
- `TOWER_ACCESS_TOKEN`: A Seqera Platform auth token so that Wave can access your private registry credentials.
- `TOWER_API_ENDPOINT`: For Enterprise customers, the URL endpoint for your instance, such as `https://api.cloud.seqera.io`.
- `TOWER_WORKSPACE_ID`: A Seqera Platform workspace ID, such as `1234567890`, where credentials may be stored.

## Usage limits
Expand Down
22 changes: 12 additions & 10 deletions src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ package io.seqera.wave.auth
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import java.util.concurrent.ExecutionException
import java.util.concurrent.CompletionException
import java.util.concurrent.TimeUnit

import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.common.cache.LoadingCache
import com.google.common.util.concurrent.UncheckedExecutionException
import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import groovy.json.JsonSlurper
import groovy.transform.Canonical
import groovy.transform.CompileStatic
Expand Down Expand Up @@ -101,11 +100,12 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
return result
}

private LoadingCache<CacheKey, String> cacheTokens = CacheBuilder<CacheKey, String>
// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<CacheKey, String> cacheTokens = Caffeine.newBuilder()
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS)
.build(loader)
.buildAsync(loader)

@Inject
private RegistryLookupService lookupService
Expand Down Expand Up @@ -269,9 +269,10 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
protected String getAuthToken(String image, RegistryAuth auth, RegistryCredentials creds) {
final key = new CacheKey(image, auth, creds)
try {
return cacheTokens.get(key)
// FIXME https://github.com/seqeralabs/wave/issues/747
return cacheTokens.synchronous().get(key)
}
catch (UncheckedExecutionException | ExecutionException e) {
catch (CompletionException e) {
// this catches the exception thrown in the cache loader lookup
// and throws the causing exception that should be `RegistryUnauthorizedAccessException`
throw e.cause
Expand All @@ -287,7 +288,8 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
*/
void invalidateAuthorization(String image, RegistryAuth auth, RegistryCredentials creds) {
final key = new CacheKey(image, auth, creds)
cacheTokens.invalidate(key)
// FIXME https://github.com/seqeralabs/wave/issues/747
cacheTokens.synchronous().invalidate(key)
tokenStore.remove(getStableKey(key))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ package io.seqera.wave.auth

import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.util.concurrent.ExecutionException
import java.util.concurrent.CompletionException
import java.util.concurrent.TimeUnit

import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.common.cache.LoadingCache
import com.google.common.util.concurrent.UncheckedExecutionException
import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.seqera.wave.configuration.HttpClientConfig
Expand Down Expand Up @@ -74,11 +73,12 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
}
}

private LoadingCache<URI, RegistryAuth> cache = CacheBuilder<URI, RegistryAuth>
// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<URI, RegistryAuth> cache = Caffeine.newBuilder()
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(1, TimeUnit.HOURS)
.build(loader)
.buildAsync(loader)

protected RegistryAuth lookup0(URI endpoint) {
final httpClient = HttpClientFactory.followRedirectsHttpClient()
Expand Down Expand Up @@ -117,10 +117,11 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
RegistryInfo lookup(String registry) {
try {
final endpoint = registryEndpoint(registry)
final auth = cache.get(endpoint)
// FIXME https://github.com/seqeralabs/wave/issues/747
final auth = cache.synchronous().get(endpoint)
return new RegistryInfo(registry, endpoint, auth)
}
catch (UncheckedExecutionException | ExecutionException e) {
catch (CompletionException e) {
// this catches the exception thrown in the cache loader lookup
// and throws the causing exception that should be `RegistryUnauthorizedAccessException`
throw e.cause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.seqera.wave.controller

import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import io.micronaut.scheduling.TaskExecutors
Expand All @@ -26,15 +27,21 @@ import io.seqera.wave.auth.RegistryAuthService
import jakarta.inject.Inject
import jakarta.validation.Valid

@Controller("/validate-creds")
@Controller("/")
@ExecuteOn(TaskExecutors.BLOCKING)
class ValidateController {

@Inject RegistryAuthService loginService

@Post
@Deprecated
@Post("/validate-creds")
Boolean validateCreds(@Valid ValidateRegistryCredsRequest request){
loginService.validateUser(request.registry, request.userName, request.password)
}

@Post("/v1alpha2/validate-creds")
Boolean validateCredsV2(@Valid @Body ValidateRegistryCredsRequest request){
loginService.validateUser(request.registry, request.userName, request.password)
}

}
13 changes: 11 additions & 2 deletions src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package io.seqera.wave.core

import java.util.concurrent.CompletableFuture

import groovy.transform.CompileStatic
import groovy.transform.ToString
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -193,7 +195,7 @@ class RegistryProxyService {

String getImageDigest(String containerImage, PlatformId identity, boolean retryOnNotFound=false) {
try {
return getImageDigest0(containerImage, identity, retryOnNotFound)
return getImageDigest0(containerImage, identity, retryOnNotFound).get()
}
catch(Exception e) {
log.warn "Unable to retrieve digest for image '${containerImage}' -- cause: ${e.message}"
Expand All @@ -203,8 +205,15 @@ class RegistryProxyService {

static private List<Integer> RETRY_ON_NOT_FOUND = HTTP_RETRYABLE_ERRORS + 404

// note: return a CompletableFuture to force micronaut to use caffeine AsyncCache
// that provides a workaround about the use of virtual threads with SyncCache
// see https://github.com/ben-manes/caffeine/issues/1468#issuecomment-1906733926
@Cacheable(value = 'cache-registry-proxy', atomic = true, parameters = ['image'])
protected String getImageDigest0(String image, PlatformId identity, boolean retryOnNotFound) {
protected CompletableFuture<String> getImageDigest0(String image, PlatformId identity, boolean retryOnNotFound) {
CompletableFuture.completedFuture(getImageDigest1(image, identity, retryOnNotFound))
}

protected String getImageDigest1(String image, PlatformId identity, boolean retryOnNotFound) {
final coords = ContainerCoordinates.parse(image)
final route = RoutePath.v2manifestPath(coords, identity)
final proxyClient = client(route)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ class PullMetricsRequestsFilter implements HttpServerFilter {
final contentType = response.headers.get(HttpHeaders.CONTENT_TYPE)
if( contentType && contentType in MANIFEST_TYPES ) {
final route = routeHelper.parse(request.path)
CompletableFuture.supplyAsync(() -> metricsService.incrementPullsCounter(route.identity), executor)
CompletableFuture.runAsync(() -> metricsService.incrementPullsCounter(route.identity), executor)
final version = route.request?.containerConfig?.fusionVersion()
if (version) {
CompletableFuture.supplyAsync(() -> metricsService.incrementFusionPullsCounter(route.identity), executor)
CompletableFuture.runAsync(() -> metricsService.incrementFusionPullsCounter(route.identity), executor)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import io.seqera.wave.configuration.RedisConfig
import jakarta.inject.Inject
import jakarta.inject.Singleton
import jakarta.validation.constraints.NotNull
import redis.clients.jedis.JedisPool
Expand All @@ -51,9 +52,7 @@ class SpillWayStorageFactory {

@Singleton
@Requires(property = 'redis.uri')
LimitUsageStorage redisStorage(@NotNull RedisConfig redisConfig){
log.info "Using redis $redisConfig.uri as storage for rate limit"
def jedisPool = new JedisPool(redisConfig.uri)
return RedisStorage.builder().withJedisPool(jedisPool).build()
LimitUsageStorage redisStorage(JedisPool pool){
return RedisStorage.builder().withJedisPool(pool).build()
}
}
37 changes: 33 additions & 4 deletions src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import io.micronaut.core.annotation.Nullable
import jakarta.inject.Singleton
import redis.clients.jedis.DefaultJedisClientConfig
import redis.clients.jedis.JedisClientConfig
import redis.clients.jedis.JedisPool
import redis.clients.jedis.JedisPoolConfig
import redis.clients.jedis.exceptions.InvalidURIException
import redis.clients.jedis.util.JedisURIHelper
/**
* Redis connection pool factory
*
Expand All @@ -39,17 +44,41 @@ class RedisFactory {

@Singleton
JedisPool createRedisPool(
@Value('${redis.uri}') String uri,
@Value('${redis.uri}') String connection,
@Value('${redis.pool.minIdle:0}') int minIdle,
@Value('${redis.pool.maxIdle:10}') int maxIdle,
@Value('${redis.pool.maxTotal:50}') int maxTotal
@Value('${redis.pool.maxTotal:50}') int maxTotal,
@Value('${redis.client.timeout:5000}') int timeout,
@Nullable @Value('${redis.password}') String password
) {
log.info "Using redis $uri as storage for rate limit - pool minIdle: ${minIdle}; maxIdle: ${maxIdle}; maxTotal: ${maxTotal}"
log.info "Using redis ${connection} as storage for rate limit - pool minIdle: ${minIdle}; maxIdle: ${maxIdle}; maxTotal: ${maxTotal}; timeout: ${timeout}"

final uri = URI.create(connection)
// pool config
final config = new JedisPoolConfig()
config.setMinIdle(minIdle)
config.setMaxIdle(maxIdle)
config.setMaxTotal(maxTotal)
return new JedisPool(config, URI.create(uri))
// client config
final clientConfig = clientConfig(uri, password, timeout)
// create the jedis pool
return new JedisPool(config, JedisURIHelper.getHostAndPort(uri), clientConfig)
}

protected JedisClientConfig clientConfig(URI uri, String password, int timeout) {
if (!JedisURIHelper.isValid(uri)) {
throw new InvalidURIException("Invalid Redis connection URI: ${uri}")
}

return DefaultJedisClientConfig.builder().connectionTimeoutMillis(timeout)
.socketTimeoutMillis(timeout)
.blockingSocketTimeoutMillis(timeout)
.user(JedisURIHelper.getUser(uri))
.password(password?:JedisURIHelper.getPassword(uri))
.database(JedisURIHelper.getDBIndex(uri))
.protocol(JedisURIHelper.getRedisProtocol(uri))
.ssl(JedisURIHelper.isRedisSSLScheme(uri))
.build()
}

}
14 changes: 8 additions & 6 deletions src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ package io.seqera.wave.service.aws
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern

import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.common.cache.LoadingCache
import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -73,11 +73,12 @@ class AwsEcrService {
}
}

private LoadingCache<AwsCreds, String> cache = CacheBuilder<AwsCreds, String>
// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<AwsCreds, String> cache = Caffeine.newBuilder()
.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(3, TimeUnit.HOURS)
.build(loader)
.buildAsync(loader)


private EcrClient ecrClient(String accessKey, String secretKey, String region) {
Expand Down Expand Up @@ -126,7 +127,8 @@ class AwsEcrService {
try {
// get the token from the cache, if missing the it's automatically
// fetch using the AWS ECR client
return cache.get(new AwsCreds(accessKey,secretKey,region,isPublic))
// FIXME https://github.com/seqeralabs/wave/issues/747
return cache.synchronous().get(new AwsCreds(accessKey,secretKey,region,isPublic))
}
catch (Exception e) {
final type = isPublic ? "ECR public" : "ECR"
Expand Down
Loading

0 comments on commit f0ffe2b

Please sign in to comment.