From a8c219880f155433a7e2f962e0cf5856d3aaf511 Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Tue, 23 Aug 2022 17:44:11 +0800 Subject: [PATCH 1/3] Add data-source extension for OpenSergo spec Signed-off-by: Eric Zhao --- sentinel-extension/pom.xml | 1 + .../sentinel-datasource-opensergo/README.md | 26 ++ .../sentinel-datasource-opensergo/pom.xml | 40 +++ .../datasource/OpenSergoDataSourceGroup.java | 299 +++++++++++++++++ .../datasource/OpenSergoRuleAggregator.java | 303 ++++++++++++++++++ .../OpenSergoSentinelConstants.java | 29 ++ 6 files changed, 698 insertions(+) create mode 100644 sentinel-extension/sentinel-datasource-opensergo/README.md create mode 100644 sentinel-extension/sentinel-datasource-opensergo/pom.xml create mode 100644 sentinel-extension/sentinel-datasource-opensergo/src/main/java/com/alibaba/csp/sentinel/datasource/OpenSergoDataSourceGroup.java create mode 100644 sentinel-extension/sentinel-datasource-opensergo/src/main/java/com/alibaba/csp/sentinel/datasource/OpenSergoRuleAggregator.java create mode 100644 sentinel-extension/sentinel-datasource-opensergo/src/main/java/com/alibaba/csp/sentinel/datasource/OpenSergoSentinelConstants.java diff --git a/sentinel-extension/pom.xml b/sentinel-extension/pom.xml index 3587881949..be63fc2404 100755 --- a/sentinel-extension/pom.xml +++ b/sentinel-extension/pom.xml @@ -25,6 +25,7 @@ sentinel-datasource-eureka sentinel-annotation-cdi-interceptor sentinel-metric-exporter + sentinel-datasource-opensergo diff --git a/sentinel-extension/sentinel-datasource-opensergo/README.md b/sentinel-extension/sentinel-datasource-opensergo/README.md new file mode 100644 index 0000000000..f28f287c80 --- /dev/null +++ b/sentinel-extension/sentinel-datasource-opensergo/README.md @@ -0,0 +1,26 @@ +# Sentinel OpenSergo data-source + +Sentinel OpenSergo data-source provides integration with OpenSergo. +The data source leverages [OpenSergo Java SDK](https://github.com/opensergo/opensergo-java-sdk) to implement subscribe (push) model. + +## Usage + +To use Sentinel OpenSergo data-source, you'll need to add the following Maven dependency: + +```xml + + com.alibaba.csp + sentinel-datasource-opensergo + x.y.z + +``` + +Then you can create an `OpenSergoDataSourceGroup` and subscribe Sentinel rules. For example: + +```java +OpenSergoDataSourceGroup openSergo = new OpenSergoDataSourceGroup(host, port, namespace, appName); +openSergo.start(); + +// Subscribe flow rules from OpenSergo control plane, and propagate to Sentinel rule manager. +FlowRuleManager.register2Property(openSergo.subscribeFlowRules()); +``` \ No newline at end of file diff --git a/sentinel-extension/sentinel-datasource-opensergo/pom.xml b/sentinel-extension/sentinel-datasource-opensergo/pom.xml new file mode 100644 index 0000000000..867a043f5a --- /dev/null +++ b/sentinel-extension/sentinel-datasource-opensergo/pom.xml @@ -0,0 +1,40 @@ + + + + sentinel-extension + com.alibaba.csp + 1.8.5 + + 4.0.0 + + sentinel-datasource-opensergo + + + 1.8 + 1.8 + + 0.1.0-SNAPSHOT + + + + + com.alibaba.csp + sentinel-datasource-extension + + + + io.opensergo + opensergo-java-sdk + ${opensergo.sdk.version} + + + + junit + junit + test + + + + \ No newline at end of file diff --git a/sentinel-extension/sentinel-datasource-opensergo/src/main/java/com/alibaba/csp/sentinel/datasource/OpenSergoDataSourceGroup.java b/sentinel-extension/sentinel-datasource-opensergo/src/main/java/com/alibaba/csp/sentinel/datasource/OpenSergoDataSourceGroup.java new file mode 100644 index 0000000000..c07c6a5efa --- /dev/null +++ b/sentinel-extension/sentinel-datasource-opensergo/src/main/java/com/alibaba/csp/sentinel/datasource/OpenSergoDataSourceGroup.java @@ -0,0 +1,299 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.datasource; + +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; +import com.alibaba.csp.sentinel.property.SentinelProperty; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.util.AssertUtil; + +import io.opensergo.ConfigKind; +import io.opensergo.OpenSergoClient; +import io.opensergo.proto.fault_tolerance.v1.CircuitBreakerStrategy; +import io.opensergo.proto.fault_tolerance.v1.FaultToleranceRule; +import io.opensergo.proto.fault_tolerance.v1.RateLimitStrategy; +import io.opensergo.proto.fault_tolerance.v1.ThrottlingStrategy; +import io.opensergo.subscribe.OpenSergoConfigSubscriber; +import io.opensergo.subscribe.SubscribeKey; + +/** + *

