From aee8d6820833b7c253f3baa569b09c9d1a6e4e5e Mon Sep 17 00:00:00 2001 From: louyuting <1849491904@qq.com> Date: Tue, 8 Sep 2020 13:20:10 +0800 Subject: [PATCH 1/6] Break down flow.ControlBehavior to flow.TokenCalculateStrategy and flow.ControlBehavior --- adapter/echo/middleware_test.go | 22 ++- adapter/gin/middleware_test.go | 22 ++- adapter/grpc/client_test.go | 44 ++++-- adapter/grpc/server_test.go | 44 ++++-- adapter/micro/client_test.go | 11 +- adapter/micro/server_test.go | 22 ++- core/flow/rule.go | 89 +++++++++-- core/flow/rule_manager.go | 123 +++++++++++---- core/flow/rule_manager_test.go | 144 ++++++++++++++++-- core/flow/tc_default.go | 8 +- example/qps/qps_limit_example.go | 11 +- example/qps/qps_limit_example_test.go | 11 +- example/warm_up/qps_warm_up_example.go | 11 +- example/warm_up/qps_warm_up_example_test.go | 11 +- ext/datasource/helper_test.go | 69 +++++---- tests/testdata/extension/helper/FlowRule.json | 15 +- 16 files changed, 483 insertions(+), 174 deletions(-) diff --git a/adapter/echo/middleware_test.go b/adapter/echo/middleware_test.go index 686f0bf83..27e815f30 100644 --- a/adapter/echo/middleware_test.go +++ b/adapter/echo/middleware_test.go @@ -20,16 +20,22 @@ func initSentinel(t *testing.T) { _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "GET:/ping", - MetricType: flow.QPS, - Count: 1, - ControlBehavior: flow.Reject, + Resource: "GET:/ping", + MetricType: flow.QPS, + Count: 1, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, { - Resource: "/api/:uid", - MetricType: flow.QPS, - Count: 0, - ControlBehavior: flow.Reject, + Resource: "/api/:uid", + MetricType: flow.QPS, + Count: 0, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) if err != nil { diff --git a/adapter/gin/middleware_test.go b/adapter/gin/middleware_test.go index a459331cb..7f95b6bba 100644 --- a/adapter/gin/middleware_test.go +++ b/adapter/gin/middleware_test.go @@ -20,16 +20,22 @@ func initSentinel(t *testing.T) { _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "GET:/ping", - MetricType: flow.QPS, - Count: 1, - ControlBehavior: flow.Reject, + Resource: "GET:/ping", + MetricType: flow.QPS, + Count: 1, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, { - Resource: "/api/users/:id", - MetricType: flow.QPS, - Count: 0, - ControlBehavior: flow.Reject, + Resource: "/api/users/:id", + MetricType: flow.QPS, + Count: 0, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) if err != nil { diff --git a/adapter/grpc/client_test.go b/adapter/grpc/client_test.go index 3b4d4f4c9..bfdfbef87 100644 --- a/adapter/grpc/client_test.go +++ b/adapter/grpc/client_test.go @@ -25,10 +25,13 @@ func TestUnaryClientIntercept(t *testing.T) { t.Run("success", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "client:" + method, - MetricType: flow.QPS, - Count: 1, - ControlBehavior: flow.Reject, + Resource: "client:" + method, + MetricType: flow.QPS, + Count: 1, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) assert.Nil(t, err) @@ -43,10 +46,13 @@ func TestUnaryClientIntercept(t *testing.T) { t.Run("fail", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "client:" + method, - MetricType: flow.QPS, - Count: 0, - ControlBehavior: flow.Reject, + Resource: "client:" + method, + MetricType: flow.QPS, + Count: 0, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) assert.Nil(t, err) @@ -69,10 +75,13 @@ func TestStreamClientIntercept(t *testing.T) { t.Run("success", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "client:/grpc.testing.TestService/StreamingOutputCall", - MetricType: flow.QPS, - Count: 1, - ControlBehavior: flow.Reject, + Resource: "client:/grpc.testing.TestService/StreamingOutputCall", + MetricType: flow.QPS, + Count: 1, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) assert.Nil(t, err) @@ -89,10 +98,13 @@ func TestStreamClientIntercept(t *testing.T) { t.Run("fail", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "client:/grpc.testing.TestService/StreamingOutputCall", - MetricType: flow.QPS, - Count: 0, - ControlBehavior: flow.Reject, + Resource: "client:/grpc.testing.TestService/StreamingOutputCall", + MetricType: flow.QPS, + Count: 0, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) assert.Nil(t, err) diff --git a/adapter/grpc/server_test.go b/adapter/grpc/server_test.go index 8292921b7..0859c4c4a 100644 --- a/adapter/grpc/server_test.go +++ b/adapter/grpc/server_test.go @@ -31,10 +31,13 @@ func TestStreamServerIntercept(t *testing.T) { t.Run("success", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "/grpc.testing.TestService/StreamingInputCall", - MetricType: flow.QPS, - Count: 1, - ControlBehavior: flow.Reject, + Resource: "/grpc.testing.TestService/StreamingInputCall", + MetricType: flow.QPS, + Count: 1, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) assert.Nil(t, err) @@ -49,10 +52,13 @@ func TestStreamServerIntercept(t *testing.T) { t.Run("fail", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "/grpc.testing.TestService/StreamingInputCall", - MetricType: flow.QPS, - Count: 0, - ControlBehavior: flow.Reject, + Resource: "/grpc.testing.TestService/StreamingInputCall", + MetricType: flow.QPS, + Count: 0, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) assert.Nil(t, err) @@ -73,10 +79,13 @@ func TestUnaryServerIntercept(t *testing.T) { t.Run("success", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "/grpc.testing.TestService/UnaryCall", - MetricType: flow.QPS, - Count: 1, - ControlBehavior: flow.Reject, + Resource: "/grpc.testing.TestService/UnaryCall", + MetricType: flow.QPS, + Count: 1, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) assert.Nil(t, err) @@ -101,10 +110,13 @@ func TestUnaryServerIntercept(t *testing.T) { t.Run("fail", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "/grpc.testing.TestService/UnaryCall", - MetricType: flow.QPS, - Count: 0, - ControlBehavior: flow.Reject, + Resource: "/grpc.testing.TestService/UnaryCall", + MetricType: flow.QPS, + Count: 0, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) assert.Nil(t, err) diff --git a/adapter/micro/client_test.go b/adapter/micro/client_test.go index 14b588eb3..e11cbcc5c 100644 --- a/adapter/micro/client_test.go +++ b/adapter/micro/client_test.go @@ -47,10 +47,13 @@ func TestClientLimiter(t *testing.T) { t.Run("success", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: req.Method(), - MetricType: flow.QPS, - Count: 1, - ControlBehavior: flow.Reject, + Resource: req.Method(), + MetricType: flow.QPS, + Count: 1, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) assert.Nil(t, err) diff --git a/adapter/micro/server_test.go b/adapter/micro/server_test.go index 9a3912db7..9fbb20eec 100644 --- a/adapter/micro/server_test.go +++ b/adapter/micro/server_test.go @@ -57,10 +57,13 @@ func TestServerLimiter(t *testing.T) { _, err = flow.LoadRules([]*flow.Rule{ { - Resource: req.Method(), - MetricType: flow.QPS, - Count: 1, - ControlBehavior: flow.Reject, + Resource: req.Method(), + MetricType: flow.QPS, + Count: 1, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) @@ -71,10 +74,13 @@ func TestServerLimiter(t *testing.T) { t.Run("success", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: req.Method(), - MetricType: flow.QPS, - Count: 1, - ControlBehavior: flow.Reject, + Resource: req.Method(), + MetricType: flow.QPS, + Count: 1, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) assert.Nil(t, err) diff --git a/core/flow/rule.go b/core/flow/rule.go index 632c65998..84873ada1 100644 --- a/core/flow/rule.go +++ b/core/flow/rule.go @@ -15,38 +15,94 @@ const ( QPS ) +func (s MetricType) String() string { + switch s { + case Concurrency: + return "Concurrency" + case QPS: + return "QPS" + default: + return "Undefined" + } +} + // RelationStrategy indicates the flow control strategy based on the relation of invocations. type RelationStrategy int32 const ( - // Direct means flow control by current resource directly. - Direct RelationStrategy = iota + // DirectResource means flow control by current resource directly. + DirectResource RelationStrategy = iota // AssociatedResource means flow control by the associated resource rather than current resource. AssociatedResource ) -// ControlBehavior indicates the traffic shaping behaviour. +func (s RelationStrategy) String() string { + switch s { + case DirectResource: + return "DirectResource" + case AssociatedResource: + return "AssociatedResource" + default: + return "Undefined" + } +} + +type TokenCalculateStrategy int32 + +const ( + Direct TokenCalculateStrategy = iota + WarmUp +) + +func (s TokenCalculateStrategy) String() string { + switch s { + case Direct: + return "Direct" + case WarmUp: + return "WarmUp" + default: + return "Undefined" + } +} + type ControlBehavior int32 const ( Reject ControlBehavior = iota - WarmUp Throttling - WarmUpThrottling ) +func (s ControlBehavior) String() string { + switch s { + case Reject: + return "Reject" + case Throttling: + return "Throttling" + default: + return "Undefined" + } +} + +type ControlStrategy struct { + TokenCalculateStrategy TokenCalculateStrategy `json:"tokenCalculateStrategy"` + ControlBehavior ControlBehavior `json:"controlBehavior"` +} + +func (s ControlStrategy) String() string { + return fmt.Sprintf("{TokenCalculateStrategy: %s, ControlBehavior: %s}", s.TokenCalculateStrategy, s.ControlBehavior) +} + // Rule describes the strategy of flow control. type Rule struct { // ID represents the unique ID of the rule (optional). ID uint64 `json:"id,omitempty"` // Resource represents the resource name. - Resource string `json:"resource"` - MetricType MetricType `json:"metricType"` + Resource string `json:"resource"` + MetricType MetricType `json:"metricType"` + ControlStrategy ControlStrategy `json:"controlStrategy"` // Count represents the threshold. - Count float64 `json:"count"` - ControlBehavior ControlBehavior `json:"controlBehavior"` - + Count float64 `json:"count"` RelationStrategy RelationStrategy `json:"relationStrategy"` RefResource string `json:"refResource"` MaxQueueingTimeMs uint32 `json:"maxQueueingTimeMs"` @@ -54,16 +110,17 @@ type Rule struct { WarmUpColdFactor uint32 `json:"warmUpColdFactor"` } -func (f *Rule) String() string { - b, err := json.Marshal(f) +func (r *Rule) String() string { + b, err := json.Marshal(r) if err != nil { // Return the fallback string - return fmt.Sprintf("Rule{resource=%s, id=%d, metricType=%d, threshold=%.2f}", - f.Resource, f.ID, f.MetricType, f.Count) + return fmt.Sprintf("Rule{Resource=%s, MetricType=%s, ControlStrategy=%s, "+ + "Count=%.2f, RelationStrategy=%s, WarmUpPeriodSec=%d, WarmUpColdFactor=%d, MaxQueueingTimeMs=%d}", + r.Resource, r.MetricType, r.ControlStrategy, r.Count, r.RelationStrategy, r.WarmUpPeriodSec, r.WarmUpColdFactor, r.MaxQueueingTimeMs) } return string(b) } -func (f *Rule) ResourceName() string { - return f.Resource +func (r *Rule) ResourceName() string { + return r.Resource } diff --git a/core/flow/rule_manager.go b/core/flow/rule_manager.go index 7549192d3..5400bbf74 100644 --- a/core/flow/rule_manager.go +++ b/core/flow/rule_manager.go @@ -17,30 +17,53 @@ type TrafficControllerGenFunc func(*Rule) *TrafficShapingController type TrafficControllerMap map[string][]*TrafficShapingController var ( - tcGenFuncMap = make(map[ControlBehavior]TrafficControllerGenFunc) + tcGenFuncMap = make(map[ControlStrategy]TrafficControllerGenFunc) tcMap = make(TrafficControllerMap) tcMux = new(sync.RWMutex) ) func init() { // Initialize the traffic shaping controller generator map for existing control behaviors. - tcGenFuncMap[Reject] = func(rule *Rule) *TrafficShapingController { - return NewTrafficShapingController(NewDefaultTrafficShapingCalculator(rule.Count), NewDefaultTrafficShapingChecker(rule), rule) + tcGenFuncMap[ControlStrategy{ + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + }] = func(rule *Rule) *TrafficShapingController { + return NewTrafficShapingController(NewDirectTrafficShapingCalculator(rule.Count), NewDefaultTrafficShapingChecker(rule), rule) } - tcGenFuncMap[Throttling] = func(rule *Rule) *TrafficShapingController { - return NewTrafficShapingController(NewDefaultTrafficShapingCalculator(rule.Count), NewThrottlingChecker(rule.MaxQueueingTimeMs), rule) + tcGenFuncMap[ControlStrategy{ + TokenCalculateStrategy: Direct, + ControlBehavior: Throttling, + }] = func(rule *Rule) *TrafficShapingController { + return NewTrafficShapingController(NewDirectTrafficShapingCalculator(rule.Count), NewThrottlingChecker(rule.MaxQueueingTimeMs), rule) } - tcGenFuncMap[WarmUp] = func(rule *Rule) *TrafficShapingController { + tcGenFuncMap[ControlStrategy{ + TokenCalculateStrategy: WarmUp, + ControlBehavior: Reject, + }] = func(rule *Rule) *TrafficShapingController { return NewTrafficShapingController(NewWarmUpTrafficShapingCalculator(rule), NewDefaultTrafficShapingChecker(rule), rule) } + tcGenFuncMap[ControlStrategy{ + TokenCalculateStrategy: WarmUp, + ControlBehavior: Throttling, + }] = func(rule *Rule) *TrafficShapingController { + return NewTrafficShapingController(NewWarmUpTrafficShapingCalculator(rule), NewThrottlingChecker(rule.MaxQueueingTimeMs), rule) + } } func logRuleUpdate(m TrafficControllerMap) { bs, err := json.Marshal(rulesFrom(m)) if err != nil { - logging.Info("[FlowRuleManager] Flow rules loaded") + if len(m) == 0 { + logging.Info("[FlowRuleManager] Flow rules were cleared") + } else { + logging.Info("[FlowRuleManager] Flow rules were loaded") + } } else { - logging.Infof("[FlowRuleManager] Flow rules loaded: %s", bs) + if len(m) == 0 { + logging.Info("[FlowRuleManager] Flow rules were cleared") + } else { + logging.Infof("[FlowRuleManager] Flow rules were loaded: %s", bs) + } } } @@ -79,13 +102,46 @@ func LoadRules(rules []*Rule) (bool, error) { return true, err } -func GetRules() []*Rule { +func getRules() []*Rule { tcMux.RLock() defer tcMux.RUnlock() return rulesFrom(tcMap) } +func getResRules(res string) []*Rule { + tcMux.RLock() + defer tcMux.RUnlock() + + resTcs, exist := tcMap[res] + if !exist { + return nil + } + ret := make([]*Rule, 0, len(resTcs)) + for _, tc := range resTcs { + ret = append(ret, tc.Rule()) + } + return ret +} + +func GetRules() []Rule { + rules := getRules() + ret := make([]Rule, 0, len(rules)) + for _, rule := range rules { + ret = append(ret, *rule) + } + return ret +} + +func GetResRules(res string) []Rule { + rules := getResRules(res) + ret := make([]Rule, 0, len(rules)) + for _, rule := range rules { + ret = append(ret, *rule) + } + return ret +} + func ClearRules() error { _, err := LoadRules(nil) return err @@ -109,30 +165,36 @@ func rulesFrom(m TrafficControllerMap) []*Rule { return rules } -// SetTrafficShapingGenerator sets the traffic controller generator for the given control behavior. -// Note that modifying the generator of default control behaviors is not allowed. -func SetTrafficShapingGenerator(cb ControlBehavior, generator TrafficControllerGenFunc) error { +// SetTrafficShapingGenerator sets the traffic controller generator for the given control strategy. +// Note that modifying the generator of default control strategy is not allowed. +func SetTrafficShapingGenerator(cs ControlStrategy, generator TrafficControllerGenFunc) error { if generator == nil { return errors.New("nil generator") } - if cb >= Reject && cb <= WarmUpThrottling { - return errors.New("not allowed to replace the generator for default control behaviors") + if cs.TokenCalculateStrategy >= Direct && cs.TokenCalculateStrategy <= WarmUp { + return errors.New("not allowed to replace the generator for default control strategy") + } + if cs.ControlBehavior >= Reject && cs.ControlBehavior <= Throttling { + return errors.New("not allowed to replace the generator for default control strategy") } tcMux.Lock() defer tcMux.Unlock() - tcGenFuncMap[cb] = generator + tcGenFuncMap[cs] = generator return nil } -func RemoveTrafficShapingGenerator(cb ControlBehavior) error { - if cb >= Reject && cb <= WarmUpThrottling { - return errors.New("not allowed to replace the generator for default control behaviors") +func RemoveTrafficShapingGenerator(cs ControlStrategy) error { + if cs.TokenCalculateStrategy >= Direct && cs.TokenCalculateStrategy <= WarmUp { + return errors.New("not allowed to replace the generator for default control strategy") + } + if cs.ControlBehavior >= Reject && cs.ControlBehavior <= Throttling { + return errors.New("not allowed to replace the generator for default control strategy") } tcMux.Lock() defer tcMux.Unlock() - delete(tcGenFuncMap, cb) + delete(tcGenFuncMap, cs) return nil } @@ -151,11 +213,11 @@ func buildFlowMap(rules []*Rule) TrafficControllerMap { } for _, rule := range rules { - if err := IsValidFlowRule(rule); err != nil { + if err := IsValidRule(rule); err != nil { logging.Warnf("Ignoring invalid flow rule: %v, reason: %s", rule, err.Error()) continue } - generator, supported := tcGenFuncMap[rule.ControlBehavior] + generator, supported := tcGenFuncMap[rule.ControlStrategy] if !supported { logging.Warnf("Ignoring the rule due to unsupported control behavior: %v", rule) continue @@ -176,8 +238,8 @@ func buildFlowMap(rules []*Rule) TrafficControllerMap { return m } -// IsValidFlowRule checks whether the given Rule is valid. -func IsValidFlowRule(rule *Rule) error { +// IsValidRule checks whether the given Rule is valid. +func IsValidRule(rule *Rule) error { if rule == nil { return errors.New("nil Rule") } @@ -193,19 +255,19 @@ func IsValidFlowRule(rule *Rule) error { if rule.RelationStrategy < 0 { return errors.New("invalid relation strategy") } - if rule.ControlBehavior < 0 { - return errors.New("invalid control behavior") + if rule.ControlStrategy.TokenCalculateStrategy < 0 || rule.ControlStrategy.ControlBehavior < 0 { + return errors.New("invalid control strategy") } if rule.RelationStrategy == AssociatedResource && rule.RefResource == "" { return errors.New("Bad flow rule: invalid control behavior") } - return checkControlBehaviorField(rule) + return checkControlStrategyField(rule) } -func checkControlBehaviorField(rule *Rule) error { - switch rule.ControlBehavior { +func checkControlStrategyField(rule *Rule) error { + switch rule.ControlStrategy.TokenCalculateStrategy { case WarmUp: if rule.WarmUpPeriodSec <= 0 { return errors.New("invalid warmUpPeriodSec") @@ -214,11 +276,6 @@ func checkControlBehaviorField(rule *Rule) error { return errors.New("WarmUpColdFactor must be great than 1") } return nil - case WarmUpThrottling: - if rule.WarmUpPeriodSec <= 0 { - return errors.New("invalid warmUpPeriodSec") - } - return nil default: } return nil diff --git a/core/flow/rule_manager_test.go b/core/flow/rule_manager_test.go index b308e0dc1..03b51e656 100644 --- a/core/flow/rule_manager_test.go +++ b/core/flow/rule_manager_test.go @@ -1,23 +1,33 @@ package flow import ( + "reflect" "testing" "github.com/stretchr/testify/assert" ) func TestSetAndRemoveTrafficShapingGenerator(t *testing.T) { - tsc := NewTrafficShapingController(nil, nil, nil) + tsc := &TrafficShapingController{} - err := SetTrafficShapingGenerator(Reject, func(_ *Rule) *TrafficShapingController { + err := SetTrafficShapingGenerator(ControlStrategy{ + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + }, func(_ *Rule) *TrafficShapingController { return tsc }) assert.Error(t, err, "default control behaviors are not allowed to be modified") - err = RemoveTrafficShapingGenerator(Reject) + err = RemoveTrafficShapingGenerator(ControlStrategy{ + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + }) assert.Error(t, err, "default control behaviors are not allowed to be removed") - cb := ControlBehavior(9999) - err = SetTrafficShapingGenerator(cb, func(_ *Rule) *TrafficShapingController { + cs := ControlStrategy{ + TokenCalculateStrategy: TokenCalculateStrategy(111), + ControlBehavior: ControlBehavior(112), + } + err = SetTrafficShapingGenerator(cs, func(_ *Rule) *TrafficShapingController { return tsc }) assert.NoError(t, err) @@ -29,17 +39,17 @@ func TestSetAndRemoveTrafficShapingGenerator(t *testing.T) { Count: 20, MetricType: QPS, Resource: resource, - ControlBehavior: cb, + ControlStrategy: cs, }, }) assert.NoError(t, err) - assert.Contains(t, tcGenFuncMap, cb) + assert.Contains(t, tcGenFuncMap, cs) assert.NotZero(t, len(tcMap[resource])) assert.Equal(t, tsc, tcMap[resource][0]) - err = RemoveTrafficShapingGenerator(cb) + err = RemoveTrafficShapingGenerator(cs) assert.NoError(t, err) - assert.NotContains(t, tcGenFuncMap, cb) + assert.NotContains(t, tcGenFuncMap, cs) _, _ = LoadRules([]*Rule{}) } @@ -47,11 +57,115 @@ func TestSetAndRemoveTrafficShapingGenerator(t *testing.T) { func TestIsValidFlowRule(t *testing.T) { badRule1 := &Rule{ID: 1, Count: 1, MetricType: QPS, Resource: ""} badRule2 := &Rule{ID: 1, Count: -1.9, MetricType: QPS, Resource: "test"} - badRule3 := &Rule{Count: 5, MetricType: QPS, Resource: "test", ControlBehavior: WarmUp} - goodRule1 := &Rule{Count: 10, MetricType: QPS, Resource: "test", ControlBehavior: Throttling} + badRule3 := &Rule{Count: 5, MetricType: QPS, Resource: "test", ControlStrategy: ControlStrategy{TokenCalculateStrategy: WarmUp, ControlBehavior: Reject}} + goodRule1 := &Rule{Count: 10, MetricType: QPS, Resource: "test", ControlStrategy: ControlStrategy{TokenCalculateStrategy: WarmUp, ControlBehavior: Throttling}, WarmUpPeriodSec: 10} + + assert.Error(t, IsValidRule(badRule1)) + assert.Error(t, IsValidRule(badRule2)) + assert.Error(t, IsValidRule(badRule3)) + assert.NoError(t, IsValidRule(goodRule1)) +} + +func TestGetRules(t *testing.T) { + t.Run("GetRules", func(t *testing.T) { + if err := ClearRules(); err != nil { + t.Fatal(err) + } + r1 := &Rule{ + ID: 0, + Resource: "abc1", + MetricType: 0, + Count: 0, + RelationStrategy: 0, + ControlStrategy: ControlStrategy{ + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + }, + RefResource: "", + WarmUpPeriodSec: 0, + MaxQueueingTimeMs: 0, + } + r2 := &Rule{ + ID: 1, + Resource: "abc2", + MetricType: 0, + Count: 0, + RelationStrategy: 0, + ControlStrategy: ControlStrategy{ + TokenCalculateStrategy: Direct, + ControlBehavior: Throttling, + }, + RefResource: "", + WarmUpPeriodSec: 0, + MaxQueueingTimeMs: 0, + } + if _, err := LoadRules([]*Rule{r1, r2}); err != nil { + t.Fatal(err) + } - assert.Error(t, IsValidFlowRule(badRule1)) - assert.Error(t, IsValidFlowRule(badRule2)) - assert.Error(t, IsValidFlowRule(badRule3)) - assert.NoError(t, IsValidFlowRule(goodRule1)) + rs1 := GetRules() + if rs1[0].Resource == "abc1" { + assert.True(t, &rs1[0] != r1) + assert.True(t, &rs1[1] != r2) + assert.True(t, reflect.DeepEqual(&rs1[0], r1)) + assert.True(t, reflect.DeepEqual(&rs1[1], r2)) + } else { + assert.True(t, &rs1[0] != r2) + assert.True(t, &rs1[1] != r1) + assert.True(t, reflect.DeepEqual(&rs1[0], r2)) + assert.True(t, reflect.DeepEqual(&rs1[1], r1)) + } + if err := ClearRules(); err != nil { + t.Fatal(err) + } + }) + + t.Run("getRules", func(t *testing.T) { + r1 := &Rule{ + ID: 0, + Resource: "abc1", + MetricType: 0, + Count: 0, + RelationStrategy: 0, + ControlStrategy: ControlStrategy{ + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + }, + RefResource: "", + WarmUpPeriodSec: 0, + MaxQueueingTimeMs: 0, + } + r2 := &Rule{ + ID: 1, + Resource: "abc2", + MetricType: 0, + Count: 0, + RelationStrategy: 0, + ControlStrategy: ControlStrategy{ + TokenCalculateStrategy: Direct, + ControlBehavior: Throttling, + }, + RefResource: "", + WarmUpPeriodSec: 0, + MaxQueueingTimeMs: 0, + } + if _, err := LoadRules([]*Rule{r1, r2}); err != nil { + t.Fatal(err) + } + rs2 := getRules() + if rs2[0].Resource == "abc1" { + assert.True(t, rs2[0] == r1) + assert.True(t, rs2[1] == r2) + assert.True(t, reflect.DeepEqual(rs2[0], r1)) + assert.True(t, reflect.DeepEqual(rs2[1], r2)) + } else { + assert.True(t, rs2[0] == r2) + assert.True(t, rs2[1] == r1) + assert.True(t, reflect.DeepEqual(rs2[0], r2)) + assert.True(t, reflect.DeepEqual(rs2[1], r1)) + } + if err := ClearRules(); err != nil { + t.Fatal(err) + } + }) } diff --git a/core/flow/tc_default.go b/core/flow/tc_default.go index c767e694f..e8ac91fc6 100644 --- a/core/flow/tc_default.go +++ b/core/flow/tc_default.go @@ -4,15 +4,15 @@ import ( "github.com/alibaba/sentinel-golang/core/base" ) -type DefaultTrafficShapingCalculator struct { +type DirectTrafficShapingCalculator struct { threshold float64 } -func NewDefaultTrafficShapingCalculator(threshold float64) *DefaultTrafficShapingCalculator { - return &DefaultTrafficShapingCalculator{threshold: threshold} +func NewDirectTrafficShapingCalculator(threshold float64) *DirectTrafficShapingCalculator { + return &DirectTrafficShapingCalculator{threshold: threshold} } -func (d *DefaultTrafficShapingCalculator) CalculateAllowedTokens(base.StatNode, uint32, int32) float64 { +func (d *DirectTrafficShapingCalculator) CalculateAllowedTokens(base.StatNode, uint32, int32) float64 { return d.threshold } diff --git a/example/qps/qps_limit_example.go b/example/qps/qps_limit_example.go index e7d527078..7eabb78ef 100644 --- a/example/qps/qps_limit_example.go +++ b/example/qps/qps_limit_example.go @@ -21,10 +21,13 @@ func main() { _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "some-test", - MetricType: flow.QPS, - Count: 10, - ControlBehavior: flow.Reject, + Resource: "some-test", + MetricType: flow.QPS, + Count: 10, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) if err != nil { diff --git a/example/qps/qps_limit_example_test.go b/example/qps/qps_limit_example_test.go index f821e4a27..68c78b4ca 100644 --- a/example/qps/qps_limit_example_test.go +++ b/example/qps/qps_limit_example_test.go @@ -28,10 +28,13 @@ func doTest() { _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "some-test", - MetricType: flow.QPS, - Count: 100, - ControlBehavior: flow.Reject, + Resource: "some-test", + MetricType: flow.QPS, + Count: 100, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, }, }) if err != nil { diff --git a/example/warm_up/qps_warm_up_example.go b/example/warm_up/qps_warm_up_example.go index 0f5563acb..9f1d5c7ed 100644 --- a/example/warm_up/qps_warm_up_example.go +++ b/example/warm_up/qps_warm_up_example.go @@ -33,10 +33,13 @@ func main() { _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "some-test", - MetricType: flow.QPS, - Count: 100, - ControlBehavior: flow.WarmUp, + Resource: "some-test", + MetricType: flow.QPS, + Count: 100, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.WarmUp, + ControlBehavior: flow.Reject, + }, WarmUpPeriodSec: 10, WarmUpColdFactor: 3, }, diff --git a/example/warm_up/qps_warm_up_example_test.go b/example/warm_up/qps_warm_up_example_test.go index 075502b30..dac06cc50 100644 --- a/example/warm_up/qps_warm_up_example_test.go +++ b/example/warm_up/qps_warm_up_example_test.go @@ -30,10 +30,13 @@ func doTest() { _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "some-test", - MetricType: flow.QPS, - Count: 100, - ControlBehavior: flow.WarmUp, + Resource: "some-test", + MetricType: flow.QPS, + Count: 100, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.WarmUp, + ControlBehavior: flow.Reject, + }, WarmUpPeriodSec: 10, }, }) diff --git a/ext/datasource/helper_test.go b/ext/datasource/helper_test.go index 5984bf9cb..3cf3fd794 100644 --- a/ext/datasource/helper_test.go +++ b/ext/datasource/helper_test.go @@ -47,11 +47,14 @@ func TestFlowRulesJsonConverter(t *testing.T) { flowRules := got.([]*flow.Rule) assert.True(t, len(flowRules) == 3) r1 := &flow.Rule{ - Resource: "abc", - MetricType: flow.Concurrency, - Count: 100, - RelationStrategy: flow.Direct, - ControlBehavior: flow.Reject, + Resource: "abc", + MetricType: flow.Concurrency, + Count: 100, + RelationStrategy: flow.DirectResource, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, RefResource: "refDefault", WarmUpPeriodSec: 10, MaxQueueingTimeMs: 1000, @@ -59,11 +62,14 @@ func TestFlowRulesJsonConverter(t *testing.T) { assert.True(t, reflect.DeepEqual(flowRules[0], r1)) r2 := &flow.Rule{ - Resource: "abc", - MetricType: flow.QPS, - Count: 200, - RelationStrategy: flow.AssociatedResource, - ControlBehavior: flow.WarmUp, + Resource: "abc", + MetricType: flow.QPS, + Count: 200, + RelationStrategy: flow.AssociatedResource, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Throttling, + }, RefResource: "refDefault", WarmUpPeriodSec: 20, MaxQueueingTimeMs: 2000, @@ -71,11 +77,14 @@ func TestFlowRulesJsonConverter(t *testing.T) { assert.True(t, reflect.DeepEqual(flowRules[1], r2)) r3 := &flow.Rule{ - Resource: "abc", - MetricType: flow.QPS, - Count: 300, - RelationStrategy: flow.Direct, - ControlBehavior: flow.WarmUp, + Resource: "abc", + MetricType: flow.QPS, + Count: 300, + RelationStrategy: flow.DirectResource, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Throttling, + }, RefResource: "refDefault", WarmUpPeriodSec: 30, MaxQueueingTimeMs: 3000, @@ -89,12 +98,15 @@ func TestFlowRulesUpdater(t *testing.T) { flow.ClearRules() flow.LoadRules([]*flow.Rule{ { - ID: 0, - Resource: "abc", - MetricType: 0, - Count: 0, - RelationStrategy: 0, - ControlBehavior: 0, + ID: 0, + Resource: "abc", + MetricType: 0, + Count: 0, + RelationStrategy: 0, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, RefResource: "", WarmUpPeriodSec: 0, MaxQueueingTimeMs: 0, @@ -121,12 +133,15 @@ func TestFlowRulesUpdater(t *testing.T) { flow.ClearRules() p := make([]flow.Rule, 0) fw := flow.Rule{ - ID: 0, - Resource: "aaaa", - MetricType: 0, - Count: 0, - RelationStrategy: 0, - ControlBehavior: 0, + ID: 0, + Resource: "aaaa", + MetricType: 0, + Count: 0, + RelationStrategy: 0, + ControlStrategy: flow.ControlStrategy{ + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + }, RefResource: "", WarmUpPeriodSec: 0, MaxQueueingTimeMs: 0, diff --git a/tests/testdata/extension/helper/FlowRule.json b/tests/testdata/extension/helper/FlowRule.json index ad373230f..0266d93ec 100644 --- a/tests/testdata/extension/helper/FlowRule.json +++ b/tests/testdata/extension/helper/FlowRule.json @@ -4,7 +4,10 @@ "metricType": 0, "count": 100.0, "relationStrategy": 0, - "controlBehavior": 0, + "controlStrategy": { + "tokenCalculateStrategy": 0, + "controlBehavior": 0 + }, "refResource": "refDefault", "warmUpPeriodSec": 10, "maxQueueingTimeMs": 1000 @@ -14,7 +17,10 @@ "metricType": 1, "count": 200.0, "relationStrategy": 1, - "controlBehavior": 1, + "controlStrategy": { + "tokenCalculateStrategy": 0, + "controlBehavior": 1 + }, "refResource": "refDefault", "warmUpPeriodSec": 20, "maxQueueingTimeMs": 2000 @@ -24,7 +30,10 @@ "metricType": 1, "count": 300.0, "relationStrategy": 0, - "controlBehavior": 1, + "controlStrategy": { + "tokenCalculateStrategy": 0, + "controlBehavior": 1 + }, "refResource": "refDefault", "warmUpPeriodSec": 30, "maxQueueingTimeMs": 3000 From 397dc4397aa0837fa8197edacbff90fdc28102f8 Mon Sep 17 00:00:00 2001 From: louyuting <1849491904@qq.com> Date: Tue, 8 Sep 2020 20:30:29 +0800 Subject: [PATCH 2/6] inprove naming --- core/flow/rule.go | 8 ++++---- ext/datasource/helper_test.go | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/flow/rule.go b/core/flow/rule.go index 84873ada1..260bf09f4 100644 --- a/core/flow/rule.go +++ b/core/flow/rule.go @@ -30,16 +30,16 @@ func (s MetricType) String() string { type RelationStrategy int32 const ( - // DirectResource means flow control by current resource directly. - DirectResource RelationStrategy = iota + // CurrentResource means flow control by current resource directly. + CurrentResource RelationStrategy = iota // AssociatedResource means flow control by the associated resource rather than current resource. AssociatedResource ) func (s RelationStrategy) String() string { switch s { - case DirectResource: - return "DirectResource" + case CurrentResource: + return "CurrentResource" case AssociatedResource: return "AssociatedResource" default: diff --git a/ext/datasource/helper_test.go b/ext/datasource/helper_test.go index 3cf3fd794..2c50c22ea 100644 --- a/ext/datasource/helper_test.go +++ b/ext/datasource/helper_test.go @@ -50,7 +50,7 @@ func TestFlowRulesJsonConverter(t *testing.T) { Resource: "abc", MetricType: flow.Concurrency, Count: 100, - RelationStrategy: flow.DirectResource, + RelationStrategy: flow.CurrentResource, ControlStrategy: flow.ControlStrategy{ TokenCalculateStrategy: flow.Direct, ControlBehavior: flow.Reject, @@ -80,7 +80,7 @@ func TestFlowRulesJsonConverter(t *testing.T) { Resource: "abc", MetricType: flow.QPS, Count: 300, - RelationStrategy: flow.DirectResource, + RelationStrategy: flow.CurrentResource, ControlStrategy: flow.ControlStrategy{ TokenCalculateStrategy: flow.Direct, ControlBehavior: flow.Throttling, From c9cf1f437bc83aa04a81d164b1bc80f133688479 Mon Sep 17 00:00:00 2001 From: louyuting <1849491904@qq.com> Date: Tue, 8 Sep 2020 21:40:11 +0800 Subject: [PATCH 3/6] flatten flow.Rule --- adapter/echo/middleware_test.go | 24 ++-- adapter/gin/middleware_test.go | 24 ++-- adapter/grpc/client_test.go | 48 +++---- adapter/grpc/server_test.go | 48 +++---- adapter/micro/client_test.go | 12 +- adapter/micro/server_test.go | 24 ++-- core/flow/rule.go | 33 ++--- core/flow/rule_manager.go | 63 +++++---- core/flow/rule_manager_test.go | 126 ++++++++---------- example/qps/qps_limit_example.go | 12 +- example/qps/qps_limit_example_test.go | 12 +- example/warm_up/qps_warm_up_example.go | 16 +-- example/warm_up/qps_warm_up_example_test.go | 14 +- ext/datasource/helper_test.go | 104 +++++++-------- tests/testdata/extension/helper/FlowRule.json | 18 +-- 15 files changed, 259 insertions(+), 319 deletions(-) diff --git a/adapter/echo/middleware_test.go b/adapter/echo/middleware_test.go index 27e815f30..9e4cbebbe 100644 --- a/adapter/echo/middleware_test.go +++ b/adapter/echo/middleware_test.go @@ -20,22 +20,18 @@ func initSentinel(t *testing.T) { _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "GET:/ping", - MetricType: flow.QPS, - Count: 1, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: "GET:/ping", + MetricType: flow.QPS, + Count: 1, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, { - Resource: "/api/:uid", - MetricType: flow.QPS, - Count: 0, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: "/api/:uid", + MetricType: flow.QPS, + Count: 0, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) if err != nil { diff --git a/adapter/gin/middleware_test.go b/adapter/gin/middleware_test.go index 7f95b6bba..9f31217cf 100644 --- a/adapter/gin/middleware_test.go +++ b/adapter/gin/middleware_test.go @@ -20,22 +20,18 @@ func initSentinel(t *testing.T) { _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "GET:/ping", - MetricType: flow.QPS, - Count: 1, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: "GET:/ping", + MetricType: flow.QPS, + Count: 1, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, { - Resource: "/api/users/:id", - MetricType: flow.QPS, - Count: 0, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: "/api/users/:id", + MetricType: flow.QPS, + Count: 0, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) if err != nil { diff --git a/adapter/grpc/client_test.go b/adapter/grpc/client_test.go index bfdfbef87..26368f69c 100644 --- a/adapter/grpc/client_test.go +++ b/adapter/grpc/client_test.go @@ -25,13 +25,11 @@ func TestUnaryClientIntercept(t *testing.T) { t.Run("success", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "client:" + method, - MetricType: flow.QPS, - Count: 1, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: "client:" + method, + MetricType: flow.QPS, + Count: 1, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) assert.Nil(t, err) @@ -46,13 +44,11 @@ func TestUnaryClientIntercept(t *testing.T) { t.Run("fail", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "client:" + method, - MetricType: flow.QPS, - Count: 0, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: "client:" + method, + MetricType: flow.QPS, + Count: 0, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) assert.Nil(t, err) @@ -75,13 +71,11 @@ func TestStreamClientIntercept(t *testing.T) { t.Run("success", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "client:/grpc.testing.TestService/StreamingOutputCall", - MetricType: flow.QPS, - Count: 1, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: "client:/grpc.testing.TestService/StreamingOutputCall", + MetricType: flow.QPS, + Count: 1, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) assert.Nil(t, err) @@ -98,13 +92,11 @@ func TestStreamClientIntercept(t *testing.T) { t.Run("fail", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "client:/grpc.testing.TestService/StreamingOutputCall", - MetricType: flow.QPS, - Count: 0, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: "client:/grpc.testing.TestService/StreamingOutputCall", + MetricType: flow.QPS, + Count: 0, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) assert.Nil(t, err) diff --git a/adapter/grpc/server_test.go b/adapter/grpc/server_test.go index 0859c4c4a..d9ec2bcae 100644 --- a/adapter/grpc/server_test.go +++ b/adapter/grpc/server_test.go @@ -31,13 +31,11 @@ func TestStreamServerIntercept(t *testing.T) { t.Run("success", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "/grpc.testing.TestService/StreamingInputCall", - MetricType: flow.QPS, - Count: 1, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: "/grpc.testing.TestService/StreamingInputCall", + MetricType: flow.QPS, + Count: 1, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) assert.Nil(t, err) @@ -52,13 +50,11 @@ func TestStreamServerIntercept(t *testing.T) { t.Run("fail", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "/grpc.testing.TestService/StreamingInputCall", - MetricType: flow.QPS, - Count: 0, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: "/grpc.testing.TestService/StreamingInputCall", + MetricType: flow.QPS, + Count: 0, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) assert.Nil(t, err) @@ -79,13 +75,11 @@ func TestUnaryServerIntercept(t *testing.T) { t.Run("success", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "/grpc.testing.TestService/UnaryCall", - MetricType: flow.QPS, - Count: 1, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: "/grpc.testing.TestService/UnaryCall", + MetricType: flow.QPS, + Count: 1, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) assert.Nil(t, err) @@ -110,13 +104,11 @@ func TestUnaryServerIntercept(t *testing.T) { t.Run("fail", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "/grpc.testing.TestService/UnaryCall", - MetricType: flow.QPS, - Count: 0, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: "/grpc.testing.TestService/UnaryCall", + MetricType: flow.QPS, + Count: 0, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) assert.Nil(t, err) diff --git a/adapter/micro/client_test.go b/adapter/micro/client_test.go index e11cbcc5c..b8cb78183 100644 --- a/adapter/micro/client_test.go +++ b/adapter/micro/client_test.go @@ -47,13 +47,11 @@ func TestClientLimiter(t *testing.T) { t.Run("success", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: req.Method(), - MetricType: flow.QPS, - Count: 1, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: req.Method(), + MetricType: flow.QPS, + Count: 1, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) assert.Nil(t, err) diff --git a/adapter/micro/server_test.go b/adapter/micro/server_test.go index 9fbb20eec..b0a0c7bce 100644 --- a/adapter/micro/server_test.go +++ b/adapter/micro/server_test.go @@ -57,13 +57,11 @@ func TestServerLimiter(t *testing.T) { _, err = flow.LoadRules([]*flow.Rule{ { - Resource: req.Method(), - MetricType: flow.QPS, - Count: 1, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: req.Method(), + MetricType: flow.QPS, + Count: 1, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) @@ -74,13 +72,11 @@ func TestServerLimiter(t *testing.T) { t.Run("success", func(t *testing.T) { var _, err = flow.LoadRules([]*flow.Rule{ { - Resource: req.Method(), - MetricType: flow.QPS, - Count: 1, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: req.Method(), + MetricType: flow.QPS, + Count: 1, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) assert.Nil(t, err) diff --git a/core/flow/rule.go b/core/flow/rule.go index 260bf09f4..5932945d9 100644 --- a/core/flow/rule.go +++ b/core/flow/rule.go @@ -83,40 +83,31 @@ func (s ControlBehavior) String() string { } } -type ControlStrategy struct { - TokenCalculateStrategy TokenCalculateStrategy `json:"tokenCalculateStrategy"` - ControlBehavior ControlBehavior `json:"controlBehavior"` -} - -func (s ControlStrategy) String() string { - return fmt.Sprintf("{TokenCalculateStrategy: %s, ControlBehavior: %s}", s.TokenCalculateStrategy, s.ControlBehavior) -} - // Rule describes the strategy of flow control. type Rule struct { // ID represents the unique ID of the rule (optional). ID uint64 `json:"id,omitempty"` // Resource represents the resource name. - Resource string `json:"resource"` - MetricType MetricType `json:"metricType"` - ControlStrategy ControlStrategy `json:"controlStrategy"` - // Count represents the threshold. - Count float64 `json:"count"` - RelationStrategy RelationStrategy `json:"relationStrategy"` - RefResource string `json:"refResource"` - MaxQueueingTimeMs uint32 `json:"maxQueueingTimeMs"` - WarmUpPeriodSec uint32 `json:"warmUpPeriodSec"` - WarmUpColdFactor uint32 `json:"warmUpColdFactor"` + Resource string `json:"resource"` + MetricType MetricType `json:"metricType"` + TokenCalculateStrategy TokenCalculateStrategy `json:"tokenCalculateStrategy"` + ControlBehavior ControlBehavior `json:"controlBehavior"` + Count float64 `json:"count"` + RelationStrategy RelationStrategy `json:"relationStrategy"` + RefResource string `json:"refResource"` + MaxQueueingTimeMs uint32 `json:"maxQueueingTimeMs"` + WarmUpPeriodSec uint32 `json:"warmUpPeriodSec"` + WarmUpColdFactor uint32 `json:"warmUpColdFactor"` } func (r *Rule) String() string { b, err := json.Marshal(r) if err != nil { // Return the fallback string - return fmt.Sprintf("Rule{Resource=%s, MetricType=%s, ControlStrategy=%s, "+ + return fmt.Sprintf("Rule{Resource=%s, MetricType=%s, TokenCalculateStrategy=%s, ControlBehavior=%s, "+ "Count=%.2f, RelationStrategy=%s, WarmUpPeriodSec=%d, WarmUpColdFactor=%d, MaxQueueingTimeMs=%d}", - r.Resource, r.MetricType, r.ControlStrategy, r.Count, r.RelationStrategy, r.WarmUpPeriodSec, r.WarmUpColdFactor, r.MaxQueueingTimeMs) + r.Resource, r.MetricType, r.TokenCalculateStrategy, r.ControlBehavior, r.Count, r.RelationStrategy, r.WarmUpPeriodSec, r.WarmUpColdFactor, r.MaxQueueingTimeMs) } return string(b) } diff --git a/core/flow/rule_manager.go b/core/flow/rule_manager.go index 5400bbf74..7e2ac4e12 100644 --- a/core/flow/rule_manager.go +++ b/core/flow/rule_manager.go @@ -13,38 +13,43 @@ import ( // TrafficControllerGenFunc represents the TrafficShapingController generator function of a specific control behavior. type TrafficControllerGenFunc func(*Rule) *TrafficShapingController +type trafficControllerGenKey struct { + tokenCalculateStrategy TokenCalculateStrategy + controlBehavior ControlBehavior +} + // TrafficControllerMap represents the map storage for TrafficShapingController. type TrafficControllerMap map[string][]*TrafficShapingController var ( - tcGenFuncMap = make(map[ControlStrategy]TrafficControllerGenFunc) + tcGenFuncMap = make(map[trafficControllerGenKey]TrafficControllerGenFunc) tcMap = make(TrafficControllerMap) tcMux = new(sync.RWMutex) ) func init() { // Initialize the traffic shaping controller generator map for existing control behaviors. - tcGenFuncMap[ControlStrategy{ - TokenCalculateStrategy: Direct, - ControlBehavior: Reject, + tcGenFuncMap[trafficControllerGenKey{ + tokenCalculateStrategy: Direct, + controlBehavior: Reject, }] = func(rule *Rule) *TrafficShapingController { return NewTrafficShapingController(NewDirectTrafficShapingCalculator(rule.Count), NewDefaultTrafficShapingChecker(rule), rule) } - tcGenFuncMap[ControlStrategy{ - TokenCalculateStrategy: Direct, - ControlBehavior: Throttling, + tcGenFuncMap[trafficControllerGenKey{ + tokenCalculateStrategy: Direct, + controlBehavior: Throttling, }] = func(rule *Rule) *TrafficShapingController { return NewTrafficShapingController(NewDirectTrafficShapingCalculator(rule.Count), NewThrottlingChecker(rule.MaxQueueingTimeMs), rule) } - tcGenFuncMap[ControlStrategy{ - TokenCalculateStrategy: WarmUp, - ControlBehavior: Reject, + tcGenFuncMap[trafficControllerGenKey{ + tokenCalculateStrategy: WarmUp, + controlBehavior: Reject, }] = func(rule *Rule) *TrafficShapingController { return NewTrafficShapingController(NewWarmUpTrafficShapingCalculator(rule), NewDefaultTrafficShapingChecker(rule), rule) } - tcGenFuncMap[ControlStrategy{ - TokenCalculateStrategy: WarmUp, - ControlBehavior: Throttling, + tcGenFuncMap[trafficControllerGenKey{ + tokenCalculateStrategy: WarmUp, + controlBehavior: Throttling, }] = func(rule *Rule) *TrafficShapingController { return NewTrafficShapingController(NewWarmUpTrafficShapingCalculator(rule), NewThrottlingChecker(rule.MaxQueueingTimeMs), rule) } @@ -167,34 +172,41 @@ func rulesFrom(m TrafficControllerMap) []*Rule { // SetTrafficShapingGenerator sets the traffic controller generator for the given control strategy. // Note that modifying the generator of default control strategy is not allowed. -func SetTrafficShapingGenerator(cs ControlStrategy, generator TrafficControllerGenFunc) error { +func SetTrafficShapingGenerator(tokenCalculateStrategy TokenCalculateStrategy, controlBehavior ControlBehavior, generator TrafficControllerGenFunc) error { if generator == nil { return errors.New("nil generator") } - if cs.TokenCalculateStrategy >= Direct && cs.TokenCalculateStrategy <= WarmUp { + + if tokenCalculateStrategy >= Direct && tokenCalculateStrategy <= WarmUp { return errors.New("not allowed to replace the generator for default control strategy") } - if cs.ControlBehavior >= Reject && cs.ControlBehavior <= Throttling { + if controlBehavior >= Reject && controlBehavior <= Throttling { return errors.New("not allowed to replace the generator for default control strategy") } tcMux.Lock() defer tcMux.Unlock() - tcGenFuncMap[cs] = generator + tcGenFuncMap[trafficControllerGenKey{ + tokenCalculateStrategy: tokenCalculateStrategy, + controlBehavior: controlBehavior, + }] = generator return nil } -func RemoveTrafficShapingGenerator(cs ControlStrategy) error { - if cs.TokenCalculateStrategy >= Direct && cs.TokenCalculateStrategy <= WarmUp { +func RemoveTrafficShapingGenerator(tokenCalculateStrategy TokenCalculateStrategy, controlBehavior ControlBehavior) error { + if tokenCalculateStrategy >= Direct && tokenCalculateStrategy <= WarmUp { return errors.New("not allowed to replace the generator for default control strategy") } - if cs.ControlBehavior >= Reject && cs.ControlBehavior <= Throttling { + if controlBehavior >= Reject && controlBehavior <= Throttling { return errors.New("not allowed to replace the generator for default control strategy") } tcMux.Lock() defer tcMux.Unlock() - delete(tcGenFuncMap, cs) + delete(tcGenFuncMap, trafficControllerGenKey{ + tokenCalculateStrategy: tokenCalculateStrategy, + controlBehavior: controlBehavior, + }) return nil } @@ -217,7 +229,10 @@ func buildFlowMap(rules []*Rule) TrafficControllerMap { logging.Warnf("Ignoring invalid flow rule: %v, reason: %s", rule, err.Error()) continue } - generator, supported := tcGenFuncMap[rule.ControlStrategy] + generator, supported := tcGenFuncMap[trafficControllerGenKey{ + tokenCalculateStrategy: rule.TokenCalculateStrategy, + controlBehavior: rule.ControlBehavior, + }] if !supported { logging.Warnf("Ignoring the rule due to unsupported control behavior: %v", rule) continue @@ -255,7 +270,7 @@ func IsValidRule(rule *Rule) error { if rule.RelationStrategy < 0 { return errors.New("invalid relation strategy") } - if rule.ControlStrategy.TokenCalculateStrategy < 0 || rule.ControlStrategy.ControlBehavior < 0 { + if rule.TokenCalculateStrategy < 0 || rule.ControlBehavior < 0 { return errors.New("invalid control strategy") } @@ -267,7 +282,7 @@ func IsValidRule(rule *Rule) error { } func checkControlStrategyField(rule *Rule) error { - switch rule.ControlStrategy.TokenCalculateStrategy { + switch rule.TokenCalculateStrategy { case WarmUp: if rule.WarmUpPeriodSec <= 0 { return errors.New("invalid warmUpPeriodSec") diff --git a/core/flow/rule_manager_test.go b/core/flow/rule_manager_test.go index 03b51e656..f11c4a949 100644 --- a/core/flow/rule_manager_test.go +++ b/core/flow/rule_manager_test.go @@ -10,24 +10,14 @@ import ( func TestSetAndRemoveTrafficShapingGenerator(t *testing.T) { tsc := &TrafficShapingController{} - err := SetTrafficShapingGenerator(ControlStrategy{ - TokenCalculateStrategy: Direct, - ControlBehavior: Reject, - }, func(_ *Rule) *TrafficShapingController { + err := SetTrafficShapingGenerator(Direct, Reject, func(_ *Rule) *TrafficShapingController { return tsc }) assert.Error(t, err, "default control behaviors are not allowed to be modified") - err = RemoveTrafficShapingGenerator(ControlStrategy{ - TokenCalculateStrategy: Direct, - ControlBehavior: Reject, - }) + err = RemoveTrafficShapingGenerator(Direct, Reject) assert.Error(t, err, "default control behaviors are not allowed to be removed") - cs := ControlStrategy{ - TokenCalculateStrategy: TokenCalculateStrategy(111), - ControlBehavior: ControlBehavior(112), - } - err = SetTrafficShapingGenerator(cs, func(_ *Rule) *TrafficShapingController { + err = SetTrafficShapingGenerator(TokenCalculateStrategy(111), ControlBehavior(112), func(_ *Rule) *TrafficShapingController { return tsc }) assert.NoError(t, err) @@ -35,19 +25,25 @@ func TestSetAndRemoveTrafficShapingGenerator(t *testing.T) { resource := "test-customized-tc" _, err = LoadRules([]*Rule{ { - ID: 10, - Count: 20, - MetricType: QPS, - Resource: resource, - ControlStrategy: cs, + ID: 10, + Count: 20, + MetricType: QPS, + Resource: resource, + TokenCalculateStrategy: TokenCalculateStrategy(111), + ControlBehavior: ControlBehavior(112), }, }) + + cs := trafficControllerGenKey{ + tokenCalculateStrategy: TokenCalculateStrategy(111), + controlBehavior: ControlBehavior(112), + } assert.NoError(t, err) assert.Contains(t, tcGenFuncMap, cs) assert.NotZero(t, len(tcMap[resource])) assert.Equal(t, tsc, tcMap[resource][0]) - err = RemoveTrafficShapingGenerator(cs) + err = RemoveTrafficShapingGenerator(TokenCalculateStrategy(111), ControlBehavior(112)) assert.NoError(t, err) assert.NotContains(t, tcGenFuncMap, cs) @@ -57,8 +53,8 @@ func TestSetAndRemoveTrafficShapingGenerator(t *testing.T) { func TestIsValidFlowRule(t *testing.T) { badRule1 := &Rule{ID: 1, Count: 1, MetricType: QPS, Resource: ""} badRule2 := &Rule{ID: 1, Count: -1.9, MetricType: QPS, Resource: "test"} - badRule3 := &Rule{Count: 5, MetricType: QPS, Resource: "test", ControlStrategy: ControlStrategy{TokenCalculateStrategy: WarmUp, ControlBehavior: Reject}} - goodRule1 := &Rule{Count: 10, MetricType: QPS, Resource: "test", ControlStrategy: ControlStrategy{TokenCalculateStrategy: WarmUp, ControlBehavior: Throttling}, WarmUpPeriodSec: 10} + badRule3 := &Rule{Count: 5, MetricType: QPS, Resource: "test", TokenCalculateStrategy: WarmUp, ControlBehavior: Reject} + goodRule1 := &Rule{Count: 10, MetricType: QPS, Resource: "test", TokenCalculateStrategy: WarmUp, ControlBehavior: Throttling, WarmUpPeriodSec: 10} assert.Error(t, IsValidRule(badRule1)) assert.Error(t, IsValidRule(badRule2)) @@ -72,32 +68,28 @@ func TestGetRules(t *testing.T) { t.Fatal(err) } r1 := &Rule{ - ID: 0, - Resource: "abc1", - MetricType: 0, - Count: 0, - RelationStrategy: 0, - ControlStrategy: ControlStrategy{ - TokenCalculateStrategy: Direct, - ControlBehavior: Reject, - }, - RefResource: "", - WarmUpPeriodSec: 0, - MaxQueueingTimeMs: 0, + ID: 0, + Resource: "abc1", + MetricType: 0, + Count: 0, + RelationStrategy: 0, + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + RefResource: "", + WarmUpPeriodSec: 0, + MaxQueueingTimeMs: 0, } r2 := &Rule{ - ID: 1, - Resource: "abc2", - MetricType: 0, - Count: 0, - RelationStrategy: 0, - ControlStrategy: ControlStrategy{ - TokenCalculateStrategy: Direct, - ControlBehavior: Throttling, - }, - RefResource: "", - WarmUpPeriodSec: 0, - MaxQueueingTimeMs: 0, + ID: 1, + Resource: "abc2", + MetricType: 0, + Count: 0, + RelationStrategy: 0, + TokenCalculateStrategy: Direct, + ControlBehavior: Throttling, + RefResource: "", + WarmUpPeriodSec: 0, + MaxQueueingTimeMs: 0, } if _, err := LoadRules([]*Rule{r1, r2}); err != nil { t.Fatal(err) @@ -122,32 +114,28 @@ func TestGetRules(t *testing.T) { t.Run("getRules", func(t *testing.T) { r1 := &Rule{ - ID: 0, - Resource: "abc1", - MetricType: 0, - Count: 0, - RelationStrategy: 0, - ControlStrategy: ControlStrategy{ - TokenCalculateStrategy: Direct, - ControlBehavior: Reject, - }, - RefResource: "", - WarmUpPeriodSec: 0, - MaxQueueingTimeMs: 0, + ID: 0, + Resource: "abc1", + MetricType: 0, + Count: 0, + RelationStrategy: 0, + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + RefResource: "", + WarmUpPeriodSec: 0, + MaxQueueingTimeMs: 0, } r2 := &Rule{ - ID: 1, - Resource: "abc2", - MetricType: 0, - Count: 0, - RelationStrategy: 0, - ControlStrategy: ControlStrategy{ - TokenCalculateStrategy: Direct, - ControlBehavior: Throttling, - }, - RefResource: "", - WarmUpPeriodSec: 0, - MaxQueueingTimeMs: 0, + ID: 1, + Resource: "abc2", + MetricType: 0, + Count: 0, + RelationStrategy: 0, + TokenCalculateStrategy: Direct, + ControlBehavior: Throttling, + RefResource: "", + WarmUpPeriodSec: 0, + MaxQueueingTimeMs: 0, } if _, err := LoadRules([]*Rule{r1, r2}); err != nil { t.Fatal(err) diff --git a/example/qps/qps_limit_example.go b/example/qps/qps_limit_example.go index 7eabb78ef..9ccd96edb 100644 --- a/example/qps/qps_limit_example.go +++ b/example/qps/qps_limit_example.go @@ -21,13 +21,11 @@ func main() { _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "some-test", - MetricType: flow.QPS, - Count: 10, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: "some-test", + MetricType: flow.QPS, + Count: 10, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) if err != nil { diff --git a/example/qps/qps_limit_example_test.go b/example/qps/qps_limit_example_test.go index 68c78b4ca..98ea497ed 100644 --- a/example/qps/qps_limit_example_test.go +++ b/example/qps/qps_limit_example_test.go @@ -28,13 +28,11 @@ func doTest() { _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "some-test", - MetricType: flow.QPS, - Count: 100, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, + Resource: "some-test", + MetricType: flow.QPS, + Count: 100, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, }, }) if err != nil { diff --git a/example/warm_up/qps_warm_up_example.go b/example/warm_up/qps_warm_up_example.go index 9f1d5c7ed..9dddc899b 100644 --- a/example/warm_up/qps_warm_up_example.go +++ b/example/warm_up/qps_warm_up_example.go @@ -33,15 +33,13 @@ func main() { _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "some-test", - MetricType: flow.QPS, - Count: 100, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.WarmUp, - ControlBehavior: flow.Reject, - }, - WarmUpPeriodSec: 10, - WarmUpColdFactor: 3, + Resource: "some-test", + MetricType: flow.QPS, + Count: 100, + TokenCalculateStrategy: flow.WarmUp, + ControlBehavior: flow.Reject, + WarmUpPeriodSec: 10, + WarmUpColdFactor: 3, }, }) if err != nil { diff --git a/example/warm_up/qps_warm_up_example_test.go b/example/warm_up/qps_warm_up_example_test.go index dac06cc50..0f2c9f482 100644 --- a/example/warm_up/qps_warm_up_example_test.go +++ b/example/warm_up/qps_warm_up_example_test.go @@ -30,14 +30,12 @@ func doTest() { _, err = flow.LoadRules([]*flow.Rule{ { - Resource: "some-test", - MetricType: flow.QPS, - Count: 100, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.WarmUp, - ControlBehavior: flow.Reject, - }, - WarmUpPeriodSec: 10, + Resource: "some-test", + MetricType: flow.QPS, + Count: 100, + TokenCalculateStrategy: flow.WarmUp, + ControlBehavior: flow.Reject, + WarmUpPeriodSec: 10, }, }) if err != nil { diff --git a/ext/datasource/helper_test.go b/ext/datasource/helper_test.go index 2c50c22ea..e36ba1e36 100644 --- a/ext/datasource/helper_test.go +++ b/ext/datasource/helper_test.go @@ -47,47 +47,41 @@ func TestFlowRulesJsonConverter(t *testing.T) { flowRules := got.([]*flow.Rule) assert.True(t, len(flowRules) == 3) r1 := &flow.Rule{ - Resource: "abc", - MetricType: flow.Concurrency, - Count: 100, - RelationStrategy: flow.CurrentResource, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, - RefResource: "refDefault", - WarmUpPeriodSec: 10, - MaxQueueingTimeMs: 1000, + Resource: "abc", + MetricType: flow.Concurrency, + Count: 100, + RelationStrategy: flow.CurrentResource, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + RefResource: "refDefault", + WarmUpPeriodSec: 10, + MaxQueueingTimeMs: 1000, } assert.True(t, reflect.DeepEqual(flowRules[0], r1)) r2 := &flow.Rule{ - Resource: "abc", - MetricType: flow.QPS, - Count: 200, - RelationStrategy: flow.AssociatedResource, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Throttling, - }, - RefResource: "refDefault", - WarmUpPeriodSec: 20, - MaxQueueingTimeMs: 2000, + Resource: "abc", + MetricType: flow.QPS, + Count: 200, + RelationStrategy: flow.AssociatedResource, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Throttling, + RefResource: "refDefault", + WarmUpPeriodSec: 20, + MaxQueueingTimeMs: 2000, } assert.True(t, reflect.DeepEqual(flowRules[1], r2)) r3 := &flow.Rule{ - Resource: "abc", - MetricType: flow.QPS, - Count: 300, - RelationStrategy: flow.CurrentResource, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Throttling, - }, - RefResource: "refDefault", - WarmUpPeriodSec: 30, - MaxQueueingTimeMs: 3000, + Resource: "abc", + MetricType: flow.QPS, + Count: 300, + RelationStrategy: flow.CurrentResource, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Throttling, + RefResource: "refDefault", + WarmUpPeriodSec: 30, + MaxQueueingTimeMs: 3000, } assert.True(t, reflect.DeepEqual(flowRules[2], r3)) }) @@ -98,18 +92,16 @@ func TestFlowRulesUpdater(t *testing.T) { flow.ClearRules() flow.LoadRules([]*flow.Rule{ { - ID: 0, - Resource: "abc", - MetricType: 0, - Count: 0, - RelationStrategy: 0, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, - RefResource: "", - WarmUpPeriodSec: 0, - MaxQueueingTimeMs: 0, + ID: 0, + Resource: "abc", + MetricType: 0, + Count: 0, + RelationStrategy: 0, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + RefResource: "", + WarmUpPeriodSec: 0, + MaxQueueingTimeMs: 0, }}) assert.True(t, len(flow.GetRules()) == 1, "Fail to prepare test data.") err := FlowRulesUpdater(nil) @@ -133,18 +125,16 @@ func TestFlowRulesUpdater(t *testing.T) { flow.ClearRules() p := make([]flow.Rule, 0) fw := flow.Rule{ - ID: 0, - Resource: "aaaa", - MetricType: 0, - Count: 0, - RelationStrategy: 0, - ControlStrategy: flow.ControlStrategy{ - TokenCalculateStrategy: flow.Direct, - ControlBehavior: flow.Reject, - }, - RefResource: "", - WarmUpPeriodSec: 0, - MaxQueueingTimeMs: 0, + ID: 0, + Resource: "aaaa", + MetricType: 0, + Count: 0, + RelationStrategy: 0, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + RefResource: "", + WarmUpPeriodSec: 0, + MaxQueueingTimeMs: 0, } p = append(p, fw) err := FlowRulesUpdater(p) diff --git a/tests/testdata/extension/helper/FlowRule.json b/tests/testdata/extension/helper/FlowRule.json index 0266d93ec..d4f2b7302 100644 --- a/tests/testdata/extension/helper/FlowRule.json +++ b/tests/testdata/extension/helper/FlowRule.json @@ -4,10 +4,8 @@ "metricType": 0, "count": 100.0, "relationStrategy": 0, - "controlStrategy": { - "tokenCalculateStrategy": 0, - "controlBehavior": 0 - }, + "tokenCalculateStrategy": 0, + "controlBehavior": 0, "refResource": "refDefault", "warmUpPeriodSec": 10, "maxQueueingTimeMs": 1000 @@ -17,10 +15,8 @@ "metricType": 1, "count": 200.0, "relationStrategy": 1, - "controlStrategy": { - "tokenCalculateStrategy": 0, - "controlBehavior": 1 - }, + "tokenCalculateStrategy": 0, + "controlBehavior": 1, "refResource": "refDefault", "warmUpPeriodSec": 20, "maxQueueingTimeMs": 2000 @@ -30,10 +26,8 @@ "metricType": 1, "count": 300.0, "relationStrategy": 0, - "controlStrategy": { - "tokenCalculateStrategy": 0, - "controlBehavior": 1 - }, + "tokenCalculateStrategy": 0, + "controlBehavior": 1, "refResource": "refDefault", "warmUpPeriodSec": 30, "maxQueueingTimeMs": 3000 From f717f78d4940026bee047af6994b75eba2769aec Mon Sep 17 00:00:00 2001 From: louyuting <1849491904@qq.com> Date: Tue, 8 Sep 2020 21:50:31 +0800 Subject: [PATCH 4/6] refine code --- core/flow/rule_manager.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/core/flow/rule_manager.go b/core/flow/rule_manager.go index 7e2ac4e12..e395e7ef4 100644 --- a/core/flow/rule_manager.go +++ b/core/flow/rule_manager.go @@ -170,7 +170,7 @@ func rulesFrom(m TrafficControllerMap) []*Rule { return rules } -// SetTrafficShapingGenerator sets the traffic controller generator for the given control strategy. +// SetTrafficShapingGenerator sets the traffic controller generator for the given TokenCalculateStrategy and ControlBehavior. // Note that modifying the generator of default control strategy is not allowed. func SetTrafficShapingGenerator(tokenCalculateStrategy TokenCalculateStrategy, controlBehavior ControlBehavior, generator TrafficControllerGenFunc) error { if generator == nil { @@ -278,20 +278,13 @@ func IsValidRule(rule *Rule) error { return errors.New("Bad flow rule: invalid control behavior") } - return checkControlStrategyField(rule) -} - -func checkControlStrategyField(rule *Rule) error { - switch rule.TokenCalculateStrategy { - case WarmUp: + if rule.TokenCalculateStrategy == WarmUp { if rule.WarmUpPeriodSec <= 0 { return errors.New("invalid warmUpPeriodSec") } if rule.WarmUpColdFactor == 1 { return errors.New("WarmUpColdFactor must be great than 1") } - return nil - default: } return nil } From a014421d4d7591144afbbb6e21f878418f53da1d Mon Sep 17 00:00:00 2001 From: louyuting <1849491904@qq.com> Date: Tue, 8 Sep 2020 22:12:55 +0800 Subject: [PATCH 5/6] add comments for exported func --- core/flow/rule_manager.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/flow/rule_manager.go b/core/flow/rule_manager.go index e395e7ef4..edb5bbc0b 100644 --- a/core/flow/rule_manager.go +++ b/core/flow/rule_manager.go @@ -129,6 +129,8 @@ func getResRules(res string) []*Rule { return ret } +// GetRules returns all the rules based on copy. +// It doesn't take effect for flow module if user changes the rule. func GetRules() []Rule { rules := getRules() ret := make([]Rule, 0, len(rules)) @@ -138,6 +140,8 @@ func GetRules() []Rule { return ret } +// GetResRules returns specific resource's rules based on copy. +// It doesn't take effect for flow module if user changes the rule. func GetResRules(res string) []Rule { rules := getResRules(res) ret := make([]Rule, 0, len(rules)) @@ -147,6 +151,7 @@ func GetResRules(res string) []Rule { return ret } +// ClearRules clears all the rules in flow module. func ClearRules() error { _, err := LoadRules(nil) return err From 9aa38d6afc86ccf741eb6dd4d42b5f136f05c952 Mon Sep 17 00:00:00 2001 From: louyuting <1849491904@qq.com> Date: Tue, 8 Sep 2020 22:30:21 +0800 Subject: [PATCH 6/6] improve comments --- core/flow/rule_manager.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/flow/rule_manager.go b/core/flow/rule_manager.go index edb5bbc0b..dde695729 100644 --- a/core/flow/rule_manager.go +++ b/core/flow/rule_manager.go @@ -107,6 +107,8 @@ func LoadRules(rules []*Rule) (bool, error) { return true, err } +// getRules returns all the rules。Any changes of rules take effect for flow module +// getRules is an internal interface. func getRules() []*Rule { tcMux.RLock() defer tcMux.RUnlock() @@ -114,6 +116,8 @@ func getRules() []*Rule { return rulesFrom(tcMap) } +// getResRules returns specific resource's rules。Any changes of rules take effect for flow module +// getResRules is an internal interface. func getResRules(res string) []*Rule { tcMux.RLock() defer tcMux.RUnlock()