diff --git a/storage/dynamic_delay.go b/storage/dynamic_delay.go new file mode 100644 index 000000000000..5d4c42fb82bf --- /dev/null +++ b/storage/dynamic_delay.go @@ -0,0 +1,154 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "fmt" + "math" + "sync" + "time" +) + +// dynamicDelay dynamically calculates the delay at a fixed percentile, based on +// delay samples. +// +// dynamicDelay is goroutine-safe. +type dynamicDelay struct { + increaseFactor float64 + decreaseFactor float64 + minDelay time.Duration + maxDelay time.Duration + value time.Duration + + // Guards the value + mu *sync.RWMutex +} + +// NewDynamicDelay returns a dynamicDelay. +// +// targetPercentile is the desired percentile to be computed. For example, a +// targetPercentile of 0.99 computes the delay at the 99th percentile. Must be +// in the range [0, 1]. +// +// increaseRate (must be > 0) determines how many increase calls it takes for +// Value to double. +// +// initialDelay is the start value of the delay. +// +// decrease can never lower the delay past minDelay, increase can never raise +// the delay past maxDelay. +func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*dynamicDelay, error) { + if targetPercentile < 0 || targetPercentile > 1 { + return nil, fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile) + } + if increaseRate <= 0 { + return nil, fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate) + } + if minDelay >= maxDelay { + return nil, fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay) + } + if initialDelay < minDelay { + initialDelay = minDelay + } + if initialDelay > maxDelay { + initialDelay = maxDelay + } + + // Compute increaseFactor and decreaseFactor such that: + // (increaseFactor ^ (1 - targetPercentile)) * (decreaseFactor ^ targetPercentile) = 1 + increaseFactor := math.Exp(math.Log(2) / increaseRate) + if increaseFactor < 1.001 { + increaseFactor = 1.001 + } + decreaseFactor := math.Exp(-math.Log(increaseFactor) * (1 - targetPercentile) / targetPercentile) + if decreaseFactor > 0.9999 { + decreaseFactor = 0.9999 + } + + return &dynamicDelay{ + increaseFactor: increaseFactor, + decreaseFactor: decreaseFactor, + minDelay: minDelay, + maxDelay: maxDelay, + value: initialDelay, + mu: &sync.RWMutex{}, + }, nil +} + +func (d *dynamicDelay) unsafeIncrease() { + v := time.Duration(float64(d.value) * d.increaseFactor) + if v > d.maxDelay { + d.value = d.maxDelay + } else { + d.value = v + } +} + +// increase notes that the operation took longer than the delay returned by Value. +func (d *dynamicDelay) increase() { + d.mu.Lock() + defer d.mu.Unlock() + + d.unsafeIncrease() +} + +func (d *dynamicDelay) unsafeDecrease() { + v := time.Duration(float64(d.value) * d.decreaseFactor) + if v < d.minDelay { + d.value = d.minDelay + } else { + d.value = v + } +} + +// decrease notes that the operation completed before the delay returned by getValue. +func (d *dynamicDelay) decrease() { + d.mu.Lock() + defer d.mu.Unlock() + + d.unsafeDecrease() +} + +// update updates the delay value depending on the specified latency. +func (d *dynamicDelay) update(latency time.Duration) { + d.mu.Lock() + defer d.mu.Unlock() + + if latency > d.value { + d.unsafeIncrease() + } else { + d.unsafeDecrease() + } +} + +// getValue returns the desired delay to wait before retry the operation. +func (d *dynamicDelay) getValue() time.Duration { + d.mu.RLock() + defer d.mu.RUnlock() + + return d.value +} + +// PrintDelay prints the state of delay, helpful in debugging. +func (d *dynamicDelay) printDelay() { + d.mu.RLock() + defer d.mu.RUnlock() + + fmt.Println("IncreaseFactor: ", d.increaseFactor) + fmt.Println("DecreaseFactor: ", d.decreaseFactor) + fmt.Println("MinDelay: ", d.minDelay) + fmt.Println("MaxDelay: ", d.maxDelay) + fmt.Println("Value: ", d.value) +} diff --git a/storage/dynamic_delay_test.go b/storage/dynamic_delay_test.go new file mode 100644 index 000000000000..8247d7527496 --- /dev/null +++ b/storage/dynamic_delay_test.go @@ -0,0 +1,176 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF + +package storage + +import ( + "math" + "math/rand" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +func applySamples(numSamples int, expectedValue float64, rnd *rand.Rand, d *dynamicDelay) int { + var samplesOverThreshold int + for i := 0; i < numSamples; i++ { + randomDelay := time.Duration(-math.Log(rnd.Float64()) * expectedValue * float64(time.Second)) + if randomDelay > d.getValue() { + samplesOverThreshold++ + d.increase() + } else { + d.decrease() + } + } + return samplesOverThreshold +} + +func applySamplesWithUpdate(numSamples int, expectedValue float64, rnd *rand.Rand, d *dynamicDelay) { + for i := 0; i < numSamples; i++ { + randomDelay := time.Duration(-math.Log(rnd.Float64()) * expectedValue * float64(time.Second)) + d.update(randomDelay) + } +} + +func TestNewDelay(t *testing.T) { + d, err := newDynamicDelay(1-0.01, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Fatal(err) + } + + want := &dynamicDelay{ + increaseFactor: 1.047294, + decreaseFactor: 0.999533, + minDelay: 1 * time.Millisecond, + maxDelay: 1 * time.Hour, + value: 1 * time.Millisecond, + } + + if diff := cmp.Diff(d.increaseFactor, want.increaseFactor, cmpopts.EquateApprox(0, 0.000001)); diff != "" { + t.Fatalf("unexpected diff (-got +want):\n%s", diff) + } + + if diff := cmp.Diff(d.decreaseFactor, want.decreaseFactor, cmpopts.EquateApprox(0, 0.000001)); diff != "" { + t.Fatalf("unexpected diff (-got +want):\n%s", diff) + } + + if diff := cmp.Diff(d.minDelay, want.minDelay, cmpopts.EquateApprox(0, 0.000001)); diff != "" { + t.Fatalf("unexpected diff (-got +want):\n%s", diff) + } + + if diff := cmp.Diff(d.maxDelay, want.maxDelay, cmpopts.EquateApprox(0, 0.000001)); diff != "" { + t.Fatalf("unexpected diff (-got +want):\n%s", diff) + } + + if diff := cmp.Diff(d.value, want.value, cmpopts.EquateApprox(0, 0.000001)); diff != "" { + t.Fatalf("unexpected diff (-got +want):\n%s", diff) + } + + if d.mu == nil { + t.Fatalf("unexpted mutex value") + } +} + +func TestConvergence99(t *testing.T) { + // d should converge to the 99-percentile value. + d, err := newDynamicDelay(1-0.01, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Fatal(err) + } + + rnd := rand.New(rand.NewSource(1)) + + // Warm up. + applySamplesWithUpdate(1000, 0.005, rnd, d) + + // We would end up sending hedged calls at ~1% (between 0.2% and 5%). + { + samplesOverThreshold := applySamples(1000, 0.005, rnd, d) + if samplesOverThreshold < (1000 * 0.002) { + t.Errorf("samplesOverThreshold = %d < 1000*0.002", samplesOverThreshold) + } + if samplesOverThreshold > (1000 * 0.05) { + t.Errorf("samplesOverThreshold = %d > 1000*0.05", samplesOverThreshold) + } + } + + // Apply samples from a different distribution. + applySamplesWithUpdate(1000, 1, rnd, d) + + // delay.value should have now converged to the new distribution. + { + samplesOverThreshold := applySamples(1000, 1, rnd, d) + if samplesOverThreshold < (1000 * 0.002) { + t.Errorf("samplesOverThreshold = %d < 1000*0.002", samplesOverThreshold) + } + if samplesOverThreshold > (1000 * 0.05) { + t.Errorf("samplesOverThreshold = %d > 1000*0.05", samplesOverThreshold) + } + } +} + +func TestConvergence90(t *testing.T) { + // d should converge to the 90-percentile value. + d, err := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Fatal(err) + } + + rnd := rand.New(rand.NewSource(1)) + + // Warm up. + applySamplesWithUpdate(1000, 0.005, rnd, d) + + // We would end up sending hedged calls at ~10% (between 5% and 20%). + { + samplesOverThreshold := applySamples(1000, 0.005, rnd, d) + if samplesOverThreshold < (1000 * 0.05) { + t.Errorf("samplesOverThreshold = %d < 1000*0.05", samplesOverThreshold) + } + if samplesOverThreshold > (1000 * 0.2) { + t.Errorf("samplesOverThreshold = %d > 1000*0.2", samplesOverThreshold) + } + } +} + +func TestOverflow(t *testing.T) { + d, err := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Fatal(err) + } + + n := 10000 + for i := 0; i < n; i++ { + d.increase() + } + t.Log(d.getValue()) + for i := 0; i < 100*n; i++ { + d.decrease() + } + if got, want := d.getValue(), 1*time.Millisecond; got != want { + t.Fatalf("unexpected d.Value: got %v, want %v", got, want) + } +} + +func TestInvalidArgument(t *testing.T) { + _, err := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 2*time.Hour, 1*time.Hour) + if err == nil { + t.Fatal("unexpected, should throw error as minDelay is greater than maxDelay") + } + + _, err = newDynamicDelay(1-0.1, 0, 1*time.Millisecond, 2*time.Hour, 1*time.Hour) + if err == nil { + t.Fatal("unexpected, should throw error as increaseRate can't be zero") + } +}