Skip to content

Commit

Permalink
[OPIK-385] Attempt to fix rate limiter not initialized (#574)
Browse files Browse the repository at this point in the history
* [OPIK-385] Attemp to fix rate limiter not initialized

* Fix imports
  • Loading branch information
thiagohora authored Nov 6, 2024
1 parent 6b7ab8a commit df51ba3
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,8 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
// Get the method being invoked
Method method = invocation.getMethod();

if (!rateLimitConfig.isEnabled()) {
return invocation.proceed();
}

// Check if the method is annotated with @RateLimit
if (!method.isAnnotationPresent(RateLimited.class)) {
// Check if the method is annotated with @RateLimited or if rate limiting is disabled
if (!rateLimitConfig.isEnabled() || !method.isAnnotationPresent(RateLimited.class)) {
return invocation.proceed();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
import com.comet.opik.infrastructure.ratelimit.RateLimitService;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RRateLimiterReactive;
import org.redisson.api.RateType;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisException;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

import java.time.Duration;

import static com.comet.opik.infrastructure.RateLimitConfig.LimitConfig;

@RequiredArgsConstructor
@Slf4j
public class RedisRateLimitService implements RateLimitService {

private static final String KEY = "%s:%s";
Expand All @@ -26,10 +30,19 @@ public Mono<Boolean> isLimitExceeded(@NonNull String apiKey, long events, @NonNu
RRateLimiterReactive rateLimit = redisClient.getRateLimiter(KEY.formatted(bucketName, apiKey));

return setLimitIfNecessary(limitConfig.limit(), limitConfig.durationInSeconds(), rateLimit)
.then(Mono.defer(() -> rateLimit.tryAcquire(events)))
.then(Mono.defer(() -> rateLimit.tryAcquire(events).retryWhen(configureRetry(limitConfig, rateLimit))))
.map(Boolean.FALSE::equals);
}

private Retry configureRetry(LimitConfig limitConfig, RRateLimiterReactive rateLimit) {
return Retry.fixedDelay(2, Duration.ofMillis(5))
.filter(RedisException.class::isInstance)
.doBeforeRetryAsync(signal -> {
log.warn("Retrying due to error", signal.failure());
return setLimitIfNecessary(limitConfig.limit(), limitConfig.durationInSeconds(), rateLimit).then();
});
}

private Mono<Boolean> setLimitIfNecessary(long limit, long limitDurationInSeconds, RRateLimiterReactive rateLimit) {
return rateLimit.trySetRate(RateType.OVERALL, limit, Duration.ofSeconds(limitDurationInSeconds))
.flatMap(__ -> rateLimit.expireIfNotSet(Duration.ofSeconds(limitDurationInSeconds)));
Expand All @@ -40,15 +53,15 @@ public Mono<Long> availableEvents(@NonNull String apiKey, @NonNull String bucket
@NonNull LimitConfig limitConfig) {
RRateLimiterReactive rateLimit = redisClient.getRateLimiter(KEY.formatted(bucketName, apiKey));
return setLimitIfNecessary(limitConfig.limit(), limitConfig.durationInSeconds(), rateLimit)
.then(Mono.defer(rateLimit::availablePermits));
.then(Mono.defer(rateLimit::availablePermits).retryWhen(configureRetry(limitConfig, rateLimit)));
}

@Override
public Mono<Long> getRemainingTTL(@NonNull String apiKey, @NonNull String bucketName,
@NonNull LimitConfig limitConfig) {
RRateLimiterReactive rateLimit = redisClient.getRateLimiter(KEY.formatted(bucketName, apiKey));
return setLimitIfNecessary(limitConfig.limit(), limitConfig.durationInSeconds(), rateLimit)
.then(Mono.defer(rateLimit::remainTimeToLive));
.then(Mono.defer(rateLimit::remainTimeToLive).retryWhen(configureRetry(limitConfig, rateLimit)));
}

}

0 comments on commit df51ba3

Please sign in to comment.