From ce281a771aa42898e35522ef95e883b262ebd2b9 Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Wed, 25 Nov 2020 23:31:26 +0800 Subject: [PATCH 1/2] Change semantic of waitMs to nanosToWait in TokenResult and polish related stat slots Signed-off-by: Eric Zhao --- core/base/result.go | 53 +++++++++++++++++---------------- core/flow/slot.go | 4 +-- core/hotspot/slot.go | 4 +-- core/hotspot/traffic_shaping.go | 3 +- 4 files changed, 33 insertions(+), 31 deletions(-) diff --git a/core/base/result.go b/core/base/result.go index ce5d1da25..b979357e3 100644 --- a/core/base/result.go +++ b/core/base/result.go @@ -2,6 +2,7 @@ package base import ( "fmt" + "time" ) type BlockType uint8 @@ -58,13 +59,13 @@ func (s TokenResultStatus) String() string { type TokenResult struct { status TokenResultStatus - blockErr *BlockError - waitMs uint64 + blockErr *BlockError + nanosToWait time.Duration } func (r *TokenResult) DeepCopyFrom(newResult *TokenResult) { r.status = newResult.status - r.waitMs = newResult.waitMs + r.nanosToWait = newResult.nanosToWait if r.blockErr == nil { r.blockErr = &BlockError{ blockType: newResult.blockErr.blockType, @@ -84,7 +85,7 @@ func (r *TokenResult) DeepCopyFrom(newResult *TokenResult) { func (r *TokenResult) ResetToPass() { r.status = ResultStatusPass r.blockErr = nil - r.waitMs = 0 + r.nanosToWait = 0 } func (r *TokenResult) ResetToBlocked(blockType BlockType) { @@ -97,7 +98,7 @@ func (r *TokenResult) ResetToBlocked(blockType BlockType) { r.blockErr.rule = nil r.blockErr.snapshotValue = nil } - r.waitMs = 0 + r.nanosToWait = 0 } func (r *TokenResult) ResetToBlockedWithMessage(blockType BlockType, blockMsg string) { @@ -110,7 +111,7 @@ func (r *TokenResult) ResetToBlockedWithMessage(blockType BlockType, blockMsg st r.blockErr.rule = nil r.blockErr.snapshotValue = nil } - r.waitMs = 0 + r.nanosToWait = 0 } func (r *TokenResult) ResetToBlockedWithCause(blockType BlockType, blockMsg string, rule SentinelRule, snapshot interface{}) { @@ -123,7 +124,7 @@ func (r *TokenResult) ResetToBlockedWithCause(blockType BlockType, blockMsg stri r.blockErr.rule = rule r.blockErr.snapshotValue = snapshot } - r.waitMs = 0 + r.nanosToWait = 0 } func (r *TokenResult) IsPass() bool { @@ -142,8 +143,8 @@ func (r *TokenResult) BlockError() *BlockError { return r.blockErr } -func (r *TokenResult) WaitMs() uint64 { - return r.waitMs +func (r *TokenResult) NanosToWait() time.Duration { + return r.nanosToWait } func (r *TokenResult) String() string { @@ -153,45 +154,45 @@ func (r *TokenResult) String() string { } else { blockMsg = r.blockErr.Error() } - return fmt.Sprintf("TokenResult{status=%s, blockErr=%s, waitMs=%d}", r.status.String(), blockMsg, r.waitMs) + return fmt.Sprintf("TokenResult{status=%s, blockErr=%s, nanosToWait=%d}", r.status.String(), blockMsg, r.nanosToWait) } func NewTokenResultPass() *TokenResult { return &TokenResult{ - status: ResultStatusPass, - blockErr: nil, - waitMs: 0, + status: ResultStatusPass, + blockErr: nil, + nanosToWait: 0, } } func NewTokenResultBlocked(blockType BlockType) *TokenResult { return &TokenResult{ - status: ResultStatusBlocked, - blockErr: NewBlockError(blockType), - waitMs: 0, + status: ResultStatusBlocked, + blockErr: NewBlockError(blockType), + nanosToWait: 0, } } func NewTokenResultBlockedWithMessage(blockType BlockType, blockMsg string) *TokenResult { return &TokenResult{ - status: ResultStatusBlocked, - blockErr: NewBlockErrorWithMessage(blockType, blockMsg), - waitMs: 0, + status: ResultStatusBlocked, + blockErr: NewBlockErrorWithMessage(blockType, blockMsg), + nanosToWait: 0, } } func NewTokenResultBlockedWithCause(blockType BlockType, blockMsg string, rule SentinelRule, snapshot interface{}) *TokenResult { return &TokenResult{ - status: ResultStatusBlocked, - blockErr: NewBlockErrorWithCause(blockType, blockMsg, rule, snapshot), - waitMs: 0, + status: ResultStatusBlocked, + blockErr: NewBlockErrorWithCause(blockType, blockMsg, rule, snapshot), + nanosToWait: 0, } } -func NewTokenResultShouldWait(waitMs uint64) *TokenResult { +func NewTokenResultShouldWait(waitNs time.Duration) *TokenResult { return &TokenResult{ - status: ResultStatusShouldWait, - blockErr: nil, - waitMs: waitMs, + status: ResultStatusShouldWait, + blockErr: nil, + nanosToWait: waitNs, } } diff --git a/core/flow/slot.go b/core/flow/slot.go index fd81495c5..c56aea24c 100644 --- a/core/flow/slot.go +++ b/core/flow/slot.go @@ -49,9 +49,9 @@ func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult { return r } if r.Status() == base.ResultStatusShouldWait { - if waitMs := r.WaitMs(); waitMs > 0 { + if nanosToWait := r.NanosToWait(); nanosToWait > 0 { // Handle waiting action. - time.Sleep(time.Duration(waitMs) * time.Millisecond) + time.Sleep(nanosToWait) } continue } diff --git a/core/hotspot/slot.go b/core/hotspot/slot.go index f3e53a189..6843982bc 100644 --- a/core/hotspot/slot.go +++ b/core/hotspot/slot.go @@ -72,9 +72,9 @@ func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult { return r } if r.Status() == base.ResultStatusShouldWait { - if waitMs := r.WaitMs(); waitMs > 0 { + if nanosToWait := r.NanosToWait(); nanosToWait > 0 { // Handle waiting action. - time.Sleep(time.Duration(waitMs) * time.Millisecond) + time.Sleep(nanosToWait) } continue } diff --git a/core/hotspot/traffic_shaping.go b/core/hotspot/traffic_shaping.go index fd4a5f656..68aa6cc92 100644 --- a/core/hotspot/traffic_shaping.go +++ b/core/hotspot/traffic_shaping.go @@ -5,6 +5,7 @@ import ( "math" "runtime" "sync/atomic" + "time" "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/core/hotspot/cache" @@ -281,7 +282,7 @@ func (c *throttlingTrafficShapingController) PerformChecking(arg interface{}, ba awaitTime := expectedTime - currentTimeInMs if awaitTime > 0 { atomic.StoreInt64(lastPassTimePtr, expectedTime) - return base.NewTokenResultShouldWait(uint64(awaitTime)) + return base.NewTokenResultShouldWait(time.Duration(awaitTime) * time.Millisecond) } return nil } else { From d443d80142df31a433f321f5801c8ebb56be4314 Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Wed, 25 Nov 2020 23:32:53 +0800 Subject: [PATCH 2/2] Fix the bug that unsigned estimatedQueueingDuration in throttling checker may overflow * Use int64 instead of uint64 here * Update wait duration to nanos Signed-off-by: Eric Zhao --- core/flow/tc_throttling.go | 50 ++++++++++++++++----------------- core/flow/tc_throttling_test.go | 4 +-- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/core/flow/tc_throttling.go b/core/flow/tc_throttling.go index 9196fdcff..5d5c8903f 100644 --- a/core/flow/tc_throttling.go +++ b/core/flow/tc_throttling.go @@ -3,34 +3,36 @@ package flow import ( "math" "sync/atomic" + "time" "github.com/alibaba/sentinel-golang/core/base" - "github.com/alibaba/sentinel-golang/core/config" "github.com/alibaba/sentinel-golang/util" ) +const ( + BlockMsgQueueing = "flow throttling check blocked, estimated queueing time exceeds max queueing time" + + MillisToNanosOffset = int64(time.Millisecond / time.Nanosecond) +) + // ThrottlingChecker limits the time interval between two requests. type ThrottlingChecker struct { owner *TrafficShapingController - maxQueueingTimeNs uint64 - statIntervalNs uint64 - lastPassedTime uint64 + maxQueueingTimeNs int64 + statIntervalNs int64 + lastPassedTime int64 } func NewThrottlingChecker(owner *TrafficShapingController, timeoutMs uint32, statIntervalMs uint32) *ThrottlingChecker { - var statIntervalNs uint64 + var statIntervalNs int64 if statIntervalMs == 0 { - defaultIntervalMs := config.MetricStatisticIntervalMs() - if defaultIntervalMs == 0 { - defaultIntervalMs = 1000 - } - statIntervalNs = uint64(defaultIntervalMs) * util.UnixTimeUnitOffset + statIntervalNs = 1000 * MillisToNanosOffset } else { - statIntervalNs = uint64(statIntervalMs) * util.UnixTimeUnitOffset + statIntervalNs = int64(statIntervalMs) * MillisToNanosOffset } return &ThrottlingChecker{ owner: owner, - maxQueueingTimeNs: uint64(timeoutMs) * util.UnixTimeUnitOffset, + maxQueueingTimeNs: int64(timeoutMs) * MillisToNanosOffset, statIntervalNs: statIntervalNs, lastPassedTime: 0, } @@ -58,35 +60,33 @@ func (c *ThrottlingChecker) DoCheck(_ base.StatNode, batchCount uint32, threshol return base.NewTokenResultBlocked(base.BlockTypeFlow) } // Here we use nanosecond so that we could control the queueing time more accurately. - curNano := util.CurrentTimeNano() + curNano := int64(util.CurrentTimeNano()) // The interval between two requests (in nanoseconds). - intervalNs := uint64(math.Ceil(float64(batchCount) / threshold * float64(c.statIntervalNs))) + intervalNs := int64(math.Ceil(float64(batchCount) / threshold * float64(c.statIntervalNs))) // Expected pass time of this request. - expectedTime := atomic.LoadUint64(&c.lastPassedTime) + intervalNs + expectedTime := atomic.LoadInt64(&c.lastPassedTime) + intervalNs if expectedTime <= curNano { // Contention may exist here, but it's okay. - atomic.StoreUint64(&c.lastPassedTime, curNano) + atomic.StoreInt64(&c.lastPassedTime, curNano) return nil } - estimatedQueueingDuration := atomic.LoadUint64(&c.lastPassedTime) + intervalNs - util.CurrentTimeNano() + estimatedQueueingDuration := atomic.LoadInt64(&c.lastPassedTime) + intervalNs - int64(util.CurrentTimeNano()) if estimatedQueueingDuration > c.maxQueueingTimeNs { - msg := "flow throttling check blocked, estimated queueing time exceeds max queueing time" - return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, msg, rule, nil) + return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, BlockMsgQueueing, rule, nil) } - oldTime := atomic.AddUint64(&c.lastPassedTime, intervalNs) - estimatedQueueingDuration = oldTime - util.CurrentTimeNano() + oldTime := atomic.AddInt64(&c.lastPassedTime, intervalNs) + estimatedQueueingDuration = oldTime - int64(util.CurrentTimeNano()) if estimatedQueueingDuration > c.maxQueueingTimeNs { // Subtract the interval. - atomic.AddUint64(&c.lastPassedTime, ^(intervalNs - 1)) - msg := "flow throttling check blocked, estimated queueing time exceeds max queueing time" - return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, msg, rule, nil) + atomic.AddInt64(&c.lastPassedTime, -intervalNs) + return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, BlockMsgQueueing, rule, nil) } if estimatedQueueingDuration > 0 { - return base.NewTokenResultShouldWait(estimatedQueueingDuration / util.UnixTimeUnitOffset) + return base.NewTokenResultShouldWait(time.Duration(estimatedQueueingDuration)) } else { return base.NewTokenResultShouldWait(0) } diff --git a/core/flow/tc_throttling_test.go b/core/flow/tc_throttling_test.go index 8867d54d7..b478b1cc6 100644 --- a/core/flow/tc_throttling_test.go +++ b/core/flow/tc_throttling_test.go @@ -61,8 +61,8 @@ func TestThrottlingChecker_DoCheckSingleThread(t *testing.T) { waitCount := int(float64(timeoutMs) / (float64(intervalMs) / threshold)) for i := 0; i < waitCount; i++ { assert.True(t, resultList[i].Status() == base.ResultStatusShouldWait) - wt := resultList[i].WaitMs() - assert.InEpsilon(t, (i+1)*1000/int(waitCount), wt, 10) + wt := resultList[i].NanosToWait() + assert.InEpsilon(t, (i+1)*(int)(time.Second/time.Nanosecond)/waitCount, wt, 10) } for i := waitCount; i < reqCount; i++ { assert.True(t, resultList[i].IsBlocked())