Skip to content

Commit

Permalink
Make the current standard time align to the associated BucketWrap's s…
Browse files Browse the repository at this point in the history
…tartTime
  • Loading branch information
louyuting committed Aug 4, 2020
1 parent c93bfd8 commit 0489375
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 191 deletions.
23 changes: 18 additions & 5 deletions core/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,12 @@ func newSlowRtCircuitBreakerWithStat(r *slowRtRule, stat *slowRequestLeapArray)
func newSlowRtCircuitBreaker(r *slowRtRule) *slowRtCircuitBreaker {
interval := r.StatIntervalMs
stat := &slowRequestLeapArray{}
stat.data = sbase.NewLeapArray(1, interval, stat)
leapArray, err := sbase.NewLeapArray(1, interval, stat)
if err != nil {
logger.Errorf("Fail to convert slowRtRule to slowRtCircuitBreaker, rule: %+v", r)
return nil
}
stat.data = leapArray

return newSlowRtCircuitBreakerWithStat(r, stat)
}
Expand Down Expand Up @@ -382,8 +387,12 @@ func newErrorRatioCircuitBreakerWithStat(r *errorRatioRule, stat *errorCounterLe
func newErrorRatioCircuitBreaker(r *errorRatioRule) *errorRatioCircuitBreaker {
interval := r.StatIntervalMs
stat := &errorCounterLeapArray{}
stat.data = sbase.NewLeapArray(1, interval, stat)

leapArray, err := sbase.NewLeapArray(1, interval, stat)
if err != nil {
logger.Errorf("Fail to convert errorRatioRule to errorRatioCircuitBreaker, rule: %+v", r)
return nil
}
stat.data = leapArray
return newErrorRatioCircuitBreakerWithStat(r, stat)
}

Expand Down Expand Up @@ -559,8 +568,12 @@ func newErrorCountCircuitBreakerWithStat(r *errorCountRule, stat *errorCounterLe
func newErrorCountCircuitBreaker(r *errorCountRule) *errorCountCircuitBreaker {
interval := r.StatIntervalMs
stat := &errorCounterLeapArray{}
stat.data = sbase.NewLeapArray(1, interval, stat)

leapArray, err := sbase.NewLeapArray(1, interval, stat)
if err != nil {
logger.Errorf("Fail to convert errorCountRule to errorCountCircuitBreaker, rule: %+v", r)
return nil
}
stat.data = leapArray
return newErrorCountCircuitBreakerWithStat(r, stat)
}

Expand Down
100 changes: 51 additions & 49 deletions core/stat/base/atomic_window_wrap_array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,52 +9,9 @@ import (
"time"

"github.com/alibaba/sentinel-golang/util"
"github.com/stretchr/testify/assert"
)

func Test_newAtomicBucketWrapArray_normal(t *testing.T) {
type args struct {
len int
bucketLengthInMs uint32
bg BucketGenerator
}
tests := []struct {
name string
args args
want *AtomicBucketWrapArray
}{
{
name: "Test_newAtomicBucketWrapArray_normal",
args: args{
len: int(SampleCount),
bucketLengthInMs: BucketLengthInMs,
bg: &leapArrayMock{},
},
want: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ret := NewAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg)
if ret == nil || uintptr(ret.base) == uintptr(0) || ret.length != tt.args.len || ret.data == nil || len(ret.data) == 0 {
t.Errorf("NewAtomicBucketWrapArray() %+v is illegal.\n", ret)
return
}
dataNil := false
for _, v := range ret.data {
if v == nil {
dataNil = true
break
}
}
if dataNil {
t.Error("NewAtomicBucketWrapArray exists nil BucketWrap.")
}

})
}
}

