Skip to content

Commit

Permalink
chore(storage): adding dynamic delay algorithm (#10838)
Browse files Browse the repository at this point in the history
Adding go-routine safe dynamic delay algorithm that calculates the delay at a fixed percentile, based on delay samples.

This will be used by retry options to be added later.
  • Loading branch information
raj-prince authored Sep 16, 2024
1 parent 6997991 commit 4265acc
Show file tree
Hide file tree
Showing 2 changed files with 330 additions and 0 deletions.
154 changes: 154 additions & 0 deletions storage/dynamic_delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
"fmt"
"math"
"sync"
"time"
)

// dynamicDelay dynamically calculates the delay at a fixed percentile, based on
// delay samples.
//
// dynamicDelay is goroutine-safe.
type dynamicDelay struct {
increaseFactor float64
decreaseFactor float64
minDelay time.Duration
maxDelay time.Duration
value time.Duration

// Guards the value
mu *sync.RWMutex
}

// NewDynamicDelay returns a dynamicDelay.
//
// targetPercentile is the desired percentile to be computed. For example, a
// targetPercentile of 0.99 computes the delay at the 99th percentile. Must be
// in the range [0, 1].
//
// increaseRate (must be > 0) determines how many increase calls it takes for
// Value to double.
//
// initialDelay is the start value of the delay.
//
// decrease can never lower the delay past minDelay, increase can never raise
// the delay past maxDelay.
func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*dynamicDelay, error) {
if targetPercentile < 0 || targetPercentile > 1 {
return nil, fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile)
}
if increaseRate <= 0 {
return nil, fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate)
}
if minDelay >= maxDelay {
return nil, fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay)
}
if initialDelay < minDelay {
initialDelay = minDelay
}
if initialDelay > maxDelay {
initialDelay = maxDelay
}

// Compute increaseFactor and decreaseFactor such that:
// (increaseFactor ^ (1 - targetPercentile)) * (decreaseFactor ^ targetPercentile) = 1
increaseFactor := math.Exp(math.Log(2) / increaseRate)
if increaseFactor < 1.001 {
increaseFactor = 1.001
}
decreaseFactor := math.Exp(-math.Log(increaseFactor) * (1 - targetPercentile) / targetPercentile)
if decreaseFactor > 0.9999 {
decreaseFactor = 0.9999
}

return &dynamicDelay{
increaseFactor: increaseFactor,
decreaseFactor: decreaseFactor,
minDelay: minDelay,
maxDelay: maxDelay,
value: initialDelay,
mu: &sync.RWMutex{},
}, nil
}

func (d *dynamicDelay) unsafeIncrease() {
v := time.Duration(float64(d.value) * d.increaseFactor)
if v > d.maxDelay {
d.value = d.maxDelay
} else {
d.value = v
}
}

// increase notes that the operation took longer than the delay returned by Value.
func (d *dynamicDelay) increase() {
d.mu.Lock()
defer d.mu.Unlock()

d.unsafeIncrease()
}

func (d *dynamicDelay) unsafeDecrease() {
v := time.Duration(float64(d.value) * d.decreaseFactor)
if v < d.minDelay {
d.value = d.minDelay
} else {
d.value = v
}
}

// decrease notes that the operation completed before the delay returned by getValue.
func (d *dynamicDelay) decrease() {
d.mu.Lock()
defer d.mu.Unlock()

d.unsafeDecrease()
}

// update updates the delay value depending on the specified latency.
func (d *dynamicDelay) update(latency time.Duration) {
d.mu.Lock()
defer d.mu.Unlock()

if latency > d.value {
d.unsafeIncrease()
} else {
d.unsafeDecrease()
}
}

// getValue returns the desired delay to wait before retry the operation.
func (d *dynamicDelay) getValue() time.Duration {
d.mu.RLock()
defer d.mu.RUnlock()

return d.value
}

// PrintDelay prints the state of delay, helpful in debugging.
func (d *dynamicDelay) printDelay() {
d.mu.RLock()
defer d.mu.RUnlock()

fmt.Println("IncreaseFactor: ", d.increaseFactor)
fmt.Println("DecreaseFactor: ", d.decreaseFactor)
fmt.Println("MinDelay: ", d.minDelay)
fmt.Println("MaxDelay: ", d.maxDelay)
fmt.Println("Value: ", d.value)
}
176 changes: 176 additions & 0 deletions storage/dynamic_delay_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF

package storage

import (
"math"
"math/rand"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)

func applySamples(numSamples int, expectedValue float64, rnd *rand.Rand, d *dynamicDelay) int {
var samplesOverThreshold int
for i := 0; i < numSamples; i++ {
randomDelay := time.Duration(-math.Log(rnd.Float64()) * expectedValue * float64(time.Second))
if randomDelay > d.getValue() {
samplesOverThreshold++
d.increase()
} else {
d.decrease()
}
}
return samplesOverThreshold
}

