Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

configurable S3 PUT options #4526

Merged
merged 1 commit into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/configurable-s3-put-options.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: configurable s3 put options

The s3ng blobstore can now be configured with several options: `s3.disable_content_sha254`, `s3.disable_multipart`, `s3.send_content_md5`, `s3.concurrent_stream_parts`, `s3.num_threads` and `s3.part_size`. If unset we default to `s3.send_content_md5: true`, which was hardcoded before. We also default to `s3.concurrent_stream_parts: true` and `s3.num_threads: 4` to allow concurrent uploads even when `s3.send_content_md5` is set to `true`. When tweaking the uploads try setting `s3.send_content_md5: false` and `s3.concurrent_stream_parts: false` first, as this will try to concurrently stream an uploaded file to the s3 store without cutting it into parts first.

https://github.com/cs3org/reva/pull/4526
28 changes: 24 additions & 4 deletions pkg/storage/fs/s3ng/blobstore/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,22 @@ import (
type Blobstore struct {
client *minio.Client

defaultPutOptions Options

bucket string
}

type Options struct {
DisableContentSha256 bool
DisableMultipart bool
SendContentMd5 bool
ConcurrentStreamParts bool
NumThreads uint
PartSize uint64
}

// New returns a new Blobstore
func New(endpoint, region, bucket, accessKey, secretKey string) (*Blobstore, error) {
func New(endpoint, region, bucket, accessKey, secretKey string, defaultPutOptions Options) (*Blobstore, error) {
u, err := url.Parse(endpoint)
if err != nil {
return nil, errors.Wrap(err, "failed to parse s3 endpoint")
Expand All @@ -58,8 +69,9 @@ func New(endpoint, region, bucket, accessKey, secretKey string) (*Blobstore, err
}

return &Blobstore{
client: client,
bucket: bucket,
client: client,
bucket: bucket,
defaultPutOptions: defaultPutOptions,
}, nil
}

Expand All @@ -71,7 +83,15 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error {
}
defer reader.Close()

_, err = bs.client.PutObject(context.Background(), bs.bucket, bs.path(node), reader, node.Blobsize, minio.PutObjectOptions{ContentType: "application/octet-stream", SendContentMd5: true})
_, err = bs.client.PutObject(context.Background(), bs.bucket, bs.path(node), reader, node.Blobsize, minio.PutObjectOptions{
ContentType: "application/octet-stream",
SendContentMd5: bs.defaultPutOptions.SendContentMd5,
ConcurrentStreamParts: bs.defaultPutOptions.ConcurrentStreamParts,
NumThreads: bs.defaultPutOptions.NumThreads,
PartSize: bs.defaultPutOptions.PartSize,
DisableMultipart: bs.defaultPutOptions.DisableMultipart,
DisableContentSha256: bs.defaultPutOptions.DisableContentSha256,
})

if err != nil {
return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", bs.path(node), bs.bucket)
Expand Down
29 changes: 29 additions & 0 deletions pkg/storage/fs/s3ng/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ type Options struct {

// Secret key for the s3 blobstore
S3SecretKey string `mapstructure:"s3.secret_key"`

// disable sending content sha256
DisableContentSha256 bool `mapstructure:"s3.disable_content_sha254"`

// disable multipart uploads
DisableMultipart bool `mapstructure:"s3.disable_multipart"`

// enable sending content md5, defaults to true if unset
SendContentMd5 bool `mapstructure:"s3.send_content_md5"`

// use concurrent stream parts
ConcurrentStreamParts bool `mapstructure:"s3.concurrent_stream_parts"`

// number of concurrent uploads
NumThreads uint `mapstructure:"s3.num_threads"`

// part size for concurrent uploads
PartSize uint64 `mapstructure:"s3.part_size"`
}

// S3ConfigComplete return true if all required s3 fields are set
Expand All @@ -60,5 +78,16 @@ func parseConfig(m map[string]interface{}) (*Options, error) {
err = errors.Wrap(err, "error decoding conf")
return nil, err
}

// if unset we set these defaults
if m["s3.send_content_md5"] == nil {
o.SendContentMd5 = true
}
if m["s3.concurrent_stream_parts"] == nil {
o.ConcurrentStreamParts = true
}
if m["s3.num_threads"] == nil {
o.NumThreads = 4
}
return o, nil
}
11 changes: 10 additions & 1 deletion pkg/storage/fs/s3ng/s3ng.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,16 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
return nil, fmt.Errorf("S3 configuration incomplete")
}

bs, err := blobstore.New(o.S3Endpoint, o.S3Region, o.S3Bucket, o.S3AccessKey, o.S3SecretKey)
defaultPutOptions := blobstore.Options{
DisableContentSha256: o.DisableContentSha256,
DisableMultipart: o.DisableMultipart,
SendContentMd5: o.SendContentMd5,
ConcurrentStreamParts: o.ConcurrentStreamParts,
NumThreads: o.NumThreads,
PartSize: o.PartSize,
}

bs, err := blobstore.New(o.S3Endpoint, o.S3Region, o.S3Bucket, o.S3AccessKey, o.S3SecretKey, defaultPutOptions)
if err != nil {
return nil, err
}
Expand Down
Loading