From bd940843d06ae93ec464e31fb6da0ca1c7ed3347 Mon Sep 17 00:00:00 2001 From: Brenna N Epp Date: Tue, 9 Jul 2024 08:46:04 -0700 Subject: [PATCH] chore(storage/transfermanager): non-positive partSize will turn off sharding (#10509) --- storage/transfermanager/downloader.go | 12 ++++- storage/transfermanager/downloader_test.go | 13 +++++ storage/transfermanager/option.go | 11 +++-- storage/transfermanager/option_test.go | 57 +++++++++++++++------- 4 files changed, 71 insertions(+), 22 deletions(-) diff --git a/storage/transfermanager/downloader.go b/storage/transfermanager/downloader.go index 5145f8a1579a..3f6107627dda 100644 --- a/storage/transfermanager/downloader.go +++ b/storage/transfermanager/downloader.go @@ -706,8 +706,13 @@ func numShards(attrs *storage.ReaderObjectAttrs, r *DownloadRange, partSize int6 return 1 } + // Sharding turned off with partSize < 1. + if partSize < 1 { + return 1 + } + + // Divide entire object into shards if no range given. if r == nil { - // Divide entire object into shards. return int(math.Ceil(float64(objectSize) / float64(partSize))) } // Negative offset reads the whole object in one go. @@ -742,6 +747,11 @@ func shardRange(r *DownloadRange, partSize int64, shard int) DownloadRange { } } + // No sharding if partSize is less than 1. + if partSize < 1 { + return *r + } + // Negative offset reads the whole object in one go. if r.Offset < 0 { return *r diff --git a/storage/transfermanager/downloader_test.go b/storage/transfermanager/downloader_test.go index 2045ecece428..2c469e5ef99b 100644 --- a/storage/transfermanager/downloader_test.go +++ b/storage/transfermanager/downloader_test.go @@ -356,6 +356,19 @@ func TestCalculateRange(t *testing.T) { Length: 1000, }, }, + { + desc: "sharding turned off", + objRange: &DownloadRange{ + Offset: 1024 * 1024 * 1024 * 1024 / 2, + Length: 1024 * 1024 * 1024 * 1024, + }, + partSize: 0, + shard: 0, + want: DownloadRange{ + Offset: 1024 * 1024 * 1024 * 1024 / 2, + Length: 1024 * 1024 * 1024 * 1024, + }, + }, { desc: "large object", objRange: &DownloadRange{ diff --git a/storage/transfermanager/option.go b/storage/transfermanager/option.go index 1f81672dded4..ac521000db76 100644 --- a/storage/transfermanager/option.go +++ b/storage/transfermanager/option.go @@ -69,10 +69,14 @@ func (wpt withPerOpTimeout) apply(tm *transferManagerConfig) { tm.perOperationTimeout = wpt.timeout } -// WithPartSize returns a TransferManagerOption that specifies the size of the -// shards to transfer; that is, if the object is larger than this size, it will -// be uploaded or downloaded in concurrent pieces. +// WithPartSize returns a TransferManagerOption that specifies the size in bytes +// of the shards to transfer; that is, if the object is larger than partSize, +// it will be uploaded or downloaded in concurrent pieces of size partSize. +// // The default is 32 MiB for downloads. +// +// To turn off sharding, set partSize to 0. +// // Note that files that support decompressive transcoding will be downloaded in // a single piece regardless of the partSize set here. func WithPartSize(partSize int64) Option { @@ -93,6 +97,7 @@ type transferManagerConfig struct { // Size of shards to transfer; Python found 32 MiB to be good default for // JSON downloads but gRPC may benefit from larger. + // A partSize smaller than 1 indicates to turn off sharding. partSize int64 // Timeout for a single operation (including all retries). Zero value means diff --git a/storage/transfermanager/option_test.go b/storage/transfermanager/option_test.go index d97cf831ec31..2fc9ca51ca7d 100644 --- a/storage/transfermanager/option_test.go +++ b/storage/transfermanager/option_test.go @@ -22,25 +22,46 @@ import ( ) func TestApply(t *testing.T) { - opts := []Option{ - WithWorkers(3), - WithPerOpTimeout(time.Hour), - WithCallbacks(), - WithPartSize(30), - } - var got transferManagerConfig - for _, opt := range opts { - opt.apply(&got) - } - want := transferManagerConfig{ - numWorkers: 3, - perOperationTimeout: time.Hour, - asynchronous: true, - partSize: 30, - } + for _, test := range []struct { + desc string + opts []Option + want transferManagerConfig + }{ + { + desc: "all options", + opts: []Option{ + WithWorkers(3), + WithPerOpTimeout(time.Hour), + WithCallbacks(), + WithPartSize(300000), + }, + want: transferManagerConfig{ + numWorkers: 3, + perOperationTimeout: time.Hour, + asynchronous: true, + partSize: 300000, + }, + }, + { + desc: "small partSize", + opts: []Option{ + WithPartSize(30), + }, + want: transferManagerConfig{ + partSize: 30, + }, + }, + } { + t.Run(test.desc, func(t *testing.T) { + var got transferManagerConfig + for _, opt := range test.opts { + opt.apply(&got) + } - if got != want { - t.Errorf("got: %+v, want: %+v", got, want) + if got != test.want { + t.Errorf("got: %+v, want: %+v", got, test.want) + } + }) } }