Skip to content

Commit

Permalink
flow module supports independent statistic
Browse files Browse the repository at this point in the history
  • Loading branch information
louyuting committed Sep 8, 2020
1 parent bd07b4b commit 854ed30
Show file tree
Hide file tree
Showing 25 changed files with 541 additions and 179 deletions.
4 changes: 4 additions & 0 deletions adapter/echo/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ func initSentinel(t *testing.T) {
Count: 1,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
StatSampleCount: 2,
StatIntervalInMs: 1000,
},
{
Resource: "/api/:uid",
MetricType: flow.QPS,
Count: 0,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
StatSampleCount: 2,
StatIntervalInMs: 1000,
},
})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions adapter/gin/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ func initSentinel(t *testing.T) {
Count: 1,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
StatSampleCount: 2,
StatIntervalInMs: 1000,
},
{
Resource: "/api/users/:id",
MetricType: flow.QPS,
Count: 0,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
StatSampleCount: 2,
StatIntervalInMs: 1000,
},
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions api/slot_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ func BuildDefaultSlotChain() *base.SlotChain {
sc.AddStatSlotLast(&log.Slot{})
sc.AddStatSlotLast(&circuitbreaker.MetricStatSlot{})
sc.AddStatSlotLast(&hotspot.ConcurrencyStatSlot{})
sc.AddStatSlotLast(&flow.StandaloneStatSlot{})
return sc
}
49 changes: 48 additions & 1 deletion core/base/stat.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package base

import (
"github.com/pkg/errors"
)

type TimePredicate func(uint64) bool

type MetricEvent int8
Expand Down Expand Up @@ -31,7 +35,7 @@ type ReadStat interface {
}

type WriteStat interface {
AddMetric(event MetricEvent, count uint64)
AddCount(event MetricEvent, count int64)
}

// StatNode holds real-time statistics for resources.
Expand All @@ -46,4 +50,47 @@ type StatNode interface {
DecreaseGoroutineNum()

Reset()

// GenerateReadonlyReadStat generates the readonly metric statistic based on resource level global statistic
// If parameters, sampleCount and intervalInMs, are not suitable for resource level global statistic, return (nil, error)
GenerateReadonlyReadStat(sampleCount uint32, intervalInMs uint32) (ReadStat, error)
}

var (
IllegalGlobalStatisticParamsError = errors.New("Invalid parameters, sampleCount or interval, for resource's global statistic")
IllegalStatisticParamsError = errors.New("Invalid parameters, sampleCount or interval, for metric statistic")
GlobalStatisticNonReusableError = errors.New("The parameters, sampleCount and interval, mismatch for reusing between resource's global statistic and readonly metric statistic.")
)

func CheckValidityForStatistic(sampleCount, intervalInMs uint32) error {
if intervalInMs <= 0 || sampleCount <= 0 || intervalInMs%sampleCount != 0 {
return IllegalStatisticParamsError
}
return nil
}

// CheckValidityForReuseStatistic check the compliance whether readonly metric statistic can be built based on resource's global statistic
// The parameters, sampleCount and intervalInMs, are the parameters of the metric statistic you want to build
// The parameters, parentSampleCount and parentIntervalInMs, are the parameters of the resource's global statistic
// If compliance passes, return nil, if not returns specific error
func CheckValidityForReuseStatistic(sampleCount, intervalInMs uint32, parentSampleCount, parentIntervalInMs uint32) error {
if intervalInMs <= 0 || sampleCount <= 0 || intervalInMs%sampleCount != 0 {
return IllegalStatisticParamsError
}
bucketLengthInMs := intervalInMs / sampleCount

if parentIntervalInMs <= 0 || parentSampleCount <= 0 || parentIntervalInMs%parentSampleCount != 0 {
return IllegalGlobalStatisticParamsError
}
parentBucketLengthInMs := parentIntervalInMs / parentSampleCount

//SlidingWindowMetric's intervalInMs is not divisible by BucketLeapArray's intervalInMs
if parentIntervalInMs%intervalInMs != 0 {
return GlobalStatisticNonReusableError
}
// BucketLeapArray's BucketLengthInMs is not divisible by SlidingWindowMetric's BucketLengthInMs
if bucketLengthInMs%parentBucketLengthInMs != 0 {
return GlobalStatisticNonReusableError
}
return nil
}
25 changes: 23 additions & 2 deletions core/base/stat_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package base

import "github.com/stretchr/testify/mock"
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

type StatNodeMock struct {
mock.Mock
}

