Skip to content

Commit

Permalink
feat(storage): add Writer.ChunkTransferTimeout (#11111)
Browse files Browse the repository at this point in the history
Expose the ChunkTransferTimeout MediaOption through the manual
client layer. This allows users to set a longer timeout for
chunk retries in case of stalls in resumable uploads if desired.

Added emulator based unit tests to cover all the scenarios.
  • Loading branch information
Tulsishah authored Dec 4, 2024
1 parent f82fffd commit fd1db20
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 56 deletions.
3 changes: 2 additions & 1 deletion storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ type openWriterParams struct {
chunkSize int
// chunkRetryDeadline - see `Writer.ChunkRetryDeadline`.
// Optional.
chunkRetryDeadline time.Duration
chunkRetryDeadline time.Duration
chunkTransferTimeout time.Duration

// Object/request properties

Expand Down
139 changes: 139 additions & 0 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,145 @@ func TestRetryReadStallEmulated(t *testing.T) {
}
}

func TestWriterChunkTransferTimeoutEmulated(t *testing.T) {
transportClientTest(skipGRPC("service is not implemented"), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}

chunkSize := 2 * 1024 * 1024 // 2 MiB
fileSize := 5 * 1024 * 1024 // 5 MiB
tests := []struct {
name string
instructions map[string][]string
chunkTransferTimeout time.Duration
expectedSuccess bool
}{
{
name: "stall-on-first-chunk-with-chunk-transfer-timeout-zero",
instructions: map[string][]string{
"storage.objects.insert": {"stall-for-10s-after-1024K"},
},
chunkTransferTimeout: 0,
expectedSuccess: false,
},
{
name: "stall-on-first-chunk-with-chunk-transfer-timeout-nonzero",
instructions: map[string][]string{
"storage.objects.insert": {"stall-for-10s-after-1024K"},
},
chunkTransferTimeout: 100 * time.Millisecond,
expectedSuccess: true,
},
{
name: "stall-on-second-chunk-with-chunk-transfer-timeout-zero",
instructions: map[string][]string{
"storage.objects.insert": {"stall-for-10s-after-3072K"},
},
chunkTransferTimeout: 0,
expectedSuccess: false,
},
{
name: "stall-on-second-chunk-with-chunk-transfer-timeout-nonzero",
instructions: map[string][]string{
"storage.objects.insert": {"stall-for-10s-after-3072K"},
},
chunkTransferTimeout: 100 * time.Millisecond,
expectedSuccess: true,
},
{
name: "stall-on-first-chunk-twice-with-chunk-transfer-timeout-zero",
instructions: map[string][]string{
"storage.objects.insert": {"stall-for-10s-after-1024K", "stall-for-10s-after-1024K"},
},
chunkTransferTimeout: 0,
expectedSuccess: false,
},
{
name: "stall-on-first-chunk-twice-with-chunk-transfer-timeout-nonzero",
instructions: map[string][]string{
"storage.objects.insert": {"stall-for-10s-after-1024K", "stall-for-10s-after-1024K"},
},
chunkTransferTimeout: 100 * time.Millisecond,
expectedSuccess: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
testID := createRetryTest(t, client, tc.instructions)
var cancel context.CancelFunc
rCtx := callctx.SetHeaders(ctx, "x-retry-test-id", testID)
rCtx, cancel = context.WithTimeout(rCtx, 1*time.Second)
defer cancel()

prefix := time.Now().Nanosecond()
want := &ObjectAttrs{
Bucket: bucket,
Name: fmt.Sprintf("%d-object-%d", prefix, time.Now().Nanosecond()),
Generation: defaultGen,
}

var gotAttrs *ObjectAttrs
params := &openWriterParams{
attrs: want,
bucket: bucket,
chunkSize: chunkSize,
chunkTransferTimeout: tc.chunkTransferTimeout,
ctx: rCtx,
donec: make(chan struct{}),
setError: func(_ error) {}, // no-op
progress: func(_ int64) {}, // no-op
setObj: func(o *ObjectAttrs) { gotAttrs = o },
}

pw, err := client.OpenWriter(params)
if err != nil {
t.Fatalf("failed to open writer: %v", err)
}
buffer := bytes.Repeat([]byte("A"), fileSize)
_, err = pw.Write(buffer)
if tc.expectedSuccess {
if err != nil {
t.Fatalf("failed to populate test data: %v", err)
}
if err := pw.Close(); err != nil {
t.Fatalf("closing object: %v", err)
}
select {
case <-params.donec:
}
if gotAttrs == nil {
t.Fatalf("Writer finished, but resulting object wasn't set")
}
if diff := cmp.Diff(gotAttrs.Name, want.Name); diff != "" {
t.Fatalf("Resulting object name: got(-),want(+):\n%s", diff)
}

r, err := veneerClient.Bucket(bucket).Object(want.Name).NewReader(ctx)
if err != nil {
t.Fatalf("opening reading: %v", err)
}
wantLen := len(buffer)
got := make([]byte, wantLen)
n, err := r.Read(got)
if n != wantLen {
t.Fatalf("expected to read %d bytes, but got %d", wantLen, n)
}
if diff := cmp.Diff(got, buffer); diff != "" {
t.Fatalf("checking written content: got(-),want(+):\n%s", diff)
}
} else {
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("expected context deadline exceeded found %v", err)
}
}
})
}
})
}