Data-source group for subscribing Sentinel rules from OpenSergo control plane.

+ * + * @author Eric Zhao + */ +public class OpenSergoDataSourceGroup { + + private final OpenSergoClient openSergoClient; + + private final String namespace; + private final String app; + + /** + * (SentinelRuleKind, SentinelProperty) + */ + private final ConcurrentMap dataSourceMap = new ConcurrentHashMap<>(); + + private final ConcurrentMap subscribeKeyMap = new ConcurrentHashMap<>(); + private final ConcurrentMap sentinelRuleSubscribeMap = new ConcurrentHashMap<>(); + + private final OpenSergoRuleAggregator ruleAggregator; + + private final AtomicBoolean started = new AtomicBoolean(false); + + /** + * @param host host of OpenSergo Control Plane + * @param port port of OpenSergo Control Plane + * @param namespace namespace to subscribe + * @param app appName to subscribe + */ + public OpenSergoDataSourceGroup(String host, int port, String namespace, String app) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + AssertUtil.notEmpty(app, "app cannot be empty"); + this.openSergoClient = new OpenSergoClient(host, port); + this.namespace = namespace; + this.app = app; + this.ruleAggregator = new OpenSergoRuleAggregator(dataSourceMap); + + initializeDataSourceMap(); + } + + private void initializeDataSourceMap() { + dataSourceMap.put(OpenSergoSentinelConstants.KIND_FLOW_RULE, new DynamicSentinelProperty>()); + dataSourceMap.put(OpenSergoSentinelConstants.KIND_CIRCUIT_BREAKER_RULE, + new DynamicSentinelProperty>()); + } + + public void start() throws Exception { + if (started.compareAndSet(false, true)) { + openSergoClient.start(); + + prepareForBaseSubscription(); + } + } + + public void close() throws Exception { + if (started.compareAndSet(true, false)) { + for (Entry e : subscribeKeyMap.entrySet()) { + openSergoClient.unsubscribeConfig(e.getValue()); + RecordLog.info("Unsubscribing OpenSergo config for target: {}", e.getValue()); + } + + openSergoClient.close(); + } + } + + private void prepareForBaseSubscription() { + SubscribeKey key = new SubscribeKey(namespace, app, ConfigKind.FAULT_TOLERANCE_RULE); + + openSergoClient.subscribeConfig(key, new OpenSergoFaultToleranceRuleSubscriber(ruleAggregator)); + subscribeKeyMap.put(ConfigKind.FAULT_TOLERANCE_RULE, key); + + RecordLog.info("Subscribing OpenSergo base fault-tolerance rules for target <{}, {}>", namespace, app); + } + + public boolean hasSubscribedFor(String sentinelRuleKind) { + if (sentinelRuleKind == null) { + return false; + } + return sentinelRuleSubscribeMap.getOrDefault(sentinelRuleKind, false); + } + + /** + *

Subscribe Sentinel flow rules from OpenSergo control plane.

+ * + * @return {@link SentinelProperty} of Sentinel flow rules + */ + public synchronized SentinelProperty> subscribeFlowRules() { + if (!started.get()) { + throw new IllegalStateException("OpenSergo data-source not started"); + } + + boolean subscribed = hasSubscribedFor(OpenSergoSentinelConstants.KIND_FLOW_RULE); + SentinelProperty> property = dataSourceMap.get(OpenSergoSentinelConstants.KIND_FLOW_RULE); + if (subscribed) { + return property; + } + + OpenSergoSentinelFlowRuleSubscriber subscriber = new OpenSergoSentinelFlowRuleSubscriber(ruleAggregator); + + SubscribeKey rlsKey = new SubscribeKey(namespace, app, ConfigKind.RATE_LIMIT_STRATEGY); + openSergoClient.subscribeConfig(rlsKey, subscriber); + subscribeKeyMap.put(ConfigKind.RATE_LIMIT_STRATEGY, rlsKey); + RecordLog.info("Subscribing OpenSergo config for target: {}", rlsKey); + + SubscribeKey tsKey = new SubscribeKey(namespace, app, ConfigKind.THROTTLING_STRATEGY); + openSergoClient.subscribeConfig(tsKey, subscriber); + subscribeKeyMap.put(ConfigKind.THROTTLING_STRATEGY, tsKey); + RecordLog.info("Subscribing OpenSergo config for target: {}", tsKey); + + SubscribeKey concurrencyLimitKey = new SubscribeKey(namespace, app, ConfigKind.CONCURRENCY_LIMIT_STRATEGY); + openSergoClient.subscribeConfig(concurrencyLimitKey, subscriber); + subscribeKeyMap.put(ConfigKind.CONCURRENCY_LIMIT_STRATEGY, concurrencyLimitKey); + RecordLog.info("Subscribing OpenSergo config for target: {}", concurrencyLimitKey); + + sentinelRuleSubscribeMap.put(OpenSergoSentinelConstants.KIND_FLOW_RULE, true); + + return property; + } + + /** + *

Subscribe Sentinel degrade rules from OpenSergo control plane.

+ * + * @return {@link SentinelProperty} of Sentinel degrade rules + */ + public synchronized SentinelProperty> subscribeDegradeRules() { + if (!started.get()) { + throw new IllegalStateException("OpenSergo data-source not started"); + } + + boolean subscribed = hasSubscribedFor(OpenSergoSentinelConstants.KIND_CIRCUIT_BREAKER_RULE); + SentinelProperty> property = dataSourceMap.get( + OpenSergoSentinelConstants.KIND_CIRCUIT_BREAKER_RULE); + if (subscribed) { + return property; + } + + SubscribeKey subscribeKey = new SubscribeKey(namespace, app, ConfigKind.CIRCUIT_BREAKER_STRATEGY); + openSergoClient.subscribeConfig(subscribeKey, + new OpenSergoSentinelCircuitBreakerRuleSubscriber(ruleAggregator)); + subscribeKeyMap.put(ConfigKind.CIRCUIT_BREAKER_STRATEGY, subscribeKey); + RecordLog.info("Subscribing OpenSergo config for target: {}", subscribeKey); + + sentinelRuleSubscribeMap.put(OpenSergoSentinelConstants.KIND_CIRCUIT_BREAKER_RULE, true); + + return property; + } + + /** + *

Subscribe Sentinel flow rules from OpenSergo control plane.

+ */ + public synchronized void unsubscribeFlowRules() { + boolean subscribed = hasSubscribedFor(OpenSergoSentinelConstants.KIND_FLOW_RULE); + if (!subscribed) { + return; + } + + SubscribeKey rlsKey = subscribeKeyMap.remove(ConfigKind.RATE_LIMIT_STRATEGY); + boolean rlRemoved = openSergoClient.unsubscribeConfig(rlsKey); + SubscribeKey tsKey = subscribeKeyMap.remove(ConfigKind.THROTTLING_STRATEGY); + boolean tsRemoved = openSergoClient.unsubscribeConfig(tsKey); + SubscribeKey concurrencyLimitKey = subscribeKeyMap.remove(ConfigKind.CONCURRENCY_LIMIT_STRATEGY); + boolean clRemoved = openSergoClient.unsubscribeConfig(concurrencyLimitKey); + + sentinelRuleSubscribeMap.remove(OpenSergoSentinelConstants.KIND_FLOW_RULE); + + // NOTE: unsubscribe operation does not affect existing rules in SentinelProperty. + } + + public synchronized void unsubscribeDegradeRules() { + boolean subscribed = hasSubscribedFor(OpenSergoSentinelConstants.KIND_CIRCUIT_BREAKER_RULE); + if (!subscribed) { + return; + } + + SubscribeKey cbKey = subscribeKeyMap.remove(ConfigKind.CIRCUIT_BREAKER_STRATEGY); + boolean cbRemoved = openSergoClient.unsubscribeConfig(cbKey); + + sentinelRuleSubscribeMap.remove(OpenSergoSentinelConstants.KIND_CIRCUIT_BREAKER_RULE); + + // NOTE: unsubscribe operation does not affect existing rules in SentinelProperty. + } + + public String getNamespace() { + return namespace; + } + + public String getApp() { + return app; + } + + static class OpenSergoFaultToleranceRuleSubscriber implements OpenSergoConfigSubscriber { + + private final OpenSergoRuleAggregator ruleAggregator; + + public OpenSergoFaultToleranceRuleSubscriber(OpenSergoRuleAggregator ruleAggregator) { + this.ruleAggregator = ruleAggregator; + } + + @Override + public boolean onConfigUpdate(SubscribeKey subscribeKey, Object data) { + if (subscribeKey.getKind() != ConfigKind.FAULT_TOLERANCE_RULE || !(data instanceof List)) { + // type or data mismatch + return false; + } + return ruleAggregator.updateFaultToleranceRuleList((List) data); + } + } + + /** + * Subscriber for OpenSergo strategies related to Sentinel flow rules. + */ + static class OpenSergoSentinelFlowRuleSubscriber implements OpenSergoConfigSubscriber { + + private final OpenSergoRuleAggregator ruleAggregator; + + public OpenSergoSentinelFlowRuleSubscriber(OpenSergoRuleAggregator ruleAggregator) { + this.ruleAggregator = ruleAggregator; + } + + @Override + public boolean onConfigUpdate(SubscribeKey subscribeKey, Object data) { + if (!(data instanceof List)) { + // data mismatch + return false; + } + + switch (subscribeKey.getKind()) { + case RATE_LIMIT_STRATEGY: + return ruleAggregator.updateRateLimitStrategy((List) data); + case THROTTLING_STRATEGY: + return ruleAggregator.updateThrottlingStrategy((List) data); + case CONCURRENCY_LIMIT_STRATEGY: + // TODO + return false; + default: + // Type mismatch + return false; + } + } + } + + static class OpenSergoSentinelCircuitBreakerRuleSubscriber implements OpenSergoConfigSubscriber { + + private final OpenSergoRuleAggregator ruleAggregator; + + public OpenSergoSentinelCircuitBreakerRuleSubscriber(OpenSergoRuleAggregator ruleAggregator) { + this.ruleAggregator = ruleAggregator; + } + + @Override + public boolean onConfigUpdate(SubscribeKey subscribeKey, Object data) { + if (subscribeKey.getKind() != ConfigKind.CIRCUIT_BREAKER_STRATEGY || !(data instanceof List)) { + // type or data mismatch + return false; + } + return ruleAggregator.updateCircuitBreakerStrategy((List) data); + } + } +} diff --git a/sentinel-extension/sentinel-datasource-opensergo/src/main/java/com/alibaba/csp/sentinel/datasource/OpenSergoRuleAggregator.java b/sentinel-extension/sentinel-datasource-opensergo/src/main/java/com/alibaba/csp/sentinel/datasource/OpenSergoRuleAggregator.java new file mode 100644 index 0000000000..cc0390eea4 --- /dev/null +++ b/sentinel-extension/sentinel-datasource-opensergo/src/main/java/com/alibaba/csp/sentinel/datasource/OpenSergoRuleAggregator.java @@ -0,0 +1,303 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.datasource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.property.SentinelProperty; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; + +import io.opensergo.ConfigKind; +import io.opensergo.proto.fault_tolerance.v1.CircuitBreakerStrategy; +import io.opensergo.proto.fault_tolerance.v1.ConcurrencyLimitStrategy; +import io.opensergo.proto.fault_tolerance.v1.FaultToleranceRule; +import io.opensergo.proto.fault_tolerance.v1.FaultToleranceRule.FaultToleranceRuleTargetRef; +import io.opensergo.proto.fault_tolerance.v1.RateLimitStrategy; +import io.opensergo.proto.fault_tolerance.v1.ThrottlingStrategy; +import io.opensergo.util.TimeUnitUtils; + +/** + * @author Eric Zhao + */ +public class OpenSergoRuleAggregator { + + /** + * (SentinelRuleKind, SentinelProperty) + */ + private final Map dataSourceMap; + + public OpenSergoRuleAggregator(Map dataSourceMap) { + this.dataSourceMap = Collections.unmodifiableMap(dataSourceMap); + } + + /** + * (strategyKindSimpleName, [rules that contain this kind of strategy]) + */ + private volatile Map> ftRuleMapByStrategyKind = new HashMap<>(); + /** + * (name, rateLimitStrategy) + */ + private volatile Map rateLimitStrategyMap = new HashMap<>(); + private volatile Map throttlingStrategyMap = new HashMap<>(); + private volatile Map concurrencyLimitStrategyMap = new HashMap<>(); + private volatile Map circuitBreakerStrategyMap = new HashMap<>(); + + public synchronized boolean updateFaultToleranceRuleList(List rules) { + Map> map = new HashMap<>(4); + + if (rules != null && !rules.isEmpty()) { + for (FaultToleranceRule rule : rules) { + Set kinds = rule.getStrategiesList().stream() + .map(e -> e.getKind()).collect(Collectors.toSet()); + kinds.forEach(kindName -> map.computeIfAbsent(kindName, v -> new ArrayList<>()).add(rule)); + } + } + this.ftRuleMapByStrategyKind = map; + + // TODO: check whether the rules have been changed + handleFlowRuleUpdate(); + handleCircuitBreakerRuleUpdate(); + + return true; + } + + public synchronized boolean updateRateLimitStrategy(List strategies) { + Map map = new HashMap<>(4); + if (strategies != null && !strategies.isEmpty()) { + strategies.forEach(s -> map.put(s.getName(), s)); + } + this.rateLimitStrategyMap = map; + + return handleFlowRuleUpdate(); + } + + public synchronized boolean updateThrottlingStrategy(List strategies) { + Map map = new HashMap<>(4); + if (strategies != null && !strategies.isEmpty()) { + strategies.forEach(s -> map.put(s.getName(), s)); + } + this.throttlingStrategyMap = map; + + return handleFlowRuleUpdate(); + } + + public synchronized boolean updateCircuitBreakerStrategy(List strategies) { + Map map = new HashMap<>(4); + if (strategies != null && !strategies.isEmpty()) { + strategies.forEach(s -> map.put(s.getName(), s)); + } + this.circuitBreakerStrategyMap = map; + + return handleCircuitBreakerRuleUpdate(); + } + + private boolean handleFlowRuleUpdate() { + List rules = new ArrayList<>(); + + List rulesFromRateLimitStrategies = assembleFlowRulesFromRateLimitStrategies( + ftRuleMapByStrategyKind.get(ConfigKind.RATE_LIMIT_STRATEGY.getSimpleKindName()), rateLimitStrategyMap); + List rulesFromThrottlingStrategies = assembleFlowRulesFromThrottlingStrategies( + ftRuleMapByStrategyKind.get(ConfigKind.THROTTLING_STRATEGY.getSimpleKindName()), throttlingStrategyMap); + + rules.addAll(rulesFromRateLimitStrategies); + rules.addAll(rulesFromThrottlingStrategies); + + // Update rules to upstream data-source. + return dataSourceMap.get(OpenSergoSentinelConstants.KIND_FLOW_RULE).updateValue(rules); + } + + private boolean handleCircuitBreakerRuleUpdate() { + List rules = assembleDegradeRulesFromCbStrategies( + ftRuleMapByStrategyKind.get(ConfigKind.CIRCUIT_BREAKER_STRATEGY.getSimpleKindName()), + circuitBreakerStrategyMap); + + // Update rules to upstream data-source. + return dataSourceMap.get(OpenSergoSentinelConstants.KIND_CIRCUIT_BREAKER_RULE).updateValue(rules); + } + + private List assembleFlowRulesFromRateLimitStrategies(List ftRules, + Map rateLimitStrategyMap) { + List rules = new ArrayList<>(); + if (ftRules == null || ftRules.isEmpty()) { + return rules; + } + for (FaultToleranceRule ftRule : ftRules) { + List strategies = ftRule.getStrategiesList().stream() + .filter(e -> e.getKind().equals(ConfigKind.RATE_LIMIT_STRATEGY.getSimpleKindName())) + .map(e -> rateLimitStrategyMap.get(e.getName())) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (strategies.isEmpty()) { + continue; + } + + for (FaultToleranceRuleTargetRef targetRef : ftRule.getTargetsList()) { + String resourceName = targetRef.getTargetResourceName(); + + for (RateLimitStrategy strategy : strategies) { + FlowRule flowRule = new FlowRule(resourceName); + try { + flowRule = fillFlowRuleWithRateLimitStrategy(flowRule, strategy); + if (flowRule != null) { + rules.add(flowRule); + } + } catch (Exception ex) { + RecordLog.warn("Ignoring OpenSergo RateLimitStrategy due to covert failure, " + + "resourceName={}, strategy={}", resourceName, strategy); + } + } + } + } + return rules; + } + + private List assembleFlowRulesFromThrottlingStrategies(List ftRules, + Map throttlingStrategyMap) { + List rules = new ArrayList<>(); + if (ftRules == null || ftRules.isEmpty()) { + return rules; + } + for (FaultToleranceRule ftRule : ftRules) { + List strategies = ftRule.getStrategiesList().stream() + .filter(e -> e.getKind().equals(ConfigKind.THROTTLING_STRATEGY.getSimpleKindName())) + .map(e -> throttlingStrategyMap.get(e.getName())) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (strategies.isEmpty()) { + continue; + } + + for (FaultToleranceRuleTargetRef targetRef : ftRule.getTargetsList()) { + String resourceName = targetRef.getTargetResourceName(); + + for (ThrottlingStrategy strategy : strategies) { + FlowRule flowRule = new FlowRule(resourceName); + fillFlowRuleWithThrottlingStrategy(flowRule, strategy); + rules.add(flowRule); + } + } + } + return rules; + } + + private List assembleDegradeRulesFromCbStrategies(List ftRules, + Map strategyMap) { + List rules = new ArrayList<>(); + if (ftRules == null || ftRules.isEmpty()) { + return rules; + } + for (FaultToleranceRule ftRule : ftRules) { + List strategies = ftRule.getStrategiesList().stream() + .filter(e -> e.getKind().equals(ConfigKind.CIRCUIT_BREAKER_STRATEGY.getSimpleKindName())) + .map(e -> strategyMap.get(e.getName())) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (strategies.isEmpty()) { + continue; + } + + for (FaultToleranceRuleTargetRef targetRef : ftRule.getTargetsList()) { + String resourceName = targetRef.getTargetResourceName(); + + for (CircuitBreakerStrategy strategy : strategies) { + DegradeRule degradeRule = new DegradeRule(resourceName); + try { + degradeRule = fillDegradeRuleWithCbStrategy(degradeRule, strategy); + if (degradeRule != null) { + rules.add(degradeRule); + } + } catch (Exception ex) { + RecordLog.warn("Ignoring OpenSergo CircuitBreakerStrategy due to covert failure, " + + "resourceName={}, strategy={}", resourceName, strategy); + } + } + } + } + return rules; + } + + private FlowRule fillFlowRuleWithRateLimitStrategy(FlowRule rule, RateLimitStrategy strategy) { + if (rule == null || strategy == null) { + return rule; + } + rule.setCount(strategy.getThreshold()); + rule.setGrade(RuleConstant.FLOW_GRADE_QPS); + // TODO: support global rate limiting (limitMode=Global). + rule.setClusterMode(false); + // Relation strategy. + rule.setStrategy(RuleConstant.STRATEGY_DIRECT); + // Control behavior. + rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT); + + return rule; + } + + private FlowRule fillFlowRuleWithThrottlingStrategy(FlowRule rule, ThrottlingStrategy strategy) { + if (rule == null || strategy == null) { + return rule; + } + + // round-up + double countPerSec = Math.ceil(1000.0 / strategy.getMinIntervalMillisOfRequests()); + rule.setCount(countPerSec); + rule.setGrade(RuleConstant.FLOW_GRADE_QPS); + rule.setClusterMode(false); + rule.setStrategy(RuleConstant.STRATEGY_DIRECT); + rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER); + rule.setMaxQueueingTimeMs((int) strategy.getQueueTimeoutMillis()); + + return rule; + } + + private DegradeRule fillDegradeRuleWithCbStrategy(DegradeRule rule, CircuitBreakerStrategy strategy) { + if (rule == null || strategy == null) { + return rule; + } + switch (strategy.getStrategy()) { + case STRATEGY_SLOW_REQUEST_RATIO: + rule.setGrade(RuleConstant.DEGRADE_GRADE_RT); + rule.setSlowRatioThreshold(strategy.getTriggerRatio()); + // maxAllowedRt + rule.setCount(strategy.getSlowCondition().getMaxAllowedRtMillis()); + break; + case STRATEGY_ERROR_REQUEST_RATIO: + rule.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO); + rule.setCount(strategy.getTriggerRatio()); + break; + default: + throw new IllegalArgumentException("unknown strategy type: " + strategy.getStrategy()); + } + int recoveryTimeoutSec = (int) (TimeUnitUtils.convertToMillis(strategy.getRecoveryTimeout(), + strategy.getRecoveryTimeoutTimeUnit()) / 1000); + rule.setTimeWindow(recoveryTimeoutSec); + int statIntervalMs = (int) TimeUnitUtils.convertToMillis(strategy.getStatDuration(), + strategy.getStatDurationTimeUnit()); + rule.setStatIntervalMs(statIntervalMs); + rule.setMinRequestAmount(strategy.getMinRequestAmount()); + return rule; + } + +} diff --git a/sentinel-extension/sentinel-datasource-opensergo/src/main/java/com/alibaba/csp/sentinel/datasource/OpenSergoSentinelConstants.java b/sentinel-extension/sentinel-datasource-opensergo/src/main/java/com/alibaba/csp/sentinel/datasource/OpenSergoSentinelConstants.java new file mode 100644 index 0000000000..eb14704072 --- /dev/null +++ b/sentinel-extension/sentinel-datasource-opensergo/src/main/java/com/alibaba/csp/sentinel/datasource/OpenSergoSentinelConstants.java @@ -0,0 +1,29 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.datasource; + +/** + * @author Eric Zhao + */ +public final class OpenSergoSentinelConstants { + + public static final String KIND_FLOW_RULE = "FlowRule"; + public static final String KIND_CIRCUIT_BREAKER_RULE = "DegradeRule"; + public static final String KIND_SYSTEM_ADAPTIVE_RULE = "SystemRule"; + public static final String KIND_PARAM_FLOW_RULE = "ParamFlowRule"; + + private OpenSergoSentinelConstants() {} +} From e551cbfce35793ac802df401377283c35d7c5116 Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Tue, 25 Oct 2022 10:59:12 +0800 Subject: [PATCH 2/3] build(deps): Upgrade opensergo.sdk.version to 0.1.0-alpha Signed-off-by: Eric Zhao --- sentinel-extension/sentinel-datasource-opensergo/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentinel-extension/sentinel-datasource-opensergo/pom.xml b/sentinel-extension/sentinel-datasource-opensergo/pom.xml index 867a043f5a..0ba68162a5 100644 --- a/sentinel-extension/sentinel-datasource-opensergo/pom.xml +++ b/sentinel-extension/sentinel-datasource-opensergo/pom.xml @@ -15,7 +15,7 @@ 1.8 1.8 - 0.1.0-SNAPSHOT + 0.1.0-beta1 From 41b65627005335f85b78d9a9b2b7ee2e3ecf0925 Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Tue, 8 Nov 2022 09:39:46 +0800 Subject: [PATCH 3/3] Update pom of sentinel-datasource-opensergo Signed-off-by: Eric Zhao --- sentinel-extension/sentinel-datasource-opensergo/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentinel-extension/sentinel-datasource-opensergo/pom.xml b/sentinel-extension/sentinel-datasource-opensergo/pom.xml index 0ba68162a5..a074d8e06f 100644 --- a/sentinel-extension/sentinel-datasource-opensergo/pom.xml +++ b/sentinel-extension/sentinel-datasource-opensergo/pom.xml @@ -5,7 +5,7 @@ sentinel-extension com.alibaba.csp - 1.8.5 + 2.0.0-SNAPSHOT 4.0.0