Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce new ObjectExistsWithSize API to #14268

Merged
merged 3 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/ingester-rf1/objstore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func (m *Multi) GetStoreFor(ts model.Time) (client.ObjectClient, error) {
return nil, fmt.Errorf("no store found for timestamp %s", ts)
}

func (m *Multi) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return false, 0, err
}
return s.ObjectExistsWithSize(ctx, objectKey)
}

func (m *Multi) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
Expand Down
19 changes: 15 additions & 4 deletions pkg/storage/chunk/client/alibaba/oss_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,27 @@ func (s *OssObjectClient) Stop() {
}

func (s *OssObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := s.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (s *OssObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
var options []oss.Option
var objectSize int64
err := instrument.CollectedRequest(ctx, "OSS.ObjectExists", ossRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
_, requestErr := s.defaultBucket.GetObjectMeta(objectKey, options...)
return requestErr
headers, requestErr := s.defaultBucket.GetObjectMeta(objectKey, options...)
if requestErr != nil {
return requestErr
}

objectSize, _ = strconv.ParseInt(headers.Get(oss.HTTPHeaderContentLength), 10, 64)
return nil
})
if err != nil {
return false, err
return false, 0, err
}

return true, nil
return true, objectSize, nil
}

// GetObject returns a reader and the size for the specified object key from the configured OSS bucket.
Expand Down
26 changes: 19 additions & 7 deletions pkg/storage/chunk/client/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,37 +310,49 @@ func buckets(cfg S3Config) ([]string, error) {
func (a *S3ObjectClient) Stop() {}

func (a *S3ObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := a.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (a *S3ObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
var lastErr error
var objectSize int64

retries := backoff.New(ctx, a.cfg.BackoffConfig)
for retries.Ongoing() {
if ctx.Err() != nil {
return false, errors.Wrap(ctx.Err(), "ctx related error during s3 objectExists")
return false, 0, errors.Wrap(ctx.Err(), "ctx related error during s3 objectExists")
}
lastErr = instrument.CollectedRequest(ctx, "S3.ObjectExists", s3RequestDuration, instrument.ErrorCode, func(_ context.Context) error {
headObjectInput := &s3.HeadObjectInput{
Bucket: aws.String(a.bucketFromKey(objectKey)),
Key: aws.String(objectKey),
}
_, requestErr := a.S3.HeadObject(headObjectInput)
return requestErr
headOutput, requestErr := a.S3.HeadObject(headObjectInput)
if requestErr != nil {
return requestErr
}
if headOutput != nil && headOutput.ContentLength != nil {
objectSize = *headOutput.ContentLength
}
return nil
})
if lastErr == nil {
return true, nil
return true, 0, nil
}

if a.IsObjectNotFoundErr(lastErr) {
return false, lastErr
return false, 0, lastErr
}

retries.Wait()
}

if lastErr != nil {
return false, lastErr
return false, 0, lastErr
}

return true, nil
return true, objectSize, nil
}

// DeleteObject deletes the specified objectKey from the appropriate S3 bucket
Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/chunk/client/aws/s3_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,15 @@ func Test_RetryLogic(t *testing.T) {
return err
},
},
{
"object exists with size with retries",
3,
true,
func(c *S3ObjectClient) error {
_, _, err := c.ObjectExistsWithSize(context.Background(), "foo")
return err
},
},
{
"object doesn't exist with retries",
3,
Expand All @@ -343,6 +352,15 @@ func Test_RetryLogic(t *testing.T) {
return err
},
},
{
"object doesn't exist (with size) with retries",
3,
false,
func(c *S3ObjectClient) error {
_, _, err := c.ObjectExistsWithSize(context.Background(), "foo")
return err
},
},
} {
t.Run(tc.name, func(t *testing.T) {
callCount := atomic.NewInt32(0)
Expand Down
23 changes: 19 additions & 4 deletions pkg/storage/chunk/client/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,20 +220,35 @@ func NewBlobStorage(cfg *BlobStorageConfig, metrics BlobStorageMetrics, hedgingC
func (b *BlobStorage) Stop() {}

func (b *BlobStorage) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := b.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (b *BlobStorage) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
var objectSize int64
err := loki_instrument.TimeRequest(ctx, "azure.ObjectExists", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
blockBlobURL, err := b.getBlobURL(objectKey, false)
if err != nil {
return err
}

_, err = blockBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
return err
response, err := blockBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
if err != nil {
return err
}
if response != nil {
rawResponse := response.Response()
if rawResponse != nil {
objectSize = rawResponse.ContentLength
}
}
return nil
})
if err != nil {
return false, err
return false, 0, err
}

return true, nil
return true, objectSize, nil
}

// GetObject returns a reader and the size for the specified object key.
Expand Down
21 changes: 16 additions & 5 deletions pkg/storage/chunk/client/baidubce/bos_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,27 @@ func (b *BOSObjectStorage) PutObject(ctx context.Context, objectKey string, obje
}

func (b *BOSObjectStorage) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := b.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (b *BOSObjectStorage) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
var objectSize int64
err := instrument.CollectedRequest(ctx, "BOS.ObjectExists", bosRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
var requestErr error
_, requestErr = b.client.GetObjectMeta(b.cfg.BucketName, objectKey)
return requestErr
metaResult, requestErr := b.client.GetObjectMeta(b.cfg.BucketName, objectKey)
if requestErr != nil {
return requestErr
}
if metaResult != nil {
objectSize = metaResult.ContentLength
}
return nil
})
if err != nil {
return false, err
return false, 0, err
}

return true, nil
return true, objectSize, nil
}

