Skip to content

Commit

Permalink
modelindexer: disable scale up when 429 > 1%
Browse files Browse the repository at this point in the history
Disables the scale up actions when the 429 response rate exceeds 1% of
the total response rate. Additionally, scale down respecting the scale
down parameters when the rate is breached.

Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop committed Oct 28, 2022
1 parent bd3f058 commit b6e9004
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 15 deletions.
36 changes: 27 additions & 9 deletions internal/model/modelindexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,15 +594,27 @@ func (i *Indexer) maybeScaleDown(now time.Time, info scalingInfo, timedFlush *ui
}
info = i.scalingInformation() // refresh scaling info if CAS failed.
}
if info.withinCoolDown(i.config.Scaling.ScaleDown.CoolDown, now) {
return false
}
// If more than 1% of the requests result in 429, scale down the current
// active indexer.
if i.indexFailureRate() >= 0.01 {
if new := info.ScaleDown(now); i.scalingInfo.CompareAndSwap(info, new) {
i.logger.Infof(
"elasticsearch 429 response rate exceeded 1%%, scaling down to: %d",
new.activeIndexers,
)
return true
}
return false
}
if *timedFlush < i.config.Scaling.ScaleDown.Threshold {
return false
}
// Reset timedFlush after it has exceeded the threshold
// it avoids unnecessary precociousness to scale down.
*timedFlush = 0
if info.withinCoolDown(i.config.Scaling.ScaleDown.CoolDown, now) {
return false
}
if new := info.ScaleDown(now); i.scalingInfo.CompareAndSwap(info, new) {
i.logger.Infof("timed flush threshold exceeded, scaling down to: %d", new)
return true
Expand All @@ -624,6 +636,10 @@ func (i *Indexer) maybeScaleUp(now time.Time, info scalingInfo, fullFlush *uint)
// Reset fullFlush after it has exceeded the threshold
// it avoids unnecessary precociousness to scale up.
*fullFlush = 0
// If more than 1% of the requests result in 429, do not scale up.
if i.indexFailureRate() >= 0.01 {
return false
}
if info.withinCoolDown(i.config.Scaling.ScaleUp.CoolDown, now) {
return false
}
Expand All @@ -642,6 +658,12 @@ func (i *Indexer) scalingInformation() scalingInfo {
return i.scalingInfo.Load().(scalingInfo)
}

// indexFailureRate returns the decimal percentage of 429 / total events.
func (i *Indexer) indexFailureRate() float64 {
return float64(atomic.LoadInt64(&i.tooManyRequests)) /
float64(atomic.LoadInt64(&i.eventsAdded))
}

// activeLimit returns the value of GOMAXPROCS / 4. Which should limit the
// maximum number of active indexers to 25% of GOMAXPROCS.
// NOTE: There is also a sweet spot between Config.MaxRequests and the number
Expand Down Expand Up @@ -695,15 +717,11 @@ type scalingInfo struct {
}

func (s scalingInfo) ScaleDown(t time.Time) scalingInfo {
return scalingInfo{
lastAction: t, activeIndexers: s.activeIndexers - 1,
}
return scalingInfo{lastAction: t, activeIndexers: s.activeIndexers - 1}
}

func (s scalingInfo) ScaleUp(t time.Time) scalingInfo {
return scalingInfo{
lastAction: t, activeIndexers: s.activeIndexers + 1,
}
return scalingInfo{lastAction: t, activeIndexers: s.activeIndexers + 1}
}

func (s scalingInfo) withinCoolDown(cooldown time.Duration, now time.Time) bool {
Expand Down
88 changes: 82 additions & 6 deletions internal/model/modelindexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net/http"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -806,9 +807,8 @@ func TestModelIndexerScaling(t *testing.T) {
// the gomaxprocs change.
waitForScaleDown(t, indexer, 1)
// Wait for all the events to be indexed.
for indexer.Stats().Indexed < events {
<-time.After(time.Millisecond)
}
waitForBulkRequests(t, indexer, events)

stats := indexer.Stats()
stats.BytesTotal = 0
assert.Equal(t, modelindexer.Stats{
Expand Down Expand Up @@ -844,9 +844,8 @@ func TestModelIndexerScaling(t *testing.T) {
sendEvents(t, indexer, int(events))
waitForScaleUp(t, indexer, 2)
// Wait for all the events to be indexed.
for indexer.Stats().Indexed < events {
<-time.After(time.Millisecond)
}
waitForBulkRequests(t, indexer, events)

assert.Equal(t, int64(2), indexer.Stats().IndexersActive)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -864,6 +863,83 @@ func TestModelIndexerScaling(t *testing.T) {
IndexersDestroyed: 0,
}, stats)
})
t.Run("Downscale429Rate", func(t *testing.T) {
// Override the default GOMAXPROCS, ensuring the active indexers can scale up.
setGOMAXPROCS(t, 12)
var mu sync.RWMutex
var tooMany bool // must be accessed with the mutex held.
client := modelindexertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
_, result := modelindexertest.DecodeBulkRequest(r)
mu.RLock()
tooManyResp := tooMany
mu.RUnlock()
if tooManyResp {
result.HasErrors = true
for i := 0; i < len(result.Items); i++ {
item := result.Items[i]
resp := item["create"]
resp.Status = http.StatusTooManyRequests
item["create"] = resp
}
}
json.NewEncoder(w).Encode(result)
})
indexer, err := modelindexer.New(client, modelindexer.Config{
FlushInterval: time.Millisecond,
FlushBytes: 1,
Scaling: modelindexer.ScalingConfig{
ScaleUp: modelindexer.ScaleActionConfig{
Threshold: 5, CoolDown: 1,
},
ScaleDown: modelindexer.ScaleActionConfig{
Threshold: 100, CoolDown: 100 * time.Millisecond,
},
IdleInterval: 100 * time.Millisecond,
},
})
require.NoError(t, err)
t.Cleanup(func() { indexer.Close(context.Background()) })
events := int64(20)
sendEvents(t, indexer, int(events))
waitForScaleUp(t, indexer, 3)
waitForBulkRequests(t, indexer, events)

// Make the mocked elasticsaerch return 429 responses and wait for the
// active indexers to be scaled down to the minimum.
mu.Lock()
tooMany = true
mu.Unlock()
events += 5
sendEvents(t, indexer, 5)
waitForScaleDown(t, indexer, 1)
waitForBulkRequests(t, indexer, events)

// index 600 events and ensure that scale ups happen to the maximum after
// the threshold is exceeded.
mu.Lock()
tooMany = false
mu.Unlock()
events += 600
sendEvents(t, indexer, 600)
waitForScaleUp(t, indexer, 3)
waitForBulkRequests(t, indexer, events)

stats := indexer.Stats()
assert.Equal(t, int64(3), stats.IndexersActive)
assert.Equal(t, int64(4), stats.IndexersCreated)
assert.Equal(t, int64(2), stats.IndexersDestroyed)
})
}

func waitForBulkRequests(t *testing.T, indexer *modelindexer.Indexer, n int64) {
timeout := time.After(time.Second)
for indexer.Stats().BulkRequests < n {
select {
case <-time.After(time.Millisecond):
case <-timeout:
t.Fatalf("timed out while waiting for events to be indexed: %+v", indexer.Stats())
}
}
}

func setGOMAXPROCS(t *testing.T, new int) {
Expand Down

0 comments on commit b6e9004

Please sign in to comment.