Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize load rules for max size rule #176

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions core/circuitbreaker/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -159,8 +160,8 @@ func onRuleUpdate(rules []Rule) (err error) {
}
}
}()
newBreakerRules := make(map[string][]Rule)

newBreakerRules := make(map[string][]Rule)
for _, rule := range rules {
if rule == nil {
continue
Expand All @@ -180,9 +181,21 @@ func onRuleUpdate(rules []Rule) (err error) {
}

newBreakers := make(map[string][]CircuitBreaker)
// in order to avoid growing, build newBreakers in advance
for res, rules := range newBreakerRules {
newBreakers[res] = make([]CircuitBreaker, 0, len(rules))
}

start := util.CurrentTimeNano()
updateMux.Lock()
defer updateMux.Unlock()
defer func() {
updateMux.Unlock()
if r := recover(); r != nil {
return
}
logger.Debugf("Updating circuit breaker rule spends %d ns.", util.CurrentTimeNano()-start)
logRuleUpdate(newBreakerRules)
}()

for res, resRules := range newBreakerRules {
emptyCircuitBreakerList := make([]CircuitBreaker, 0, 0)
Expand Down Expand Up @@ -229,8 +242,6 @@ func onRuleUpdate(rules []Rule) (err error) {

breakerRules = newBreakerRules
breakers = newBreakers

logRuleUpdate(newBreakerRules)
return nil
}

Expand Down
17 changes: 12 additions & 5 deletions core/flow/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -56,14 +57,20 @@ func onRuleUpdate(rules []*FlowRule) (err error) {
}
}()

tcMux.Lock()
defer tcMux.Unlock()

m := buildFlowMap(rules)

tcMap = m
logRuleUpdate(m)
start := util.CurrentTimeNano()
tcMux.Lock()
defer func() {
tcMux.Unlock()
if r := recover(); r != nil {
return
}
logger.Debugf("Updating flow rule spends %d ns.", util.CurrentTimeNano()-start)
logRuleUpdate(m)
}()

tcMap = m
return nil
}

Expand Down
129 changes: 73 additions & 56 deletions core/hotspot/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -98,13 +99,82 @@ func onRuleUpdate(rules []*Rule) (err error) {
}
}()

newRuleMap := make(map[string][]*Rule)
for _, r := range rules {
if err := IsValidRule(r); err != nil {
logger.Warnf("Ignoring invalid hotspot rule when loading new rules, rule: %s, reason: %s", r.String(), err.Error())
continue
}
res := r.ResourceName()
ruleSet, ok := newRuleMap[res]
if !ok {
ruleSet = make([]*Rule, 0, 1)
}
ruleSet = append(ruleSet, r)
newRuleMap[res] = ruleSet
}

m := make(trafficControllerMap)
for res, rules := range newRuleMap {
m[res] = make([]TrafficShapingController, 0, len(rules))
}

start := util.CurrentTimeNano()
tcMux.Lock()
defer tcMux.Unlock()
defer func() {
tcMux.Unlock()
if r := recover(); r != nil {
return
}
logger.Debugf("Updating hotspot rule spends %d ns.", util.CurrentTimeNano()-start)
logRuleUpdate(m)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we may need to handle cases regarding panic outside (where the log is not required).

}()

for res, resRules := range newRuleMap {
emptyTcList := make([]TrafficShapingController, 0, 0)
for _, r := range resRules {
oldResTcs := tcMap[res]
if oldResTcs == nil {
oldResTcs = emptyTcList
}

m := buildTcMap(rules)
equalIdx, reuseStatIdx := calculateReuseIndexFor(r, oldResTcs)
// there is equivalent rule in old traffic shaping controller slice
if equalIdx >= 0 {
equalOldTC := oldResTcs[equalIdx]
insertTcToTcMap(equalOldTC, res, m)
// remove old tc from old resTcs
tcMap[res] = append(oldResTcs[:equalIdx], oldResTcs[equalIdx+1:]...)
continue
}

// generate new traffic shaping controller
generator, supported := tcGenFuncMap[r.ControlBehavior]
if !supported {
logger.Warnf("Ignoring the frequent param flow rule due to unsupported control behavior: %v", r)
continue
}
var tc TrafficShapingController
if reuseStatIdx >= 0 {
// generate new traffic shaping controller with reusable statistic metric.
tc = generator(r, oldResTcs[reuseStatIdx].BoundMetric())
} else {
tc = generator(r, nil)
}
if tc == nil {
logger.Debugf("Ignoring the frequent param flow rule due to bad generated traffic controller: %v", r)
continue
}

// remove the reused traffic shaping controller old res tcs
if reuseStatIdx >= 0 {
tcMap[res] = append(oldResTcs[:reuseStatIdx], oldResTcs[reuseStatIdx+1:]...)
}
insertTcToTcMap(tc, res, m)
}
}
tcMap = m

