Skip to content

Commit

Permalink
Adjust functions after minio-go/v7 upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
heimweh committed Aug 3, 2020
1 parent 8014a2f commit fb0f9c6
Showing 1 changed file with 54 additions and 42 deletions.
96 changes: 54 additions & 42 deletions pkg/objstore/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/minio/minio-go/v6"
"github.com/minio/minio-go/v6/pkg/credentials"
"github.com/minio/minio-go/v6/pkg/encrypt"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
Expand Down Expand Up @@ -136,36 +136,41 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
}
}

client, err := minio.NewWithCredentials(config.Endpoint, credentials.NewChainCredentials(chain), !config.Insecure, config.Region)
client, err := minio.New(config.Endpoint, &minio.Options{
Creds: credentials.NewChainCredentials(chain),
Secure: !config.Insecure,
Region: config.Region,
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,

MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: time.Duration(config.HTTPConfig.IdleConnTimeout),
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
// The ResponseHeaderTimeout here is the only change
// from the default minio transport, it was introduced
// to cover cases where the tcp connection works but
// the server never answers. Defaults to 2 minutes.
ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
// Set this value so that the underlying transport round-tripper
// doesn't try to auto decode the body of objects with
// content-encoding set to `gzip`.
//
// Refer: https://golang.org/src/net/http/transport.go?h=roundTrip#L1843.
DisableCompression: true,
TLSClientConfig: &tls.Config{InsecureSkipVerify: config.HTTPConfig.InsecureSkipVerify},
},
})
if err != nil {
return nil, errors.Wrap(err, "initialize s3 client")
}
client.SetAppInfo(fmt.Sprintf("thanos-%s", component), fmt.Sprintf("%s (%s)", version.Version, runtime.Version()))
client.SetCustomTransport(&http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: time.Duration(config.HTTPConfig.IdleConnTimeout),
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
// The ResponseHeaderTimeout here is the only change
// from the default minio transport, it was introduced
// to cover cases where the tcp connection works but
// the server never answers. Defaults to 2 minutes.
ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
// Set this value so that the underlying transport round-tripper
// doesn't try to auto decode the body of objects with
// content-encoding set to `gzip`.
//
// Refer: https://golang.org/src/net/http/transport.go?h=roundTrip#L1843.
DisableCompression: true,
TLSClientConfig: &tls.Config{InsecureSkipVerify: config.HTTPConfig.InsecureSkipVerify},
})

var sse encrypt.ServerSide
if config.SSEEncryption {
Expand Down Expand Up @@ -228,7 +233,12 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) err
dir = strings.TrimSuffix(dir, DirDelim) + DirDelim
}

for object := range b.client.ListObjects(b.name, dir, false, ctx.Done()) {
opts := minio.ListObjectsOptions{
Prefix: dir,
Recursive: false,
}

for object := range b.client.ListObjects(ctx, b.name, opts) {
// Catch the error when failed to list objects.
if object.Err != nil {
return object.Err
Expand Down Expand Up @@ -260,7 +270,7 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
return nil, err
}
}
r, err := b.client.GetObjectWithContext(ctx, b.name, name, *opts)
r, err := b.client.GetObject(ctx, b.name, name, *opts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -288,8 +298,8 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (
}

// Exists checks if the given object exists.
func (b *Bucket) Exists(_ context.Context, name string) (bool, error) {
_, err := b.client.StatObject(b.name, name, minio.StatObjectOptions{})
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
_, err := b.client.StatObject(ctx, b.name, name, minio.StatObjectOptions{})
if err != nil {
if b.IsObjNotFoundErr(err) {
return false, nil
Expand All @@ -314,7 +324,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
if size < int64(partSize) {
partSize = 0
}
if _, err := b.client.PutObjectWithContext(
if _, err := b.client.PutObject(
ctx,
b.name,
name,
Expand All @@ -333,8 +343,8 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
}

// Attributes returns information about the specified object.
func (b *Bucket) Attributes(_ context.Context, name string) (objstore.ObjectAttributes, error) {
objInfo, err := b.client.StatObject(b.name, name, minio.StatObjectOptions{})
func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
objInfo, err := b.client.StatObject(ctx, b.name, name, minio.StatObjectOptions{})
if err != nil {
return objstore.ObjectAttributes{}, err
}
Expand All @@ -346,8 +356,8 @@ func (b *Bucket) Attributes(_ context.Context, name string) (objstore.ObjectAttr
}

// Delete removes the object with the given name.
func (b *Bucket) Delete(_ context.Context, name string) error {
return b.client.RemoveObject(b.name, name)
func (b *Bucket) Delete(ctx context.Context, name string) error {
return b.client.RemoveObject(ctx, b.name, name, minio.RemoveObjectOptions{})
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
Expand Down Expand Up @@ -391,6 +401,8 @@ func NewTestBucket(t testing.TB, location string) (objstore.Bucket, func(), erro
}

func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucket bool) (objstore.Bucket, func(), error) {
ctx := context.Background()

bc, err := yaml.Marshal(c)
if err != nil {
return nil, nil, err
Expand All @@ -402,7 +414,7 @@ func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucke

bktToCreate := c.Bucket
if c.Bucket != "" && reuseBucket {
if err := b.Iter(context.Background(), "", func(f string) error {
if err := b.Iter(ctx, "", func(f string) error {
return errors.Errorf("bucket %s is not empty", c.Bucket)
}); err != nil {
return nil, nil, errors.Wrapf(err, "s3 check bucket %s", c.Bucket)
Expand All @@ -416,15 +428,15 @@ func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucke
bktToCreate = objstore.CreateTemporaryTestBucketName(t)
}

if err := b.client.MakeBucket(bktToCreate, location); err != nil {
if err := b.client.MakeBucket(ctx, bktToCreate, minio.MakeBucketOptions{Region: location}); err != nil {
return nil, nil, err
}
b.name = bktToCreate
t.Log("created temporary AWS bucket for AWS tests with name", bktToCreate, "in", location)

return b, func() {
objstore.EmptyBucket(t, context.Background(), b)
if err := b.client.RemoveBucket(bktToCreate); err != nil {
objstore.EmptyBucket(t, ctx, b)
if err := b.client.RemoveBucket(ctx, bktToCreate); err != nil {
t.Logf("deleting bucket %s failed: %s", bktToCreate, err)
}
}, nil
Expand Down

0 comments on commit fb0f9c6

Please sign in to comment.