func (m *StatNodeMock) AddMetric(event MetricEvent, count uint64) {
func (m *StatNodeMock) AddCount(event MetricEvent, count int64) {
m.Called(event, count)
}

Expand Down Expand Up @@ -64,3 +69,19 @@ func (m *StatNodeMock) Reset() {
m.Called()
return
}

func (m *StatNodeMock) GenerateReadonlyReadStat(sampleCount uint32, intervalInMs uint32) (ReadStat, error) {
args := m.Called(sampleCount, intervalInMs)
return args.Get(0).(ReadStat), args.Error(1)
}

func TestCheckValidityForReuseStatistic(t *testing.T) {
assert.Equal(t, CheckValidityForReuseStatistic(3, 1000, 20, 10000), IllegalStatisticParamsError)
assert.Equal(t, CheckValidityForReuseStatistic(0, 1000, 20, 10000), IllegalStatisticParamsError)
assert.Equal(t, CheckValidityForReuseStatistic(2, 1000, 21, 10000), IllegalGlobalStatisticParamsError)
assert.Equal(t, CheckValidityForReuseStatistic(2, 1000, 0, 10000), IllegalGlobalStatisticParamsError)
assert.Equal(t, CheckValidityForReuseStatistic(2, 8000, 20, 10000), GlobalStatisticNonReusableError)
assert.Equal(t, CheckValidityForReuseStatistic(2, 1000, 10, 10000), GlobalStatisticNonReusableError)
assert.Equal(t, CheckValidityForReuseStatistic(1, 1000, 100, 10000), nil)
assert.Equal(t, CheckValidityForReuseStatistic(2, 1000, 20, 10000), nil)
}
15 changes: 15 additions & 0 deletions core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,18 @@ func SystemStatCollectIntervalMs() uint32 {
func UseCacheTime() bool {
return globalCfg.UseCacheTime()
}

func GlobalStatisticIntervalMsTotal() uint32 {
return globalCfg.GlobalStatisticIntervalMsTotal()
}

func GlobalStatisticSampleCountTotal() uint32 {
return globalCfg.GlobalStatisticSampleCountTotal()
}

func MetricStatisticIntervalMs() uint32 {
return globalCfg.MetricStatisticIntervalMs()
}
func MetricStatisticSampleCount() uint32 {
return globalCfg.MetricStatisticSampleCount()
}
33 changes: 33 additions & 0 deletions core/config/entity.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/logging"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -49,6 +50,15 @@ type MetricLogConfig struct {

// StatConfig represents the configuration items of statistics.
type StatConfig struct {
// GlobalStatisticSampleCountTotal and GlobalStatisticIntervalMsTotal is the per resource's global default statistic sliding window config
GlobalStatisticSampleCountTotal uint32 `yaml:"globalStatisticSampleCountTotal"`
GlobalStatisticIntervalMsTotal uint32 `yaml:"globalStatisticIntervalMsTotal"`

// MetricStatisticSampleCount and MetricStatisticIntervalMs is the per resource's default readonly metric statistic
// This default readonly metric statistic must be reusable based on global statistic.
MetricStatisticSampleCount uint32 `yaml:"metricStatisticSampleCount"`
MetricStatisticIntervalMs uint32 `yaml:"metricStatisticIntervalMs"`

System SystemStatConfig `yaml:"system"`
}

Expand Down Expand Up @@ -81,6 +91,10 @@ func NewDefaultConfig() *Entity {
},
},
Stat: StatConfig{
GlobalStatisticSampleCountTotal: base.DefaultSampleCountTotal,
GlobalStatisticIntervalMsTotal: base.DefaultIntervalMsTotal,
MetricStatisticSampleCount: base.DefaultSampleCount,
MetricStatisticIntervalMs: base.DefaultIntervalMs,
System: SystemStatConfig{
CollectIntervalMs: DefaultSystemStatCollectIntervalMs,
},
Expand Down Expand Up @@ -117,6 +131,10 @@ func checkConfValid(conf *SentinelConfig) error {
if conf.Stat.System.CollectIntervalMs == 0 {
return errors.New("Bad system stat globalCfg: collectIntervalMs = 0")
}
if err := base.CheckValidityForReuseStatistic(conf.Stat.MetricStatisticSampleCount, conf.Stat.MetricStatisticIntervalMs,
conf.Stat.GlobalStatisticSampleCountTotal, conf.Stat.GlobalStatisticIntervalMsTotal); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -160,3 +178,18 @@ func (entity *Entity) SystemStatCollectIntervalMs() uint32 {
func (entity *Entity) UseCacheTime() bool {
return entity.Sentinel.UseCacheTime
}

func (entity *Entity) GlobalStatisticIntervalMsTotal() uint32 {
return entity.Sentinel.Stat.GlobalStatisticIntervalMsTotal
}

func (entity *Entity) GlobalStatisticSampleCountTotal() uint32 {
return entity.Sentinel.Stat.GlobalStatisticSampleCountTotal
}

func (entity *Entity) MetricStatisticIntervalMs() uint32 {
return entity.Sentinel.Stat.MetricStatisticIntervalMs
}
func (entity *Entity) MetricStatisticSampleCount() uint32 {
return entity.Sentinel.Stat.MetricStatisticSampleCount
}
15 changes: 12 additions & 3 deletions core/flow/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func (s ControlBehavior) String() string {
type Rule struct {
// ID represents the unique ID of the rule (optional).
ID uint64 `json:"id,omitempty"`

// Resource represents the resource name.
Resource string `json:"resource"`
MetricType MetricType `json:"metricType"`
Expand All @@ -99,15 +98,25 @@ type Rule struct {
MaxQueueingTimeMs uint32 `json:"maxQueueingTimeMs"`
WarmUpPeriodSec uint32 `json:"warmUpPeriodSec"`
WarmUpColdFactor uint32 `json:"warmUpColdFactor"`

// StatSampleCount and StatIntervalInMs are the optional setting for flow control.
// If user doesn't set StatSampleCount and StatIntervalInMs, that means using default metric statistic.
// If user specifies StatSampleCount and StatIntervalInMs and meets the compliance of CheckValidityForReuseStatistic,
// will build readonly metric statistic based on resource's global statistic.
// If user specifies StatSampleCount and StatIntervalInMs and doesn't meet the compliance of CheckValidityForReuseStatistic,
// will generate independent statistic strcture for this rule.
StatSampleCount uint32 `json:"statSampleCount"`
StatIntervalInMs uint32 `json:"statIntervalInMs"`
}

func (r *Rule) String() string {
b, err := json.Marshal(r)
if err != nil {
// Return the fallback string
return fmt.Sprintf("Rule{Resource=%s, MetricType=%s, TokenCalculateStrategy=%s, ControlBehavior=%s, "+
"Count=%.2f, RelationStrategy=%s, WarmUpPeriodSec=%d, WarmUpColdFactor=%d, MaxQueueingTimeMs=%d}",
r.Resource, r.MetricType, r.TokenCalculateStrategy, r.ControlBehavior, r.Count, r.RelationStrategy, r.WarmUpPeriodSec, r.WarmUpColdFactor, r.MaxQueueingTimeMs)
"Count=%.2f, RelationStrategy=%s, RefResource=%s, MaxQueueingTimeMs=%d, WarmUpPeriodSec=%d, WarmUpColdFactor=%d, StatSampleCount=%d, StatIntervalInMs=%d}",
r.Resource, r.MetricType, r.TokenCalculateStrategy, r.ControlBehavior, r.Count, r.RelationStrategy, r.RefResource,
r.MaxQueueingTimeMs, r.WarmUpPeriodSec, r.WarmUpColdFactor, r.StatSampleCount, r.StatIntervalInMs)
}
return string(b)
}
Expand Down
56 changes: 40 additions & 16 deletions core/flow/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

// TrafficControllerGenFunc represents the TrafficShapingController generator function of a specific control behavior.
type TrafficControllerGenFunc func(*Rule) *TrafficShapingController
type TrafficControllerGenFunc func(*Rule) (*TrafficShapingController, error)

type trafficControllerGenKey struct {
tokenCalculateStrategy TokenCalculateStrategy
Expand All @@ -32,26 +32,50 @@ func init() {
tcGenFuncMap[trafficControllerGenKey{
tokenCalculateStrategy: Direct,
controlBehavior: Reject,
}] = func(rule *Rule) *TrafficShapingController {
return NewTrafficShapingController(NewDirectTrafficShapingCalculator(rule.Count), NewDefaultTrafficShapingChecker(rule), rule)
}] = func(rule *Rule) (*TrafficShapingController, error) {
tsc, err := NewTrafficShapingController(rule)
if err != nil || tsc == nil {
return nil, err
}
tsc.flowCalculator = NewDirectTrafficShapingCalculator(tsc, rule.Count)
tsc.flowChecker = NewDefaultTrafficShapingChecker(tsc, rule)
return tsc, nil
}
tcGenFuncMap[trafficControllerGenKey{
tokenCalculateStrategy: Direct,
controlBehavior: Throttling,
}] = func(rule *Rule) *TrafficShapingController {
return NewTrafficShapingController(NewDirectTrafficShapingCalculator(rule.Count), NewThrottlingChecker(rule.MaxQueueingTimeMs), rule)
}] = func(rule *Rule) (*TrafficShapingController, error) {
tsc, err := NewTrafficShapingController(rule)
if err != nil || tsc == nil {
return nil, err
}
tsc.flowCalculator = NewDirectTrafficShapingCalculator(tsc, rule.Count)
tsc.flowChecker = NewThrottlingChecker(tsc, rule.MaxQueueingTimeMs)
return tsc, nil
}
tcGenFuncMap[trafficControllerGenKey{
tokenCalculateStrategy: WarmUp,
controlBehavior: Reject,
}] = func(rule *Rule) *TrafficShapingController {
return NewTrafficShapingController(NewWarmUpTrafficShapingCalculator(rule), NewDefaultTrafficShapingChecker(rule), rule)
}] = func(rule *Rule) (*TrafficShapingController, error) {
tsc, err := NewTrafficShapingController(rule)
if err != nil || tsc == nil {
return nil, err
}
tsc.flowCalculator = NewWarmUpTrafficShapingCalculator(tsc, rule)
tsc.flowChecker = NewDefaultTrafficShapingChecker(tsc, rule)
return tsc, nil
}
tcGenFuncMap[trafficControllerGenKey{
tokenCalculateStrategy: WarmUp,
controlBehavior: Throttling,
}] = func(rule *Rule) *TrafficShapingController {
return NewTrafficShapingController(NewWarmUpTrafficShapingCalculator(rule), NewThrottlingChecker(rule.MaxQueueingTimeMs), rule)
}] = func(rule *Rule) (*TrafficShapingController, error) {
tsc, err := NewTrafficShapingController(rule)
if err != nil || tsc == nil {
return nil, err
}
tsc.flowCalculator = NewWarmUpTrafficShapingCalculator(tsc, rule)
tsc.flowChecker = NewDefaultTrafficShapingChecker(tsc, rule)
return tsc, nil
}
}

Expand Down Expand Up @@ -128,7 +152,7 @@ func getResRules(res string) []*Rule {
}
ret := make([]*Rule, 0, len(resTcs))
for _, tc := range resTcs {
ret = append(ret, tc.Rule())
ret = append(ret, tc.BoundRule())
}
return ret
}
Expand Down Expand Up @@ -171,8 +195,8 @@ func rulesFrom(m TrafficControllerMap) []*Rule {
continue
}
for _, r := range rs {
if r != nil && r.Rule() != nil {
rules = append(rules, r.Rule())
if r != nil && r.BoundRule() != nil {
rules = append(rules, r.BoundRule())
}
}
}
Expand Down Expand Up @@ -242,13 +266,13 @@ func buildFlowMap(rules []*Rule) TrafficControllerMap {
tokenCalculateStrategy: rule.TokenCalculateStrategy,
controlBehavior: rule.ControlBehavior,
}]
if !supported {
if !supported || generator == nil {
logging.Warnf("Ignoring the rule due to unsupported control behavior: %v", rule)
continue
}
tsc := generator(rule)
if tsc == nil {
logging.Warnf("Ignoring the rule due to bad generated traffic controller: %v", rule)
tsc, e := generator(rule)
if e != nil || tsc == nil {
logging.Warnf("Ignoring the rule due to bad generated traffic controller, rule: %+v, err: %+v", rule, e)
continue
}

Expand Down
8 changes: 4 additions & 4 deletions core/flow/rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
func TestSetAndRemoveTrafficShapingGenerator(t *testing.T) {
tsc := &TrafficShapingController{}

err := SetTrafficShapingGenerator(Direct, Reject, func(_ *Rule) *TrafficShapingController {
return tsc
err := SetTrafficShapingGenerator(Direct, Reject, func(_ *Rule) (*TrafficShapingController, error) {
return tsc, nil
})
assert.Error(t, err, "default control behaviors are not allowed to be modified")
err = RemoveTrafficShapingGenerator(Direct, Reject)
assert.Error(t, err, "default control behaviors are not allowed to be removed")

err = SetTrafficShapingGenerator(TokenCalculateStrategy(111), ControlBehavior(112), func(_ *Rule) *TrafficShapingController {
return tsc
err = SetTrafficShapingGenerator(TokenCalculateStrategy(111), ControlBehavior(112), func(_ *Rule) (*TrafficShapingController, error) {
return tsc, nil
})
assert.NoError(t, err)

Expand Down
Loading

0 comments on commit 854ed30

Please sign in to comment.