Skip to content

Commit

Permalink
chore(k228): recut k228 to include thanos changes (#14870)
Browse files Browse the repository at this point in the history
Co-authored-by: Joao Marcal <[email protected]>
  • Loading branch information
ashwanthgoli and JoaoBraveCoding authored Nov 12, 2024
1 parent f2da621 commit 7d0a5b8
Show file tree
Hide file tree
Showing 32 changed files with 535 additions and 237 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ require (
github.com/richardartoul/molecule v1.0.0
github.com/schollz/progressbar/v3 v3.17.0
github.com/shirou/gopsutil/v4 v4.24.10
github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97
github.com/twmb/franz-go v1.17.1
github.com/twmb/franz-go/pkg/kadm v1.13.0
github.com/twmb/franz-go/pkg/kfake v0.0.0-20241015013301-cea7aa5d8037
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2585,8 +2585,8 @@ github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.480/go.mod
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm v1.0.480/go.mod h1:zaBIuDDs+rC74X8Aog+LSu91GFtHYRYDC196RGTm2jk=
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw=
github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d h1:k+SLTP1mjNqXxsCiq4UYeKCe07le0ieffyuHm/YfmH8=
github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d/go.mod h1:/ZMUxFcp/nT6oYV5WslH9k07NU/+86+aibgZRmMMr/4=
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0=
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw=
github.com/tidwall/gjson v1.6.0/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/rulestore/bucketclient/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (b *BucketRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rul
Name: group,
})
return nil
}, objstore.WithRecursiveIter)
}, objstore.WithRecursiveIter())