func applySamplesWithUpdate(numSamples int, expectedValue float64, rnd *rand.Rand, d *dynamicDelay) {
for i := 0; i < numSamples; i++ {
randomDelay := time.Duration(-math.Log(rnd.Float64()) * expectedValue * float64(time.Second))
d.update(randomDelay)
}
}

func TestNewDelay(t *testing.T) {
d, err := newDynamicDelay(1-0.01, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour)
if err != nil {
t.Fatal(err)
}

want := &dynamicDelay{
increaseFactor: 1.047294,
decreaseFactor: 0.999533,
minDelay: 1 * time.Millisecond,
maxDelay: 1 * time.Hour,
value: 1 * time.Millisecond,
}

if diff := cmp.Diff(d.increaseFactor, want.increaseFactor, cmpopts.EquateApprox(0, 0.000001)); diff != "" {
t.Fatalf("unexpected diff (-got +want):\n%s", diff)
}

if diff := cmp.Diff(d.decreaseFactor, want.decreaseFactor, cmpopts.EquateApprox(0, 0.000001)); diff != "" {
t.Fatalf("unexpected diff (-got +want):\n%s", diff)
}

if diff := cmp.Diff(d.minDelay, want.minDelay, cmpopts.EquateApprox(0, 0.000001)); diff != "" {
t.Fatalf("unexpected diff (-got +want):\n%s", diff)
}

if diff := cmp.Diff(d.maxDelay, want.maxDelay, cmpopts.EquateApprox(0, 0.000001)); diff != "" {
t.Fatalf("unexpected diff (-got +want):\n%s", diff)
}

if diff := cmp.Diff(d.value, want.value, cmpopts.EquateApprox(0, 0.000001)); diff != "" {
t.Fatalf("unexpected diff (-got +want):\n%s", diff)
}

if d.mu == nil {
t.Fatalf("unexpted mutex value")
}
}

func TestConvergence99(t *testing.T) {
// d should converge to the 99-percentile value.
d, err := newDynamicDelay(1-0.01, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour)
if err != nil {
t.Fatal(err)
}

rnd := rand.New(rand.NewSource(1))

// Warm up.
applySamplesWithUpdate(1000, 0.005, rnd, d)

// We would end up sending hedged calls at ~1% (between 0.2% and 5%).
{
samplesOverThreshold := applySamples(1000, 0.005, rnd, d)
if samplesOverThreshold < (1000 * 0.002) {
t.Errorf("samplesOverThreshold = %d < 1000*0.002", samplesOverThreshold)
}
if samplesOverThreshold > (1000 * 0.05) {
t.Errorf("samplesOverThreshold = %d > 1000*0.05", samplesOverThreshold)
}
}

// Apply samples from a different distribution.
applySamplesWithUpdate(1000, 1, rnd, d)

// delay.value should have now converged to the new distribution.
{
samplesOverThreshold := applySamples(1000, 1, rnd, d)
if samplesOverThreshold < (1000 * 0.002) {
t.Errorf("samplesOverThreshold = %d < 1000*0.002", samplesOverThreshold)
}
if samplesOverThreshold > (1000 * 0.05) {
t.Errorf("samplesOverThreshold = %d > 1000*0.05", samplesOverThreshold)
}
}
}

func TestConvergence90(t *testing.T) {
// d should converge to the 90-percentile value.
d, err := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour)
if err != nil {
t.Fatal(err)
}

rnd := rand.New(rand.NewSource(1))

// Warm up.
applySamplesWithUpdate(1000, 0.005, rnd, d)

// We would end up sending hedged calls at ~10% (between 5% and 20%).
{
samplesOverThreshold := applySamples(1000, 0.005, rnd, d)
if samplesOverThreshold < (1000 * 0.05) {
t.Errorf("samplesOverThreshold = %d < 1000*0.05", samplesOverThreshold)
}
if samplesOverThreshold > (1000 * 0.2) {
t.Errorf("samplesOverThreshold = %d > 1000*0.2", samplesOverThreshold)
}
}
}

func TestOverflow(t *testing.T) {
d, err := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour)
if err != nil {
t.Fatal(err)
}

n := 10000
for i := 0; i < n; i++ {
d.increase()
}
t.Log(d.getValue())
for i := 0; i < 100*n; i++ {
d.decrease()
}
if got, want := d.getValue(), 1*time.Millisecond; got != want {
t.Fatalf("unexpected d.Value: got %v, want %v", got, want)
}
}

func TestInvalidArgument(t *testing.T) {
_, err := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 2*time.Hour, 1*time.Hour)
if err == nil {
t.Fatal("unexpected, should throw error as minDelay is greater than maxDelay")
}

_, err = newDynamicDelay(1-0.1, 0, 1*time.Millisecond, 2*time.Hour, 1*time.Hour)
if err == nil {
t.Fatal("unexpected, should throw error as increaseRate can't be zero")
}
}

0 comments on commit 4265acc

Please sign in to comment.