func (b *BOSObjectStorage) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/chunk/client/congestion/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func (a *AIMDController) ObjectExists(ctx context.Context, objectKey string) (bo
return a.inner.ObjectExists(ctx, objectKey)
}

func (a *AIMDController) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
return a.inner.ObjectExistsWithSize(ctx, objectKey)
}

func (a *AIMDController) DeleteObject(ctx context.Context, objectKey string) error {
return a.inner.DeleteObject(ctx, objectKey)
}
Expand Down Expand Up @@ -212,6 +216,9 @@ func NewNoopController(Config) *NoopController {
return &NoopController{}
}

func (n *NoopController) ObjectExistsWithSize(context.Context, string) (bool, int64, error) {
return true, 0, nil
}
func (n *NoopController) ObjectExists(context.Context, string) (bool, error) { return true, nil }
func (n *NoopController) PutObject(context.Context, string, io.Reader) error { return nil }
func (n *NoopController) GetObject(context.Context, string) (io.ReadCloser, int64, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/chunk/client/congestion/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ func (m *mockObjectClient) ObjectExists(context.Context, string) (bool, error) {
panic("not implemented")
}

func (m *mockObjectClient) ObjectExistsWithSize(context.Context, string) (bool, int64, error) {
panic("not implemented")
}

func (m *mockObjectClient) List(context.Context, string, string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
panic("not implemented")
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/storage/chunk/client/gcp/gcs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,20 @@ func (s *GCSObjectClient) Stop() {
}

func (s *GCSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
_, err := s.getsBuckets.Object(objectKey).Attrs(ctx)
exists, _, err := s.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (s *GCSObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
attrs, err := s.getsBuckets.Object(objectKey).Attrs(ctx)
if err != nil {
return false, err
return false, 0, err
}

return true, nil
if attrs != nil {
return true, attrs.Size, nil
}
return true, 0, nil
}

// GetObject returns a reader and the size for the specified object key from the configured GCS bucket.
Expand Down
21 changes: 16 additions & 5 deletions pkg/storage/chunk/client/ibmcloud/cos_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,20 +320,31 @@ func (c *COSObjectClient) DeleteObject(ctx context.Context, objectKey string) er
}

func (c *COSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := c.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (c *COSObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
bucket := c.bucketFromKey(objectKey)
var objectSize int64
err := instrument.CollectedRequest(ctx, "COS.GetObject", cosRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
var requestErr error
_, requestErr = c.hedgedCOS.HeadObject(&cos.HeadObjectInput{
headOutput, requestErr := c.hedgedCOS.HeadObject(&cos.HeadObjectInput{
Bucket: ibm.String(bucket),
Key: ibm.String(objectKey),
})
return requestErr
if requestErr != nil {
return requestErr
}
if headOutput != nil && headOutput.ContentLength != nil {
objectSize = *headOutput.ContentLength
}
return nil
})
if err != nil {
return false, err
return false, 0, err
}

return true, nil
return true, objectSize, nil
}

// GetObject returns a reader and the size for the specified object key from the configured S3 bucket.
Expand Down
13 changes: 9 additions & 4 deletions pkg/storage/chunk/client/local/fs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,19 @@ func NewFSObjectClient(cfg FSConfig) (*FSObjectClient, error) {
// Stop implements ObjectClient
func (FSObjectClient) Stop() {}

func (f *FSObjectClient) ObjectExists(_ context.Context, objectKey string) (bool, error) {
func (f *FSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := f.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (f *FSObjectClient) ObjectExistsWithSize(_ context.Context, objectKey string) (bool, int64, error) {
fullPath := filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey))
_, err := os.Lstat(fullPath)
fi, err := os.Lstat(fullPath)
if err != nil {
return false, err
return false, 0, err
}

return true, nil
return true, fi.Size(), nil
}

// GetObject from the store
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/chunk/client/local/fs_object_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ func TestFSObjectClient_List_and_ObjectExists(t *testing.T) {
ok, err := bucketClient.ObjectExists(context.Background(), "outer-file2")
require.NoError(t, err)
require.True(t, ok)

ok, objectSize, err := bucketClient.ObjectExistsWithSize(context.Background(), "outer-file2")
require.NoError(t, err)
require.True(t, ok)
require.EqualValues(t, len("outer-file2"), objectSize)
}

func TestFSObjectClient_DeleteObject(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/chunk/client/object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
// ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/...)
type ObjectClient interface {
ObjectExists(ctx context.Context, objectKey string) (bool, error)
ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error)

PutObject(ctx context.Context, objectKey string, object io.Reader) error
// NOTE: The consumer of GetObject should always call the Close method when it is done reading which otherwise could cause a resource leak.
Expand Down
11 changes: 8 additions & 3 deletions pkg/storage/chunk/client/openstack/swift_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,17 @@ func (s *SwiftObjectClient) Stop() {
}

func (s *SwiftObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
_, _, err := s.hedgingConn.Object(ctx, s.cfg.Config.ContainerName, objectKey)
exists, _, err := s.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (s *SwiftObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
info, _, err := s.hedgingConn.Object(ctx, s.cfg.Config.ContainerName, objectKey)
if err != nil {
return false, err
return false, 0, err
}

return true, nil
return true, info.Bytes, nil
}

// GetObject returns a reader and the size for the specified object key from the configured swift container.
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/chunk/client/prefixed_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func (p PrefixedObjectClient) ObjectExists(ctx context.Context, objectKey string
return p.downstreamClient.ObjectExists(ctx, p.prefix+objectKey)
}

func (p PrefixedObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
return p.downstreamClient.ObjectExistsWithSize(ctx, p.prefix+objectKey)
}

func (p PrefixedObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
return p.downstreamClient.GetObject(ctx, p.prefix+objectKey)
}
Expand Down
Loading
Loading