Skip to content

Commit

Permalink
feat:support concurrency rate limit. (#1456)
Browse files Browse the repository at this point in the history
* feat:support concurrency rate limit.

* feat:support concurrency rate limit.
  • Loading branch information
SkyeBeFreeman authored Nov 4, 2024
1 parent 0ff874f commit e89bdae
Show file tree
Hide file tree
Showing 42 changed files with 372 additions and 1,078 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
- [fix: fix lossless deregister failed when no healthcheck configured](https://github.com/Tencent/spring-cloud-tencent/pull/1345)
- [feat:add zero protection.](https://github.com/Tencent/spring-cloud-tencent/pull/1346)
- [fix:fix no registry when lossless is disabled.](https://github.com/Tencent/spring-cloud-tencent/pull/1347)
- [fix:fix the ratelimit bug for 2022](https://github.com/Tencent/spring-cloud-tencent/pull/1348)
- [fix:fix the ratelimit bug for 2021](https://github.com/Tencent/spring-cloud-tencent/pull/1348)
- [feat:add Tencent Cloud TSF support.](https://github.com/Tencent/spring-cloud-tencent/pull/1350)
- [feat:support consul config.](https://github.com/Tencent/spring-cloud-tencent/pull/1352)
- [feat:add trace report support.](https://github.com/Tencent/spring-cloud-tencent/pull/1353)
Expand All @@ -35,3 +35,4 @@
- [feat:upgrade api circuit breaker.](https://github.com/Tencent/spring-cloud-tencent/pull/1440)
- [feat: support lossless config from console & support warmup.](https://github.com/Tencent/spring-cloud-tencent/pull/1446)
- [feat:add admin http handler.](https://github.com/Tencent/spring-cloud-tencent/pull/1449)
- [feat:support concurrency rate limit.](https://github.com/Tencent/spring-cloud-tencent/pull/1456)
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.tencent.cloud.metadata.core.EncodeTransferMedataRestTemplateEnhancedPlugin;
import com.tencent.cloud.metadata.core.EncodeTransferMedataScgEnhancedPlugin;
import com.tencent.cloud.metadata.core.EncodeTransferMedataWebClientEnhancedPlugin;
import com.tencent.cloud.polaris.context.config.PolarisContextProperties;

import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand Down Expand Up @@ -66,8 +65,8 @@ public FilterRegistrationBean<DecodeTransferMetadataServletFilter> metadataServl
}

@Bean
public DecodeTransferMetadataServletFilter metadataServletFilter(PolarisContextProperties polarisContextProperties) {
return new DecodeTransferMetadataServletFilter(polarisContextProperties);
public DecodeTransferMetadataServletFilter metadataServletFilter() {
return new DecodeTransferMetadataServletFilter();
}
}

Expand All @@ -79,8 +78,8 @@ public DecodeTransferMetadataServletFilter metadataServletFilter(PolarisContextP
protected static class MetadataReactiveFilterConfig {

@Bean
public DecodeTransferMetadataReactiveFilter metadataReactiveFilter(PolarisContextProperties polarisContextProperties) {
return new DecodeTransferMetadataReactiveFilter(polarisContextProperties);
public DecodeTransferMetadataReactiveFilter metadataReactiveFilter() {
return new DecodeTransferMetadataReactiveFilter();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.common.util.UrlUtils;
import com.tencent.cloud.metadata.provider.ReactiveMetadataProvider;
import com.tencent.cloud.polaris.context.config.PolarisContextProperties;
import com.tencent.polaris.api.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,11 +53,6 @@
public class DecodeTransferMetadataReactiveFilter implements WebFilter, Ordered {

private static final Logger LOG = LoggerFactory.getLogger(DecodeTransferMetadataReactiveFilter.class);
private PolarisContextProperties polarisContextProperties;

public DecodeTransferMetadataReactiveFilter(PolarisContextProperties polarisContextProperties) {
this.polarisContextProperties = polarisContextProperties;
}

@Override
public int getOrder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.common.util.UrlUtils;
import com.tencent.cloud.metadata.provider.ServletMetadataProvider;
import com.tencent.cloud.polaris.context.config.PolarisContextProperties;
import com.tencent.polaris.api.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -57,12 +56,6 @@ public class DecodeTransferMetadataServletFilter extends OncePerRequestFilter {

private static final Logger LOG = LoggerFactory.getLogger(DecodeTransferMetadataServletFilter.class);

private PolarisContextProperties polarisContextProperties;

public DecodeTransferMetadataServletFilter(PolarisContextProperties polarisContextProperties) {
this.polarisContextProperties = polarisContextProperties;
}

@Override
protected void doFilterInternal(@NonNull HttpServletRequest httpServletRequest,
@NonNull HttpServletResponse httpServletResponse, FilterChain filterChain)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.tencent.cloud.common.constant.MetadataConstant;
import com.tencent.cloud.common.constant.OrderConstant;
import com.tencent.cloud.common.metadata.config.MetadataLocalProperties;
import com.tencent.cloud.polaris.context.config.PolarisContextProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -55,7 +54,7 @@ public class DecodeTransferMetadataReactiveFilterTest {

@BeforeEach
public void setUp() {
this.metadataReactiveFilter = new DecodeTransferMetadataReactiveFilter(new PolarisContextProperties());
this.metadataReactiveFilter = new DecodeTransferMetadataReactiveFilter();
}

@Test
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,9 @@

import com.tencent.cloud.common.constant.OrderConstant;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.context.ServiceRuleManager;
import com.tencent.cloud.polaris.context.config.PolarisContextAutoConfiguration;
import com.tencent.cloud.polaris.ratelimit.filter.QuotaCheckReactiveFilter;
import com.tencent.cloud.polaris.ratelimit.filter.QuotaCheckServletFilter;
import com.tencent.cloud.polaris.ratelimit.resolver.RateLimitRuleArgumentReactiveResolver;
import com.tencent.cloud.polaris.ratelimit.resolver.RateLimitRuleArgumentServletResolver;
import com.tencent.cloud.polaris.ratelimit.spi.PolarisRateLimiterLabelReactiveResolver;
import com.tencent.cloud.polaris.ratelimit.spi.PolarisRateLimiterLabelServletResolver;
import com.tencent.cloud.polaris.ratelimit.spi.PolarisRateLimiterLimitedFallback;

import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -63,20 +58,13 @@ public class PolarisRateLimitAutoConfiguration {
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
protected static class QuotaCheckFilterConfig {

@Bean
public RateLimitRuleArgumentServletResolver rateLimitRuleArgumentResolver(ServiceRuleManager serviceRuleManager,
@Autowired(required = false) PolarisRateLimiterLabelServletResolver labelResolver) {
return new RateLimitRuleArgumentServletResolver(serviceRuleManager, labelResolver);
}

@Bean
@ConditionalOnMissingBean
public QuotaCheckServletFilter quotaCheckFilter(PolarisSDKContextManager polarisSDKContextManager,
PolarisRateLimitProperties polarisRateLimitProperties,
RateLimitRuleArgumentServletResolver rateLimitRuleArgumentResolver,
@Autowired(required = false) PolarisRateLimiterLimitedFallback polarisRateLimiterLimitedFallback) {
return new QuotaCheckServletFilter(polarisSDKContextManager.getLimitAPI(), polarisRateLimitProperties,
rateLimitRuleArgumentResolver, polarisRateLimiterLimitedFallback);
return new QuotaCheckServletFilter(polarisSDKContextManager.getLimitAPI(), polarisSDKContextManager.getAssemblyAPI(),
polarisRateLimitProperties, polarisRateLimiterLimitedFallback);
}

@Bean
Expand All @@ -97,21 +85,14 @@ public FilterRegistrationBean<QuotaCheckServletFilter> quotaFilterRegistrationBe
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
protected static class MetadataReactiveFilterConfig {

@Bean
public RateLimitRuleArgumentReactiveResolver rateLimitRuleArgumentResolver(ServiceRuleManager serviceRuleManager,
@Autowired(required = false) PolarisRateLimiterLabelReactiveResolver labelResolver) {
return new RateLimitRuleArgumentReactiveResolver(serviceRuleManager, labelResolver);
}
protected static class QuotaCheckReactiveFilterConfig {

@Bean
public QuotaCheckReactiveFilter quotaCheckReactiveFilter(PolarisSDKContextManager polarisSDKContextManager,
PolarisRateLimitProperties polarisRateLimitProperties,
RateLimitRuleArgumentReactiveResolver rateLimitRuleArgumentResolver,
@Nullable PolarisRateLimiterLimitedFallback polarisRateLimiterLimitedFallback) {
return new QuotaCheckReactiveFilter(polarisSDKContextManager.getLimitAPI(), polarisRateLimitProperties,
rateLimitRuleArgumentResolver, polarisRateLimiterLimitedFallback);
return new QuotaCheckReactiveFilter(polarisSDKContextManager.getLimitAPI(), polarisSDKContextManager.getAssemblyAPI(),
polarisRateLimitProperties, polarisRateLimiterLimitedFallback);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,20 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;

import javax.annotation.PostConstruct;

import com.tencent.cloud.common.constant.HeaderConstant;
import com.tencent.cloud.common.constant.OrderConstant;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.polaris.ratelimit.config.PolarisRateLimitProperties;
import com.tencent.cloud.polaris.ratelimit.resolver.RateLimitRuleArgumentReactiveResolver;
import com.tencent.cloud.polaris.ratelimit.spi.PolarisRateLimiterLimitedFallback;
import com.tencent.cloud.polaris.ratelimit.utils.QuotaCheckUtils;
import com.tencent.cloud.polaris.ratelimit.utils.RateLimitUtils;
import com.tencent.polaris.api.pojo.RetStatus;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.assembly.api.AssemblyAPI;
import com.tencent.polaris.ratelimit.api.core.LimitAPI;
import com.tencent.polaris.ratelimit.api.rpc.Argument;
import com.tencent.polaris.ratelimit.api.rpc.QuotaResponse;
import com.tencent.polaris.ratelimit.api.rpc.QuotaResultCode;
import org.slf4j.Logger;
Expand All @@ -54,6 +53,7 @@
import org.springframework.web.server.WebFilterChain;

import static com.tencent.cloud.common.constant.ContextConstant.UTF_8;
import static org.springframework.core.io.buffer.DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY;

/**
* Reactive filter to check quota.
Expand All @@ -66,22 +66,20 @@ public class QuotaCheckReactiveFilter implements WebFilter, Ordered {

private final LimitAPI limitAPI;

private final PolarisRateLimitProperties polarisRateLimitProperties;
private final AssemblyAPI assemblyAPI;

private final RateLimitRuleArgumentReactiveResolver rateLimitRuleArgumentResolver;
private final PolarisRateLimitProperties polarisRateLimitProperties;

private final PolarisRateLimiterLimitedFallback polarisRateLimiterLimitedFallback;


private String rejectTips;

public QuotaCheckReactiveFilter(LimitAPI limitAPI,
public QuotaCheckReactiveFilter(LimitAPI limitAPI, AssemblyAPI assemblyAPI,
PolarisRateLimitProperties polarisRateLimitProperties,
RateLimitRuleArgumentReactiveResolver rateLimitRuleArgumentResolver,
@Nullable PolarisRateLimiterLimitedFallback polarisRateLimiterLimitedFallback) {
this.limitAPI = limitAPI;
this.assemblyAPI = assemblyAPI;
this.polarisRateLimitProperties = polarisRateLimitProperties;
this.rateLimitRuleArgumentResolver = rateLimitRuleArgumentResolver;
this.polarisRateLimiterLimitedFallback = polarisRateLimiterLimitedFallback;
}

Expand All @@ -100,31 +98,41 @@ public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String localNamespace = MetadataContext.LOCAL_NAMESPACE;
String localService = MetadataContext.LOCAL_SERVICE;

Set<Argument> arguments = rateLimitRuleArgumentResolver.getArguments(exchange, localNamespace, localService);
long waitMs = -1;
QuotaResponse quotaResponse = null;
try {
String path = exchange.getRequest().getURI().getPath();
QuotaResponse quotaResponse = QuotaCheckUtils.getQuota(
limitAPI, localNamespace, localService, 1, arguments, path);
quotaResponse = QuotaCheckUtils.getQuota(limitAPI, localNamespace, localService, 1, path);

if (quotaResponse.getCode() == QuotaResultCode.QuotaResultLimited) {
ServerHttpResponse response = exchange.getResponse();
DataBuffer dataBuffer;
if (!Objects.isNull(polarisRateLimiterLimitedFallback)) {
if (Objects.nonNull(quotaResponse.getActiveRule())
&& StringUtils.isNotBlank(quotaResponse.getActiveRule().getCustomResponse().getBody())) {
response.setRawStatusCode(polarisRateLimitProperties.getRejectHttpCode());
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
dataBuffer = response.bufferFactory().allocateBuffer(DEFAULT_INITIAL_CAPACITY)
.write(quotaResponse.getActiveRule().getCustomResponse().getBody()
.getBytes(StandardCharsets.UTF_8));
}
else if (!Objects.isNull(polarisRateLimiterLimitedFallback)) {
response.setRawStatusCode(polarisRateLimiterLimitedFallback.rejectHttpCode());
response.getHeaders().setContentType(polarisRateLimiterLimitedFallback.mediaType());
dataBuffer = response.bufferFactory().allocateBuffer()
dataBuffer = response.bufferFactory().allocateBuffer(DEFAULT_INITIAL_CAPACITY)
.write(polarisRateLimiterLimitedFallback.rejectTips()
.getBytes(polarisRateLimiterLimitedFallback.charset()));
}
else {
response.setRawStatusCode(polarisRateLimitProperties.getRejectHttpCode());
response.getHeaders().setContentType(MediaType.TEXT_HTML);
dataBuffer = response.bufferFactory().allocateBuffer()
dataBuffer = response.bufferFactory().allocateBuffer(DEFAULT_INITIAL_CAPACITY)
.write(rejectTips.getBytes(StandardCharsets.UTF_8));
}
// set flow control to header
response.getHeaders()
.add(HeaderConstant.INTERNAL_CALLEE_RET_STATUS, RetStatus.RetFlowControl.getDesc());
// set trace span
RateLimitUtils.reportTrace(assemblyAPI, quotaResponse.getActiveRule().getId().getValue());
if (Objects.nonNull(quotaResponse.getActiveRule())) {
try {
String encodedActiveRuleName = URLEncoder.encode(
Expand All @@ -136,6 +144,7 @@ public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
quotaResponse.getActiveRuleName(), e);
}
}
RateLimitUtils.release(quotaResponse);
return response.writeWith(Mono.just(dataBuffer));
}
// Unirate
Expand All @@ -150,11 +159,13 @@ public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
LOG.error("fail to invoke getQuota, service is " + localService, t);
}

QuotaResponse finalQuotaResponse = quotaResponse;
if (waitMs > 0) {
return Mono.delay(Duration.ofMillis(waitMs)).flatMap(e -> chain.filter(exchange));
return Mono.delay(Duration.ofMillis(waitMs))
.flatMap(e -> chain.filter(exchange).doFinally((v) -> RateLimitUtils.release(finalQuotaResponse)));
}
else {
return chain.filter(exchange);
return chain.filter(exchange).doFinally((v) -> RateLimitUtils.release(finalQuotaResponse));
}
}

Expand Down
Loading

0 comments on commit e89bdae

Please sign in to comment.