if err != nil {
return nil, err
Expand Down Expand Up @@ -156,7 +156,7 @@ func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context,
Name: group,
})
return nil
}, objstore.WithRecursiveIter)
}, objstore.WithRecursiveIter())
if err != nil {
return nil, err
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/storage/bucket/azure/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,12 @@ func newBucketClient(cfg Config, name string, logger log.Logger, factory func(lo
bucketConfig.ContainerName = cfg.ContainerName
bucketConfig.MaxRetries = cfg.MaxRetries
bucketConfig.UserAssignedID = cfg.UserAssignedID
bucketConfig.HTTPConfig.Transport = cfg.Transport

if cfg.Endpoint != "" {
// azure.DefaultConfig has the default Endpoint, overwrite it only if a different one was explicitly provided.
bucketConfig.Endpoint = cfg.Endpoint
}

return factory(logger, bucketConfig, name, func(rt http.RoundTripper) http.RoundTripper {
if cfg.Transport != nil {
rt = cfg.Transport
}
return rt
})
return factory(logger, bucketConfig, name, nil)
}
40 changes: 40 additions & 0 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"flag"
"fmt"
"net/http"
"regexp"

"github.com/go-kit/log"
Expand Down Expand Up @@ -126,6 +128,44 @@ func (cfg *Config) Validate() error {
return cfg.StorageBackendConfig.Validate()
}

func (cfg *Config) disableRetries(backend string) error {
switch backend {
case S3:
cfg.S3.MaxRetries = 1
case GCS:
cfg.GCS.MaxRetries = 1
case Azure:
cfg.Azure.MaxRetries = 1
case Swift:
cfg.Swift.MaxRetries = 1
case Filesystem:
// do nothing
default:
return fmt.Errorf("cannot disable retries for backend: %s", backend)
}

return nil
}

func (cfg *Config) configureTransport(backend string, rt http.RoundTripper) error {
switch backend {
case S3:
cfg.S3.HTTP.Transport = rt
case GCS:
cfg.GCS.Transport = rt
case Azure:
cfg.Azure.Transport = rt
case Swift:
cfg.Swift.Transport = rt
case Filesystem:
// do nothing
default:
return fmt.Errorf("cannot configure transport for backend: %s", backend)
}

return nil
}

// NewClient creates a new bucket client based on the configured backend
func NewClient(ctx context.Context, backend string, cfg Config, name string, logger log.Logger) (objstore.InstrumentedBucket, error) {
var (
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/bucket/gcs/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo
bucketConfig.Bucket = cfg.BucketName
bucketConfig.ServiceAccount = cfg.ServiceAccount.String()
bucketConfig.ChunkSizeBytes = cfg.ChunkBufferSize
bucketConfig.MaxRetries = cfg.MaxRetries
bucketConfig.HTTPConfig.Transport = cfg.Transport

return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, nil)
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/bucket/gcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Config struct {
BucketName string `yaml:"bucket_name"`
ServiceAccount flagext.Secret `yaml:"service_account" doc:"description_method=GCSServiceAccountLongDescription"`
ChunkBufferSize int `yaml:"chunk_buffer_size"`
MaxRetries int `yaml:"max_retries"`

// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
Expand All @@ -27,6 +28,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.BucketName, prefix+"gcs.bucket-name", "", "GCS bucket name")
f.Var(&cfg.ServiceAccount, prefix+"gcs.service-account", cfg.GCSServiceAccountShortDescription())
f.IntVar(&cfg.ChunkBufferSize, prefix+"gcs.chunk-buffer-size", 0, "The maximum size of the buffer that GCS client for a single PUT request. 0 to disable buffering.")
f.IntVar(&cfg.MaxRetries, prefix+"gcs.max-retries", 10, "The maximum number of retries for idempotent operations. Overrides the default gcs storage client behavior if this value is greater than 0. Set this to 1 to disable retries.")
}

func (cfg *Config) GCSServiceAccountShortDescription() string {
Expand Down
88 changes: 64 additions & 24 deletions pkg/storage/bucket/object_client_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,78 @@ package bucket

import (
"context"
"fmt"
"io"
"slices"
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"

"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/aws"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/gcp"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
)

type ObjectClientAdapter struct {
bucket, hedgedBucket objstore.Bucket
logger log.Logger
supportsUpdatedAt bool
isRetryableErr func(err error) bool
}

func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Logger, opts ...ClientOptions) *ObjectClientAdapter {
if hedgedBucket == nil {
hedgedBucket = bucket
func NewObjectClient(ctx context.Context, backend string, cfg Config, component string, hedgingCfg hedging.Config, disableRetries bool, logger log.Logger) (*ObjectClientAdapter, error) {
if disableRetries {
if err := cfg.disableRetries(backend); err != nil {
return nil, fmt.Errorf("create bucket: %w", err)
}
}

bucket, err := NewClient(ctx, backend, cfg, component, logger)
if err != nil {
return nil, fmt.Errorf("create bucket: %w", err)
}

hedgedBucket := bucket
if hedgingCfg.At != 0 {
hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer))
if err != nil {
return nil, fmt.Errorf("create hedged transport: %w", err)
}

if err := cfg.configureTransport(backend, hedgedTrasport); err != nil {
return nil, fmt.Errorf("create hedged bucket: %w", err)
}

hedgedBucket, err = NewClient(ctx, backend, cfg, component, logger)
if err != nil {
return nil, fmt.Errorf("create hedged bucket: %w", err)
}
}

o := &ObjectClientAdapter{
bucket: bucket,
hedgedBucket: hedgedBucket,
logger: log.With(logger, "component", "bucket_to_object_client_adapter"),
bucket: bucket,
hedgedBucket: hedgedBucket,
logger: log.With(logger, "component", "bucket_to_object_client_adapter"),
supportsUpdatedAt: slices.Contains(bucket.SupportedIterOptions(), objstore.UpdatedAt),
// default to no retryable errors. Override with WithRetryableErrFunc
isRetryableErr: func(_ error) bool {
return false
},
}

for _, opt := range opts {
opt(o)
switch backend {
case GCS:
o.isRetryableErr = gcp.IsRetryableErr
case S3:
o.isRetryableErr = aws.IsRetryableErr
}

return o
}

type ClientOptions func(*ObjectClientAdapter)

func WithRetryableErrFunc(f func(err error) bool) ClientOptions {
return func(o *ObjectClientAdapter) {
o.isRetryableErr = f
}
return o, nil
}

func (o *ObjectClientAdapter) Stop() {
Expand Down Expand Up @@ -103,26 +130,39 @@ func (o *ObjectClientAdapter) List(ctx context.Context, prefix, delimiter string

// If delimiter is empty we want to list all files
if delimiter == "" {
iterParams = append(iterParams, objstore.WithRecursiveIter)
iterParams = append(iterParams, objstore.WithRecursiveIter())
}

if o.supportsUpdatedAt {
iterParams = append(iterParams, objstore.WithUpdatedAt())
}

err := o.bucket.Iter(ctx, prefix, func(objectKey string) error {
err := o.bucket.IterWithAttributes(ctx, prefix, func(attrs objstore.IterObjectAttributes) error {
// CommonPrefixes are keys that have the prefix and have the delimiter
// as a suffix
objectKey := attrs.Name
if delimiter != "" && strings.HasSuffix(objectKey, delimiter) {
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(objectKey))
return nil
}

// TODO: remove this once thanos support IterWithAttributes
attr, err := o.bucket.Attributes(ctx, objectKey)
if err != nil {
return errors.Wrapf(err, "failed to get attributes for %s", objectKey)
lastModified, ok := attrs.LastModified()
if o.supportsUpdatedAt && !ok {
return errors.Errorf("failed to get lastModified for %s", objectKey)
}
// Some providers do not support supports UpdatedAt option. For those we need
// to make an additional request to get the last modified time.
if !o.supportsUpdatedAt {
attr, err := o.bucket.Attributes(ctx, objectKey)
if err != nil {
return errors.Wrapf(err, "failed to get attributes for %s", objectKey)
}
lastModified = attr.LastModified
}

storageObjects = append(storageObjects, client.StorageObject{
Key: objectKey,
ModifiedAt: attr.LastModified,
ModifiedAt: lastModified,
})

return nil
Expand Down
10 changes: 8 additions & 2 deletions pkg/storage/bucket/object_client_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"sort"
"testing"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/storage/bucket/filesystem"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
)

func TestObjectClientAdapter_List(t *testing.T) {
Expand Down Expand Up @@ -95,8 +97,12 @@ func TestObjectClientAdapter_List(t *testing.T) {
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff))

client := NewObjectClientAdapter(newBucket, nil, nil)
client.bucket = newBucket
client, err := NewObjectClient(context.Background(), "filesystem", Config{
StorageBackendConfig: StorageBackendConfig{
Filesystem: config,
},
}, "test", hedging.Config{}, false, log.NewNopLogger())
require.NoError(t, err)

storageObj, storageCommonPref, err := client.List(context.Background(), tt.prefix, tt.delimiter)
if tt.wantErr != nil {
Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/bucket/prefixed_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func (b *PrefixedBucketClient) Delete(ctx context.Context, name string) error {
// Name returns the bucket name for the provider.
func (b *PrefixedBucketClient) Name() string { return b.bucket.Name() }

// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
func (b *PrefixedBucketClient) SupportedIterOptions() []objstore.IterOptionType {
return b.bucket.SupportedIterOptions()
}

// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory. The configured prefix will be stripped
// before supplied function is applied.
Expand All @@ -53,6 +58,18 @@ func (b *PrefixedBucketClient) Iter(ctx context.Context, dir string, f func(stri
}, options...)
}

// IterWithAttributes calls f for each entry in the given directory similar to Iter.
// In addition to Name, it also includes requested object attributes in the argument to f.
//
// Attributes can be requested using IterOption.
// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
func (b *PrefixedBucketClient) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
return b.bucket.IterWithAttributes(ctx, b.fullName(dir), func(attrs objstore.IterObjectAttributes) error {
attrs.Name = strings.TrimPrefix(attrs.Name, b.prefix+objstore.DirDelim)
return f(attrs)
}, options...)
}

// Get returns a reader for the given object name.
func (b *PrefixedBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.bucket.Get(ctx, b.fullName(name))
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/bucket/s3/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,6 @@ func newS3Config(cfg Config) (s3.Config, error) {
Enable: cfg.TraceConfig.Enabled,
},
STSEndpoint: cfg.STSEndpoint,
MaxRetries: cfg.MaxRetries,
}, nil
}
2 changes: 2 additions & 0 deletions pkg/storage/bucket/s3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type Config struct {
PartSize uint64 `yaml:"part_size" category:"experimental"`
SendContentMd5 bool `yaml:"send_content_md5" category:"experimental"`
STSEndpoint string `yaml:"sts_endpoint"`
MaxRetries int `yaml:"max_retries"`

SSE SSEConfig `yaml:"sse"`
HTTP HTTPConfig `yaml:"http"`
Expand Down Expand Up @@ -146,6 +147,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.Var(newBucketLookupTypeValue(s3.AutoLookup, &cfg.BucketLookupType), prefix+"s3.bucket-lookup-type", fmt.Sprintf("Bucket lookup style type, used to access bucket in S3-compatible service. Default is auto. Supported values are: %s.", strings.Join(supportedBucketLookupTypes, ", ")))
f.BoolVar(&cfg.DualstackEnabled, prefix+"s3.dualstack-enabled", true, "When enabled, direct all AWS S3 requests to the dual-stack IPv4/IPv6 endpoint for the configured region.")
f.StringVar(&cfg.STSEndpoint, prefix+"s3.sts-endpoint", "", "Accessing S3 resources using temporary, secure credentials provided by AWS Security Token Service.")
f.IntVar(&cfg.MaxRetries, prefix+"s3.max-retries", 10, "The maximum number of retries for S3 requests that are retryable. Default is 10, set this to 1 to disable retries.")
cfg.SSE.RegisterFlagsWithPrefix(prefix+"s3.sse.", f)
cfg.HTTP.RegisterFlagsWithPrefix(prefix, f)
cfg.TraceConfig.RegisterFlagsWithPrefix(prefix+"s3.trace.", f)
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/bucket/sse_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,25 @@ func (b *SSEBucketClient) getCustomS3SSEConfig() (encrypt.ServerSide, error) {
return sse, nil
}

// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
func (b *SSEBucketClient) SupportedIterOptions() []objstore.IterOptionType {
return b.bucket.SupportedIterOptions()
}

// Iter implements objstore.Bucket.
func (b *SSEBucketClient) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
return b.bucket.Iter(ctx, dir, f, options...)
}

// IterWithAttributes calls f for each entry in the given directory similar to Iter.
// In addition to Name, it also includes requested object attributes in the argument to f.
//
// Attributes can be requested using IterOption.
// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
func (b *SSEBucketClient) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
return b.bucket.IterWithAttributes(ctx, dir, f, options...)
}

// Get implements objstore.Bucket.
func (b *SSEBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.bucket.Get(ctx, name)
Expand Down
Loading

0 comments on commit 7d0a5b8

Please sign in to comment.