// createRetryTest creates a bucket in the emulator and sets up a test using the
// Retry Test API for the given instructions. This is intended for emulator tests
// of retry behavior that are not covered by conformance tests.
Expand Down
34 changes: 17 additions & 17 deletions storage/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,32 @@ retract [v1.25.0, v1.27.0] // due to https://github.com/googleapis/google-cloud-
require (
cloud.google.com/go v0.116.0
cloud.google.com/go/compute/metadata v0.5.2
cloud.google.com/go/iam v1.2.1
cloud.google.com/go/longrunning v0.6.1
cloud.google.com/go/iam v1.2.2
cloud.google.com/go/longrunning v0.6.2
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.48.1
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/googleapis/gax-go/v2 v2.13.0
github.com/googleapis/gax-go/v2 v2.14.0
go.opentelemetry.io/contrib/detectors/gcp v1.29.0
go.opentelemetry.io/otel v1.29.0
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.29.0
go.opentelemetry.io/otel/sdk v1.29.0
go.opentelemetry.io/otel/sdk/metric v1.29.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
google.golang.org/api v0.203.0
google.golang.org/genproto v0.0.0-20241015192408-796eee8c2d53
google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9
golang.org/x/oauth2 v0.24.0
golang.org/x/sync v0.9.0
google.golang.org/api v0.210.0
google.golang.org/genproto v0.0.0-20241118233622-e639e219e697
google.golang.org/genproto/googleapis/api v0.0.0-20241113202542-65e8d215514f
google.golang.org/grpc v1.67.1
google.golang.org/grpc/stats/opentelemetry v0.0.0-20240907200651-3ffb98b2c93a
google.golang.org/protobuf v1.35.1
google.golang.org/protobuf v1.35.2
)

require (
cel.dev/expr v0.16.1 // indirect
cloud.google.com/go/auth v0.10.2 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.5 // indirect
cloud.google.com/go/monitoring v1.21.1 // indirect
cloud.google.com/go/auth v0.11.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect
cloud.google.com/go/monitoring v1.21.2 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.1 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.48.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
Expand All @@ -53,10 +53,10 @@ require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
go.opentelemetry.io/otel/metric v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/crypto v0.29.0 // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.7.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
golang.org/x/text v0.20.0 // indirect
golang.org/x/time v0.8.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 // indirect
)
Loading

0 comments on commit fd1db20

Please sign in to comment.