Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): dynamic read request stall timeout #10958

Merged
merged 12 commits into from
Oct 14, 2024
50 changes: 30 additions & 20 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,10 +1336,14 @@ func TestObjectConditionsEmulated(t *testing.T) {
// Test that RetryNever prevents any retries from happening in both transports.
func TestRetryNeverEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-503"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
_, err := client.GetBucket(ctx, bucket, nil, withRetryConfig(&retryConfig{policy: RetryNever}))
_, err = client.GetBucket(ctx, bucket, nil, withRetryConfig(&retryConfig{policy: RetryNever}))

var ae *apierror.APIError
if errors.As(err, &ae) {
Expand All @@ -1355,12 +1359,16 @@ func TestRetryNeverEmulated(t *testing.T) {
// Test that errors are wrapped correctly if retry happens until a timeout.
func TestRetryTimeoutEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
_, err := client.GetBucket(ctx, bucket, nil, idempotent(true))
_, err = client.GetBucket(ctx, bucket, nil, idempotent(true))

var ae *apierror.APIError
if errors.As(err, &ae) {
Expand All @@ -1380,11 +1388,15 @@ func TestRetryTimeoutEmulated(t *testing.T) {
// Test that errors are wrapped correctly if retry happens until max attempts.
func TestRetryMaxAttemptsEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
config := &retryConfig{maxAttempts: expectedAttempts(3), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
_, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))
_, err = client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))

var ae *apierror.APIError
if errors.As(err, &ae) {
Expand Down Expand Up @@ -1427,8 +1439,12 @@ func TestTimeoutErrorEmulated(t *testing.T) {
// Test that server-side DEADLINE_EXCEEDED errors are retried as expected with gRPC.
func TestRetryDeadlineExceedeEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-504", "return-504"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
config := &retryConfig{maxAttempts: expectedAttempts(4), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
if _, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config)); err != nil {
Expand All @@ -1454,7 +1470,7 @@ func TestRetryReadReqStallEmulated(t *testing.T) {

// Plant stall at start for 2s.
instructions := map[string][]string{"storage.objects.get": {"stall-for-2s-after-0K"}}
testID := plantRetryInstructions(t, client.tc, instructions)
testID := createRetryTest(t, client.tc, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
Expand All @@ -1480,25 +1496,19 @@ func TestRetryReadReqStallEmulated(t *testing.T) {
t.Errorf("content does not match, got len %v, want len %v", buf.Len(), len(randomBytes3MiB))
}

}, WithDynamicReadReqStallTimeout(0.99, 15, time.Second, time.Second, 2*time.Second))
}, WithDynamicReadReqStallTimeout(
&DynamicReadReqStallTimeoutConfig{
TargetPercentile: 0.99,
Min: time.Second,
}))
}

// createRetryTest creates a bucket in the emulator and sets up a test using the
// Retry Test API for the given instructions. This is intended for emulator tests
// of retry behavior that are not covered by conformance tests.
func createRetryTest(t *testing.T, project, bucket string, client storageClient, instructions map[string][]string) string {
func createRetryTest(t *testing.T, client storageClient, instructions map[string][]string) string {
t.Helper()
ctx := context.Background()

_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}

return plantRetryInstructions(t, client, instructions)
}

func plantRetryInstructions(t *testing.T, client storageClient, instructions map[string][]string) string {
// Need the HTTP hostname to set up a retry test, as well as knowledge of
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
// underlying transport to specify instructions.
host := os.Getenv("STORAGE_EMULATOR_HOST")
Expand Down
14 changes: 7 additions & 7 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,14 @@ func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageCl
}

var dd *dynamicDelay
if config.dynamicReadReqStallTimeout != nil {
drrstConfig := config.dynamicReadReqStallTimeout
if config.DynamicReadReqStallTimeoutConfig != nil {
drrstConfig := config.DynamicReadReqStallTimeoutConfig
dd, err = newDynamicDelay(
drrstConfig.targetPercentile,
drrstConfig.increaseRate,
drrstConfig.initial,
drrstConfig.min,
drrstConfig.max)
drrstConfig.TargetPercentile,
getDynamicReadReqIncreaseRateFromEnv(),
getDynamicReadReqInitialTimeoutSecFromEnv(drrstConfig.Min),
drrstConfig.Min,
defaultDynamicReqdReqMaxTimeout)
if err != nil {
return nil, fmt.Errorf("creating dynamic-delay: %w", err)
}
Expand Down
101 changes: 61 additions & 40 deletions storage/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,58 @@
package storage

import (
"os"
"strconv"
"time"

"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
)

const (
dynamicReadReqIncreaseRateEnv = "DYNAMIC_READ_REQ_INCREASE_RATE"
dynamicReadReqInitialTimeoutEnv = "DYNAMIC_READ_REQ_INITIAL_TIMEOUT"
defaultDynamicReadReqIncreaseRate = 15.0
defaultDynamicReqdReqMaxTimeout = 1 * time.Hour
)

// getDynamicReadReqIncreaseRateFromEnv returns the value set in the env variable.
// It returns defaultDynamicReadReqIncreaseRate if env is not set or the set value is invalid.
func getDynamicReadReqIncreaseRateFromEnv() float64 {
increaseRate := os.Getenv(dynamicReadReqIncreaseRateEnv)
if increaseRate == "" {
return defaultDynamicReadReqIncreaseRate
}

val, err := strconv.ParseFloat(increaseRate, 64)
if err != nil {
return defaultDynamicReadReqIncreaseRate
}
return val
}

// getDynamicReadReqInitialTimeoutSecFromEnv returns the value set in the env variable.
// It returns the passed defaultVal if env is not set or the set value is invalid.
func getDynamicReadReqInitialTimeoutSecFromEnv(defaultVal time.Duration) time.Duration {
initialTimeout := os.Getenv(dynamicReadReqInitialTimeoutEnv)
if initialTimeout == "" {
return defaultVal
}

val, err := time.ParseDuration(initialTimeout)
if err != nil {
return defaultVal
}
return val
}

// storageConfig contains the Storage client option configuration that can be
// set through storageClientOptions.
type storageConfig struct {
useJSONforReads bool
readAPIWasSet bool
disableClientMetrics bool
dynamicReadReqStallTimeout *dynamicReadReqStallTimeout
useJSONforReads bool
readAPIWasSet bool
disableClientMetrics bool
DynamicReadReqStallTimeoutConfig *DynamicReadReqStallTimeoutConfig
}

// newStorageConfig generates a new storageConfig with all the given
Expand Down Expand Up @@ -119,51 +158,33 @@ func (w *withDisabledClientMetrics) ApplyStorageOpt(c *storageConfig) {
//
// This is only supported for the read operation and that too for http(XML) client.
// Grpc read-operation will be supported soon.
//
// Here, the input parameter decides the value of dynamic-timeout.
// targetPercentile is the desired percentile of the observed latencies.
// increaseRate determines the rate, timeout is adjusted. High means slow increase in
// timeout and low means rapid increase.
// initialTimeout decides the initial timeout.
// minTimeout, maxTimeout is the lower & upper bound of the timeout.
//
// TODO (raj-prince): To keep separate dynamicDelay instance for different BucketHandle.
// Currently, dynamicTimeout is kept at the client and hence shared across all the
// BucketHandle, which is not not the ideal state. As latency depends on location of VM
// and Bucket, and read latency of different buckets may lie in different range.
// Hence hence having a separate dynamicTimeout instance at BucketHandle level will
// be better
func WithDynamicReadReqStallTimeout(
targetPercentile float64,
increaseRate float64,
initialTimeout time.Duration,
minTimeout time.Duration,
maxTimeout time.Duration) option.ClientOption {
func WithDynamicReadReqStallTimeout(drrst *DynamicReadReqStallTimeoutConfig) option.ClientOption {
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
// TODO (raj-prince): To keep separate dynamicDelay instance for different BucketHandle.
// Currently, dynamicTimeout is kept at the client and hence shared across all the
// BucketHandle, which is not not the ideal state. As latency depends on location of VM
// and Bucket, and read latency of different buckets may lie in different range.
// Hence hence having a separate dynamicTimeout instance at BucketHandle level will
// be better
return &withDynamicReadReqStallTimeout{
dynamicReadReqStallTimeout: &dynamicReadReqStallTimeout{
targetPercentile: targetPercentile,
increaseRate: increaseRate,
initial: initialTimeout,
min: minTimeout,
max: maxTimeout,
},
dynamicReadReqStallTimeoutConfig: drrst,
}
}

type dynamicReadReqStallTimeout struct {
min time.Duration
max time.Duration
initial time.Duration

targetPercentile float64
increaseRate float64
// DynamicReadReqStallTimeoutConfig defines the timeout which is adjusted based on
// past observed latencies. Here,
// TargetPercentile tells the timeout should be the desired percentile of the past
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
// observed latencies.
// Min is the lower bound of the timeout.
type DynamicReadReqStallTimeoutConfig struct {
Min time.Duration
TargetPercentile float64
}

type withDynamicReadReqStallTimeout struct {
internaloption.EmbeddableAdapter
dynamicReadReqStallTimeout *dynamicReadReqStallTimeout
dynamicReadReqStallTimeoutConfig *DynamicReadReqStallTimeoutConfig
}

func (wdrrst *withDynamicReadReqStallTimeout) ApplyStorageOpt(config *storageConfig) {
config.dynamicReadReqStallTimeout = wdrrst.dynamicReadReqStallTimeout
config.DynamicReadReqStallTimeoutConfig = wdrrst.dynamicReadReqStallTimeoutConfig
}
61 changes: 52 additions & 9 deletions storage/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
"os"
"testing"
"time"

Expand Down Expand Up @@ -84,17 +85,17 @@ func TestApplyStorageOpt(t *testing.T) {
},
{
desc: "set dynamic read req stall timeout option",
opts: []option.ClientOption{WithDynamicReadReqStallTimeout(0.99, 15, time.Second, time.Second, 2*time.Second)},
opts: []option.ClientOption{WithDynamicReadReqStallTimeout(&DynamicReadReqStallTimeoutConfig{
TargetPercentile: 0.99,
Min: time.Second,
})},
want: storageConfig{
useJSONforReads: false,
readAPIWasSet: false,
disableClientMetrics: false,
dynamicReadReqStallTimeout: &dynamicReadReqStallTimeout{
targetPercentile: 0.99,
increaseRate: 15,
initial: time.Second,
min: time.Second,
max: 2 * time.Second,
DynamicReadReqStallTimeoutConfig: &DynamicReadReqStallTimeoutConfig{
TargetPercentile: 0.99,
Min: time.Second,
},
},
},
Expand All @@ -106,8 +107,50 @@ func TestApplyStorageOpt(t *testing.T) {
storageOpt.ApplyStorageOpt(&got)
}
}
if !cmp.Equal(got, test.want, cmp.AllowUnexported(storageConfig{}, dynamicReadReqStallTimeout{})) {
t.Errorf(cmp.Diff(got, test.want, cmp.AllowUnexported(storageConfig{}, dynamicReadReqStallTimeout{})))
if !cmp.Equal(got, test.want, cmp.AllowUnexported(storageConfig{}, DynamicReadReqStallTimeoutConfig{})) {
t.Errorf(cmp.Diff(got, test.want, cmp.AllowUnexported(storageConfig{}, DynamicReadReqStallTimeoutConfig{})))
}
})
}
}

func TestGetDynamicReadReqInitialTimeoutSecFromEnv(t *testing.T) {
defaultValue := 10 * time.Second

tests := []struct {
name string
envValue string
want time.Duration
}{
{"env variable not set", "", 10 * time.Second},
{"valid duration string", "5s", 5 * time.Second},
{"invalid duration string", "invalid", 10 * time.Second},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
os.Setenv(dynamicReadReqInitialTimeoutEnv, tt.envValue)
if got := getDynamicReadReqInitialTimeoutSecFromEnv(defaultValue); got != tt.want {
t.Errorf("getDynamicReadReqInitialTimeoutSecFromEnv(defaultValue) = %v, want %v", got, tt.want)
}
})
}
}

func TestGetDynamicReadReqIncreaseRateFromEnv(t *testing.T) {
tests := []struct {
name string
envValue string
want float64
}{
{"env variable not set", "", defaultDynamicReadReqIncreaseRate},
{"valid float string", "1.5", 1.5},
{"invalid float string", "abc", defaultDynamicReadReqIncreaseRate},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
os.Setenv(dynamicReadReqIncreaseRateEnv, tt.envValue)
if got := getDynamicReadReqIncreaseRateFromEnv(); got != tt.want {
t.Errorf("getDynamicReadReqIncreaseRateFromEnv() = %v, want %v", got, tt.want)
}
})
}
Expand Down
Loading