func Test_atomicBucketWrapArray_elementOffset(t *testing.T) {
type args struct {
len int
Expand All @@ -80,7 +37,8 @@ func Test_atomicBucketWrapArray_elementOffset(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
aa := NewAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg)
now := uint64(1596199310000)
aa := NewAtomicBucketWrapArrayWithTime(tt.args.len, tt.args.bucketLengthInMs, now, tt.args.bg)
if got := uintptr(aa.elementOffset(tt.args.idx)) - uintptr(aa.base); got != tt.want {
t.Errorf("AtomicBucketWrapArray.elementOffset() = %v, want %v \n", got, tt.want)
}
Expand Down Expand Up @@ -113,7 +71,8 @@ func Test_atomicBucketWrapArray_get(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
aa := NewAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg)
now := uint64(1596199310000)
aa := NewAtomicBucketWrapArrayWithTime(tt.args.len, tt.args.bucketLengthInMs, now, tt.args.bg)
tt.want = aa.data[9]
if got := aa.get(tt.args.idx); !reflect.DeepEqual(got, tt.want) {
t.Errorf("AtomicBucketWrapArray.get() = %v, want %v", got, tt.want)
Expand Down Expand Up @@ -147,7 +106,8 @@ func Test_atomicBucketWrapArray_compareAndSet(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
aa := NewAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg)
now := uint64(1596199310000)
aa := NewAtomicBucketWrapArrayWithTime(tt.args.len, tt.args.bucketLengthInMs, now, tt.args.bg)
update := &BucketWrap{
BucketStart: 8888888888888,
Value: atomic.Value{},
Expand Down Expand Up @@ -183,7 +143,8 @@ func taskGet(wg *sync.WaitGroup, at *AtomicBucketWrapArray, t *testing.T) {
}

func Test_atomicBucketWrapArray_Concurrency_Get(t *testing.T) {
ret := NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{})
now := uint64(1596199310000)
ret := NewAtomicBucketWrapArrayWithTime(int(SampleCount), BucketLengthInMs, now, &leapArrayMock{})
for _, ww := range ret.data {
c := new(int64)
*c = 0
Expand Down Expand Up @@ -230,7 +191,8 @@ func taskSet(wg *sync.WaitGroup, at *AtomicBucketWrapArray, t *testing.T) {
}

func Test_atomicBucketWrapArray_Concurrency_Set(t *testing.T) {
ret := NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{})
now := uint64(1596199310000)
ret := NewAtomicBucketWrapArrayWithTime(int(SampleCount), BucketLengthInMs, now, &leapArrayMock{})
for _, ww := range ret.data {
c := new(int64)
*c = 0
Expand All @@ -253,3 +215,43 @@ func Test_atomicBucketWrapArray_Concurrency_Set(t *testing.T) {
}
t.Log("all done")
}

func TestNewAtomicBucketWrapArrayWithTime(t *testing.T) {
t.Run("TestNewAtomicBucketWrapArrayWithTime_normal", func(t *testing.T) {
now := uint64(1596199317001)
arrayStartTime := uint64(1596199316000)
idx := int((now - arrayStartTime) / 200)
a := NewAtomicBucketWrapArrayWithTime(10, 200, now, &leapArrayMock{})

targetTime := arrayStartTime + uint64(idx*200)
for i := idx; i < 10; i++ {
b := a.get(i)
assert.True(t, b.BucketStart == targetTime, "Check start failed")
targetTime += 200
}
for i := 0; i < idx; i++ {
b := a.get(i)
assert.True(t, b.BucketStart == targetTime, "Check start failed")
targetTime += 200
}
})

t.Run("TestNewAtomicBucketWrapArrayWithTime_edge1", func(t *testing.T) {
now := uint64(1596199316000)
arrayStartTime := uint64(1596199316000)
idx := int((now - arrayStartTime) / 200)
a := NewAtomicBucketWrapArrayWithTime(10, 200, now, &leapArrayMock{})

targetTime := arrayStartTime + uint64(idx*200)
for i := idx; i < 10; i++ {
b := a.get(i)
assert.True(t, b.BucketStart == targetTime, "Check start failed")
targetTime += 200
}
for i := 0; i < idx; i++ {
b := a.get(i)
assert.True(t, b.BucketStart == targetTime, "Check start failed")
targetTime += 200
}
})
}
2 changes: 1 addition & 1 deletion core/stat/base/bucket_leap_array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func coroutineTask(wg *sync.WaitGroup, slidingWindow *BucketLeapArray, now uint6

func TestBucketLeapArray_resetBucketTo(t *testing.T) {
bla := NewBucketLeapArray(SampleCount, IntervalInMs)
idx := 6
idx := 19
oldBucketWrap := bla.data.array.get(idx)
oldBucket := oldBucketWrap.Value.Load()
if oldBucket == nil {
Expand Down
55 changes: 38 additions & 17 deletions core/stat/base/leap_array.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,33 @@ type AtomicBucketWrapArray struct {
data []*BucketWrap
}

// New AtomicBucketWrapArray with initializing field data
// Default, automatically initialize each BucketWrap
// len: length of array
// bucketLengthInMs: bucket length of BucketWrap
// generator: generator to generate bucket
func NewAtomicBucketWrapArray(len int, bucketLengthInMs uint32, generator BucketGenerator) *AtomicBucketWrapArray {
func NewAtomicBucketWrapArrayWithTime(len int, bucketLengthInMs uint32, now uint64, generator BucketGenerator) *AtomicBucketWrapArray {
ret := &AtomicBucketWrapArray{
length: len,
data: make([]*BucketWrap, len),
}

// automatically initialize each BucketWrap
// tail BucketWrap of data is initialized with current time
startTime := calculateStartTime(util.CurrentTimeMillis(), bucketLengthInMs)
for i := len - 1; i >= 0; i-- {
timeId := now / uint64(bucketLengthInMs)
idx := int(timeId) % len
startTime := calculateStartTime(now, bucketLengthInMs)

for i := idx; i <= len-1; i++ {
ww := &BucketWrap{
BucketStart: startTime,
Value: atomic.Value{},
}
ww.Value.Store(generator.NewEmptyBucket())
ret.data[i] = ww
startTime -= uint64(bucketLengthInMs)
startTime += uint64(bucketLengthInMs)
}
for i := 0; i < idx; i++ {
ww := &BucketWrap{
BucketStart: startTime,
Value: atomic.Value{},
}
ww.Value.Store(generator.NewEmptyBucket())
ret.data[i] = ww
startTime += uint64(bucketLengthInMs)
}

// calculate base address for real data array
Expand All @@ -79,6 +84,15 @@ func NewAtomicBucketWrapArray(len int, bucketLengthInMs uint32, generator Bucket
return ret
}

// New AtomicBucketWrapArray with initializing field data
// Default, automatically initialize each BucketWrap
// len: length of array
// bucketLengthInMs: bucket length of BucketWrap
// generator: generator to generate bucket
func NewAtomicBucketWrapArray(len int, bucketLengthInMs uint32, generator BucketGenerator) *AtomicBucketWrapArray {
return NewAtomicBucketWrapArrayWithTime(len, bucketLengthInMs, util.CurrentTimeMillis(), generator)
}

func (aa *AtomicBucketWrapArray) elementOffset(idx int) unsafe.Pointer {
if idx >= aa.length || idx < 0 {
panic(fmt.Sprintf("The index (%d) is out of bounds, length is %d.", idx, aa.length))
Expand All @@ -103,7 +117,14 @@ func (aa *AtomicBucketWrapArray) compareAndSet(idx int, except, update *BucketWr
// The BucketWrap leap array,
// sampleCount represent the number of BucketWrap
// intervalInMs represent the interval of LeapArray.
// For example, bucketLengthInMs is 500ms, intervalInMs is 1min, so sampleCount is 120.
// For example, bucketLengthInMs is 200ms, intervalInMs is 1000ms, so sampleCount is 5.
// Give a diagram to illustrate
// Suppose current time is 888, bucketLengthInMs is 200ms, intervalInMs is 1000ms, LeapArray will build the below windows
// B0 B1 B2 B3 B4
// |_______|_______|_______|_______|_______|
// 800 1000
// ^
// time=888
type LeapArray struct {
bucketLengthInMs uint32
sampleCount uint32
Expand All @@ -113,20 +134,20 @@ type LeapArray struct {
updateLock mutex
}

func NewLeapArray(sampleCount uint32, intervalInMs uint32, generator BucketGenerator) *LeapArray {
func NewLeapArray(sampleCount uint32, intervalInMs uint32, generator BucketGenerator) (*LeapArray, error) {
if intervalInMs%sampleCount != 0 {
panic(fmt.Sprintf("Invalid parameters, intervalInMs is %d, sampleCount is %d.", intervalInMs, sampleCount))
return nil, errors.Errorf("Invalid parameters, intervalInMs is %d, sampleCount is %d", intervalInMs, sampleCount)
}
if generator == nil {
panic("Invalid parameters, generator is nil.")
return nil, errors.Errorf("Invalid parameters, generator is nil")
}
bucketLengthInMs := intervalInMs / sampleCount
return &LeapArray{
bucketLengthInMs: bucketLengthInMs,
sampleCount: sampleCount,
intervalInMs: intervalInMs,
array: NewAtomicBucketWrapArray(int(sampleCount), intervalInMs, generator),
}
array: NewAtomicBucketWrapArray(int(sampleCount), bucketLengthInMs, generator),
}, nil
}

func (la *LeapArray) CurrentBucket(bg BucketGenerator) (*BucketWrap, error) {
Expand Down
Loading

0 comments on commit 0489375

Please sign in to comment.