Skip to content

Commit

Permalink
Add bucket tool retention command (#4406)
Browse files Browse the repository at this point in the history
* add bucket tool retention command

Signed-off-by: ben.ye <[email protected]>

* update changelog

Signed-off-by: ben.ye <[email protected]>

* add sync meta

Signed-off-by: ben.ye <[email protected]>
  • Loading branch information
yeya24 authored Jul 6, 2021
1 parent e7d3ec7 commit 8d3ea52
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4292](https://github.com/thanos-io/thanos/pull/4292) Receive: Enable exemplars ingestion and querying.
- [#4392](https://github.com/thanos-io/thanos/pull/4392) Tools: Added `--delete-blocks` to bucket rewrite tool to mark the original blocks for deletion after rewriting is done.
- [#3970](https://github.com/thanos-io/thanos/pull/3970) Azure: Adds more configuration options for Azure blob storage. This allows for pipeline and reader specific configuration. Implements HTTP transport configuration options. These options allows for more fine-grained control on timeouts and retries. Implements MSI authentication as second method of authentication via a service principal token.
- [#4406](https://github.com/thanos-io/thanos/pull/4406) Tools: Add retention command for applying retention policy on the bucket.

### Fixed

Expand Down
116 changes: 116 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
prommodel "github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
Expand Down Expand Up @@ -85,6 +86,7 @@ func registerBucket(app extkingpin.AppClause) {
registerBucketCleanup(cmd, objStoreConfig)
registerBucketMarkBlock(cmd, objStoreConfig)
registerBucketRewrite(cmd, objStoreConfig)
registerBucketRetention(cmd, objStoreConfig)
}

func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
Expand Down Expand Up @@ -988,3 +990,117 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return nil
})
}

func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
var (
retentionRaw, retentionFiveMin, retentionOneHr prommodel.Duration
)

cmd := app.Command("retention", "Retention applies retention policies on the given bucket. Please make sure no compactor is running on the same bucket at the same time.")
deleteDelay := cmd.Flag("delete-delay", "Time before a block marked for deletion is deleted from bucket.").Default("48h").Duration()
consistencyDelay := cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %v will be removed.", compact.PartialUploadThresholdAge)).
Default("30m").Duration()
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").Int()
selectorRelabelConf := extkingpin.RegisterSelectorRelabelFlags(cmd)
cmd.Flag("retention.resolution-raw",
"How long to retain raw samples in bucket. Setting this to 0d will retain samples of this resolution forever").
Default("0d").SetValue(&retentionRaw)
cmd.Flag("retention.resolution-5m", "How long to retain samples of resolution 1 (5 minutes) in bucket. Setting this to 0d will retain samples of this resolution forever").
Default("0d").SetValue(&retentionFiveMin)
cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. Setting this to 0d will retain samples of this resolution forever").
Default("0d").SetValue(&retentionOneHr)
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
retentionByResolution := map[compact.ResolutionLevel]time.Duration{
compact.ResolutionLevelRaw: time.Duration(retentionRaw),
compact.ResolutionLevel5m: time.Duration(retentionFiveMin),
compact.ResolutionLevel1h: time.Duration(retentionOneHr),
}

if retentionByResolution[compact.ResolutionLevelRaw].Seconds() != 0 {
level.Info(logger).Log("msg", "retention policy of raw samples is enabled", "duration", retentionByResolution[compact.ResolutionLevelRaw])
}
if retentionByResolution[compact.ResolutionLevel5m].Seconds() != 0 {
level.Info(logger).Log("msg", "retention policy of 5 min aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel5m])
}
if retentionByResolution[compact.ResolutionLevel1h].Seconds() != 0 {
level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

relabelContentYaml, err := selectorRelabelConf.Content()
if err != nil {
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions)
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Rewrite.String())
if err != nil {
return err
}

// Dummy actor to immediately kill the group after the run function returns.
g.Add(func() error { return nil }, func(error) {})

defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, *deleteDelay/2, block.FetcherConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter()
stubCounter := promauto.With(nil).NewCounter(prometheus.CounterOpts{})

var sy *compact.Syncer
{
baseMetaFetcher, err := block.NewBaseFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
cf := baseMetaFetcher.NewMetaFetcher(
extprom.WrapRegistererWithPrefix(extpromPrefix, reg), []block.MetadataFilter{
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, *consistencyDelay, extprom.WrapRegistererWithPrefix(extpromPrefix, reg)),
duplicateBlocksFilter,
ignoreDeletionMarkFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, make([]string, 0))},
)
sy, err = compact.NewMetaSyncer(
logger,
reg,
bkt,
cf,
duplicateBlocksFilter,
ignoreDeletionMarkFilter,
stubCounter,
stubCounter,
*blockSyncConcurrency)
if err != nil {
return errors.Wrap(err, "create syncer")
}
}

ctx := context.Background()
level.Info(logger).Log("msg", "syncing blocks metadata")
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync blocks")
}

level.Info(logger).Log("msg", "synced blocks done")

level.Warn(logger).Log("msg", "GLOBAL COMPACTOR SHOULD __NOT__ BE RUNNING ON THE SAME BUCKET")

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, stubCounter); err != nil {
return errors.Wrap(err, "retention failed")
}
return nil
})
}
8 changes: 8 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ Subcommands:
*IRREVERSIBLE* after certain time (delete delay), so do backup your blocks
first.
tools bucket retention [<flags>]
Retention applies retention policies on the given bucket. Please make sure
no compactor is running on the same bucket at the same time.
tools rules-check --rules=RULES
Check if the rule files are valid or not.
Expand Down Expand Up @@ -184,6 +188,10 @@ Subcommands:
*IRREVERSIBLE* after certain time (delete delay), so do backup your blocks
first.
tools bucket retention [<flags>]
Retention applies retention policies on the given bucket. Please make sure
no compactor is running on the same bucket at the same time.
```

Expand Down

0 comments on commit 8d3ea52

Please sign in to comment.