From 963192c30fea9b5114db20daa4711394282316bd Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Wed, 25 Nov 2020 23:32:53 +0800 Subject: [PATCH] Fix the bug that unsigned estimatedQueueingDuration in throttling checker may overflow * Use int64 instead of uint64 here * Update wait duration to nanos Signed-off-by: Eric Zhao --- core/flow/tc_throttling.go | 50 ++++++++++++++++----------------- core/flow/tc_throttling_test.go | 4 +-- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/core/flow/tc_throttling.go b/core/flow/tc_throttling.go index 9196fdcff..5d5c8903f 100644 --- a/core/flow/tc_throttling.go +++ b/core/flow/tc_throttling.go @@ -3,34 +3,36 @@ package flow import ( "math" "sync/atomic" + "time" "github.com/alibaba/sentinel-golang/core/base" - "github.com/alibaba/sentinel-golang/core/config" "github.com/alibaba/sentinel-golang/util" ) +const ( + BlockMsgQueueing = "flow throttling check blocked, estimated queueing time exceeds max queueing time" + + MillisToNanosOffset = int64(time.Millisecond / time.Nanosecond) +) + // ThrottlingChecker limits the time interval between two requests. type ThrottlingChecker struct { owner *TrafficShapingController - maxQueueingTimeNs uint64 - statIntervalNs uint64 - lastPassedTime uint64 + maxQueueingTimeNs int64 + statIntervalNs int64 + lastPassedTime int64 } func NewThrottlingChecker(owner *TrafficShapingController, timeoutMs uint32, statIntervalMs uint32) *ThrottlingChecker { - var statIntervalNs uint64 + var statIntervalNs int64 if statIntervalMs == 0 { - defaultIntervalMs := config.MetricStatisticIntervalMs() - if defaultIntervalMs == 0 { - defaultIntervalMs = 1000 - } - statIntervalNs = uint64(defaultIntervalMs) * util.UnixTimeUnitOffset + statIntervalNs = 1000 * MillisToNanosOffset } else { - statIntervalNs = uint64(statIntervalMs) * util.UnixTimeUnitOffset + statIntervalNs = int64(statIntervalMs) * MillisToNanosOffset } return &ThrottlingChecker{ owner: owner, - maxQueueingTimeNs: uint64(timeoutMs) * util.UnixTimeUnitOffset, + maxQueueingTimeNs: int64(timeoutMs) * MillisToNanosOffset, statIntervalNs: statIntervalNs, lastPassedTime: 0, } @@ -58,35 +60,33 @@ func (c *ThrottlingChecker) DoCheck(_ base.StatNode, batchCount uint32, threshol return base.NewTokenResultBlocked(base.BlockTypeFlow) } // Here we use nanosecond so that we could control the queueing time more accurately. - curNano := util.CurrentTimeNano() + curNano := int64(util.CurrentTimeNano()) // The interval between two requests (in nanoseconds). - intervalNs := uint64(math.Ceil(float64(batchCount) / threshold * float64(c.statIntervalNs))) + intervalNs := int64(math.Ceil(float64(batchCount) / threshold * float64(c.statIntervalNs))) // Expected pass time of this request. - expectedTime := atomic.LoadUint64(&c.lastPassedTime) + intervalNs + expectedTime := atomic.LoadInt64(&c.lastPassedTime) + intervalNs if expectedTime <= curNano { // Contention may exist here, but it's okay. - atomic.StoreUint64(&c.lastPassedTime, curNano) + atomic.StoreInt64(&c.lastPassedTime, curNano) return nil } - estimatedQueueingDuration := atomic.LoadUint64(&c.lastPassedTime) + intervalNs - util.CurrentTimeNano() + estimatedQueueingDuration := atomic.LoadInt64(&c.lastPassedTime) + intervalNs - int64(util.CurrentTimeNano()) if estimatedQueueingDuration > c.maxQueueingTimeNs { - msg := "flow throttling check blocked, estimated queueing time exceeds max queueing time" - return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, msg, rule, nil) + return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, BlockMsgQueueing, rule, nil) } - oldTime := atomic.AddUint64(&c.lastPassedTime, intervalNs) - estimatedQueueingDuration = oldTime - util.CurrentTimeNano() + oldTime := atomic.AddInt64(&c.lastPassedTime, intervalNs) + estimatedQueueingDuration = oldTime - int64(util.CurrentTimeNano()) if estimatedQueueingDuration > c.maxQueueingTimeNs { // Subtract the interval. - atomic.AddUint64(&c.lastPassedTime, ^(intervalNs - 1)) - msg := "flow throttling check blocked, estimated queueing time exceeds max queueing time" - return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, msg, rule, nil) + atomic.AddInt64(&c.lastPassedTime, -intervalNs) + return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, BlockMsgQueueing, rule, nil) } if estimatedQueueingDuration > 0 { - return base.NewTokenResultShouldWait(estimatedQueueingDuration / util.UnixTimeUnitOffset) + return base.NewTokenResultShouldWait(time.Duration(estimatedQueueingDuration)) } else { return base.NewTokenResultShouldWait(0) } diff --git a/core/flow/tc_throttling_test.go b/core/flow/tc_throttling_test.go index 8867d54d7..b478b1cc6 100644 --- a/core/flow/tc_throttling_test.go +++ b/core/flow/tc_throttling_test.go @@ -61,8 +61,8 @@ func TestThrottlingChecker_DoCheckSingleThread(t *testing.T) { waitCount := int(float64(timeoutMs) / (float64(intervalMs) / threshold)) for i := 0; i < waitCount; i++ { assert.True(t, resultList[i].Status() == base.ResultStatusShouldWait) - wt := resultList[i].WaitMs() - assert.InEpsilon(t, (i+1)*1000/int(waitCount), wt, 10) + wt := resultList[i].NanosToWait() + assert.InEpsilon(t, (i+1)*(int)(time.Second/time.Nanosecond)/waitCount, wt, 10) } for i := waitCount; i < reqCount; i++ { assert.True(t, resultList[i].IsBlocked())