From 7a14ce7219f005ee940f2738020e6151c94f0b69 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Mon, 29 Jul 2024 11:22:27 -0500 Subject: [PATCH] Add BatchPut and EmptyBucket functions to S3Service --- s3x/s3.go | 89 ----------- s3x/service.go | 219 ++++++++++++++++++++++++++++ s3x/{s3_test.go => service_test.go} | 33 ++++- 3 files changed, 251 insertions(+), 90 deletions(-) delete mode 100644 s3x/s3.go create mode 100644 s3x/service.go rename s3x/{s3_test.go => service_test.go} (64%) diff --git a/s3x/s3.go b/s3x/s3.go deleted file mode 100644 index cb1447b..0000000 --- a/s3x/s3.go +++ /dev/null @@ -1,89 +0,0 @@ -package s3x - -import ( - "bytes" - "context" - "fmt" - "io" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" -) - -// Service is simple abstraction layer to work with a S3-compatible storage service -type Service struct { - Client *s3.S3 - urler ObjectURLer -} - -// NewService creates a new S3 service with the given credentials and configuration -func NewService(accessKey, secretKey, region, endpoint string, minio bool) (*Service, error) { - cfg := &aws.Config{ - Region: aws.String(region), - Endpoint: aws.String(endpoint), - S3ForcePathStyle: aws.Bool(minio), // urls as endpoint/bucket/key instead of bucket.endpoint/key - MaxRetries: aws.Int(3), - } - if accessKey != "" || secretKey != "" { - cfg.Credentials = credentials.NewStaticCredentials(accessKey, secretKey, "") - } - s, err := session.NewSession(cfg) - if err != nil { - return nil, err - } - - var urler ObjectURLer - if minio { - urler = MinioURLer(endpoint) - } else { - urler = AWSURLer(region) - } - - return &Service{Client: s3.New(s), urler: urler}, nil -} - -// ObjectURL returns the publicly accessible URL for the given object -func (s *Service) ObjectURL(bucket, key string) string { - return s.urler(bucket, key) -} - -// Test is a convenience method to HEAD a bucket to test if it exists and we can access it -func (s *Service) Test(ctx context.Context, bucket string) error { - _, err := s.Client.HeadBucket(&s3.HeadBucketInput{Bucket: aws.String(bucket)}) - return err -} - -// GetObject is a convenience method to get an object from S3 and read its contents into a byte slice -func (s *Service) GetObject(ctx context.Context, bucket, key string) (string, []byte, error) { - out, err := s.Client.GetObjectWithContext(ctx, &s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), - }) - if err != nil { - return "", nil, fmt.Errorf("error getting S3 object: %w", err) - } - - body, err := io.ReadAll(out.Body) - if err != nil { - return "", nil, fmt.Errorf("error reading S3 object: %w", err) - } - - return aws.StringValue(out.ContentType), body, nil -} - -// PutObject is a convenience method to put the given object and return its publicly accessible URL -func (s *Service) PutObject(ctx context.Context, bucket, key string, contentType string, body []byte, acl string) (string, error) { - _, err := s.Client.PutObjectWithContext(ctx, &s3.PutObjectInput{ - Bucket: aws.String(bucket), - Body: bytes.NewReader(body), - Key: aws.String(key), - ContentType: aws.String(contentType), - ACL: aws.String(acl), - }) - if err != nil { - return "", fmt.Errorf("error putting S3 object: %w", err) - } - return s.urler(bucket, key), nil -} diff --git a/s3x/service.go b/s3x/service.go new file mode 100644 index 0000000..69bd106 --- /dev/null +++ b/s3x/service.go @@ -0,0 +1,219 @@ +package s3x + +import ( + "bytes" + "context" + "fmt" + "io" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" +) + +const ( + workersPerBatch = 32 +) + +// Service is simple abstraction layer to work with a S3-compatible storage service +type Service struct { + Client *s3.S3 + urler ObjectURLer +} + +// NewService creates a new S3 service with the given credentials and configuration +func NewService(accessKey, secretKey, region, endpoint string, minio bool) (*Service, error) { + cfg := &aws.Config{ + Region: aws.String(region), + Endpoint: aws.String(endpoint), + S3ForcePathStyle: aws.Bool(minio), // urls as endpoint/bucket/key instead of bucket.endpoint/key + MaxRetries: aws.Int(3), + } + if accessKey != "" || secretKey != "" { + cfg.Credentials = credentials.NewStaticCredentials(accessKey, secretKey, "") + } + s, err := session.NewSession(cfg) + if err != nil { + return nil, err + } + + var urler ObjectURLer + if minio { + urler = MinioURLer(endpoint) + } else { + urler = AWSURLer(region) + } + + return &Service{Client: s3.New(s), urler: urler}, nil +} + +// ObjectURL returns the publicly accessible URL for the given object +func (s *Service) ObjectURL(bucket, key string) string { + return s.urler(bucket, key) +} + +// Test is a convenience method to HEAD a bucket to test if it exists and we can access it +func (s *Service) Test(ctx context.Context, bucket string) error { + _, err := s.Client.HeadBucket(&s3.HeadBucketInput{Bucket: aws.String(bucket)}) + return err +} + +// GetObject is a convenience method to get an object from S3 and read its contents into a byte slice +func (s *Service) GetObject(ctx context.Context, bucket, key string) (string, []byte, error) { + out, err := s.Client.GetObjectWithContext(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + return "", nil, fmt.Errorf("error getting S3 object: %w", err) + } + + body, err := io.ReadAll(out.Body) + if err != nil { + return "", nil, fmt.Errorf("error reading S3 object: %w", err) + } + + return aws.StringValue(out.ContentType), body, nil +} + +// PutObject is a convenience method to put the given object and return its publicly accessible URL +func (s *Service) PutObject(ctx context.Context, bucket, key string, contentType string, body []byte, acl string) (string, error) { + _, err := s.Client.PutObjectWithContext(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Body: bytes.NewReader(body), + Key: aws.String(key), + ContentType: aws.String(contentType), + ACL: aws.String(acl), + }) + if err != nil { + return "", fmt.Errorf("error putting S3 object: %w", err) + } + return s.urler(bucket, key), nil +} + +// Upload is our type for a file in a batch upload +type Upload struct { + Bucket string + Key string + ContentType string + Body []byte + ACL string + + // set by BatchPut + URL string + Error error +} + +// BatchPut writes the entire batch of items to the passed in URLs, returning a map of errors if any. +// Writes will be retried up to three times automatically. +func (s *Service) BatchPut(ctx context.Context, us []*Upload) error { + uploads := make(chan *Upload, len(us)) + errors := make(chan error, len(us)) + stop := make(chan bool) + wg := &sync.WaitGroup{} + + // start our workers + for w := 0; w < workersPerBatch; w++ { + wg.Add(1) + go s.batchWorker(ctx, uploads, errors, stop, wg) + } + + // add all our uploads to our work queue + for _, u := range us { + uploads <- u + } + + // read all our errors out, we'll stop everything if we encounter one + var err error + for i := 0; i < len(us); i++ { + e := <-errors + if e != nil { + err = e + break + } + } + + // stop everyone + close(stop) + + // wait for everything to finish up + wg.Wait() + + return err +} + +func (s *Service) batchWorker(ctx context.Context, uploads chan *Upload, errors chan error, stop chan bool, wg *sync.WaitGroup) { + defer wg.Done() + + for { + select { + case u := <-uploads: + var err error + for tries := 0; tries < 3; tries++ { + // we use a short timeout per request, better to retry than wait on a stalled connection and waste all our time + // TODO: validate choice of 15 seconds against real world performance + uctx, cancel := context.WithTimeout(ctx, time.Second*15) + defer cancel() + + _, err = s.Client.PutObjectWithContext(uctx, &s3.PutObjectInput{ + Bucket: aws.String(u.Bucket), + Body: bytes.NewReader(u.Body), + Key: aws.String(u.Key), + ContentType: aws.String(u.ContentType), + ACL: aws.String(u.ACL), + }) + + if err == nil { + break + } + } + + if err == nil { + u.URL = s.urler(u.Bucket, u.Key) + } else { + u.Error = err + } + + errors <- err + + case <-stop: + return + } + } +} + +// EmptyBucket is a convenience method to delete all the objects in a bucket +func (s *Service) EmptyBucket(ctx context.Context, bucket string) error { + request := &s3.ListObjectsV2Input{Bucket: aws.String(bucket)} + + for { + response, err := s.Client.ListObjectsV2WithContext(ctx, request) + if err != nil { + return fmt.Errorf("error listing S3 objects: %w", err) + } + + if len(response.Contents) > 0 { + del := &s3.Delete{} + + for _, obj := range response.Contents { + del.Objects = append(del.Objects, &s3.ObjectIdentifier{Key: obj.Key}) + } + + _, err = s.Client.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{Bucket: aws.String(bucket), Delete: del}) + if err != nil { + return fmt.Errorf("error deleting S3 objects: %w", err) + } + } + + request.ContinuationToken = response.NextContinuationToken + + if !aws.BoolValue(response.IsTruncated) { + break + } + } + + return nil +} diff --git a/s3x/s3_test.go b/s3x/service_test.go similarity index 64% rename from s3x/s3_test.go rename to s3x/service_test.go index 7719922..0ec50a3 100644 --- a/s3x/s3_test.go +++ b/s3x/service_test.go @@ -34,9 +34,40 @@ func TestService(t *testing.T) { assert.Equal(t, "text/plain", contentType) assert.Equal(t, []byte("hello world"), body) - _, err = svc.Client.DeleteObject(&s3.DeleteObjectInput{Bucket: aws.String("gocommon-tests"), Key: aws.String("1/hello world.txt")}) + // test batch put + uploads := []*s3x.Upload{ + { + Bucket: "gocommon-tests", + Key: "foo/thing1", + Body: []byte(`HELLOWORLD`), + ContentType: "text/plain", + ACL: s3.BucketCannedACLPublicRead, + }, + { + Bucket: "gocommon-tests", + Key: "foo/thing2", + Body: []byte(`HELLOWORLD2`), + ContentType: "text/plain", + ACL: s3.BucketCannedACLPublicRead, + }, + } + + err = svc.BatchPut(ctx, uploads) + assert.NoError(t, err) + + assert.Equal(t, "http://localhost:9000/gocommon-tests/foo/thing1", uploads[0].URL) + assert.Nil(t, uploads[0].Error) + assert.Equal(t, "http://localhost:9000/gocommon-tests/foo/thing2", uploads[1].URL) + assert.Nil(t, uploads[1].Error) + + // test emptying a bucket + err = svc.EmptyBucket(ctx, "gocommon-tests") + assert.NoError(t, err) + + err = svc.EmptyBucket(ctx, "gocommon-tests") assert.NoError(t, err) + // test deleting a bucket _, err = svc.Client.DeleteBucket(&s3.DeleteBucketInput{Bucket: aws.String("gocommon-tests")}) assert.NoError(t, err)