Skip to content

Commit

Permalink
chore: add ObjectExistsWithSize to respect the interface
Browse files Browse the repository at this point in the history
new method was added in grafana#14268
  • Loading branch information
JoaoBraveCoding committed Sep 30, 2024
1 parent 3fef0bf commit 6e98e22
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 24 deletions.
10 changes: 10 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1818,6 +1818,10 @@ storage:
# CLI flag: -common.storage.thanos.s3.insecure
[insecure: <boolean> | default = false]
# Disable forcing S3 dualstack endpoint usage.
# CLI flag: -common.storage.thanos.s3.disable-dualstack
[disable_dualstack: <boolean> | default = false]
# The signature version to use for authenticating against S3. Supported
# values are: v4.
# CLI flag: -common.storage.thanos.s3.signature-version
Expand Down Expand Up @@ -4916,6 +4920,8 @@ storage:
[insecure: <boolean>]
[disable_dualstack: <boolean>]
[signature_version: <string> | default = ""]
[storage_class: <string> | default = ""]
Expand Down Expand Up @@ -6211,6 +6217,10 @@ objstore_config:
# CLI flag: -thanos.s3.insecure
[insecure: <boolean> | default = false]

# Disable forcing S3 dualstack endpoint usage.
# CLI flag: -thanos.s3.disable-dualstack
[disable_dualstack: <boolean> | default = false]

# The signature version to use for authenticating against S3. Supported
# values are: v4.
# CLI flag: -thanos.s3.signature-version
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1506,7 +1506,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
}
tableRange := period.GetIndexTableNumberRange(periodEndTime)

indexClient, err := storage.NewIndexClient("index-gateway", period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.ClientMetrics, shardingStrategy,
indexClient, err := storage.NewIndexClient("index-store", period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.ClientMetrics, shardingStrategy,
prometheus.DefaultRegisterer, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())), t.Cfg.MetricsNamespace,
)
if err != nil {
Expand Down
29 changes: 11 additions & 18 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,7 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger,
client = NewPrefixedBucketClient(client, cfg.StoragePrefix)
}

if metrics.BucketMetrics != nil {
client = bucketWrapWith(client, metrics.BucketMetrics)
} else {
bucketMetrics := bucketMetrics(name, metrics.Registerer)
client = bucketWrapWith(client, bucketMetrics)
// Save metrics to be assigned to other buckets created with the same component name
metrics.BucketMetrics = bucketMetrics
}

instrumentedClient := objstoretracing.WrapWithTraces(client)
instrumentedClient := objstoretracing.WrapWithTraces(bucketWithMetrics(client, name, metrics))

// Wrap the client with any provided middleware
for _, wrap := range cfg.Middlewares {
Expand All @@ -205,16 +196,18 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger,
return instrumentedClient, nil
}

func bucketMetrics(name string, reg prometheus.Registerer) *objstore.Metrics {
reg = prometheus.WrapRegistererWithPrefix("loki_", reg)
reg = prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg)
return objstore.BucketMetrics(reg, "")
}

func bucketWrapWith(bucketClient objstore.Bucket, metrics *objstore.Metrics) objstore.Bucket {
func bucketWithMetrics(bucketClient objstore.Bucket, name string, metrics *Metrics) objstore.Bucket {
if metrics == nil {
return bucketClient
}

return objstore.WrapWith(bucketClient, metrics)
if metrics.BucketMetrics == nil {
reg := metrics.Registerer
reg = prometheus.WrapRegistererWithPrefix("loki_", reg)
reg = prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg)
// Save metrics to be assigned to other buckets created with the same component name
metrics.BucketMetrics = objstore.BucketMetrics(reg, "")
}

return objstore.WrapWith(bucketClient, metrics.BucketMetrics)
}
5 changes: 3 additions & 2 deletions pkg/storage/chunk/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"errors"

"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"

"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
)

var (
Expand Down
15 changes: 15 additions & 0 deletions pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ func (s *GCSThanosObjectClient) ObjectExists(ctx context.Context, objectKey stri
return s.client.Exists(ctx, objectKey)
}

// ObjectExistsWithSize checks if a given objectKey exists and it's size in the GCS bucket
func (s *GCSThanosObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
_, err := s.client.Get(ctx, objectKey)
if err != nil {
return false, 0, err
}

attr, err := s.client.Attributes(ctx, objectKey)
if err != nil {
return true, 0, nil
}

return true, attr.Size, nil
}

// PutObject puts the specified bytes into the configured GCS bucket at the provided key
func (s *GCSThanosObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return s.client.Upload(ctx, objectKey, object)
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func NewStore(cfg Config, storeCfg config.ChunkStoreConfig, schemaCfg config.Sch
}

func (s *LokiStore) init() error {
metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer}
for i, p := range s.schemaCfg.Configs {
p := p
chunkClient, err := s.chunkClientForPeriod(p)
Expand All @@ -208,7 +209,7 @@ func (s *LokiStore) init() error {
if i < len(s.schemaCfg.Configs)-1 {
periodEndTime = config.DayTime{Time: s.schemaCfg.Configs[i+1].From.Time.Add(-time.Millisecond)}
}
w, idx, stop, err := s.storeForPeriod(p, p.GetIndexTableNumberRange(periodEndTime), chunkClient, f)
w, idx, stop, err := s.storeForPeriod(p, p.GetIndexTableNumberRange(periodEndTime), chunkClient, f, metrics)
if err != nil {
return err
}
Expand Down Expand Up @@ -264,7 +265,7 @@ func shouldUseIndexGatewayClient(cfg indexshipper.Config) bool {
return true
}

func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.TableRange, chunkClient client.Client, f *fetcher.Fetcher) (stores.ChunkWriter, index.ReaderWriter, func(), error) {
func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.TableRange, chunkClient client.Client, f *fetcher.Fetcher, metrics *client.Metrics) (stores.ChunkWriter, index.ReaderWriter, func(), error) {
component := fmt.Sprintf("index-store-%s-%s", p.IndexType, p.From.String())
indexClientReg := prometheus.WrapRegistererWith(prometheus.Labels{"component": component}, s.registerer)
indexClientLogger := log.With(s.logger, "index-store", fmt.Sprintf("%s-%s", p.IndexType, p.From.String()))
Expand All @@ -287,7 +288,6 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl
var objectClient client.ObjectClient
var err error
if s.cfg.ThanosObjStore {
metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer}
objectClient, err = NewObjectClientV2(component, p.ObjectType, s.cfg, metrics)
} else {
objectClient, err = NewObjectClient(p.ObjectType, s.cfg, s.clientMetrics)
Expand Down

0 comments on commit 6e98e22

Please sign in to comment.