From 812eaf26814acf38ee63414e7214f6b7992a5415 Mon Sep 17 00:00:00 2001 From: louyuting <1849491904@qq.com> Date: Mon, 17 Aug 2020 22:21:11 +0800 Subject: [PATCH] Using circuitbreaker.Rule to define slowRequest/errorRatio/errorCount Rule --- core/circuitbreaker/circuit_breaker.go | 59 ++-- core/circuitbreaker/circuit_breaker_test.go | 4 +- core/circuitbreaker/rule.go | 318 +++--------------- core/circuitbreaker/rule_manager.go | 71 ++-- core/circuitbreaker/rule_manager_test.go | 209 ++++++++---- core/circuitbreaker/rule_test.go | 43 --- core/circuitbreaker/slot.go | 2 +- core/hotspot/rule.go | 2 +- .../circuitbreaker/circuit_breaker_example.go | 32 +- ext/datasource/helper.go | 46 +-- ext/datasource/helper_test.go | 83 +++-- go.mod | 1 - go.sum | 6 - tests/maxsize_rule_list_benchmark_test.go | 60 ++-- .../extension/helper/CircuitBreakerRule.json | 4 +- util/math.go | 9 + util/math_test.go | 12 + 17 files changed, 413 insertions(+), 548 deletions(-) delete mode 100644 core/circuitbreaker/rule_test.go create mode 100644 util/math.go create mode 100644 util/math_test.go diff --git a/core/circuitbreaker/circuit_breaker.go b/core/circuitbreaker/circuit_breaker.go index 418bf5441..4bcc29a57 100644 --- a/core/circuitbreaker/circuit_breaker.go +++ b/core/circuitbreaker/circuit_breaker.go @@ -10,21 +10,20 @@ import ( "github.com/alibaba/sentinel-golang/util" ) -/** - Circuit Breaker State Machine: - - switch to open based on rule - +-----------------------------------------------------------------------+ - | | - | v -+----------------+ +----------------+ Probe +----------------+ -| | | |<----------------| | -| | Probe succeed | | | | -| Closed |<------------------| HalfOpen | | Open | -| | | | Probe failed | | -| | | +---------------->| | -+----------------+ +----------------+ +----------------+ -*/ +// +// Circuit Breaker State Machine: +// +// switch to open based on rule +// +-----------------------------------------------------------------------+ +// | | +// | v +// +----------------+ +----------------+ Probe +----------------+ +// | | | |<----------------| | +// | | Probe succeed | | | | +// | Closed |<------------------| HalfOpen | | Open | +// | | | | Probe failed | | +// | | | +---------------->| | +// +----------------+ +----------------+ +----------------+ type State int32 const ( @@ -80,7 +79,7 @@ type StateChangeListener interface { // CircuitBreaker is the basic interface of circuit breaker type CircuitBreaker interface { // BoundRule returns the associated circuit breaking rule. - BoundRule() Rule + BoundRule() *Rule // BoundStat returns the associated statistic data structure. BoundStat() interface{} // TryPass acquires permission of an invocation only if it is available at the time of invocation. @@ -95,7 +94,7 @@ type CircuitBreaker interface { //================================= circuitBreakerBase ==================================== // circuitBreakerBase encompasses the common fields of circuit breaker. type circuitBreakerBase struct { - rule Rule + rule *Rule // retryTimeoutMs represents recovery timeout (in milliseconds) before the circuit breaker opens. // During the open period, no requests are permitted until the timeout has elapsed. // After that, the circuit breaker will transform to half-open state for trying a few "trial" requests. @@ -106,7 +105,7 @@ type circuitBreakerBase struct { state *State } -func (b *circuitBreakerBase) BoundRule() Rule { +func (b *circuitBreakerBase) BoundRule() *Rule { return b.rule } @@ -128,7 +127,7 @@ func (b *circuitBreakerBase) fromClosedToOpen(snapshot interface{}) bool { if b.state.casState(Closed, Open) { b.updateNextRetryTimestamp() for _, listener := range stateChangeListeners { - listener.OnTransformToOpen(Closed, b.rule, snapshot) + listener.OnTransformToOpen(Closed, *b.rule, snapshot) } return true } @@ -140,7 +139,7 @@ func (b *circuitBreakerBase) fromClosedToOpen(snapshot interface{}) bool { func (b *circuitBreakerBase) fromOpenToHalfOpen() bool { if b.state.casState(Open, HalfOpen) { for _, listener := range stateChangeListeners { - listener.OnTransformToHalfOpen(Open, b.rule) + listener.OnTransformToHalfOpen(Open, *b.rule) } return true } @@ -153,7 +152,7 @@ func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool { if b.state.casState(HalfOpen, Open) { b.updateNextRetryTimestamp() for _, listener := range stateChangeListeners { - listener.OnTransformToOpen(HalfOpen, b.rule, snapshot) + listener.OnTransformToOpen(HalfOpen, *b.rule, snapshot) } return true } @@ -165,7 +164,7 @@ func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool { func (b *circuitBreakerBase) fromHalfOpenToClosed() bool { if b.state.casState(HalfOpen, Closed) { for _, listener := range stateChangeListeners { - listener.OnTransformToClosed(HalfOpen, b.rule) + listener.OnTransformToClosed(HalfOpen, *b.rule) } return true } @@ -181,7 +180,7 @@ type slowRtCircuitBreaker struct { minRequestAmount uint64 } -func newSlowRtCircuitBreakerWithStat(r *slowRtRule, stat *slowRequestLeapArray) *slowRtCircuitBreaker { +func newSlowRtCircuitBreakerWithStat(r *Rule, stat *slowRequestLeapArray) *slowRtCircuitBreaker { status := new(State) status.set(Closed) return &slowRtCircuitBreaker{ @@ -193,12 +192,12 @@ func newSlowRtCircuitBreakerWithStat(r *slowRtRule, stat *slowRequestLeapArray) }, stat: stat, maxAllowedRt: r.MaxAllowedRtMs, - maxSlowRequestRatio: r.MaxSlowRequestRatio, + maxSlowRequestRatio: r.Threshold, minRequestAmount: r.MinRequestAmount, } } -func newSlowRtCircuitBreaker(r *slowRtRule) (*slowRtCircuitBreaker, error) { +func newSlowRtCircuitBreaker(r *Rule) (*slowRtCircuitBreaker, error) { interval := r.StatIntervalMs stat := &slowRequestLeapArray{} leapArray, err := sbase.NewLeapArray(1, interval, stat) @@ -367,7 +366,7 @@ type errorRatioCircuitBreaker struct { stat *errorCounterLeapArray } -func newErrorRatioCircuitBreakerWithStat(r *errorRatioRule, stat *errorCounterLeapArray) *errorRatioCircuitBreaker { +func newErrorRatioCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) *errorRatioCircuitBreaker { status := new(State) status.set(Closed) @@ -384,7 +383,7 @@ func newErrorRatioCircuitBreakerWithStat(r *errorRatioRule, stat *errorCounterLe } } -func newErrorRatioCircuitBreaker(r *errorRatioRule) (*errorRatioCircuitBreaker, error) { +func newErrorRatioCircuitBreaker(r *Rule) (*errorRatioCircuitBreaker, error) { interval := r.StatIntervalMs stat := &errorCounterLeapArray{} leapArray, err := sbase.NewLeapArray(1, interval, stat) @@ -547,7 +546,7 @@ type errorCountCircuitBreaker struct { stat *errorCounterLeapArray } -func newErrorCountCircuitBreakerWithStat(r *errorCountRule, stat *errorCounterLeapArray) *errorCountCircuitBreaker { +func newErrorCountCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) *errorCountCircuitBreaker { status := new(State) status.set(Closed) @@ -559,12 +558,12 @@ func newErrorCountCircuitBreakerWithStat(r *errorCountRule, stat *errorCounterLe state: status, }, minRequestAmount: r.MinRequestAmount, - errorCountThreshold: r.Threshold, + errorCountThreshold: uint64(r.Threshold), stat: stat, } } -func newErrorCountCircuitBreaker(r *errorCountRule) (*errorCountCircuitBreaker, error) { +func newErrorCountCircuitBreaker(r *Rule) (*errorCountCircuitBreaker, error) { interval := r.StatIntervalMs stat := &errorCounterLeapArray{} leapArray, err := sbase.NewLeapArray(1, interval, stat) diff --git a/core/circuitbreaker/circuit_breaker_test.go b/core/circuitbreaker/circuit_breaker_test.go index a75d9c57f..9d4cee83d 100644 --- a/core/circuitbreaker/circuit_breaker_test.go +++ b/core/circuitbreaker/circuit_breaker_test.go @@ -12,9 +12,9 @@ type CircuitBreakerMock struct { mock.Mock } -func (m *CircuitBreakerMock) BoundRule() Rule { +func (m *CircuitBreakerMock) BoundRule() *Rule { args := m.Called() - return args.Get(0).(Rule) + return args.Get(0).(*Rule) } func (m *CircuitBreakerMock) BoundStat() interface{} { diff --git a/core/circuitbreaker/rule.go b/core/circuitbreaker/rule.go index dd519245f..85f79321a 100644 --- a/core/circuitbreaker/rule.go +++ b/core/circuitbreaker/rule.go @@ -3,8 +3,6 @@ package circuitbreaker import ( "fmt" - "github.com/alibaba/sentinel-golang/core/base" - "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" "github.com/pkg/errors" ) @@ -35,307 +33,93 @@ func (s Strategy) String() string { } } -// Rule is the base interface of the circuit breaker rule. -type Rule interface { - base.SentinelRule - // BreakerStrategy returns the circuit breaker strategy. - BreakerStrategy() Strategy - // IsApplicable checks whether the rule is valid and could be converted to a corresponding circuit breaker. - IsApplicable() error - // BreakerStatIntervalMs returns the statistic interval of circuit breaker (in milliseconds). - BreakerStatIntervalMs() uint32 - // IsEqualsTo checks whether current rule is equal to the given rule. - IsEqualsTo(r Rule) bool - // IsStatReusable checks whether current rule is "statistically" equal to the given rule. - IsStatReusable(r Rule) bool -} - -// RuleBase encompasses the common fields of circuit breaking rule. -type RuleBase struct { +// Rule encompasses the fields of circuit breaking rule. +type Rule struct { // unique id - Id string + Id string `json:"id,omitempty"` // resource name - Resource string - Strategy Strategy + Resource string `json:"resource"` + Strategy Strategy `json:"strategy"` // RetryTimeoutMs represents recovery timeout (in milliseconds) before the circuit breaker opens. // During the open period, no requests are permitted until the timeout has elapsed. // After that, the circuit breaker will transform to half-open state for trying a few "trial" requests. - RetryTimeoutMs uint32 + RetryTimeoutMs uint32 `json:"retryTimeoutMs"` // MinRequestAmount represents the minimum number of requests (in an active statistic time span) // that can trigger circuit breaking. - MinRequestAmount uint64 + MinRequestAmount uint64 `json:"minRequestAmount"` // StatIntervalMs represents statistic time interval of the internal circuit breaker (in ms). - StatIntervalMs uint32 + StatIntervalMs uint32 `json:"statIntervalMs"` + // MaxAllowedRtMs indicates that any invocation whose response time exceeds this value (in ms) + // will be recorded as a slow request. + // MaxAllowedRtMs only takes effect for SlowRequestRatio strategy + MaxAllowedRtMs uint64 `json:"maxAllowedRtMs"` + // Threshold represents the threshold of circuit breaker. + // for SlowRequestRatio, it represents the max slow request ratio + // for ErrorRatio, it represents the max error request ratio + // for ErrorCount, it represents the max error request count + Threshold float64 `json:"threshold"` } -func (b *RuleBase) BreakerStatIntervalMs() uint32 { - return b.StatIntervalMs +func (r *Rule) String() string { + // fallback string + return fmt.Sprintf("{id=%s,resource=%s, strategy=%s, RetryTimeoutMs=%d, MinRequestAmount=%d, StatIntervalMs=%d, MaxAllowedRtMs=%d, Threshold=%f}", + r.Id, r.Resource, r.Strategy, r.RetryTimeoutMs, r.MinRequestAmount, r.StatIntervalMs, r.MaxAllowedRtMs, r.Threshold) } -func (b *RuleBase) IsApplicable() error { - if len(b.Resource) == 0 { +func (r *Rule) isApplicable() error { + if len(r.Resource) == 0 { return errors.New("empty resource name") } - if b.RetryTimeoutMs <= 0 { - return errors.New("invalid RetryTimeoutMs") - } - if b.MinRequestAmount <= 0 { - return errors.New("invalid MinRequestAmount") + if int(r.Strategy) < int(SlowRequestRatio) || int(r.Strategy) > int(ErrorCount) { + return errors.New("invalid Strategy") } - if b.StatIntervalMs <= 0 { + if r.StatIntervalMs <= 0 { return errors.New("invalid StatIntervalMs") } - return nil -} - -func (b *RuleBase) IsStatReusable(r Rule) bool { - return b.Resource == r.ResourceName() && b.Strategy == r.BreakerStrategy() && b.StatIntervalMs == r.BreakerStatIntervalMs() -} - -func (b *RuleBase) String() string { - // fallback string - return fmt.Sprintf("{id=%s,resource=%s, strategy=%+v, RetryTimeoutMs=%d, MinRequestAmount=%d, StatIntervalMs=%d}", - b.Id, b.Resource, b.Strategy, b.RetryTimeoutMs, b.MinRequestAmount, b.StatIntervalMs) -} - -func (b *RuleBase) BreakerStrategy() Strategy { - return b.Strategy -} - -func (b *RuleBase) ResourceName() string { - return b.Resource -} - -type RuleOptions struct { - retryTimeoutMs uint32 - minRequestAmount uint64 - statIntervalMs uint32 - - //The following two fields apply only to slowRtRule - maxAllowedRtMs uint64 - maxSlowRequestRatio float64 - - //The following one field apply only to errorRatioRule - errorRatioThreshold float64 - - //The following one field apply only to errorCountRule - errorCountThreshold uint64 -} - -// TODO: make default option configurable? -func newDefaultRuleOptions() *RuleOptions { - return &RuleOptions{ - retryTimeoutMs: 0, - minRequestAmount: 0, - statIntervalMs: 0, - maxAllowedRtMs: 0, - maxSlowRequestRatio: 0, - errorRatioThreshold: 0, - errorCountThreshold: 0, - } -} - -type RuleOption func(opts *RuleOptions) - -// WithRetryTimeoutMs sets the retryTimeoutMs -// This function takes effect for all circuit breaker rule -func WithRetryTimeoutMs(retryTimeoutMs uint32) RuleOption { - return func(opts *RuleOptions) { - opts.retryTimeoutMs = retryTimeoutMs - } -} - -// WithMinRequestAmount sets the minRequestAmount -// This function takes effect for all circuit breaker rule -func WithMinRequestAmount(minRequestAmount uint64) RuleOption { - return func(opts *RuleOptions) { - opts.minRequestAmount = minRequestAmount + if r.Threshold < 0 { + return errors.New("invalid Threshold") } -} - -// WithStatIntervalMs sets the statIntervalMs -// This function takes effect for all circuit breaker rule -func WithStatIntervalMs(statIntervalMs uint32) RuleOption { - return func(opts *RuleOptions) { - opts.statIntervalMs = statIntervalMs + if r.Strategy == SlowRequestRatio && (r.Threshold < 0.0 || r.Threshold > 1.0) { + return errors.New("invalid slow request ratio threshold (valid range: [0.0, 1.0])") } -} - -// WithMaxAllowedRtMs sets the maxAllowedRtMs -// This function only takes effect for slowRtRule -func WithMaxAllowedRtMs(maxAllowedRtMs uint64) RuleOption { - return func(opts *RuleOptions) { - opts.maxAllowedRtMs = maxAllowedRtMs + if r.Strategy == ErrorRatio && (r.Threshold < 0.0 || r.Threshold > 1.0) { + return errors.New("invalid error ratio threshold (valid range: [0.0, 1.0])") } + return nil } -// WithMaxSlowRequestRatio sets the maxSlowRequestRatio -// This function only takes effect for slowRtRule -func WithMaxSlowRequestRatio(maxSlowRequestRatio float64) RuleOption { - return func(opts *RuleOptions) { - opts.maxSlowRequestRatio = maxSlowRequestRatio +func (r *Rule) isStatReusable(newRule *Rule) bool { + if newRule == nil { + return false } + return r.Resource == newRule.Resource && r.Strategy == newRule.Strategy && r.StatIntervalMs == newRule.StatIntervalMs } -// WithErrorRatioThreshold sets the errorRatioThreshold -// This function only takes effect for errorRatioRule -func WithErrorRatioThreshold(errorRatioThreshold float64) RuleOption { - return func(opts *RuleOptions) { - opts.errorRatioThreshold = errorRatioThreshold - } +func (r *Rule) ResourceName() string { + return r.Resource } -// WithErrorCountThreshold sets the errorCountThreshold -// This function only takes effect for errorCountRule -func WithErrorCountThreshold(errorCountThreshold uint64) RuleOption { - return func(opts *RuleOptions) { - opts.errorCountThreshold = errorCountThreshold +func (r *Rule) isEqualsToBase(newRule *Rule) bool { + if newRule == nil { + return false } + return r.Resource == newRule.Resource && r.Strategy == newRule.Strategy && r.RetryTimeoutMs == newRule.RetryTimeoutMs && + r.MinRequestAmount == newRule.MinRequestAmount && r.StatIntervalMs == newRule.StatIntervalMs } -// SlowRequestRatio circuit breaker rule -type slowRtRule struct { - RuleBase - // MaxAllowedRtMs indicates that any invocation whose response time exceeds this value (in ms) - // will be recorded as a slow request. - MaxAllowedRtMs uint64 - // MaxSlowRequestRatio represents the threshold of slow rt ratio circuit breaker. - MaxSlowRequestRatio float64 -} - -func NewRule(resource string, strategy Strategy, opts ...RuleOption) Rule { - ruleOpts := newDefaultRuleOptions() - for _, opt := range opts { - opt(ruleOpts) +func (r *Rule) isEqualsTo(newRule *Rule) bool { + if !r.isEqualsToBase(newRule) { + return false } - switch strategy { + switch newRule.Strategy { case SlowRequestRatio: - return &slowRtRule{ - RuleBase: RuleBase{ - Id: util.NewUuid(), - Resource: resource, - Strategy: SlowRequestRatio, - RetryTimeoutMs: ruleOpts.retryTimeoutMs, - MinRequestAmount: ruleOpts.minRequestAmount, - StatIntervalMs: ruleOpts.statIntervalMs, - }, - MaxAllowedRtMs: ruleOpts.maxAllowedRtMs, - MaxSlowRequestRatio: ruleOpts.maxSlowRequestRatio, - } + return r.MaxAllowedRtMs == newRule.MaxAllowedRtMs && util.Float64Equals(r.Threshold, newRule.Threshold) case ErrorRatio: - return &errorRatioRule{ - RuleBase: RuleBase{ - Id: util.NewUuid(), - Resource: resource, - Strategy: ErrorRatio, - RetryTimeoutMs: ruleOpts.retryTimeoutMs, - MinRequestAmount: ruleOpts.minRequestAmount, - StatIntervalMs: ruleOpts.statIntervalMs, - }, - Threshold: ruleOpts.errorRatioThreshold, - } + return util.Float64Equals(r.Threshold, newRule.Threshold) case ErrorCount: - return &errorCountRule{ - RuleBase: RuleBase{ - Id: util.NewUuid(), - Resource: resource, - Strategy: ErrorCount, - RetryTimeoutMs: ruleOpts.retryTimeoutMs, - MinRequestAmount: ruleOpts.minRequestAmount, - StatIntervalMs: ruleOpts.statIntervalMs, - }, - Threshold: ruleOpts.errorCountThreshold, - } + return util.Float64Equals(r.Threshold, newRule.Threshold) default: - logging.Errorf("unsupported circuit breaker rule, strategy: %d", strategy) - return nil - } -} - -func (r *slowRtRule) IsEqualsTo(newRule Rule) bool { - newSlowRtRule, ok := newRule.(*slowRtRule) - if !ok { - return false - } - return r.Resource == newSlowRtRule.Resource && r.Strategy == newSlowRtRule.Strategy && r.RetryTimeoutMs == newSlowRtRule.RetryTimeoutMs && - r.MinRequestAmount == newSlowRtRule.MinRequestAmount && r.StatIntervalMs == newSlowRtRule.StatIntervalMs && - r.MaxAllowedRtMs == newSlowRtRule.MaxAllowedRtMs && r.MaxSlowRequestRatio == newSlowRtRule.MaxSlowRequestRatio -} - -func (r *slowRtRule) IsApplicable() error { - baseCheckErr := r.RuleBase.IsApplicable() - if baseCheckErr != nil { - return baseCheckErr - } - if r.MaxSlowRequestRatio < 0 || r.MaxSlowRequestRatio > 1 { - return errors.New("invalid slow request ratio threshold (valid range: [0.0, 1.0])") - } - return nil -} - -func (r *slowRtRule) String() string { - return fmt.Sprintf("{slowRtRule{RuleBase:%s, MaxAllowedRtMs=%d, MaxSlowRequestRatio=%f}", r.RuleBase.String(), r.MaxAllowedRtMs, r.MaxSlowRequestRatio) -} - -// Error ratio circuit breaker rule -type errorRatioRule struct { - RuleBase - Threshold float64 -} - -func (r *errorRatioRule) String() string { - return fmt.Sprintf("{errorRatioRule{RuleBase:%s, Threshold=%f}", r.RuleBase.String(), r.Threshold) -} - -func (r *errorRatioRule) IsEqualsTo(newRule Rule) bool { - newErrorRatioRule, ok := newRule.(*errorRatioRule) - if !ok { return false } - return r.Resource == newErrorRatioRule.Resource && r.Strategy == newErrorRatioRule.Strategy && r.RetryTimeoutMs == newErrorRatioRule.RetryTimeoutMs && - r.MinRequestAmount == newErrorRatioRule.MinRequestAmount && r.StatIntervalMs == newErrorRatioRule.StatIntervalMs && - r.Threshold == newErrorRatioRule.Threshold -} - -func (r *errorRatioRule) IsApplicable() error { - baseCheckErr := r.RuleBase.IsApplicable() - if baseCheckErr != nil { - return baseCheckErr - } - if r.Threshold < 0 || r.Threshold > 1 { - return errors.New("invalid error ratio threshold (valid range: [0.0, 1.0])") - } - return nil -} - -// Error count circuit breaker rule -type errorCountRule struct { - RuleBase - Threshold uint64 -} - -func (r *errorCountRule) String() string { - return fmt.Sprintf("{errorCountRule{RuleBase:%s, Threshold=%d}", r.RuleBase.String(), r.Threshold) -} - -func (r *errorCountRule) IsEqualsTo(newRule Rule) bool { - newErrorCountRule, ok := newRule.(*errorCountRule) - if !ok { - return false - } - return r.Resource == newErrorCountRule.Resource && r.Strategy == newErrorCountRule.Strategy && r.RetryTimeoutMs == newErrorCountRule.RetryTimeoutMs && - r.MinRequestAmount == newErrorCountRule.MinRequestAmount && r.StatIntervalMs == newErrorCountRule.StatIntervalMs && - r.Threshold == newErrorCountRule.Threshold -} - -func (r *errorCountRule) IsApplicable() error { - baseCheckErr := r.RuleBase.IsApplicable() - if baseCheckErr != nil { - return baseCheckErr - } - if r.Threshold < 0 { - return errors.New("negative error count threshold") - } - return nil } diff --git a/core/circuitbreaker/rule_manager.go b/core/circuitbreaker/rule_manager.go index aa860db6e..3428503d6 100644 --- a/core/circuitbreaker/rule_manager.go +++ b/core/circuitbreaker/rule_manager.go @@ -10,12 +10,12 @@ import ( "github.com/pkg/errors" ) -type CircuitBreakerGenFunc func(r Rule, reuseStat interface{}) (CircuitBreaker, error) +type CircuitBreakerGenFunc func(r *Rule, reuseStat interface{}) (CircuitBreaker, error) var ( cbGenFuncMap = make(map[Strategy]CircuitBreakerGenFunc) - breakerRules = make(map[string][]Rule) + breakerRules = make(map[string][]*Rule) breakers = make(map[string][]CircuitBreaker) updateMux = &sync.RWMutex{} @@ -23,61 +23,58 @@ var ( ) func init() { - cbGenFuncMap[SlowRequestRatio] = func(r Rule, reuseStat interface{}) (CircuitBreaker, error) { - rtRule, ok := r.(*slowRtRule) - if !ok || rtRule == nil { - return nil, errors.Errorf("rule don't match the SlowRequestRatio strategy, rule: %s", r.String()) + cbGenFuncMap[SlowRequestRatio] = func(r *Rule, reuseStat interface{}) (CircuitBreaker, error) { + if r == nil { + return nil, errors.New("nil rule") } if reuseStat == nil { - return newSlowRtCircuitBreaker(rtRule) + return newSlowRtCircuitBreaker(r) } stat, ok := reuseStat.(*slowRequestLeapArray) if !ok || stat == nil { logging.Warnf("Expect to generate circuit breaker with reuse statistic, but fail to do type assertion, expect:*slowRequestLeapArray, in fact: %+v", stat) - return newSlowRtCircuitBreaker(rtRule) + return newSlowRtCircuitBreaker(r) } - return newSlowRtCircuitBreakerWithStat(rtRule, stat), nil + return newSlowRtCircuitBreakerWithStat(r, stat), nil } - cbGenFuncMap[ErrorRatio] = func(r Rule, reuseStat interface{}) (CircuitBreaker, error) { - errRatioRule, ok := r.(*errorRatioRule) - if !ok || errRatioRule == nil { - return nil, errors.Errorf("rule don't match the ErrorRatio strategy, rule: %s", r.String()) + cbGenFuncMap[ErrorRatio] = func(r *Rule, reuseStat interface{}) (CircuitBreaker, error) { + if r == nil { + return nil, errors.New("nil rule") } if reuseStat == nil { - return newErrorRatioCircuitBreaker(errRatioRule) + return newErrorRatioCircuitBreaker(r) } stat, ok := reuseStat.(*errorCounterLeapArray) if !ok || stat == nil { logging.Warnf("Expect to generate circuit breaker with reuse statistic, but fail to do type assertion, expect:*errorCounterLeapArray, in fact: %+v", stat) - return newErrorRatioCircuitBreaker(errRatioRule) + return newErrorRatioCircuitBreaker(r) } - return newErrorRatioCircuitBreakerWithStat(errRatioRule, stat), nil + return newErrorRatioCircuitBreakerWithStat(r, stat), nil } - cbGenFuncMap[ErrorCount] = func(r Rule, reuseStat interface{}) (CircuitBreaker, error) { - errCountRule, ok := r.(*errorCountRule) - if !ok || errCountRule == nil { - return nil, errors.Errorf("rule don't match the ErrorCount strategy, rule: %s", r.String()) + cbGenFuncMap[ErrorCount] = func(r *Rule, reuseStat interface{}) (CircuitBreaker, error) { + if r == nil { + return nil, errors.New("nil rule") } if reuseStat == nil { - return newErrorCountCircuitBreaker(errCountRule) + return newErrorCountCircuitBreaker(r) } stat, ok := reuseStat.(*errorCounterLeapArray) if !ok || stat == nil { logging.Warnf("Expect to generate circuit breaker with reuse statistic, but fail to do type assertion, expect:*errorCounterLeapArray, in fact: %+v", stat) - return newErrorCountCircuitBreaker(errCountRule) + return newErrorCountCircuitBreaker(r) } - return newErrorCountCircuitBreakerWithStat(errCountRule, stat), nil + return newErrorCountCircuitBreakerWithStat(r, stat), nil } } -func GetResRules(resource string) []Rule { +func GetResRules(resource string) []*Rule { updateMux.RLock() ret, ok := breakerRules[resource] updateMux.RUnlock() if !ok { - ret = make([]Rule, 0) + ret = make([]*Rule, 0) } return ret } @@ -94,7 +91,7 @@ func ClearRules() error { // // bool: was designed to indicate whether the internal map has been changed // error: was designed to indicate whether occurs the error. -func LoadRules(rules []Rule) (bool, error) { +func LoadRules(rules []*Rule) (bool, error) { // TODO in order to avoid invalid update, should check consistent with last update rules err := onRuleUpdate(rules) return true, err @@ -112,7 +109,7 @@ func getResBreakers(resource string) []CircuitBreaker { return ret } -func calculateReuseIndexFor(r Rule, oldResCbs []CircuitBreaker) (equalIdx, reuseStatIdx int) { +func calculateReuseIndexFor(r *Rule, oldResCbs []CircuitBreaker) (equalIdx, reuseStatIdx int) { // the index of equivalent rule in old circuit breaker slice equalIdx = -1 // the index of statistic reusable rule in old circuit breaker slice @@ -120,13 +117,13 @@ func calculateReuseIndexFor(r Rule, oldResCbs []CircuitBreaker) (equalIdx, reuse for idx, oldTc := range oldResCbs { oldRule := oldTc.BoundRule() - if oldRule.IsEqualsTo(r) { + if oldRule.isEqualsTo(r) { // break if there is equivalent rule equalIdx = idx break } // find the index of first StatReusable rule - if !oldRule.IsStatReusable(r) { + if !oldRule.isStatReusable(r) { continue } if reuseStatIdx >= 0 { @@ -149,7 +146,7 @@ func insertCbToCbMap(cb CircuitBreaker, res string, m map[string][]CircuitBreake } // Concurrent safe to update rules -func onRuleUpdate(rules []Rule) (err error) { +func onRuleUpdate(rules []*Rule) (err error) { defer func() { if r := recover(); r != nil { var ok bool @@ -160,12 +157,12 @@ func onRuleUpdate(rules []Rule) (err error) { } }() - newBreakerRules := make(map[string][]Rule) + newBreakerRules := make(map[string][]*Rule) for _, rule := range rules { if rule == nil { continue } - if err := rule.IsApplicable(); err != nil { + if err := rule.isApplicable(); err != nil { logging.Warnf("Ignoring invalid circuit breaking rule when loading new rules, rule: %+v, reason: %s", rule, err.Error()) continue } @@ -173,7 +170,7 @@ func onRuleUpdate(rules []Rule) (err error) { classification := rule.ResourceName() ruleSet, ok := newBreakerRules[classification] if !ok { - ruleSet = make([]Rule, 0, 1) + ruleSet = make([]*Rule, 0, 1) } ruleSet = append(ruleSet, rule) newBreakerRules[classification] = ruleSet @@ -215,7 +212,7 @@ func onRuleUpdate(rules []Rule) (err error) { continue } - generator := cbGenFuncMap[r.BreakerStrategy()] + generator := cbGenFuncMap[r.Strategy] if generator == nil { logging.Warnf("Ignoring the rule due to unsupported circuit breaking strategy: %v", r) continue @@ -245,8 +242,8 @@ func onRuleUpdate(rules []Rule) (err error) { return nil } -func rulesFrom(rm map[string][]Rule) []Rule { - rules := make([]Rule, 0) +func rulesFrom(rm map[string][]*Rule) []*Rule { + rules := make([]*Rule, 0) if len(rm) == 0 { return rules } @@ -263,7 +260,7 @@ func rulesFrom(rm map[string][]Rule) []Rule { return rules } -func logRuleUpdate(rules map[string][]Rule) { +func logRuleUpdate(rules map[string][]*Rule) { sb := strings.Builder{} sb.WriteString("Circuit breaking rules loaded: [") diff --git a/core/circuitbreaker/rule_manager_test.go b/core/circuitbreaker/rule_manager_test.go index be6de6897..8f3f564b4 100644 --- a/core/circuitbreaker/rule_manager_test.go +++ b/core/circuitbreaker/rule_manager_test.go @@ -9,7 +9,7 @@ import ( func Test_isApplicableRule_valid(t *testing.T) { type args struct { - rule Rule + rule *Rule } tests := []struct { name string @@ -19,35 +19,51 @@ func Test_isApplicableRule_valid(t *testing.T) { { name: "rtRule_isApplicable", args: args{ - rule: NewRule("abc01", SlowRequestRatio, WithStatIntervalMs(1000), - WithRetryTimeoutMs(1000), WithMaxAllowedRtMs(20), - WithMinRequestAmount(5), WithMaxSlowRequestRatio(0.1)), + rule: &Rule{ + Resource: "abc01", + Strategy: SlowRequestRatio, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + MaxAllowedRtMs: 20, + Threshold: 0.1, + }, }, want: nil, }, { name: "errorRatioRule_isApplicable", args: args{ - rule: NewRule("abc02", ErrorRatio, WithStatIntervalMs(1000), - WithRetryTimeoutMs(1000), WithMinRequestAmount(5), - WithMaxSlowRequestRatio(0.3)), + rule: &Rule{ + Resource: "abc02", + Strategy: ErrorRatio, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + Threshold: 0.3, + }, }, want: nil, }, { name: "errorCountRule_isApplicable", args: args{ - rule: NewRule("abc02", ErrorCount, WithStatIntervalMs(1000), - WithRetryTimeoutMs(1000), WithMinRequestAmount(5), - WithMaxSlowRequestRatio(10)), + rule: &Rule{ + Resource: "abc02", + Strategy: ErrorCount, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + Threshold: 10, + }, }, want: nil, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := tt.args.rule.IsApplicable(); got != tt.want { - t.Errorf("RuleManager.IsApplicable() = %v, want %v", got, tt.want) + if got := tt.args.rule.isApplicable(); got != tt.want { + t.Errorf("RuleManager.isApplicable() = %v, want %v", got, tt.want) } }) } @@ -55,43 +71,75 @@ func Test_isApplicableRule_valid(t *testing.T) { func Test_isApplicableRule_invalid(t *testing.T) { t.Run("rtBreakerRule_isApplicable_false", func(t *testing.T) { - rule := NewRule("abc01", SlowRequestRatio, WithStatIntervalMs(1000), - WithRetryTimeoutMs(1000), WithMaxAllowedRtMs(5), - WithMinRequestAmount(10050), WithMaxSlowRequestRatio(-1.0)) - if got := rule.IsApplicable(); got == nil { - t.Errorf("RuleManager.IsApplicable() = %v", got) + rule := &Rule{ + Resource: "abc01", + Strategy: SlowRequestRatio, + RetryTimeoutMs: 1000, + MinRequestAmount: 10050, + StatIntervalMs: 1000, + MaxAllowedRtMs: 5, + Threshold: -1.0, + } + if got := rule.isApplicable(); got == nil { + t.Errorf("RuleManager.isApplicable() = %v", got) } }) t.Run("errorRatioRule_isApplicable_false", func(t *testing.T) { - rule := NewRule("abc02", ErrorRatio, WithStatIntervalMs(1000), - WithRetryTimeoutMs(1000), WithMinRequestAmount(5), - WithErrorRatioThreshold(-0.3)) - if got := rule.IsApplicable(); got == nil { - t.Errorf("RuleManager.IsApplicable() = %v", got) + rule := &Rule{ + Resource: "abc02", + Strategy: ErrorRatio, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + Threshold: -0.3, + } + if got := rule.isApplicable(); got == nil { + t.Errorf("RuleManager.isApplicable() = %v", got) } }) t.Run("errorCountRule_isApplicable_false", func(t *testing.T) { - rule := NewRule("", ErrorCount, WithStatIntervalMs(1000), - WithRetryTimeoutMs(1000), WithMinRequestAmount(5), - WithMaxSlowRequestRatio(0)) - if got := rule.IsApplicable(); got == nil { - t.Errorf("RuleManager.IsApplicable() = %v", got) + rule := &Rule{ + Resource: "", + Strategy: ErrorRatio, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + Threshold: 0, + } + if got := rule.isApplicable(); got == nil { + t.Errorf("RuleManager.isApplicable() = %v", got) } }) } func Test_onUpdateRules(t *testing.T) { t.Run("Test_onUpdateRules", func(t *testing.T) { - rules := make([]Rule, 0) - r1 := NewRule("abc01", SlowRequestRatio, WithStatIntervalMs(1000), - WithRetryTimeoutMs(1000), WithMaxAllowedRtMs(20), - WithMinRequestAmount(5), WithMaxSlowRequestRatio(0.1)) - r2 := NewRule("abc01", ErrorRatio, WithStatIntervalMs(1000), - WithRetryTimeoutMs(1000), WithMinRequestAmount(5), - WithMaxSlowRequestRatio(0.3)) - r3 := NewRule("abc01", ErrorCount, WithStatIntervalMs(1000), - WithRetryTimeoutMs(1000), WithMinRequestAmount(5), - WithMaxSlowRequestRatio(10)) + rules := make([]*Rule, 0) + r1 := &Rule{ + Resource: "abc01", + Strategy: SlowRequestRatio, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + MaxAllowedRtMs: 20, + Threshold: 0.1, + } + r2 := &Rule{ + Resource: "abc01", + Strategy: ErrorRatio, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + Threshold: 0.3, + } + r3 := &Rule{ + Resource: "abc01", + Strategy: ErrorCount, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + Threshold: 10, + } rules = append(rules, r1, r2, r3) err := onRuleUpdate(rules) if err != nil { @@ -100,22 +148,39 @@ func Test_onUpdateRules(t *testing.T) { assert.True(t, len(breakers["abc01"]) == 3) assert.True(t, len(breakerRules["abc01"]) == 3) breakers = make(map[string][]CircuitBreaker) - breakerRules = make(map[string][]Rule) + breakerRules = make(map[string][]*Rule) }) } func Test_onRuleUpdate(t *testing.T) { t.Run("Test_onRuleUpdate", func(t *testing.T) { - r1 := NewRule("abc", SlowRequestRatio, WithStatIntervalMs(1000), - WithRetryTimeoutMs(1000), WithMaxAllowedRtMs(20), - WithMinRequestAmount(5), WithMaxSlowRequestRatio(0.1)) - r2 := NewRule("abc", ErrorRatio, WithStatIntervalMs(1000), - WithRetryTimeoutMs(1000), WithMinRequestAmount(5), - WithMaxSlowRequestRatio(0.3)) - r3 := NewRule("abc", ErrorCount, WithStatIntervalMs(1000), - WithRetryTimeoutMs(1000), WithMinRequestAmount(5), - WithMaxSlowRequestRatio(10)) - _, _ = LoadRules([]Rule{r1, r2, r3}) + r1 := &Rule{ + Resource: "abc", + Strategy: SlowRequestRatio, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + MaxAllowedRtMs: 20, + Threshold: 0.1, + } + r2 := &Rule{ + Resource: "abc", + Strategy: ErrorRatio, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + Threshold: 0.3, + } + r3 := &Rule{ + Resource: "abc", + Strategy: ErrorCount, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + Threshold: 10, + } + + _, _ = LoadRules([]*Rule{r1, r2, r3}) b2 := breakers["abc"][1] assert.True(t, len(breakers) == 1) @@ -124,20 +189,40 @@ func Test_onRuleUpdate(t *testing.T) { assert.True(t, reflect.DeepEqual(breakers["abc"][1].BoundRule(), r2)) assert.True(t, reflect.DeepEqual(breakers["abc"][2].BoundRule(), r3)) - r4 := NewRule("abc", SlowRequestRatio, WithStatIntervalMs(1000), - WithRetryTimeoutMs(1000), WithMaxAllowedRtMs(20), - WithMinRequestAmount(5), WithMaxSlowRequestRatio(0.1)) - r5 := NewRule("abc", ErrorRatio, WithStatIntervalMs(1000), - WithRetryTimeoutMs(100), WithMinRequestAmount(25), - WithMaxSlowRequestRatio(0.5)) - r6 := NewRule("abc", ErrorCount, WithStatIntervalMs(100), - WithRetryTimeoutMs(1000), WithMinRequestAmount(5), - WithMaxSlowRequestRatio(10)) - r7 := NewRule("abc", ErrorCount, WithStatIntervalMs(1100), - WithRetryTimeoutMs(1000), WithMinRequestAmount(5), - WithMaxSlowRequestRatio(10)) - - _, _ = LoadRules([]Rule{r4, r5, r6, r7}) + r4 := &Rule{ + Resource: "abc", + Strategy: SlowRequestRatio, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + MaxAllowedRtMs: 20, + Threshold: 0.1, + } + r5 := &Rule{ + Resource: "abc", + Strategy: ErrorRatio, + RetryTimeoutMs: 100, + MinRequestAmount: 25, + StatIntervalMs: 1000, + Threshold: 0.5, + } + r6 := &Rule{ + Resource: "abc", + Strategy: ErrorCount, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 100, + Threshold: 10, + } + r7 := &Rule{ + Resource: "abc", + Strategy: ErrorCount, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1100, + Threshold: 10, + } + _, _ = LoadRules([]*Rule{r4, r5, r6, r7}) assert.True(t, len(breakers) == 1) newCbs := breakers["abc"] assert.True(t, len(newCbs) == 4, "Expect:4, in fact:", len(newCbs)) diff --git a/core/circuitbreaker/rule_test.go b/core/circuitbreaker/rule_test.go deleted file mode 100644 index d9c31822d..000000000 --- a/core/circuitbreaker/rule_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package circuitbreaker - -import ( - "github.com/stretchr/testify/mock" -) - -type RuleMock struct { - mock.Mock -} - -func (m *RuleMock) String() string { - args := m.Called() - return args.String(0) -} - -func (m *RuleMock) ResourceName() string { - args := m.Called() - return args.String(0) -} - -func (m *RuleMock) BreakerStrategy() Strategy { - args := m.Called() - return args.Get(0).(Strategy) -} -func (m *RuleMock) BreakerStatIntervalMs() uint32 { - args := m.Called() - return uint32(args.Int(0)) -} - -func (m *RuleMock) IsEqualsTo(r Rule) bool { - args := m.Called(r) - return args.Bool(0) -} - -func (m *RuleMock) IsStatReusable(r Rule) bool { - args := m.Called(r) - return args.Bool(0) -} - -func (m *RuleMock) IsApplicable() error { - args := m.Called() - return args.Get(0).(error) -} diff --git a/core/circuitbreaker/slot.go b/core/circuitbreaker/slot.go index 364b7ab13..2f3bc692d 100644 --- a/core/circuitbreaker/slot.go +++ b/core/circuitbreaker/slot.go @@ -23,7 +23,7 @@ func (b *Slot) Check(ctx *base.EntryContext) *base.TokenResult { return result } -func checkPass(ctx *base.EntryContext) (bool, Rule) { +func checkPass(ctx *base.EntryContext) (bool, *Rule) { breakers := getResBreakers(ctx.Resource.Name()) for _, breaker := range breakers { passed := breaker.TryPass(ctx) diff --git a/core/hotspot/rule.go b/core/hotspot/rule.go index 2c0cbfa33..a607af4a7 100644 --- a/core/hotspot/rule.go +++ b/core/hotspot/rule.go @@ -123,7 +123,7 @@ func (r *Rule) IsStatReusable(newRule *Rule) bool { return r.Resource == newRule.Resource && r.ControlBehavior == newRule.ControlBehavior && r.ParamsMaxCapacity == newRule.ParamsMaxCapacity && r.DurationInSec == newRule.DurationInSec } -// IsEqualsTo checks whether current rule is consistent with the given rule. +// Equals checks whether current rule is consistent with the given rule. func (r *Rule) Equals(newRule *Rule) bool { baseCheck := r.Resource == newRule.Resource && r.MetricType == newRule.MetricType && r.ControlBehavior == newRule.ControlBehavior && r.ParamsMaxCapacity == newRule.ParamsMaxCapacity && r.ParamIndex == newRule.ParamIndex && r.Threshold == newRule.Threshold && r.DurationInSec == newRule.DurationInSec && reflect.DeepEqual(r.SpecificItems, newRule.SpecificItems) if !baseCheck { diff --git a/example/circuitbreaker/circuit_breaker_example.go b/example/circuitbreaker/circuit_breaker_example.go index 0e1dc7904..1392212f5 100644 --- a/example/circuitbreaker/circuit_breaker_example.go +++ b/example/circuitbreaker/circuit_breaker_example.go @@ -16,15 +16,15 @@ type stateChangeTestListener struct { } func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) { - fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.BreakerStrategy(), prev.String(), util.CurrentTimeMillis()) + fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis()) } func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) { - fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %.2f, time: %d\n", rule.BreakerStrategy(), prev.String(), snapshot, util.CurrentTimeMillis()) + fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %.2f, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis()) } func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) { - fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.BreakerStrategy(), prev.String(), util.CurrentTimeMillis()) + fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis()) } func main() { @@ -36,16 +36,26 @@ func main() { // Register a state change listener so that we could observer the state change of the internal circuit breaker. circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{}) - _, err = circuitbreaker.LoadRules([]circuitbreaker.Rule{ + _, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{ // Statistic time span=10s, recoveryTimeout=3s, slowRtUpperBound=50ms, maxSlowRequestRatio=50% - circuitbreaker.NewRule("abc", circuitbreaker.SlowRequestRatio, - circuitbreaker.WithStatIntervalMs(10000), circuitbreaker.WithRetryTimeoutMs(3000), - circuitbreaker.WithMinRequestAmount(10), circuitbreaker.WithMaxAllowedRtMs(50), - circuitbreaker.WithMaxSlowRequestRatio(0.5)), + { + Resource: "abc", + Strategy: circuitbreaker.SlowRequestRatio, + RetryTimeoutMs: 3000, + MinRequestAmount: 10, + StatIntervalMs: 10000, + MaxAllowedRtMs: 50, + Threshold: 0.5, + }, // Statistic time span=10s, recoveryTimeout=3s, maxErrorRatio=50% - circuitbreaker.NewRule("abc", circuitbreaker.ErrorRatio, - circuitbreaker.WithStatIntervalMs(10000), circuitbreaker.WithRetryTimeoutMs(3000), - circuitbreaker.WithMinRequestAmount(10), circuitbreaker.WithErrorRatioThreshold(0.5)), + { + Resource: "abc", + Strategy: circuitbreaker.ErrorRatio, + RetryTimeoutMs: 3000, + MinRequestAmount: 10, + StatIntervalMs: 10000, + Threshold: 0.5, + }, }) if err != nil { log.Fatal(err) diff --git a/ext/datasource/helper.go b/ext/datasource/helper.go index 2d9885605..397ebadc1 100644 --- a/ext/datasource/helper.go +++ b/ext/datasource/helper.go @@ -8,20 +8,12 @@ import ( "github.com/alibaba/sentinel-golang/core/flow" "github.com/alibaba/sentinel-golang/core/hotspot" "github.com/alibaba/sentinel-golang/core/system" - "github.com/alibaba/sentinel-golang/logging" - "github.com/tidwall/gjson" ) func checkSrcComplianceJson(src []byte) (bool, error) { if len(src) == 0 { return false, nil } - if !gjson.ValidBytes(src) { - return false, Error{ - code: ConvertSourceError, - desc: fmt.Sprintf("The source is invalid json: %s", src), - } - } return true, nil } @@ -118,37 +110,9 @@ func CircuitBreakerRuleJsonArrayParser(src []byte) (interface{}, error) { return nil, err } - rules := make([]cb.Rule, 0) - result := gjson.ParseBytes(src) - for _, r := range result.Array() { - if uint64(cb.SlowRequestRatio) == r.Get("strategy").Uint() { - rules = append(rules, cb.NewRule(r.Get("resource").String(), cb.SlowRequestRatio, - cb.WithStatIntervalMs(uint32(r.Get("statIntervalMs").Uint())), - cb.WithRetryTimeoutMs(uint32(r.Get("retryTimeoutMs").Uint())), - cb.WithMinRequestAmount(r.Get("minRequestAmount").Uint()), - cb.WithMaxAllowedRtMs(r.Get("maxAllowedRt").Uint()), - cb.WithMaxSlowRequestRatio(r.Get("maxSlowRequestRatio").Float()))) - continue - } - if uint64(cb.ErrorRatio) == r.Get("strategy").Uint() { - rules = append(rules, cb.NewRule(r.Get("resource").String(), cb.ErrorRatio, - cb.WithStatIntervalMs(uint32(r.Get("statIntervalMs").Uint())), - cb.WithRetryTimeoutMs(uint32(r.Get("retryTimeoutMs").Uint())), - cb.WithMinRequestAmount(r.Get("minRequestAmount").Uint()), - cb.WithErrorRatioThreshold(r.Get("threshold").Float()))) - continue - } - if uint64(cb.ErrorCount) == r.Get("strategy").Uint() { - rules = append(rules, cb.NewRule(r.Get("resource").String(), cb.ErrorCount, - cb.WithStatIntervalMs(uint32(r.Get("statIntervalMs").Uint())), - cb.WithRetryTimeoutMs(uint32(r.Get("retryTimeoutMs").Uint())), - cb.WithMinRequestAmount(r.Get("minRequestAmount").Uint()), - cb.WithErrorCountThreshold(r.Get("threshold").Uint()))) - continue - } - logging.Errorf("Unknown rule message: %s", r.Str) - } - return rules, nil + rules := make([]*cb.Rule, 0) + err := json.Unmarshal(src, &rules) + return rules, err } // CircuitBreakerRulesUpdater load the newest []cb.Rule to downstream circuit breaker component. @@ -157,8 +121,8 @@ func CircuitBreakerRulesUpdater(data interface{}) error { return cb.ClearRules() } - var rules []cb.Rule - if val, ok := data.([]cb.Rule); ok { + var rules []*cb.Rule + if val, ok := data.([]*cb.Rule); ok { rules = val } else { return Error{ diff --git a/ext/datasource/helper_test.go b/ext/datasource/helper_test.go index c24885d77..f3411d6e2 100644 --- a/ext/datasource/helper_test.go +++ b/ext/datasource/helper_test.go @@ -37,10 +37,8 @@ func TestFlowRulesJsonConverter(t *testing.T) { }) t.Run("TestFlowRulesJsonConverter_error", func(t *testing.T) { - got, err := FlowRuleJsonArrayParser([]byte{'x', 'i', 'm', 'u'}) - assert.True(t, got == nil) - realErr, succ := err.(Error) - assert.True(t, succ && realErr.code == ConvertSourceError) + _, err := FlowRuleJsonArrayParser([]byte{'x', 'i', 'm', 'u'}) + assert.True(t, err != nil) }) t.Run("TestFlowRulesJsonConverter_normal", func(t *testing.T) { @@ -249,8 +247,7 @@ func TestSystemRulesUpdater(t *testing.T) { func TestCircuitBreakerRulesJsonConverter(t *testing.T) { t.Run("TestCircuitBreakerRulesJsonConverter_failed", func(t *testing.T) { - properties, err := CircuitBreakerRuleJsonArrayParser([]byte{'s', 'r', 'c'}) - assert.True(t, properties == nil) + _, err := CircuitBreakerRuleJsonArrayParser([]byte{'s', 'r', 'c'}) assert.True(t, err != nil) }) @@ -267,34 +264,72 @@ func TestCircuitBreakerRulesJsonConverter(t *testing.T) { } properties, err := CircuitBreakerRuleJsonArrayParser(src) - rules := properties.([]cb.Rule) + rules := properties.([]*cb.Rule) assert.True(t, err == nil) assert.True(t, len(rules) == 3) - assert.True(t, strings.Contains(rules[0].String(), "resource=abc, strategy=SlowRequestRatio, RetryTimeoutMs=10, MinRequestAmount=10, StatIntervalMs=1000}, MaxAllowedRtMs=100, MaxSlowRequestRatio=0.100000")) - assert.True(t, strings.Contains(rules[1].String(), "resource=abc, strategy=ErrorRatio, RetryTimeoutMs=20, MinRequestAmount=20, StatIntervalMs=2000}, Threshold=0.200000")) - assert.True(t, strings.Contains(rules[2].String(), "resource=abc, strategy=ErrorCount, RetryTimeoutMs=30, MinRequestAmount=30, StatIntervalMs=3000}, Threshold=30")) + assert.True(t, reflect.DeepEqual(rules[0], &cb.Rule{ + Resource: "abc", + Strategy: cb.SlowRequestRatio, + RetryTimeoutMs: 10, + MinRequestAmount: 10, + StatIntervalMs: 1000, + MaxAllowedRtMs: 100, + Threshold: 0.1, + })) + assert.True(t, reflect.DeepEqual(rules[1], &cb.Rule{ + Resource: "abc", + Strategy: cb.ErrorRatio, + RetryTimeoutMs: 20, + MinRequestAmount: 20, + StatIntervalMs: 2000, + Threshold: 0.2, + })) + assert.True(t, reflect.DeepEqual(rules[2], &cb.Rule{ + Resource: "abc", + Strategy: cb.ErrorCount, + RetryTimeoutMs: 30, + MinRequestAmount: 30, + StatIntervalMs: 3000, + Threshold: 30, + })) }) } func TestCircuitBreakerRulesUpdater(t *testing.T) { // Prepare test data - r1 := cb.NewRule("abc", cb.SlowRequestRatio, cb.WithStatIntervalMs(1000), - cb.WithRetryTimeoutMs(1000), cb.WithMaxAllowedRtMs(20), - cb.WithMinRequestAmount(5), cb.WithMaxSlowRequestRatio(0.1)) - r2 := cb.NewRule("abc", cb.ErrorRatio, cb.WithStatIntervalMs(1000), - cb.WithRetryTimeoutMs(1000), cb.WithMinRequestAmount(5), - cb.WithMaxSlowRequestRatio(0.3)) - r3 := cb.NewRule("abc", cb.ErrorCount, cb.WithStatIntervalMs(1000), - cb.WithRetryTimeoutMs(1000), cb.WithMinRequestAmount(5), - cb.WithMaxSlowRequestRatio(10)) - - err := CircuitBreakerRulesUpdater([]cb.Rule{r1, r2, r3}) + r1 := &cb.Rule{ + Resource: "abc", + Strategy: cb.SlowRequestRatio, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + MaxAllowedRtMs: 20, + Threshold: 0.1, + } + r2 := &cb.Rule{ + Resource: "abc", + Strategy: cb.ErrorRatio, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + Threshold: 0.3, + } + r3 := &cb.Rule{ + Resource: "abc", + Strategy: cb.ErrorCount, + RetryTimeoutMs: 1000, + MinRequestAmount: 5, + StatIntervalMs: 1000, + Threshold: 10, + } + + err := CircuitBreakerRulesUpdater([]*cb.Rule{r1, r2, r3}) assert.True(t, err == nil) rules := cb.GetResRules("abc") - assert.True(t, rules[0].IsEqualsTo(r1)) - assert.True(t, rules[1].IsEqualsTo(r2)) - assert.True(t, rules[2].IsEqualsTo(r3)) + assert.True(t, reflect.DeepEqual(rules[0], r1)) + assert.True(t, reflect.DeepEqual(rules[1], r2)) + assert.True(t, reflect.DeepEqual(rules[2], r3)) } func TestHotSpotParamRuleListJsonConverter(t *testing.T) { diff --git a/go.mod b/go.mod index 3eb0e907c..5ed91c9e6 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,6 @@ require ( github.com/pkg/errors v0.9.1 github.com/shirou/gopsutil v2.19.12+incompatible github.com/stretchr/testify v1.5.1 - github.com/tidwall/gjson v1.6.0 go.uber.org/multierr v1.5.0 golang.org/x/tools v0.0.0-20200426102838-f3a5411a4c3b // indirect google.golang.org/grpc v1.22.1 diff --git a/go.sum b/go.sum index c4b881625..4bcb1876d 100644 --- a/go.sum +++ b/go.sum @@ -428,12 +428,6 @@ github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ= github.com/tent/http-link-go v0.0.0-20130702225549-ac974c61c2f9/go.mod h1:RHkNRtSLfOK7qBTHaeSX1D6BNpI3qw7NTxsmNr4RvN8= github.com/tevid/gohamcrest v1.1.1/go.mod h1:3UvtWlqm8j5JbwYZh80D/PVBt0mJ1eJiYgZMibh0H/k= -github.com/tidwall/gjson v1.6.0 h1:9VEQWz6LLMUsUl6PueE49ir4Ka6CzLymOAZDxpFsTDc= -github.com/tidwall/gjson v1.6.0/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= -github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= -github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= -github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= -github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3/go.mod h1:QDlpd3qS71vYtakd2hmdpqhJ9nwv6mD6A30bQ1BPBFE= diff --git a/tests/maxsize_rule_list_benchmark_test.go b/tests/maxsize_rule_list_benchmark_test.go index 908a99551..3dda75c74 100644 --- a/tests/maxsize_rule_list_benchmark_test.go +++ b/tests/maxsize_rule_list_benchmark_test.go @@ -10,15 +10,20 @@ import ( ) func Test_Size_1000_Circuit_Breaker_Rules_Update(t *testing.T) { - rs := make([]cb.Rule, 0, 1000) + rs := make([]*cb.Rule, 0, 1000) rand.Seed(int64(util.CurrentTimeMillis())) intervals := []uint32{10000, 15000, 20000, 25000, 30000} for i := 0; i < 1000; i++ { retryTimeout := intervals[rand.Int()%5] - rs = append(rs, cb.NewRule("github.com/alibaba/sentinel/test"+strconv.Itoa(rand.Int()%100), cb.SlowRequestRatio, - cb.WithStatIntervalMs(10000), cb.WithRetryTimeoutMs(retryTimeout), - cb.WithMinRequestAmount(rand.Uint64()%100), cb.WithMaxAllowedRtMs(100), - cb.WithMaxSlowRequestRatio(0.1))) + rs = append(rs, &cb.Rule{ + Resource: "github.com/alibaba/sentinel/test", + Strategy: cb.SlowRequestRatio, + RetryTimeoutMs: retryTimeout, + MinRequestAmount: rand.Uint64() % 100, + StatIntervalMs: 10000, + MaxAllowedRtMs: 100, + Threshold: 0.1, + }) } _, err := cb.LoadRules(rs) @@ -28,15 +33,20 @@ func Test_Size_1000_Circuit_Breaker_Rules_Update(t *testing.T) { } func Benchmark_Size_1000_Circuit_Breaker_Rules_Update(b *testing.B) { - rs := make([]cb.Rule, 0, 1000) + rs := make([]*cb.Rule, 0, 1000) rand.Seed(int64(util.CurrentTimeMillis())) intervals := []uint32{10000, 15000, 20000, 25000, 30000} for i := 0; i < 1000; i++ { retryTimeout := intervals[rand.Int()%5] - rs = append(rs, cb.NewRule("github.com/alibaba/sentinel/test"+strconv.Itoa(rand.Int()%100), cb.SlowRequestRatio, - cb.WithStatIntervalMs(10000), cb.WithRetryTimeoutMs(retryTimeout), - cb.WithMinRequestAmount(rand.Uint64()%100), cb.WithMaxAllowedRtMs(100), - cb.WithMaxSlowRequestRatio(0.1))) + rs = append(rs, &cb.Rule{ + Resource: "github.com/alibaba/sentinel/test" + strconv.Itoa(rand.Int()%100), + Strategy: cb.SlowRequestRatio, + RetryTimeoutMs: retryTimeout, + MinRequestAmount: rand.Uint64() % 100, + StatIntervalMs: 10000, + MaxAllowedRtMs: 100, + Threshold: 0.1, + }) } b.ReportAllocs() @@ -51,14 +61,19 @@ func Benchmark_Size_1000_Circuit_Breaker_Rules_Update(b *testing.B) { } func Test_Size_10000_Circuit_Breaker_Rules_Update(t *testing.T) { - rs := make([]cb.Rule, 0, 10000) + rs := make([]*cb.Rule, 0, 10000) intervals := []uint32{10000, 15000, 20000, 25000, 30000} for i := 0; i < 10000; i++ { retryTimeout := intervals[rand.Int()%5] - rs = append(rs, cb.NewRule("github.com/alibaba/sentinel/test"+strconv.Itoa(rand.Int()%100), cb.SlowRequestRatio, - cb.WithStatIntervalMs(10000), cb.WithRetryTimeoutMs(retryTimeout), - cb.WithMinRequestAmount(rand.Uint64()%100), cb.WithMaxAllowedRtMs(100), - cb.WithMaxSlowRequestRatio(0.1))) + rs = append(rs, &cb.Rule{ + Resource: "github.com/alibaba/sentinel/test" + strconv.Itoa(rand.Int()%100), + Strategy: cb.SlowRequestRatio, + RetryTimeoutMs: retryTimeout, + MinRequestAmount: rand.Uint64() % 100, + StatIntervalMs: 10000, + MaxAllowedRtMs: 100, + Threshold: 0.1, + }) } _, err := cb.LoadRules(rs) @@ -68,14 +83,19 @@ func Test_Size_10000_Circuit_Breaker_Rules_Update(t *testing.T) { } func Benchmark_Size_10000_Circuit_Breaker_Rules_Update(b *testing.B) { - rs := make([]cb.Rule, 0, 10000) + rs := make([]*cb.Rule, 0, 10000) intervals := []uint32{10000, 15000, 20000, 25000, 30000} for i := 0; i < 10000; i++ { retryTimeout := intervals[rand.Int()%5] - rs = append(rs, cb.NewRule("github.com/alibaba/sentinel/test"+strconv.Itoa(rand.Int()%100), cb.SlowRequestRatio, - cb.WithStatIntervalMs(10000), cb.WithRetryTimeoutMs(retryTimeout), - cb.WithMinRequestAmount(rand.Uint64()%100), cb.WithMaxAllowedRtMs(100), - cb.WithMaxSlowRequestRatio(0.1))) + rs = append(rs, &cb.Rule{ + Resource: "github.com/alibaba/sentinel/test" + strconv.Itoa(rand.Int()%100), + Strategy: cb.SlowRequestRatio, + RetryTimeoutMs: retryTimeout, + MinRequestAmount: rand.Uint64() % 100, + StatIntervalMs: 10000, + MaxAllowedRtMs: 100, + Threshold: 0.1, + }) } b.ReportAllocs() diff --git a/tests/testdata/extension/helper/CircuitBreakerRule.json b/tests/testdata/extension/helper/CircuitBreakerRule.json index 208f11a64..4860d15fc 100644 --- a/tests/testdata/extension/helper/CircuitBreakerRule.json +++ b/tests/testdata/extension/helper/CircuitBreakerRule.json @@ -5,8 +5,8 @@ "retryTimeoutMs": 10, "minRequestAmount": 10, "statIntervalMs": 1000, - "maxAllowedRt": 100, - "maxSlowRequestRatio": 0.1 + "maxAllowedRtMs": 100, + "threshold": 0.1 }, { "resource": "abc", diff --git a/util/math.go b/util/math.go new file mode 100644 index 000000000..f9d480b8a --- /dev/null +++ b/util/math.go @@ -0,0 +1,9 @@ +package util + +import "math" + +const precision = 0.00000001 + +func Float64Equals(x, y float64) bool { + return math.Abs(x-y) < precision +} diff --git a/util/math_test.go b/util/math_test.go new file mode 100644 index 000000000..485b9d4fd --- /dev/null +++ b/util/math_test.go @@ -0,0 +1,12 @@ +package util + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFloat64Equals(t *testing.T) { + assert.True(t, Float64Equals(0.1, 0.099999999)) + assert.False(t, Float64Equals(0.1, 0.09999999)) +}