From 89c17ff5760697d3072b60b8cb13bf9f98e3408f Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Wed, 2 Nov 2022 10:27:58 +0100 Subject: [PATCH] modelindexer: disable scale up when 429 > 1% (#9463) Disables the scale up actions when the 429 response rate exceeds 1% of the total response rate. Additionally, scale down respecting the scale down parameters when the rate is breached. Signed-off-by: Marc Lopez Rubio --- changelogs/head.asciidoc | 1 + internal/model/modelindexer/indexer.go | 36 ++++++--- internal/model/modelindexer/indexer_test.go | 87 +++++++++++++++++++-- 3 files changed, 109 insertions(+), 15 deletions(-) diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 80d5723a8e0..32effb314ee 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -32,3 +32,4 @@ Set error.id for OpenTelemetry exception span events {pull}9372[9372] - Improve Elasticsearch output performance, particularly when compression is enabled (default) {pull}9318[9318] - Java attacher support for macOS {pull}9413[9413] - Improve Elasticsearch output performance in instances with more than 6 cores {pull}9393[9393] +- Disallow auto-scaling of active indexers when Elasticsarch 429 response rate exceeds 1% of total requests issued {pull}9463[9463] diff --git a/internal/model/modelindexer/indexer.go b/internal/model/modelindexer/indexer.go index 4c64e80d38f..c16d02c5f34 100644 --- a/internal/model/modelindexer/indexer.go +++ b/internal/model/modelindexer/indexer.go @@ -594,15 +594,27 @@ func (i *Indexer) maybeScaleDown(now time.Time, info scalingInfo, timedFlush *ui } info = i.scalingInformation() // refresh scaling info if CAS failed. } + if info.withinCoolDown(i.config.Scaling.ScaleDown.CoolDown, now) { + return false + } + // If more than 1% of the requests result in 429, scale down the current + // active indexer. + if i.indexFailureRate() >= 0.01 { + if new := info.ScaleDown(now); i.scalingInfo.CompareAndSwap(info, new) { + i.logger.Infof( + "elasticsearch 429 response rate exceeded 1%%, scaling down to: %d", + new.activeIndexers, + ) + return true + } + return false + } if *timedFlush < i.config.Scaling.ScaleDown.Threshold { return false } // Reset timedFlush after it has exceeded the threshold // it avoids unnecessary precociousness to scale down. *timedFlush = 0 - if info.withinCoolDown(i.config.Scaling.ScaleDown.CoolDown, now) { - return false - } if new := info.ScaleDown(now); i.scalingInfo.CompareAndSwap(info, new) { i.logger.Infof("timed flush threshold exceeded, scaling down to: %d", new) return true @@ -624,6 +636,10 @@ func (i *Indexer) maybeScaleUp(now time.Time, info scalingInfo, fullFlush *uint) // Reset fullFlush after it has exceeded the threshold // it avoids unnecessary precociousness to scale up. *fullFlush = 0 + // If more than 1% of the requests result in 429, do not scale up. + if i.indexFailureRate() >= 0.01 { + return false + } if info.withinCoolDown(i.config.Scaling.ScaleUp.CoolDown, now) { return false } @@ -642,6 +658,12 @@ func (i *Indexer) scalingInformation() scalingInfo { return i.scalingInfo.Load().(scalingInfo) } +// indexFailureRate returns the decimal percentage of 429 / total events. +func (i *Indexer) indexFailureRate() float64 { + return float64(atomic.LoadInt64(&i.tooManyRequests)) / + float64(atomic.LoadInt64(&i.eventsAdded)) +} + // activeLimit returns the value of GOMAXPROCS / 4. Which should limit the // maximum number of active indexers to 25% of GOMAXPROCS. // NOTE: There is also a sweet spot between Config.MaxRequests and the number @@ -695,15 +717,11 @@ type scalingInfo struct { } func (s scalingInfo) ScaleDown(t time.Time) scalingInfo { - return scalingInfo{ - lastAction: t, activeIndexers: s.activeIndexers - 1, - } + return scalingInfo{lastAction: t, activeIndexers: s.activeIndexers - 1} } func (s scalingInfo) ScaleUp(t time.Time) scalingInfo { - return scalingInfo{ - lastAction: t, activeIndexers: s.activeIndexers + 1, - } + return scalingInfo{lastAction: t, activeIndexers: s.activeIndexers + 1} } func (s scalingInfo) withinCoolDown(cooldown time.Duration, now time.Time) bool { diff --git a/internal/model/modelindexer/indexer_test.go b/internal/model/modelindexer/indexer_test.go index f8d0e40a11c..5cda8653452 100644 --- a/internal/model/modelindexer/indexer_test.go +++ b/internal/model/modelindexer/indexer_test.go @@ -27,6 +27,7 @@ import ( "net/http" "runtime" "strings" + "sync" "sync/atomic" "testing" "time" @@ -747,6 +748,16 @@ func TestModelIndexerScaling(t *testing.T) { assert.Greater(t, stats.IndexersDestroyed, int64(0), "No downscales took place: %+v", stats) assert.Equal(t, stats.IndexersActive, int64(n), "%+v", stats) } + waitForBulkRequests := func(t *testing.T, indexer *modelindexer.Indexer, n int64) { + timeout := time.After(time.Second) + for indexer.Stats().BulkRequests < n { + select { + case <-time.After(time.Millisecond): + case <-timeout: + t.Fatalf("timed out while waiting for events to be indexed: %+v", indexer.Stats()) + } + } + } t.Run("DownscaleIdle", func(t *testing.T) { // Override the default GOMAXPROCS, ensuring the active indexers can scale up. setGOMAXPROCS(t, 12) @@ -806,9 +817,8 @@ func TestModelIndexerScaling(t *testing.T) { // the gomaxprocs change. waitForScaleDown(t, indexer, 1) // Wait for all the events to be indexed. - for indexer.Stats().Indexed < events { - <-time.After(time.Millisecond) - } + waitForBulkRequests(t, indexer, events) + stats := indexer.Stats() stats.BytesTotal = 0 assert.Equal(t, modelindexer.Stats{ @@ -844,9 +854,8 @@ func TestModelIndexerScaling(t *testing.T) { sendEvents(t, indexer, int(events)) waitForScaleUp(t, indexer, 2) // Wait for all the events to be indexed. - for indexer.Stats().Indexed < events { - <-time.After(time.Millisecond) - } + waitForBulkRequests(t, indexer, events) + assert.Equal(t, int64(2), indexer.Stats().IndexersActive) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -864,6 +873,72 @@ func TestModelIndexerScaling(t *testing.T) { IndexersDestroyed: 0, }, stats) }) + t.Run("Downscale429Rate", func(t *testing.T) { + // Override the default GOMAXPROCS, ensuring the active indexers can scale up. + setGOMAXPROCS(t, 12) + var mu sync.RWMutex + var tooMany bool // must be accessed with the mutex held. + client := modelindexertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + _, result := modelindexertest.DecodeBulkRequest(r) + mu.RLock() + tooManyResp := tooMany + mu.RUnlock() + if tooManyResp { + result.HasErrors = true + for i := 0; i < len(result.Items); i++ { + item := result.Items[i] + resp := item["create"] + resp.Status = http.StatusTooManyRequests + item["create"] = resp + } + } + json.NewEncoder(w).Encode(result) + }) + indexer, err := modelindexer.New(client, modelindexer.Config{ + FlushInterval: time.Millisecond, + FlushBytes: 1, + Scaling: modelindexer.ScalingConfig{ + ScaleUp: modelindexer.ScaleActionConfig{ + Threshold: 5, CoolDown: 1, + }, + ScaleDown: modelindexer.ScaleActionConfig{ + Threshold: 100, CoolDown: 100 * time.Millisecond, + }, + IdleInterval: 100 * time.Millisecond, + }, + }) + require.NoError(t, err) + t.Cleanup(func() { indexer.Close(context.Background()) }) + events := int64(20) + sendEvents(t, indexer, int(events)) + waitForScaleUp(t, indexer, 3) + waitForBulkRequests(t, indexer, events) + + // Make the mocked elasticsaerch return 429 responses and wait for the + // active indexers to be scaled down to the minimum. + mu.Lock() + tooMany = true + mu.Unlock() + events += 5 + sendEvents(t, indexer, 5) + waitForScaleDown(t, indexer, 1) + waitForBulkRequests(t, indexer, events) + + // index 600 events and ensure that scale ups happen to the maximum after + // the threshold is exceeded. + mu.Lock() + tooMany = false + mu.Unlock() + events += 600 + sendEvents(t, indexer, 600) + waitForScaleUp(t, indexer, 3) + waitForBulkRequests(t, indexer, events) + + stats := indexer.Stats() + assert.Equal(t, int64(3), stats.IndexersActive) + assert.Equal(t, int64(4), stats.IndexersCreated) + assert.Equal(t, int64(2), stats.IndexersDestroyed) + }) } func setGOMAXPROCS(t *testing.T, new int) {