Skip to content

Commit

Permalink
refactor: Add "isolation" package and move out concurrency limiting f…
Browse files Browse the repository at this point in the history
…rom flow module (#247)
  • Loading branch information
louyuting authored Sep 23, 2020
1 parent b5bbaa1 commit 4162e79
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 0 deletions.
2 changes: 2 additions & 0 deletions api/slot_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/core/hotspot"
"github.com/alibaba/sentinel-golang/core/isolation"
"github.com/alibaba/sentinel-golang/core/log"
"github.com/alibaba/sentinel-golang/core/stat"
"github.com/alibaba/sentinel-golang/core/system"
Expand All @@ -30,6 +31,7 @@ func BuildDefaultSlotChain() *base.SlotChain {
sc.AddStatPrepareSlotLast(&stat.ResourceNodePrepareSlot{})
sc.AddRuleCheckSlotLast(&system.AdaptiveSlot{})
sc.AddRuleCheckSlotLast(&flow.Slot{})
sc.AddRuleCheckSlotLast(&isolation.Slot{})
sc.AddRuleCheckSlotLast(&circuitbreaker.Slot{})
sc.AddRuleCheckSlotLast(&hotspot.Slot{})
sc.AddStatSlotLast(&stat.Slot{})
Expand Down
3 changes: 3 additions & 0 deletions core/base/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type BlockType uint8
const (
BlockTypeUnknown BlockType = iota
BlockTypeFlow
BlockTypeIsolation
BlockTypeCircuitBreaking
BlockTypeSystemFlow
BlockTypeHotSpotParamFlow
Expand All @@ -20,6 +21,8 @@ func (t BlockType) String() string {
return "Unknown"
case BlockTypeFlow:
return "FlowControl"
case BlockTypeIsolation:
return "BlockTypeIsolation"
case BlockTypeCircuitBreaking:
return "CircuitBreaking"
case BlockTypeSystemFlow:
Expand Down
3 changes: 3 additions & 0 deletions core/isolation/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package isolation implements the concurrency traffic control.

package isolation
45 changes: 45 additions & 0 deletions core/isolation/rule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package isolation

import (
"encoding/json"
"fmt"
)

// MetricType represents the target metric type.
type MetricType int32

const (
// Concurrency represents concurrency count.
Concurrency MetricType = iota
)

func (s MetricType) String() string {
switch s {
case Concurrency:
return "Concurrency"
default:
return "Undefined"
}
}

// Rule describes the concurrency num control, that is similar to semaphore
type Rule struct {
// ID represents the unique ID of the rule (optional).
ID string `json:"id,omitempty"`
Resource string `json:"resource"`
MetricType MetricType `json:"metricType"`
Threshold uint32 `json:"threshold"`
}

func (r *Rule) String() string {
b, err := json.Marshal(r)
if err != nil {
// Return the fallback string
return fmt.Sprintf("{Id=%s, Resource=%s, MetricType=%s, Threshold=%d}", r.ID, r.Resource, r.MetricType.String(), r.Threshold)
}
return string(b)
}

func (r *Rule) ResourceName() string {
return r.Resource
}
147 changes: 147 additions & 0 deletions core/isolation/rule_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package isolation

import (
"encoding/json"
"sync"

"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/pkg/errors"
)

var (
ruleMap = make(map[string][]*Rule)
rwMux = &sync.RWMutex{}
)

// LoadRules loads the given isolation rules to the rule manager, while all previous rules will be replaced.
func LoadRules(rules []*Rule) (updated bool, err error) {
updated = true
err = nil

m := make(map[string][]*Rule)
for _, r := range rules {
if e := IsValid(r); e != nil {
logging.Error(e, "invalid isolation rule.", "rule", r)
continue
}
resRules, ok := m[r.Resource]
if !ok {
resRules = make([]*Rule, 0, 1)
}
m[r.Resource] = append(resRules, r)
}

start := util.CurrentTimeNano()
rwMux.Lock()
defer func() {
rwMux.Unlock()
logging.Debug("time statistic(ns) for updating isolation rule", "timeCost", util.CurrentTimeNano()-start)
logRuleUpdate(m)
}()
ruleMap = m
return
}

// ClearRules clears all the rules in isolation module.
func ClearRules() error {
_, err := LoadRules(nil)
return err
}

// GetRules returns all the rules based on copy.
// It doesn't take effect for isolation module if user changes the rule.
func GetRules() []Rule {
rules := getRules()
ret := make([]Rule, 0, len(rules))
for _, rule := range rules {
ret = append(ret, *rule)
}
return ret
}

// GetRulesOfResource returns specific resource's rules based on copy.
// It doesn't take effect for isolation module if user changes the rule.
func GetRulesOfResource(res string) []Rule {
rules := getRulesOfResource(res)
ret := make([]Rule, 0, len(rules))
for _, rule := range rules {
ret = append(ret, *rule)
}
return ret
}

// getRules returns all the rules。Any changes of rules take effect for isolation module
// getRules is an internal interface.
func getRules() []*Rule {
rwMux.RLock()
defer rwMux.RUnlock()

return rulesFrom(ruleMap)
}

// getRulesOfResource returns specific resource's rules。Any changes of rules take effect for isolation module
// getRulesOfResource is an internal interface.
func getRulesOfResource(res string) []*Rule {
rwMux.RLock()
defer rwMux.RUnlock()

resRules, exist := ruleMap[res]
if !exist {
return nil
}
ret := make([]*Rule, 0, len(resRules))
for _, r := range resRules {
ret = append(ret, r)
}
return ret
}

func rulesFrom(m map[string][]*Rule) []*Rule {
rules := make([]*Rule, 0)
if len(m) == 0 {
return rules
}
for _, rs := range m {
for _, r := range rs {
if r != nil {
rules = append(rules, r)
}
}
}
return rules
}

func logRuleUpdate(m map[string][]*Rule) {
bs, err := json.Marshal(rulesFrom(m))
if err != nil {
if len(m) == 0 {
logging.Info("[IsolationRuleManager] Isolation rules were cleared")
} else {
logging.Info("[IsolationRuleManager] Isolation rules were loaded")
}
} else {
if len(m) == 0 {
logging.Info("[IsolationRuleManager] Isolation rules were cleared")
} else {
logging.Info("[IsolationRuleManager] Isolation rules were loaded", "rules", string(bs))
}
}
}

// IsValidRule checks whether the given Rule is valid.
func IsValid(r *Rule) error {
if r == nil {
return errors.New("nil isolation rule")
}
if len(r.Resource) == 0 {
return errors.New("empty resource of isolation rule")
}
if r.MetricType != Concurrency {
return errors.Errorf("unsupported metric type: %d", r.MetricType)
}
if r.Threshold == 0 {
return errors.New("zero threshold")
}
return nil
}
39 changes: 39 additions & 0 deletions core/isolation/rule_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package isolation

import (
"testing"

"github.com/alibaba/sentinel-golang/logging"
"github.com/stretchr/testify/assert"
)

func TestLoadRules(t *testing.T) {
t.Run("TestLoadRules_1", func(t *testing.T) {
logging.SetGlobalLoggerLevel(logging.DebugLevel)
r1 := &Rule{
Resource: "abc1",
MetricType: Concurrency,
Threshold: 100,
}
r2 := &Rule{
Resource: "abc2",
MetricType: Concurrency,
Threshold: 200,
}
r3 := &Rule{
Resource: "abc3",
MetricType: MetricType(1),
Threshold: 200,
}
_, err := LoadRules([]*Rule{r1, r2, r3})
assert.True(t, err == nil)
assert.True(t, len(ruleMap) == 2)
assert.True(t, len(ruleMap["abc1"]) == 1)
assert.True(t, ruleMap["abc1"][0] == r1)
assert.True(t, len(ruleMap["abc2"]) == 1)
assert.True(t, ruleMap["abc2"][0] == r2)

err = ClearRules()
assert.True(t, err == nil)
})
}
47 changes: 47 additions & 0 deletions core/isolation/slot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package isolation

import (
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/logging"
"github.com/pkg/errors"
)

type Slot struct {
}

func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult {
resource := ctx.Resource.Name()
result := ctx.RuleCheckResult
if len(resource) == 0 {
return result
}
if passed, rule, snapshot := checkPass(ctx); !passed {
if result == nil {
result = base.NewTokenResultBlockedWithCause(base.BlockTypeIsolation, "", rule, snapshot)
} else {
result.ResetToBlockedWithCause(base.BlockTypeIsolation, "", rule, snapshot)
}
}
return result
}

func checkPass(ctx *base.EntryContext) (bool, *Rule, uint32) {
statNode := ctx.StatNode
acquireCount := ctx.Input.AcquireCount
curCount := uint32(0)
for _, rule := range getRulesOfResource(ctx.Resource.Name()) {
threshold := rule.Threshold
if rule.MetricType == Concurrency {
if cur := statNode.CurrentGoroutineNum(); cur >= 0 {
curCount = uint32(cur)
} else {
curCount = 0
logging.Error(errors.New("negative concurrency"), "", "rule", rule)
}
if curCount+acquireCount > threshold {
return false, rule, curCount
}
}
}
return true, nil, curCount
}
54 changes: 54 additions & 0 deletions example/isolation/concurrency_limitation_example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package main

import (
"math/rand"
"os"
"time"

sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/isolation"
"github.com/alibaba/sentinel-golang/logging"
)

func main() {
cfg := config.NewDefaultConfig()
cfg.Sentinel.Log.Logger = logging.NewConsoleLogger()
cfg.Sentinel.Log.Metric.FlushIntervalSec = 0
cfg.Sentinel.Stat.System.CollectIntervalMs = 0
err := sentinel.InitWithConfig(cfg)
if err != nil {
logging.Error(err, "fail")
os.Exit(1)
}
logging.SetGlobalLoggerLevel(logging.DebugLevel)
ch := make(chan struct{})

r1 := &isolation.Rule{
Resource: "abc",
MetricType: isolation.Concurrency,
Threshold: 12,
}
_, err = isolation.LoadRules([]*isolation.Rule{r1})
if err != nil {
logging.Error(err, "fail")
os.Exit(1)
}

for i := 0; i < 15; i++ {
go func() {
for {
e, b := sentinel.Entry("abc", sentinel.WithAcquireCount(1))
if b != nil {
logging.Info("blocked", "reason", b.BlockType().String(), "rule", b.TriggeredRule(), "snapshot", b.TriggeredValue())
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
logging.Info("passed")
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
e.Exit()
}
}
}()
}
<-ch
}

0 comments on commit 4162e79

Please sign in to comment.