logRuleUpdate(m)
return nil
}

Expand Down Expand Up @@ -173,59 +243,6 @@ func insertTcToTcMap(tc TrafficShapingController, res string, m trafficControlle
}
}

// buildTcMap be called on the condition that the mutex is locked
func buildTcMap(rules []*Rule) trafficControllerMap {
m := make(trafficControllerMap)
if len(rules) == 0 {
return m
}

for _, r := range rules {
if err := IsValidRule(r); err != nil {
logger.Warnf("Ignoring invalid hotspot param flow rule: %v, reason: %s", r.String(), err.Error())
continue
}

res := r.Resource
oldResTcs := tcMap[res]
equalIdx, reuseStatIdx := calculateReuseIndexFor(r, oldResTcs)

// there is equivalent rule in old traffic shaping controller slice
if equalIdx >= 0 {
equalOldTC := oldResTcs[equalIdx]
insertTcToTcMap(equalOldTC, res, m)
// remove old tc from old resTcs
tcMap[res] = append(oldResTcs[:equalIdx], oldResTcs[equalIdx+1:]...)
continue
}

// generate new traffic shaping controller
generator, supported := tcGenFuncMap[r.ControlBehavior]
if !supported {
logger.Warnf("Ignoring the hotspot param flow rule due to unsupported control behavior: %v", r)
continue
}
var tc TrafficShapingController
if reuseStatIdx >= 0 {
// generate new traffic shaping controller with reusable statistic metric.
tc = generator(r, oldResTcs[reuseStatIdx].BoundMetric())
} else {
tc = generator(r, nil)
}
if tc == nil {
logger.Debugf("Ignoring the hotspot param flow rule due to bad generated traffic controller: %v", r)
continue
}

// remove the reused traffic shaping controller old res tcs
if reuseStatIdx >= 0 {
tcMap[res] = append(oldResTcs[:reuseStatIdx], oldResTcs[reuseStatIdx+1:]...)
}
insertTcToTcMap(tc, res, m)
}
return m
}

func IsValidRule(rule *Rule) error {
if rule == nil {
return errors.New("nil hotspot Rule")
Expand Down
11 changes: 7 additions & 4 deletions core/hotspot/rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ func Test_IsValidRule(t *testing.T) {
})
}

