Skip to content

Commit

Permalink
feat(storage): Azure backend using thanos.io/objstore (#11315)
Browse files Browse the repository at this point in the history
Co-authored-by: Ashwanth Goli <[email protected]>
  • Loading branch information
JoaoBraveCoding and ashwanthgoli authored Oct 25, 2024
1 parent 5034d34 commit 5824e3d
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 327 deletions.
48 changes: 23 additions & 25 deletions pkg/storage/bucket/azure/bucket_client.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,37 @@
package azure

import (
"net/http"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/azure"
yaml "gopkg.in/yaml.v2"
)

func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
bucketConfig := azure.Config{
StorageAccountName: cfg.StorageAccountName,
StorageAccountKey: cfg.StorageAccountKey.String(),
StorageConnectionString: cfg.ConnectionString.String(),
ContainerName: cfg.ContainerName,
Endpoint: cfg.EndpointSuffix,
MaxRetries: cfg.MaxRetries,
HTTPConfig: azure.HTTPConfig{
IdleConnTimeout: model.Duration(cfg.IdleConnTimeout),
ResponseHeaderTimeout: model.Duration(cfg.ResponseHeaderTimeout),
InsecureSkipVerify: cfg.InsecureSkipVerify,
TLSHandshakeTimeout: model.Duration(cfg.TLSHandshakeTimeout),
ExpectContinueTimeout: model.Duration(cfg.ExpectContinueTimeout),
MaxIdleConns: cfg.MaxIdleConns,
MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
MaxConnsPerHost: cfg.MaxConnsPerHost,
},
return newBucketClient(cfg, name, logger, azure.NewBucketWithConfig)
}

func newBucketClient(cfg Config, name string, logger log.Logger, factory func(log.Logger, azure.Config, string, http.RoundTripper) (*azure.Bucket, error)) (objstore.Bucket, error) {
// Start with default config to make sure that all parameters are set to sensible values, especially
// HTTP Config field.
bucketConfig := azure.DefaultConfig
bucketConfig.StorageAccountName = cfg.StorageAccountName
bucketConfig.StorageAccountKey = cfg.StorageAccountKey.String()
bucketConfig.StorageConnectionString = cfg.StorageConnectionString.String()
bucketConfig.ContainerName = cfg.ContainerName
bucketConfig.MaxRetries = cfg.MaxRetries
bucketConfig.UserAssignedID = cfg.UserAssignedID

if cfg.Endpoint != "" {
// azure.DefaultConfig has the default Endpoint, overwrite it only if a different one was explicitly provided.
bucketConfig.Endpoint = cfg.Endpoint
}

// Thanos currently doesn't support passing the config as is, but expects a YAML,
// so we're going to serialize it.
serialized, err := yaml.Marshal(bucketConfig)
if err != nil {
return nil, err
var rt http.RoundTripper
if cfg.Transport != nil {
rt = cfg.Transport
}

return azure.NewBucket(logger, serialized, name, nil)
return factory(logger, bucketConfig, name, rt)
}
29 changes: 15 additions & 14 deletions pkg/storage/bucket/azure/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@ package azure

import (
"flag"
"net/http"

"github.com/grafana/dskit/flagext"

"github.com/grafana/loki/v3/pkg/storage/bucket/http"
)

// Config holds the config options for an Azure backend
type Config struct {
StorageAccountName string `yaml:"account_name"`
StorageAccountKey flagext.Secret `yaml:"account_key"`
ConnectionString flagext.Secret `yaml:"connection_string"`
ContainerName string `yaml:"container_name"`
EndpointSuffix string `yaml:"endpoint_suffix"`
MaxRetries int `yaml:"max_retries"`
StorageAccountName string `yaml:"account_name"`
StorageAccountKey flagext.Secret `yaml:"account_key"`
StorageConnectionString flagext.Secret `yaml:"connection_string"`
ContainerName string `yaml:"container_name"`
Endpoint string `yaml:"endpoint_suffix"`
MaxRetries int `yaml:"max_retries"`
UserAssignedID string `yaml:"user_assigned_id"`

http.Config `yaml:"http"`
// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
}

// RegisterFlags registers the flags for Azure storage
Expand All @@ -28,10 +29,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// RegisterFlagsWithPrefix registers the flags for Azure storage
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.StorageAccountName, prefix+"azure.account-name", "", "Azure storage account name")
f.Var(&cfg.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key")
f.Var(&cfg.ConnectionString, prefix+"azure.connection-string", "If `connection-string` is set, the values of `account-name` and `endpoint-suffix` values will not be used. Use this method over `account-key` if you need to authenticate via a SAS token. Or if you use the Azurite emulator.")
f.StringVar(&cfg.ContainerName, prefix+"azure.container-name", "loki", "Azure storage container name")
f.StringVar(&cfg.EndpointSuffix, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The account name will be prefixed to this value to create the FQDN")
f.Var(&cfg.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key. If unset, Azure managed identities will be used for authentication instead.")
f.Var(&cfg.StorageConnectionString, prefix+"azure.connection-string", "If `connection-string` is set, the value of `endpoint-suffix` will not be used. Use this method over `account-key` if you need to authenticate via a SAS token. Or if you use the Azurite emulator.")
f.StringVar(&cfg.ContainerName, prefix+"azure.container-name", "", "Azure storage container name")
f.StringVar(&cfg.Endpoint, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The account name will be prefixed to this value to create the FQDN. If set to empty string, default endpoint suffix is used.")
f.IntVar(&cfg.MaxRetries, prefix+"azure.max-retries", 20, "Number of retries for recoverable errors")
cfg.Config.RegisterFlagsWithPrefix(prefix+"azure.", f)
f.StringVar(&cfg.UserAssignedID, prefix+"azure.user-assigned-id", "", "User assigned managed identity. If empty, then System assigned identity is used.")
}
98 changes: 0 additions & 98 deletions pkg/storage/bucket/azure/config_test.go

This file was deleted.

150 changes: 150 additions & 0 deletions pkg/storage/bucket/object_client_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package bucket

import (
"context"
"io"
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/thanos-io/objstore"

"github.com/grafana/loki/v3/pkg/storage/chunk/client"
)

type ObjectClientAdapter struct {
bucket, hedgedBucket objstore.Bucket
logger log.Logger
isRetryableErr func(err error) bool
}

func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Logger, opts ...ClientOptions) *ObjectClientAdapter {
if hedgedBucket == nil {
hedgedBucket = bucket
}

o := &ObjectClientAdapter{
bucket: bucket,
hedgedBucket: hedgedBucket,
logger: log.With(logger, "component", "bucket_to_object_client_adapter"),
// default to no retryable errors. Override with WithRetryableErrFunc
isRetryableErr: func(_ error) bool {
return false
},
}

for _, opt := range opts {
opt(o)
}

return o
}

type ClientOptions func(*ObjectClientAdapter)

func WithRetryableErrFunc(f func(err error) bool) ClientOptions {
return func(o *ObjectClientAdapter) {
o.isRetryableErr = f
}
}

func (o *ObjectClientAdapter) Stop() {
}

// ObjectExists checks if a given objectKey exists in the bucket
func (o *ObjectClientAdapter) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
return o.bucket.Exists(ctx, objectKey)
}

// GetAttributes returns the attributes of the specified object key from the configured bucket.
func (o *ObjectClientAdapter) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
attr := client.ObjectAttributes{}
thanosAttr, err := o.hedgedBucket.Attributes(ctx, objectKey)
if err != nil {
return attr, err
}

attr.Size = thanosAttr.Size
return attr, nil
}

// PutObject puts the specified bytes into the configured bucket at the provided key
func (o *ObjectClientAdapter) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return o.bucket.Upload(ctx, objectKey, object)
}

