From 048937540831915867a1da6b00bd78d9f31e80ee Mon Sep 17 00:00:00 2001 From: louyuting <1849491904@qq.com> Date: Tue, 4 Aug 2020 23:05:27 +0800 Subject: [PATCH] Make the current standard time align to the associated BucketWrap's startTime --- core/circuitbreaker/circuit_breaker.go | 23 ++- .../base/atomic_window_wrap_array_test.go | 100 ++++++------- core/stat/base/bucket_leap_array_test.go | 2 +- core/stat/base/leap_array.go | 55 ++++--- core/stat/base/leap_array_test.go | 136 +++--------------- 5 files changed, 125 insertions(+), 191 deletions(-) diff --git a/core/circuitbreaker/circuit_breaker.go b/core/circuitbreaker/circuit_breaker.go index 09b3ac2d4..e05c855fe 100644 --- a/core/circuitbreaker/circuit_breaker.go +++ b/core/circuitbreaker/circuit_breaker.go @@ -200,7 +200,12 @@ func newSlowRtCircuitBreakerWithStat(r *slowRtRule, stat *slowRequestLeapArray) func newSlowRtCircuitBreaker(r *slowRtRule) *slowRtCircuitBreaker { interval := r.StatIntervalMs stat := &slowRequestLeapArray{} - stat.data = sbase.NewLeapArray(1, interval, stat) + leapArray, err := sbase.NewLeapArray(1, interval, stat) + if err != nil { + logger.Errorf("Fail to convert slowRtRule to slowRtCircuitBreaker, rule: %+v", r) + return nil + } + stat.data = leapArray return newSlowRtCircuitBreakerWithStat(r, stat) } @@ -382,8 +387,12 @@ func newErrorRatioCircuitBreakerWithStat(r *errorRatioRule, stat *errorCounterLe func newErrorRatioCircuitBreaker(r *errorRatioRule) *errorRatioCircuitBreaker { interval := r.StatIntervalMs stat := &errorCounterLeapArray{} - stat.data = sbase.NewLeapArray(1, interval, stat) - + leapArray, err := sbase.NewLeapArray(1, interval, stat) + if err != nil { + logger.Errorf("Fail to convert errorRatioRule to errorRatioCircuitBreaker, rule: %+v", r) + return nil + } + stat.data = leapArray return newErrorRatioCircuitBreakerWithStat(r, stat) } @@ -559,8 +568,12 @@ func newErrorCountCircuitBreakerWithStat(r *errorCountRule, stat *errorCounterLe func newErrorCountCircuitBreaker(r *errorCountRule) *errorCountCircuitBreaker { interval := r.StatIntervalMs stat := &errorCounterLeapArray{} - stat.data = sbase.NewLeapArray(1, interval, stat) - + leapArray, err := sbase.NewLeapArray(1, interval, stat) + if err != nil { + logger.Errorf("Fail to convert errorCountRule to errorCountCircuitBreaker, rule: %+v", r) + return nil + } + stat.data = leapArray return newErrorCountCircuitBreakerWithStat(r, stat) } diff --git a/core/stat/base/atomic_window_wrap_array_test.go b/core/stat/base/atomic_window_wrap_array_test.go index 4921ef6c8..7e7b0e702 100644 --- a/core/stat/base/atomic_window_wrap_array_test.go +++ b/core/stat/base/atomic_window_wrap_array_test.go @@ -9,52 +9,9 @@ import ( "time" "github.com/alibaba/sentinel-golang/util" + "github.com/stretchr/testify/assert" ) -func Test_newAtomicBucketWrapArray_normal(t *testing.T) { - type args struct { - len int - bucketLengthInMs uint32 - bg BucketGenerator - } - tests := []struct { - name string - args args - want *AtomicBucketWrapArray - }{ - { - name: "Test_newAtomicBucketWrapArray_normal", - args: args{ - len: int(SampleCount), - bucketLengthInMs: BucketLengthInMs, - bg: &leapArrayMock{}, - }, - want: nil, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ret := NewAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg) - if ret == nil || uintptr(ret.base) == uintptr(0) || ret.length != tt.args.len || ret.data == nil || len(ret.data) == 0 { - t.Errorf("NewAtomicBucketWrapArray() %+v is illegal.\n", ret) - return - } - dataNil := false - for _, v := range ret.data { - if v == nil { - dataNil = true - break - } - } - if dataNil { - t.Error("NewAtomicBucketWrapArray exists nil BucketWrap.") - } - - }) - } -} - func Test_atomicBucketWrapArray_elementOffset(t *testing.T) { type args struct { len int @@ -80,7 +37,8 @@ func Test_atomicBucketWrapArray_elementOffset(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - aa := NewAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg) + now := uint64(1596199310000) + aa := NewAtomicBucketWrapArrayWithTime(tt.args.len, tt.args.bucketLengthInMs, now, tt.args.bg) if got := uintptr(aa.elementOffset(tt.args.idx)) - uintptr(aa.base); got != tt.want { t.Errorf("AtomicBucketWrapArray.elementOffset() = %v, want %v \n", got, tt.want) } @@ -113,7 +71,8 @@ func Test_atomicBucketWrapArray_get(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - aa := NewAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg) + now := uint64(1596199310000) + aa := NewAtomicBucketWrapArrayWithTime(tt.args.len, tt.args.bucketLengthInMs, now, tt.args.bg) tt.want = aa.data[9] if got := aa.get(tt.args.idx); !reflect.DeepEqual(got, tt.want) { t.Errorf("AtomicBucketWrapArray.get() = %v, want %v", got, tt.want) @@ -147,7 +106,8 @@ func Test_atomicBucketWrapArray_compareAndSet(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - aa := NewAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg) + now := uint64(1596199310000) + aa := NewAtomicBucketWrapArrayWithTime(tt.args.len, tt.args.bucketLengthInMs, now, tt.args.bg) update := &BucketWrap{ BucketStart: 8888888888888, Value: atomic.Value{}, @@ -183,7 +143,8 @@ func taskGet(wg *sync.WaitGroup, at *AtomicBucketWrapArray, t *testing.T) { } func Test_atomicBucketWrapArray_Concurrency_Get(t *testing.T) { - ret := NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}) + now := uint64(1596199310000) + ret := NewAtomicBucketWrapArrayWithTime(int(SampleCount), BucketLengthInMs, now, &leapArrayMock{}) for _, ww := range ret.data { c := new(int64) *c = 0 @@ -230,7 +191,8 @@ func taskSet(wg *sync.WaitGroup, at *AtomicBucketWrapArray, t *testing.T) { } func Test_atomicBucketWrapArray_Concurrency_Set(t *testing.T) { - ret := NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}) + now := uint64(1596199310000) + ret := NewAtomicBucketWrapArrayWithTime(int(SampleCount), BucketLengthInMs, now, &leapArrayMock{}) for _, ww := range ret.data { c := new(int64) *c = 0 @@ -253,3 +215,43 @@ func Test_atomicBucketWrapArray_Concurrency_Set(t *testing.T) { } t.Log("all done") } + +func TestNewAtomicBucketWrapArrayWithTime(t *testing.T) { + t.Run("TestNewAtomicBucketWrapArrayWithTime_normal", func(t *testing.T) { + now := uint64(1596199317001) + arrayStartTime := uint64(1596199316000) + idx := int((now - arrayStartTime) / 200) + a := NewAtomicBucketWrapArrayWithTime(10, 200, now, &leapArrayMock{}) + + targetTime := arrayStartTime + uint64(idx*200) + for i := idx; i < 10; i++ { + b := a.get(i) + assert.True(t, b.BucketStart == targetTime, "Check start failed") + targetTime += 200 + } + for i := 0; i < idx; i++ { + b := a.get(i) + assert.True(t, b.BucketStart == targetTime, "Check start failed") + targetTime += 200 + } + }) + + t.Run("TestNewAtomicBucketWrapArrayWithTime_edge1", func(t *testing.T) { + now := uint64(1596199316000) + arrayStartTime := uint64(1596199316000) + idx := int((now - arrayStartTime) / 200) + a := NewAtomicBucketWrapArrayWithTime(10, 200, now, &leapArrayMock{}) + + targetTime := arrayStartTime + uint64(idx*200) + for i := idx; i < 10; i++ { + b := a.get(i) + assert.True(t, b.BucketStart == targetTime, "Check start failed") + targetTime += 200 + } + for i := 0; i < idx; i++ { + b := a.get(i) + assert.True(t, b.BucketStart == targetTime, "Check start failed") + targetTime += 200 + } + }) +} diff --git a/core/stat/base/bucket_leap_array_test.go b/core/stat/base/bucket_leap_array_test.go index bc37dc4d1..b4cea6952 100644 --- a/core/stat/base/bucket_leap_array_test.go +++ b/core/stat/base/bucket_leap_array_test.go @@ -95,7 +95,7 @@ func coroutineTask(wg *sync.WaitGroup, slidingWindow *BucketLeapArray, now uint6 func TestBucketLeapArray_resetBucketTo(t *testing.T) { bla := NewBucketLeapArray(SampleCount, IntervalInMs) - idx := 6 + idx := 19 oldBucketWrap := bla.data.array.get(idx) oldBucket := oldBucketWrap.Value.Load() if oldBucket == nil { diff --git a/core/stat/base/leap_array.go b/core/stat/base/leap_array.go index fc22804fb..67eac6780 100644 --- a/core/stat/base/leap_array.go +++ b/core/stat/base/leap_array.go @@ -49,28 +49,33 @@ type AtomicBucketWrapArray struct { data []*BucketWrap } -// New AtomicBucketWrapArray with initializing field data -// Default, automatically initialize each BucketWrap -// len: length of array -// bucketLengthInMs: bucket length of BucketWrap -// generator: generator to generate bucket -func NewAtomicBucketWrapArray(len int, bucketLengthInMs uint32, generator BucketGenerator) *AtomicBucketWrapArray { +func NewAtomicBucketWrapArrayWithTime(len int, bucketLengthInMs uint32, now uint64, generator BucketGenerator) *AtomicBucketWrapArray { ret := &AtomicBucketWrapArray{ length: len, data: make([]*BucketWrap, len), } - // automatically initialize each BucketWrap - // tail BucketWrap of data is initialized with current time - startTime := calculateStartTime(util.CurrentTimeMillis(), bucketLengthInMs) - for i := len - 1; i >= 0; i-- { + timeId := now / uint64(bucketLengthInMs) + idx := int(timeId) % len + startTime := calculateStartTime(now, bucketLengthInMs) + + for i := idx; i <= len-1; i++ { ww := &BucketWrap{ BucketStart: startTime, Value: atomic.Value{}, } ww.Value.Store(generator.NewEmptyBucket()) ret.data[i] = ww - startTime -= uint64(bucketLengthInMs) + startTime += uint64(bucketLengthInMs) + } + for i := 0; i < idx; i++ { + ww := &BucketWrap{ + BucketStart: startTime, + Value: atomic.Value{}, + } + ww.Value.Store(generator.NewEmptyBucket()) + ret.data[i] = ww + startTime += uint64(bucketLengthInMs) } // calculate base address for real data array @@ -79,6 +84,15 @@ func NewAtomicBucketWrapArray(len int, bucketLengthInMs uint32, generator Bucket return ret } +// New AtomicBucketWrapArray with initializing field data +// Default, automatically initialize each BucketWrap +// len: length of array +// bucketLengthInMs: bucket length of BucketWrap +// generator: generator to generate bucket +func NewAtomicBucketWrapArray(len int, bucketLengthInMs uint32, generator BucketGenerator) *AtomicBucketWrapArray { + return NewAtomicBucketWrapArrayWithTime(len, bucketLengthInMs, util.CurrentTimeMillis(), generator) +} + func (aa *AtomicBucketWrapArray) elementOffset(idx int) unsafe.Pointer { if idx >= aa.length || idx < 0 { panic(fmt.Sprintf("The index (%d) is out of bounds, length is %d.", idx, aa.length)) @@ -103,7 +117,14 @@ func (aa *AtomicBucketWrapArray) compareAndSet(idx int, except, update *BucketWr // The BucketWrap leap array, // sampleCount represent the number of BucketWrap // intervalInMs represent the interval of LeapArray. -// For example, bucketLengthInMs is 500ms, intervalInMs is 1min, so sampleCount is 120. +// For example, bucketLengthInMs is 200ms, intervalInMs is 1000ms, so sampleCount is 5. +// Give a diagram to illustrate +// Suppose current time is 888, bucketLengthInMs is 200ms, intervalInMs is 1000ms, LeapArray will build the below windows +// B0 B1 B2 B3 B4 +// |_______|_______|_______|_______|_______| +// 800 1000 +// ^ +// time=888 type LeapArray struct { bucketLengthInMs uint32 sampleCount uint32 @@ -113,20 +134,20 @@ type LeapArray struct { updateLock mutex } -func NewLeapArray(sampleCount uint32, intervalInMs uint32, generator BucketGenerator) *LeapArray { +func NewLeapArray(sampleCount uint32, intervalInMs uint32, generator BucketGenerator) (*LeapArray, error) { if intervalInMs%sampleCount != 0 { - panic(fmt.Sprintf("Invalid parameters, intervalInMs is %d, sampleCount is %d.", intervalInMs, sampleCount)) + return nil, errors.Errorf("Invalid parameters, intervalInMs is %d, sampleCount is %d", intervalInMs, sampleCount) } if generator == nil { - panic("Invalid parameters, generator is nil.") + return nil, errors.Errorf("Invalid parameters, generator is nil") } bucketLengthInMs := intervalInMs / sampleCount return &LeapArray{ bucketLengthInMs: bucketLengthInMs, sampleCount: sampleCount, intervalInMs: intervalInMs, - array: NewAtomicBucketWrapArray(int(sampleCount), intervalInMs, generator), - } + array: NewAtomicBucketWrapArray(int(sampleCount), bucketLengthInMs, generator), + }, nil } func (la *LeapArray) CurrentBucket(bg BucketGenerator) (*BucketWrap, error) { diff --git a/core/stat/base/leap_array_test.go b/core/stat/base/leap_array_test.go index 4cd02b1f1..db663c5b6 100644 --- a/core/stat/base/leap_array_test.go +++ b/core/stat/base/leap_array_test.go @@ -32,10 +32,6 @@ func Test_bucketWrapper_Size(t *testing.T) { } } -//type metricBucketMock struct { -// mock.Mock -//} - // mock ArrayMock and implement BucketGenerator type leapArrayMock struct { mock.Mock @@ -132,122 +128,24 @@ func Test_calculateStartTime_normal(t *testing.T) { } func Test_leapArray_BucketStartCheck_normal(t *testing.T) { - type fields struct { - BucketLengthInMs uint32 - sampleCount uint32 - intervalInMs uint32 - array *AtomicBucketWrapArray - mux mutex - } - type args struct { - bg BucketGenerator - timeMillis uint64 - } - tests := []struct { - name string - fields fields - args args - want uint64 //start time of bucket - }{ - { - name: "Test_leapArray_BucketStartCheck_normal", - fields: fields{ - BucketLengthInMs: BucketLengthInMs, - sampleCount: SampleCount, - intervalInMs: IntervalInMs, - array: NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}), - mux: mutex{}, - }, - args: args{ - bg: new(leapArrayMock), - timeMillis: 1576296044907, - }, - want: 1576296044500, - }, - } - wwPtr := tests[0].fields.array.get(9) - wwPtr.BucketStart = 1576296044500 //start time of cycle 1576296040000 - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - la := &LeapArray{ - bucketLengthInMs: tt.fields.BucketLengthInMs, - sampleCount: tt.fields.sampleCount, - intervalInMs: tt.fields.intervalInMs, - array: tt.fields.array, - updateLock: tt.fields.mux, - } - got, err := la.currentBucketOfTime(tt.args.timeMillis, tt.args.bg) - if err != nil { - t.Errorf("LeapArray.currentBucketOfTime() error = %v\n", err) - return - } - if got.BucketStart != tt.want { - t.Errorf("BucketStart = %v, want %v", got.BucketStart, tt.want) - } - }) - } -} - -func Test_leapArray_currentBucketWithTime_normal(t *testing.T) { - type fields struct { - bucketLengthInMs uint32 - sampleCount uint32 - intervalInMs uint32 - array *AtomicBucketWrapArray - mux mutex + now := uint64(1596199310000) + la := &LeapArray{ + bucketLengthInMs: BucketLengthInMs, + sampleCount: SampleCount, + intervalInMs: IntervalInMs, + array: NewAtomicBucketWrapArrayWithTime(int(SampleCount), BucketLengthInMs, now, &leapArrayMock{}), + updateLock: mutex{}, } - type args struct { - bg BucketGenerator - timeMillis uint64 + got, err := la.currentBucketOfTime(now+801, new(leapArrayMock)) + if err != nil { + t.Errorf("LeapArray.currentBucketOfTime() error = %v\n", err) + return } - tests := []struct { - name string - fields fields - args args - want *BucketWrap - wantErr bool - }{ - { - name: "Test_leapArray_currentBucketWithTime_normal", - fields: fields{ - bucketLengthInMs: BucketLengthInMs, - sampleCount: SampleCount, - intervalInMs: IntervalInMs, - array: NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}), - mux: mutex{}, - }, - args: args{ - bg: new(leapArrayMock), - timeMillis: 1576296044907, - }, - want: nil, - wantErr: false, - }, + if got.BucketStart != now+500 { + t.Errorf("BucketStart = %v, want %v", got.BucketStart, now+500) } - - wwPtr := tests[0].fields.array.get(9) - wwPtr.BucketStart = 1576296044500 //start time of cycle 1576296040000 - tests[0].want = tests[0].fields.array.get(9) - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - la := &LeapArray{ - bucketLengthInMs: tt.fields.bucketLengthInMs, - sampleCount: tt.fields.sampleCount, - intervalInMs: tt.fields.intervalInMs, - array: tt.fields.array, - updateLock: tt.fields.mux, - } - got, err := la.currentBucketOfTime(tt.args.timeMillis, tt.args.bg) - if (err != nil) != tt.wantErr { - t.Errorf("LeapArray.currentBucketOfTime() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("LeapArray.currentBucketOfTime() = %v, want %v", got, tt.want) - } - }) + if !reflect.DeepEqual(got, la.array.get(1)) { + t.Errorf("LeapArray.currentBucketOfTime() = %v, want %v", got, la.array.get(1)) } } @@ -275,7 +173,7 @@ func Test_leapArray_valuesWithTime_normal(t *testing.T) { bucketLengthInMs: BucketLengthInMs, sampleCount: SampleCount, intervalInMs: IntervalInMs, - array: NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}), + array: NewAtomicBucketWrapArrayWithTime(int(SampleCount), BucketLengthInMs, uint64(1596199310000), &leapArrayMock{}), mux: mutex{}, }, args: args{ @@ -344,7 +242,7 @@ func Test_leapArray_isBucketDeprecated_normal(t *testing.T) { bucketLengthInMs: BucketLengthInMs, sampleCount: SampleCount, intervalInMs: IntervalInMs, - array: NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}), + array: NewAtomicBucketWrapArrayWithTime(int(SampleCount), BucketLengthInMs, uint64(1596199310000), &leapArrayMock{}), mux: mutex{}, }, args: args{