func Test_buildTcMap(t *testing.T) {
func Test_onRuleUpdate(t *testing.T) {
tcMap = make(trafficControllerMap)

m := make(map[SpecificValue]int64)
m[SpecificValue{
ValKind: KindString,
Expand Down Expand Up @@ -274,9 +276,10 @@ func Test_buildTcMap(t *testing.T) {
oldTc2MetricPtrAddr := fmt.Sprintf("%p", tcMap["abc"][1].BoundMetric())
fmt.Println("oldTc2MetricPtr:", oldTc2MetricPtrAddr)

newTcMap := buildTcMap([]*Rule{r21, r22, r23})
assert.True(t, len(newTcMap) == 1)
abcTcs := newTcMap["abc"]
err = onRuleUpdate([]*Rule{r21, r22, r23})
assert.True(t, err == nil)
assert.True(t, len(tcMap) == 1)
abcTcs := tcMap["abc"]
assert.True(t, len(abcTcs) == 3)
newTc1Ptr := abcTcs[0]
newTc2Ptr := abcTcs[1]
Expand Down
19 changes: 11 additions & 8 deletions core/system/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"

"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -48,16 +49,18 @@ func ClearRules() error {
}

func onRuleUpdate(r RuleMap) error {
start := util.CurrentTimeNano()
ruleMapMux.Lock()
defer ruleMapMux.Unlock()

defer func() {
ruleMapMux.Unlock()
logger.Debugf("Updating system rule spends %d ns.", util.CurrentTimeNano()-start)
if len(r) > 0 {
logger.Infof("[SystemRuleManager] System rules loaded: %v", r)
} else {
logger.Info("[SystemRuleManager] System rules were cleared")
}
}()
ruleMap = r
if len(r) > 0 {
logger.Infof("[SystemRuleManager] System rules loaded: %v", r)
} else {
logger.Info("[SystemRuleManager] System rules were cleared")
}

return nil
}

Expand Down
90 changes: 90 additions & 0 deletions tests/maxsize_rule_list_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package tests

import (
"math/rand"
"strconv"
"testing"

cb "github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/util"
)

func Test_Size_1000_Circuit_Breaker_Rules_Update(t *testing.T) {
rs := make([]cb.Rule, 0, 1000)
rand.Seed(int64(util.CurrentTimeMillis()))
intervals := []uint32{10000, 15000, 20000, 25000, 30000}
for i := 0; i < 1000; i++ {
retryTimeout := intervals[rand.Int()%5]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rand.Int() will generate the same pseudo-random sequence if you don't call rand.Seed() first.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

rs = append(rs, cb.NewRule("github.com/alibaba/sentinel/test"+strconv.Itoa(rand.Int()%100), cb.SlowRequestRatio,
cb.WithStatIntervalMs(10000), cb.WithRetryTimeoutMs(retryTimeout),
cb.WithMinRequestAmount(rand.Uint64()%100), cb.WithMaxAllowedRtMs(100),
cb.WithMaxSlowRequestRatio(0.1)))
}

_, err := cb.LoadRules(rs)
if err != nil {
t.Errorf("error")
}
}

func Benchmark_Size_1000_Circuit_Breaker_Rules_Update(b *testing.B) {
rs := make([]cb.Rule, 0, 1000)
rand.Seed(int64(util.CurrentTimeMillis()))
intervals := []uint32{10000, 15000, 20000, 25000, 30000}
for i := 0; i < 1000; i++ {
retryTimeout := intervals[rand.Int()%5]
rs = append(rs, cb.NewRule("github.com/alibaba/sentinel/test"+strconv.Itoa(rand.Int()%100), cb.SlowRequestRatio,
cb.WithStatIntervalMs(10000), cb.WithRetryTimeoutMs(retryTimeout),
cb.WithMinRequestAmount(rand.Uint64()%100), cb.WithMaxAllowedRtMs(100),
cb.WithMaxSlowRequestRatio(0.1)))
}

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, err := cb.LoadRules(rs)
if err != nil {
b.Errorf("error")
}
}
}

func Test_Size_10000_Circuit_Breaker_Rules_Update(t *testing.T) {
rs := make([]cb.Rule, 0, 10000)
intervals := []uint32{10000, 15000, 20000, 25000, 30000}
for i := 0; i < 10000; i++ {
retryTimeout := intervals[rand.Int()%5]
rs = append(rs, cb.NewRule("github.com/alibaba/sentinel/test"+strconv.Itoa(rand.Int()%100), cb.SlowRequestRatio,
cb.WithStatIntervalMs(10000), cb.WithRetryTimeoutMs(retryTimeout),
cb.WithMinRequestAmount(rand.Uint64()%100), cb.WithMaxAllowedRtMs(100),
cb.WithMaxSlowRequestRatio(0.1)))
}

_, err := cb.LoadRules(rs)
if err != nil {
t.Errorf("error")
}
}

func Benchmark_Size_10000_Circuit_Breaker_Rules_Update(b *testing.B) {
rs := make([]cb.Rule, 0, 10000)
intervals := []uint32{10000, 15000, 20000, 25000, 30000}
for i := 0; i < 10000; i++ {
retryTimeout := intervals[rand.Int()%5]
rs = append(rs, cb.NewRule("github.com/alibaba/sentinel/test"+strconv.Itoa(rand.Int()%100), cb.SlowRequestRatio,
cb.WithStatIntervalMs(10000), cb.WithRetryTimeoutMs(retryTimeout),
cb.WithMinRequestAmount(rand.Uint64()%100), cb.WithMaxAllowedRtMs(100),
cb.WithMaxSlowRequestRatio(0.1)))
}

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, err := cb.LoadRules(rs)
if err != nil {
b.Errorf("error")
}
}
}