Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs of throttling control behavior in flow control module #332

Merged
merged 2 commits into from
Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 27 additions & 26 deletions core/base/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package base

import (
"fmt"
"time"
)

type BlockType uint8
Expand Down Expand Up @@ -58,13 +59,13 @@ func (s TokenResultStatus) String() string {
type TokenResult struct {
status TokenResultStatus

blockErr *BlockError
waitMs uint64
blockErr *BlockError
nanosToWait time.Duration
}

func (r *TokenResult) DeepCopyFrom(newResult *TokenResult) {
r.status = newResult.status
r.waitMs = newResult.waitMs
r.nanosToWait = newResult.nanosToWait
if r.blockErr == nil {
r.blockErr = &BlockError{
blockType: newResult.blockErr.blockType,
Expand All @@ -84,7 +85,7 @@ func (r *TokenResult) DeepCopyFrom(newResult *TokenResult) {
func (r *TokenResult) ResetToPass() {
r.status = ResultStatusPass
r.blockErr = nil
r.waitMs = 0
r.nanosToWait = 0
}

func (r *TokenResult) ResetToBlocked(blockType BlockType) {
Expand All @@ -97,7 +98,7 @@ func (r *TokenResult) ResetToBlocked(blockType BlockType) {
r.blockErr.rule = nil
r.blockErr.snapshotValue = nil
}
r.waitMs = 0
r.nanosToWait = 0
}

func (r *TokenResult) ResetToBlockedWithMessage(blockType BlockType, blockMsg string) {
Expand All @@ -110,7 +111,7 @@ func (r *TokenResult) ResetToBlockedWithMessage(blockType BlockType, blockMsg st
r.blockErr.rule = nil
r.blockErr.snapshotValue = nil
}
r.waitMs = 0
r.nanosToWait = 0
}

func (r *TokenResult) ResetToBlockedWithCause(blockType BlockType, blockMsg string, rule SentinelRule, snapshot interface{}) {
Expand All @@ -123,7 +124,7 @@ func (r *TokenResult) ResetToBlockedWithCause(blockType BlockType, blockMsg stri
r.blockErr.rule = rule
r.blockErr.snapshotValue = snapshot
}
r.waitMs = 0
r.nanosToWait = 0
}

func (r *TokenResult) IsPass() bool {
Expand All @@ -142,8 +143,8 @@ func (r *TokenResult) BlockError() *BlockError {
return r.blockErr
}

func (r *TokenResult) WaitMs() uint64 {
return r.waitMs
func (r *TokenResult) NanosToWait() time.Duration {
return r.nanosToWait
}

func (r *TokenResult) String() string {
Expand All @@ -153,45 +154,45 @@ func (r *TokenResult) String() string {
} else {
blockMsg = r.blockErr.Error()
}
return fmt.Sprintf("TokenResult{status=%s, blockErr=%s, waitMs=%d}", r.status.String(), blockMsg, r.waitMs)
return fmt.Sprintf("TokenResult{status=%s, blockErr=%s, nanosToWait=%d}", r.status.String(), blockMsg, r.nanosToWait)
}

func NewTokenResultPass() *TokenResult {
return &TokenResult{
status: ResultStatusPass,
blockErr: nil,
waitMs: 0,
status: ResultStatusPass,
blockErr: nil,
nanosToWait: 0,
}
}

func NewTokenResultBlocked(blockType BlockType) *TokenResult {
return &TokenResult{
status: ResultStatusBlocked,
blockErr: NewBlockError(blockType),
waitMs: 0,
status: ResultStatusBlocked,
blockErr: NewBlockError(blockType),
nanosToWait: 0,
}
}

func NewTokenResultBlockedWithMessage(blockType BlockType, blockMsg string) *TokenResult {
return &TokenResult{
status: ResultStatusBlocked,
blockErr: NewBlockErrorWithMessage(blockType, blockMsg),
waitMs: 0,
status: ResultStatusBlocked,
blockErr: NewBlockErrorWithMessage(blockType, blockMsg),
nanosToWait: 0,
}
}

func NewTokenResultBlockedWithCause(blockType BlockType, blockMsg string, rule SentinelRule, snapshot interface{}) *TokenResult {
return &TokenResult{
status: ResultStatusBlocked,
blockErr: NewBlockErrorWithCause(blockType, blockMsg, rule, snapshot),
waitMs: 0,
status: ResultStatusBlocked,
blockErr: NewBlockErrorWithCause(blockType, blockMsg, rule, snapshot),
nanosToWait: 0,
}
}

func NewTokenResultShouldWait(waitMs uint64) *TokenResult {
func NewTokenResultShouldWait(waitNs time.Duration) *TokenResult {
return &TokenResult{
status: ResultStatusShouldWait,
blockErr: nil,
waitMs: waitMs,
status: ResultStatusShouldWait,
blockErr: nil,
nanosToWait: waitNs,
}
}
4 changes: 2 additions & 2 deletions core/flow/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult {
return r
}
if r.Status() == base.ResultStatusShouldWait {
if waitMs := r.WaitMs(); waitMs > 0 {
if nanosToWait := r.NanosToWait(); nanosToWait > 0 {
// Handle waiting action.
time.Sleep(time.Duration(waitMs) * time.Millisecond)
time.Sleep(nanosToWait)
}
continue
}
Expand Down
50 changes: 25 additions & 25 deletions core/flow/tc_throttling.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,36 @@ package flow
import (
"math"
"sync/atomic"
"time"

"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/util"
)

const (
BlockMsgQueueing = "flow throttling check blocked, estimated queueing time exceeds max queueing time"

MillisToNanosOffset = int64(time.Millisecond / time.Nanosecond)
)

// ThrottlingChecker limits the time interval between two requests.
type ThrottlingChecker struct {
owner *TrafficShapingController
maxQueueingTimeNs uint64
statIntervalNs uint64
lastPassedTime uint64
maxQueueingTimeNs int64
statIntervalNs int64
lastPassedTime int64
}

func NewThrottlingChecker(owner *TrafficShapingController, timeoutMs uint32, statIntervalMs uint32) *ThrottlingChecker {
var statIntervalNs uint64
var statIntervalNs int64
if statIntervalMs == 0 {
defaultIntervalMs := config.MetricStatisticIntervalMs()
if defaultIntervalMs == 0 {
defaultIntervalMs = 1000
}
statIntervalNs = uint64(defaultIntervalMs) * util.UnixTimeUnitOffset
statIntervalNs = 1000 * MillisToNanosOffset
} else {
statIntervalNs = uint64(statIntervalMs) * util.UnixTimeUnitOffset
statIntervalNs = int64(statIntervalMs) * MillisToNanosOffset
}
return &ThrottlingChecker{
owner: owner,
maxQueueingTimeNs: uint64(timeoutMs) * util.UnixTimeUnitOffset,
maxQueueingTimeNs: int64(timeoutMs) * MillisToNanosOffset,
statIntervalNs: statIntervalNs,
lastPassedTime: 0,
}
Expand Down Expand Up @@ -58,35 +60,33 @@ func (c *ThrottlingChecker) DoCheck(_ base.StatNode, batchCount uint32, threshol
return base.NewTokenResultBlocked(base.BlockTypeFlow)
}
// Here we use nanosecond so that we could control the queueing time more accurately.
curNano := util.CurrentTimeNano()
curNano := int64(util.CurrentTimeNano())

// The interval between two requests (in nanoseconds).
intervalNs := uint64(math.Ceil(float64(batchCount) / threshold * float64(c.statIntervalNs)))
intervalNs := int64(math.Ceil(float64(batchCount) / threshold * float64(c.statIntervalNs)))

// Expected pass time of this request.
expectedTime := atomic.LoadUint64(&c.lastPassedTime) + intervalNs
expectedTime := atomic.LoadInt64(&c.lastPassedTime) + intervalNs
if expectedTime <= curNano {
// Contention may exist here, but it's okay.
atomic.StoreUint64(&c.lastPassedTime, curNano)
atomic.StoreInt64(&c.lastPassedTime, curNano)
return nil
}

estimatedQueueingDuration := atomic.LoadUint64(&c.lastPassedTime) + intervalNs - util.CurrentTimeNano()
estimatedQueueingDuration := atomic.LoadInt64(&c.lastPassedTime) + intervalNs - int64(util.CurrentTimeNano())
if estimatedQueueingDuration > c.maxQueueingTimeNs {
msg := "flow throttling check blocked, estimated queueing time exceeds max queueing time"
return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, msg, rule, nil)
return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, BlockMsgQueueing, rule, nil)
}

oldTime := atomic.AddUint64(&c.lastPassedTime, intervalNs)
estimatedQueueingDuration = oldTime - util.CurrentTimeNano()
oldTime := atomic.AddInt64(&c.lastPassedTime, intervalNs)
estimatedQueueingDuration = oldTime - int64(util.CurrentTimeNano())
if estimatedQueueingDuration > c.maxQueueingTimeNs {
// Subtract the interval.
atomic.AddUint64(&c.lastPassedTime, ^(intervalNs - 1))
msg := "flow throttling check blocked, estimated queueing time exceeds max queueing time"
return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, msg, rule, nil)
atomic.AddInt64(&c.lastPassedTime, -intervalNs)
return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, BlockMsgQueueing, rule, nil)
}
if estimatedQueueingDuration > 0 {
return base.NewTokenResultShouldWait(estimatedQueueingDuration / util.UnixTimeUnitOffset)
return base.NewTokenResultShouldWait(time.Duration(estimatedQueueingDuration))
} else {
return base.NewTokenResultShouldWait(0)
}
Expand Down
4 changes: 2 additions & 2 deletions core/flow/tc_throttling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func TestThrottlingChecker_DoCheckSingleThread(t *testing.T) {
waitCount := int(float64(timeoutMs) / (float64(intervalMs) / threshold))
for i := 0; i < waitCount; i++ {
assert.True(t, resultList[i].Status() == base.ResultStatusShouldWait)
wt := resultList[i].WaitMs()
assert.InEpsilon(t, (i+1)*1000/int(waitCount), wt, 10)
wt := resultList[i].NanosToWait()
assert.InEpsilon(t, (i+1)*(int)(time.Second/time.Nanosecond)/waitCount, wt, 10)
}
for i := waitCount; i < reqCount; i++ {
assert.True(t, resultList[i].IsBlocked())
Expand Down
4 changes: 2 additions & 2 deletions core/hotspot/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult {
return r
}
if r.Status() == base.ResultStatusShouldWait {
if waitMs := r.WaitMs(); waitMs > 0 {
if nanosToWait := r.NanosToWait(); nanosToWait > 0 {
// Handle waiting action.
time.Sleep(time.Duration(waitMs) * time.Millisecond)
time.Sleep(nanosToWait)
}
continue
}
Expand Down
3 changes: 2 additions & 1 deletion core/hotspot/traffic_shaping.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math"
"runtime"
"sync/atomic"
"time"

"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/hotspot/cache"
Expand Down Expand Up @@ -281,7 +282,7 @@ func (c *throttlingTrafficShapingController) PerformChecking(arg interface{}, ba
awaitTime := expectedTime - currentTimeInMs
if awaitTime > 0 {
atomic.StoreInt64(lastPassTimePtr, expectedTime)
return base.NewTokenResultShouldWait(uint64(awaitTime))
return base.NewTokenResultShouldWait(time.Duration(awaitTime) * time.Millisecond)
}
return nil
} else {
Expand Down