// GetObject returns a reader and the size for the specified object key from the configured bucket.
// size is set to -1 if it cannot be succefully determined, it is up to the caller to check this value before using it.
func (o *ObjectClientAdapter) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
reader, err := o.hedgedBucket.Get(ctx, objectKey)
if err != nil {
return nil, 0, err
}

size, err := objstore.TryToGetSize(reader)
if err != nil {
size = -1
level.Warn(o.logger).Log("msg", "failed to get size of object", "err", err)
}

return reader, size, err
}

func (o *ObjectClientAdapter) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
return o.hedgedBucket.GetRange(ctx, objectKey, offset, length)
}

// List objects with given prefix.
func (o *ObjectClientAdapter) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var storageObjects []client.StorageObject
var commonPrefixes []client.StorageCommonPrefix
var iterParams []objstore.IterOption

// If delimiter is empty we want to list all files
if delimiter == "" {
iterParams = append(iterParams, objstore.WithRecursiveIter)
}

err := o.bucket.Iter(ctx, prefix, func(objectKey string) error {
// CommonPrefixes are keys that have the prefix and have the delimiter
// as a suffix
if delimiter != "" && strings.HasSuffix(objectKey, delimiter) {
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(objectKey))
return nil
}

// TODO: remove this once thanos support IterWithAttributes
attr, err := o.bucket.Attributes(ctx, objectKey)
if err != nil {
return errors.Wrapf(err, "failed to get attributes for %s", objectKey)
}

storageObjects = append(storageObjects, client.StorageObject{
Key: objectKey,
ModifiedAt: attr.LastModified,
})

return nil
}, iterParams...)
if err != nil {
return nil, nil, err
}

return storageObjects, commonPrefixes, nil
}

// DeleteObject deletes the specified object key from the configured bucket.
func (o *ObjectClientAdapter) DeleteObject(ctx context.Context, objectKey string) error {
return o.bucket.Delete(ctx, objectKey)
}

// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations.
func (o *ObjectClientAdapter) IsObjectNotFoundErr(err error) bool {
return o.bucket.IsObjNotFoundErr(err)
}

// IsRetryableErr returns true if the request failed due to some retryable server-side scenario
func (o *ObjectClientAdapter) IsRetryableErr(err error) bool {
return o.isRetryableErr(err)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package gcp
package bucket

import (
"bytes"
Expand All @@ -12,7 +12,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
)

func TestGCSThanosObjStore_List(t *testing.T) {
func TestObjectClientAdapter_List(t *testing.T) {
tests := []struct {
name string
prefix string
Expand Down Expand Up @@ -95,10 +95,10 @@ func TestGCSThanosObjStore_List(t *testing.T) {
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff))

gcpClient := &GCSThanosObjectClient{}
gcpClient.client = newBucket
client := NewObjectClientAdapter(newBucket, nil, nil)
client.bucket = newBucket

storageObj, storageCommonPref, err := gcpClient.List(context.Background(), tt.prefix, tt.delimiter)
storageObj, storageCommonPref, err := client.List(context.Background(), tt.prefix, tt.delimiter)
if tt.wantErr != nil {
require.Equal(t, tt.wantErr.Error(), err.Error())
continue
Expand Down
Loading

0 comments on commit 5824e3d

Please sign in to comment.