diff --git a/changelog/unreleased/configurable-s3-put-options.md b/changelog/unreleased/configurable-s3-put-options.md new file mode 100644 index 0000000000..aeb57ab203 --- /dev/null +++ b/changelog/unreleased/configurable-s3-put-options.md @@ -0,0 +1,5 @@ +Enhancement: configurable s3 put options + +The s3ng blobstore can now be configured with several options: `s3.disable_content_sha254`, `s3.disable_multipart`, `s3.send_content_md5`, `s3.concurrent_stream_parts`, `s3.num_threads` and `s3.part_size`. If unset we default to `s3.send_content_md5: true`, which was hardcoded before. We also default to `s3.concurrent_stream_parts: true` and `s3.num_threads: 4` to allow concurrent uploads even when `s3.send_content_md5` is set to `true`. When tweaking the uploads try setting `s3.send_content_md5: false` and `s3.concurrent_stream_parts: false` first, as this will try to concurrently stream an uploaded file to the s3 store without cutting it into parts first. + +https://github.com/cs3org/reva/pull/4526 diff --git a/pkg/storage/fs/s3ng/blobstore/blobstore.go b/pkg/storage/fs/s3ng/blobstore/blobstore.go index 9c744e7540..48a40b764f 100644 --- a/pkg/storage/fs/s3ng/blobstore/blobstore.go +++ b/pkg/storage/fs/s3ng/blobstore/blobstore.go @@ -37,11 +37,22 @@ import ( type Blobstore struct { client *minio.Client + defaultPutOptions Options + bucket string } +type Options struct { + DisableContentSha256 bool + DisableMultipart bool + SendContentMd5 bool + ConcurrentStreamParts bool + NumThreads uint + PartSize uint64 +} + // New returns a new Blobstore -func New(endpoint, region, bucket, accessKey, secretKey string) (*Blobstore, error) { +func New(endpoint, region, bucket, accessKey, secretKey string, defaultPutOptions Options) (*Blobstore, error) { u, err := url.Parse(endpoint) if err != nil { return nil, errors.Wrap(err, "failed to parse s3 endpoint") @@ -58,8 +69,9 @@ func New(endpoint, region, bucket, accessKey, secretKey string) (*Blobstore, err } return &Blobstore{ - client: client, - bucket: bucket, + client: client, + bucket: bucket, + defaultPutOptions: defaultPutOptions, }, nil } @@ -71,7 +83,15 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error { } defer reader.Close() - _, err = bs.client.PutObject(context.Background(), bs.bucket, bs.path(node), reader, node.Blobsize, minio.PutObjectOptions{ContentType: "application/octet-stream", SendContentMd5: true}) + _, err = bs.client.PutObject(context.Background(), bs.bucket, bs.path(node), reader, node.Blobsize, minio.PutObjectOptions{ + ContentType: "application/octet-stream", + SendContentMd5: bs.defaultPutOptions.SendContentMd5, + ConcurrentStreamParts: bs.defaultPutOptions.ConcurrentStreamParts, + NumThreads: bs.defaultPutOptions.NumThreads, + PartSize: bs.defaultPutOptions.PartSize, + DisableMultipart: bs.defaultPutOptions.DisableMultipart, + DisableContentSha256: bs.defaultPutOptions.DisableContentSha256, + }) if err != nil { return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", bs.path(node), bs.bucket) diff --git a/pkg/storage/fs/s3ng/option.go b/pkg/storage/fs/s3ng/option.go index 877a7d7189..54c038af79 100644 --- a/pkg/storage/fs/s3ng/option.go +++ b/pkg/storage/fs/s3ng/option.go @@ -43,6 +43,24 @@ type Options struct { // Secret key for the s3 blobstore S3SecretKey string `mapstructure:"s3.secret_key"` + + // disable sending content sha256 + DisableContentSha256 bool `mapstructure:"s3.disable_content_sha254"` + + // disable multipart uploads + DisableMultipart bool `mapstructure:"s3.disable_multipart"` + + // enable sending content md5, defaults to true if unset + SendContentMd5 bool `mapstructure:"s3.send_content_md5"` + + // use concurrent stream parts + ConcurrentStreamParts bool `mapstructure:"s3.concurrent_stream_parts"` + + // number of concurrent uploads + NumThreads uint `mapstructure:"s3.num_threads"` + + // part size for concurrent uploads + PartSize uint64 `mapstructure:"s3.part_size"` } // S3ConfigComplete return true if all required s3 fields are set @@ -60,5 +78,16 @@ func parseConfig(m map[string]interface{}) (*Options, error) { err = errors.Wrap(err, "error decoding conf") return nil, err } + + // if unset we set these defaults + if m["s3.send_content_md5"] == nil { + o.SendContentMd5 = true + } + if m["s3.concurrent_stream_parts"] == nil { + o.ConcurrentStreamParts = true + } + if m["s3.num_threads"] == nil { + o.NumThreads = 4 + } return o, nil } diff --git a/pkg/storage/fs/s3ng/s3ng.go b/pkg/storage/fs/s3ng/s3ng.go index 5cc7f8873b..16360c898b 100644 --- a/pkg/storage/fs/s3ng/s3ng.go +++ b/pkg/storage/fs/s3ng/s3ng.go @@ -44,7 +44,16 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) { return nil, fmt.Errorf("S3 configuration incomplete") } - bs, err := blobstore.New(o.S3Endpoint, o.S3Region, o.S3Bucket, o.S3AccessKey, o.S3SecretKey) + defaultPutOptions := blobstore.Options{ + DisableContentSha256: o.DisableContentSha256, + DisableMultipart: o.DisableMultipart, + SendContentMd5: o.SendContentMd5, + ConcurrentStreamParts: o.ConcurrentStreamParts, + NumThreads: o.NumThreads, + PartSize: o.PartSize, + } + + bs, err := blobstore.New(o.S3Endpoint, o.S3Region, o.S3Bucket, o.S3AccessKey, o.S3SecretKey, defaultPutOptions) if err != nil { return nil, err }