Skip to content

Commit

Permalink
Fix concurrency ignore batchCount bug (alibaba#556)
Browse files Browse the repository at this point in the history
  • Loading branch information
吴宣辰 committed Jan 31, 2024
1 parent 0807185 commit a4d1539
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 8 deletions.
4 changes: 2 additions & 2 deletions core/base/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func (ws *nopWriteStat) AddCount(_ MetricEvent, _ int64) {
// ConcurrencyStat provides read/update operation for concurrency statistics.
type ConcurrencyStat interface {
CurrentConcurrency() int32
IncreaseConcurrency()
DecreaseConcurrency()
IncreaseConcurrency(int32)
DecreaseConcurrency(int32)
}

// StatNode holds real-time statistics for resources.
Expand Down
8 changes: 4 additions & 4 deletions core/stat/base_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ func (n *BaseStatNode) CurrentConcurrency() int32 {
return atomic.LoadInt32(&(n.concurrency))
}

func (n *BaseStatNode) IncreaseConcurrency() {
n.UpdateConcurrency(atomic.AddInt32(&(n.concurrency), 1))
func (n *BaseStatNode) IncreaseConcurrency(count int32) {
n.UpdateConcurrency(atomic.AddInt32(&(n.concurrency), count))
}

func (n *BaseStatNode) DecreaseConcurrency() {
atomic.AddInt32(&(n.concurrency), -1)
func (n *BaseStatNode) DecreaseConcurrency(count int32) {
atomic.AddInt32(&(n.concurrency), -count)
}

func (n *BaseStatNode) GenerateReadStat(sampleCount uint32, intervalInMs uint32) (base.ReadStat, error) {
Expand Down
4 changes: 2 additions & 2 deletions core/stat/stat_slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *Slot) recordPassFor(sn base.StatNode, count uint32) {
if sn == nil {
return
}
sn.IncreaseConcurrency()
sn.IncreaseConcurrency(int32(count))
sn.AddCount(base.MetricEventPass, int64(count))
}

Expand All @@ -97,5 +97,5 @@ func (s *Slot) recordCompleteFor(sn base.StatNode, count uint32, rt uint64, err
}
sn.AddCount(base.MetricEventRt, int64(rt))
sn.AddCount(base.MetricEventComplete, int64(count))
sn.DecreaseConcurrency()
sn.DecreaseConcurrency(int32(count))
}
50 changes: 50 additions & 0 deletions tests/api/api_entry_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package api

import (
"github.com/alibaba/sentinel-golang/core/isolation"
"log"
"os"
"runtime/debug"
"testing"
"time"
Expand Down Expand Up @@ -127,3 +129,51 @@ func TestAdaptiveFlowControl2(t *testing.T) {
_, blockError := api.Entry(rs, api.WithTrafficType(base.Inbound))
assert.Nil(t, blockError)
}

func assertIsPass(t *testing.T, b *base.BlockError) {
assert.True(t, b == nil)
}
func assertIsBlock(t *testing.T, b *base.BlockError) {
assert.True(t, b != nil)
}

func Test_Isolation(t *testing.T) {
initSentinel()

r1 := &isolation.Rule{
Resource: "abc",
MetricType: isolation.Concurrency,
Threshold: 12,
}
_, err := isolation.LoadRules([]*isolation.Rule{r1})
if err != nil {
logging.Error(err, "fail")
os.Exit(1)
}

entries := make([]*base.SentinelEntry, 0)

// Threshold = 12, BatchCount = 1, Should Pass 12 Entry
for i := 0; i < 12; i++ {
e, b := api.Entry("abc", api.WithBatchCount(1))
assertIsPass(t, b)
entries = append(entries, e)
}
_, b := api.Entry("abc", api.WithBatchCount(1))
assertIsBlock(t, b)
for _, e := range entries {
e.Exit()
}

// Threshold = 12, BatchCount = 2, Should Pass 6 Entry
for i := 0; i < 6; i++ {
e, b := api.Entry("abc", api.WithBatchCount(2))
assertIsPass(t, b)
entries = append(entries, e)
}
_, b = api.Entry("abc", api.WithBatchCount(2))
assertIsBlock(t, b)
for _, e := range entries {
e.Exit()
}
}

0 comments on commit a4d1539

Please sign in to comment.