Skip to content

Commit

Permalink
chore(storage/transfermanager): non-positive partSize will turn off s…
Browse files Browse the repository at this point in the history
…harding (#10509)
  • Loading branch information
BrennaEpp authored Jul 9, 2024
1 parent 3b15f9d commit bd94084
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 22 deletions.
12 changes: 11 additions & 1 deletion storage/transfermanager/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,13 @@ func numShards(attrs *storage.ReaderObjectAttrs, r *DownloadRange, partSize int6
return 1
}

// Sharding turned off with partSize < 1.
if partSize < 1 {
return 1
}

// Divide entire object into shards if no range given.
if r == nil {
// Divide entire object into shards.
return int(math.Ceil(float64(objectSize) / float64(partSize)))
}
// Negative offset reads the whole object in one go.
Expand Down Expand Up @@ -742,6 +747,11 @@ func shardRange(r *DownloadRange, partSize int64, shard int) DownloadRange {
}
}

// No sharding if partSize is less than 1.
if partSize < 1 {
return *r
}

// Negative offset reads the whole object in one go.
if r.Offset < 0 {
return *r
Expand Down
13 changes: 13 additions & 0 deletions storage/transfermanager/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,19 @@ func TestCalculateRange(t *testing.T) {
Length: 1000,
},
},
{
desc: "sharding turned off",
objRange: &DownloadRange{
Offset: 1024 * 1024 * 1024 * 1024 / 2,
Length: 1024 * 1024 * 1024 * 1024,
},
partSize: 0,
shard: 0,
want: DownloadRange{
Offset: 1024 * 1024 * 1024 * 1024 / 2,
Length: 1024 * 1024 * 1024 * 1024,
},
},
{
desc: "large object",
objRange: &DownloadRange{
Expand Down
11 changes: 8 additions & 3 deletions storage/transfermanager/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,14 @@ func (wpt withPerOpTimeout) apply(tm *transferManagerConfig) {
tm.perOperationTimeout = wpt.timeout
}

// WithPartSize returns a TransferManagerOption that specifies the size of the
// shards to transfer; that is, if the object is larger than this size, it will
// be uploaded or downloaded in concurrent pieces.
// WithPartSize returns a TransferManagerOption that specifies the size in bytes
// of the shards to transfer; that is, if the object is larger than partSize,
// it will be uploaded or downloaded in concurrent pieces of size partSize.
//
// The default is 32 MiB for downloads.
//
// To turn off sharding, set partSize to 0.
//
// Note that files that support decompressive transcoding will be downloaded in
// a single piece regardless of the partSize set here.
func WithPartSize(partSize int64) Option {
Expand All @@ -93,6 +97,7 @@ type transferManagerConfig struct {

// Size of shards to transfer; Python found 32 MiB to be good default for
// JSON downloads but gRPC may benefit from larger.
// A partSize smaller than 1 indicates to turn off sharding.
partSize int64

// Timeout for a single operation (including all retries). Zero value means
Expand Down
57 changes: 39 additions & 18 deletions storage/transfermanager/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,46 @@ import (
)

func TestApply(t *testing.T) {
opts := []Option{
WithWorkers(3),
WithPerOpTimeout(time.Hour),
WithCallbacks(),
WithPartSize(30),
}
var got transferManagerConfig
for _, opt := range opts {
opt.apply(&got)
}
want := transferManagerConfig{
numWorkers: 3,
perOperationTimeout: time.Hour,
asynchronous: true,
partSize: 30,
}
for _, test := range []struct {
desc string
opts []Option
want transferManagerConfig
}{
{
desc: "all options",
opts: []Option{
WithWorkers(3),
WithPerOpTimeout(time.Hour),
WithCallbacks(),
WithPartSize(300000),
},
want: transferManagerConfig{
numWorkers: 3,
perOperationTimeout: time.Hour,
asynchronous: true,
partSize: 300000,
},
},
{
desc: "small partSize",
opts: []Option{
WithPartSize(30),
},
want: transferManagerConfig{
partSize: 30,
},
},
} {
t.Run(test.desc, func(t *testing.T) {
var got transferManagerConfig
for _, opt := range test.opts {
opt.apply(&got)
}

if got != want {
t.Errorf("got: %+v, want: %+v", got, want)
if got != test.want {
t.Errorf("got: %+v, want: %+v", got, test.want)
}
})
}
}

Expand Down

0 comments on commit bd94084

Please sign in to comment.