Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modelindexer: disable scale up when 429 > 1% #9463

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ Set error.id for OpenTelemetry exception span events {pull}9372[9372]
- Improve Elasticsearch output performance, particularly when compression is enabled (default) {pull}9318[9318]
- Java attacher support for macOS {pull}9413[9413]
- Improve Elasticsearch output performance in instances with more than 6 cores {pull}9393[9393]
- Disallow auto-scaling of active indexers when Elasticsarch 429 response rate exceeds 1% of total requests issued {pull}9463[9463]
36 changes: 27 additions & 9 deletions internal/model/modelindexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,15 +594,27 @@ func (i *Indexer) maybeScaleDown(now time.Time, info scalingInfo, timedFlush *ui
}
info = i.scalingInformation() // refresh scaling info if CAS failed.
}
if info.withinCoolDown(i.config.Scaling.ScaleDown.CoolDown, now) {
return false
}
// If more than 1% of the requests result in 429, scale down the current
// active indexer.
if i.indexFailureRate() >= 0.01 {
if new := info.ScaleDown(now); i.scalingInfo.CompareAndSwap(info, new) {
i.logger.Infof(
"elasticsearch 429 response rate exceeded 1%%, scaling down to: %d",
new.activeIndexers,
)
return true
}
return false
}
if *timedFlush < i.config.Scaling.ScaleDown.Threshold {
return false
}
// Reset timedFlush after it has exceeded the threshold
// it avoids unnecessary precociousness to scale down.
*timedFlush = 0
if info.withinCoolDown(i.config.Scaling.ScaleDown.CoolDown, now) {
return false
}
if new := info.ScaleDown(now); i.scalingInfo.CompareAndSwap(info, new) {
i.logger.Infof("timed flush threshold exceeded, scaling down to: %d", new)
return true
Expand All @@ -624,6 +636,10 @@ func (i *Indexer) maybeScaleUp(now time.Time, info scalingInfo, fullFlush *uint)
// Reset fullFlush after it has exceeded the threshold
// it avoids unnecessary precociousness to scale up.
*fullFlush = 0
// If more than 1% of the requests result in 429, do not scale up.
if i.indexFailureRate() >= 0.01 {
return false
}
if info.withinCoolDown(i.config.Scaling.ScaleUp.CoolDown, now) {
return false
}
Expand All @@ -642,6 +658,12 @@ func (i *Indexer) scalingInformation() scalingInfo {
return i.scalingInfo.Load().(scalingInfo)
}

// indexFailureRate returns the decimal percentage of 429 / total events.
func (i *Indexer) indexFailureRate() float64 {
return float64(atomic.LoadInt64(&i.tooManyRequests)) /
float64(atomic.LoadInt64(&i.eventsAdded))
}

// activeLimit returns the value of GOMAXPROCS / 4. Which should limit the
// maximum number of active indexers to 25% of GOMAXPROCS.
// NOTE: There is also a sweet spot between Config.MaxRequests and the number
Expand Down Expand Up @@ -695,15 +717,11 @@ type scalingInfo struct {
}

func (s scalingInfo) ScaleDown(t time.Time) scalingInfo {
return scalingInfo{
lastAction: t, activeIndexers: s.activeIndexers - 1,
}
return scalingInfo{lastAction: t, activeIndexers: s.activeIndexers - 1}
}

func (s scalingInfo) ScaleUp(t time.Time) scalingInfo {
return scalingInfo{
lastAction: t, activeIndexers: s.activeIndexers + 1,
}
return scalingInfo{lastAction: t, activeIndexers: s.activeIndexers + 1}
}

func (s scalingInfo) withinCoolDown(cooldown time.Duration, now time.Time) bool {
Expand Down
87 changes: 81 additions & 6 deletions internal/model/modelindexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net/http"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -747,6 +748,16 @@ func TestModelIndexerScaling(t *testing.T) {
assert.Greater(t, stats.IndexersDestroyed, int64(0), "No downscales took place: %+v", stats)
assert.Equal(t, stats.IndexersActive, int64(n), "%+v", stats)
}
waitForBulkRequests := func(t *testing.T, indexer *modelindexer.Indexer, n int64) {
timeout := time.After(time.Second)
for indexer.Stats().BulkRequests < n {
select {
case <-time.After(time.Millisecond):
case <-timeout:
t.Fatalf("timed out while waiting for events to be indexed: %+v", indexer.Stats())
}
}
}
t.Run("DownscaleIdle", func(t *testing.T) {
// Override the default GOMAXPROCS, ensuring the active indexers can scale up.
setGOMAXPROCS(t, 12)
Expand Down Expand Up @@ -806,9 +817,8 @@ func TestModelIndexerScaling(t *testing.T) {
// the gomaxprocs change.
waitForScaleDown(t, indexer, 1)
// Wait for all the events to be indexed.
for indexer.Stats().Indexed < events {
<-time.After(time.Millisecond)
}
waitForBulkRequests(t, indexer, events)

stats := indexer.Stats()
stats.BytesTotal = 0
assert.Equal(t, modelindexer.Stats{
Expand Down Expand Up @@ -844,9 +854,8 @@ func TestModelIndexerScaling(t *testing.T) {
sendEvents(t, indexer, int(events))
waitForScaleUp(t, indexer, 2)
// Wait for all the events to be indexed.
for indexer.Stats().Indexed < events {
<-time.After(time.Millisecond)
}
waitForBulkRequests(t, indexer, events)

assert.Equal(t, int64(2), indexer.Stats().IndexersActive)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -864,6 +873,72 @@ func TestModelIndexerScaling(t *testing.T) {
IndexersDestroyed: 0,
}, stats)
})
t.Run("Downscale429Rate", func(t *testing.T) {
// Override the default GOMAXPROCS, ensuring the active indexers can scale up.
setGOMAXPROCS(t, 12)
var mu sync.RWMutex
var tooMany bool // must be accessed with the mutex held.
client := modelindexertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
_, result := modelindexertest.DecodeBulkRequest(r)
mu.RLock()
tooManyResp := tooMany
mu.RUnlock()
if tooManyResp {
result.HasErrors = true
for i := 0; i < len(result.Items); i++ {
item := result.Items[i]
resp := item["create"]
resp.Status = http.StatusTooManyRequests
item["create"] = resp
}
}
json.NewEncoder(w).Encode(result)
})
indexer, err := modelindexer.New(client, modelindexer.Config{
FlushInterval: time.Millisecond,
FlushBytes: 1,
Scaling: modelindexer.ScalingConfig{
ScaleUp: modelindexer.ScaleActionConfig{
Threshold: 5, CoolDown: 1,
},
ScaleDown: modelindexer.ScaleActionConfig{
Threshold: 100, CoolDown: 100 * time.Millisecond,
},
IdleInterval: 100 * time.Millisecond,
},
})
require.NoError(t, err)
t.Cleanup(func() { indexer.Close(context.Background()) })
events := int64(20)
sendEvents(t, indexer, int(events))
waitForScaleUp(t, indexer, 3)
waitForBulkRequests(t, indexer, events)

// Make the mocked elasticsaerch return 429 responses and wait for the
// active indexers to be scaled down to the minimum.
mu.Lock()
tooMany = true
mu.Unlock()
events += 5
sendEvents(t, indexer, 5)
waitForScaleDown(t, indexer, 1)
waitForBulkRequests(t, indexer, events)

// index 600 events and ensure that scale ups happen to the maximum after
// the threshold is exceeded.
mu.Lock()
tooMany = false
mu.Unlock()
events += 600
sendEvents(t, indexer, 600)
waitForScaleUp(t, indexer, 3)
waitForBulkRequests(t, indexer, events)

stats := indexer.Stats()
assert.Equal(t, int64(3), stats.IndexersActive)
assert.Equal(t, int64(4), stats.IndexersCreated)
assert.Equal(t, int64(2), stats.IndexersDestroyed)
})
}

func setGOMAXPROCS(t *testing.T, new int) {
Expand Down