Skip to content

Commit

Permalink
feat tools: bucket replicate update flag defaults and input format of…
Browse files Browse the repository at this point in the history
… resolution

Signed-off-by: Martin Chodur <[email protected]>
  • Loading branch information
FUSAKLA committed Jun 7, 2020
1 parent 55689a7 commit 1f4ce5e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 30 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2667](https://github.com/thanos-io/thanos/pull/2667) Store: removed support to the legacy `index.cache.json`. The hidden flag `--store.disable-index-header` was removed.
- [#2667](https://github.com/thanos-io/thanos/pull/2667) Compact: the deprecated flag `--index.generate-missing-cache-file` and the metric `thanos_compact_generated_index_total` were removed.
- [2603](https://github.com/thanos-io/thanos/pull/2603) Store/Querier: Significantly optimize cases where StoreAPIs or blocks returns exact overlapping chunks (e.g Store GW and sidecar or brute force Store Gateway HA).
- [#2671](https://github.com/thanos-io/thanos/pull/2671) *breaking* Tools: bucket replicate flag `--resolution` is now in Go duration format.
- [#2671](https://github.com/thanos-io/thanos/pull/2671) Tools: bucket replicate now replicates by default all blocks.


### Added

- [#2671](https://github.com/thanos-io/thanos/pull/2671) Tools: bucket replicate now allows passing repeated `--compaction` and `--resolution` flags.

## [v0.13.0](https://github.com/thanos-io/thanos/releases) - IN PROGRESS
Expand Down
20 changes: 10 additions & 10 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,31 +414,31 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
// Provide a list of resolution, can not use Enum directly, since string does not implement int64 function.
func listResLevel() []string {
return []string{
strconv.FormatInt(downsample.ResLevel0, 10),
strconv.FormatInt(downsample.ResLevel1, 10),
strconv.FormatInt(downsample.ResLevel2, 10)}
time.Duration(downsample.ResLevel0).String(),
time.Duration(downsample.ResLevel1).String(),
time.Duration(downsample.ResLevel2).String()}
}

func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) {
cmd := root.Command("replicate", fmt.Sprintf("Replicate data from one object storage to another. NOTE: Currently it works only with Thanos blocks (%v has to have Thanos metadata).", block.MetaFilename))
httpBindAddr, httpGracePeriod := regHTTPFlags(cmd)
toObjStoreConfig := regCommonObjStoreFlags(cmd, "-to", false, "The object storage which replicate data to.")
resolutions := cmd.Flag("resolution", "Only blocks with those resolutions will be replicated. (Resolution in ms). Repeated flag.").Default(strconv.FormatInt(downsample.ResLevel0, 10)).HintAction(listResLevel).Int64List()
compactions := cmd.Flag("compaction", "Only blocks with those compaction levels will be replicated. Repeated flag.").Default("1").Ints()
resolutions := cmd.Flag("resolution", "Only blocks with these resolutions will be replicated. Repeated flag.").Default("0s", "5m", "1h").HintAction(listResLevel).DurationList()
compactions := cmd.Flag("compaction", "Only blocks with these compaction levels will be replicated. Repeated flag.").Default("1", "2", "3", "4").Ints()
matcherStrs := cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings()
singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool()

var resolutionLevels []compact.ResolutionLevel
for _, lvl := range *resolutions {
resolutionLevels = append(resolutionLevels, compact.ResolutionLevel(lvl))
}

m[name+" replicate"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
matchers, err := replicate.ParseFlagMatchers(*matcherStrs)
if err != nil {
return errors.Wrap(err, "parse block label matchers")
}

var resolutionLevels []compact.ResolutionLevel
for _, lvl := range *resolutions {
resolutionLevels = append(resolutionLevels, compact.ResolutionLevel(lvl.Milliseconds()))
}

return replicate.RunReplicate(
g,
logger,
Expand Down
6 changes: 3 additions & 3 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,9 @@ Flags:
format details:
https://thanos.io/storage.md/#configuration The
object storage which replicate data to.
--resolution=0 ... Only blocks with those resolutions will be
replicated. (Resolution in ms). Repeated flag.
--compaction=1 ... Only blocks with those compaction levels will
--resolution=0s... ... Only blocks with these resolutions will be
replicated. Repeated flag.
--compaction=1... ... Only blocks with these compaction levels will
be replicated. Repeated flag.
--matcher=key="value" ... Only blocks whose external labels exactly match
this matcher will be replicated.
Expand Down
27 changes: 10 additions & 17 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package replicate
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"path"
Expand Down Expand Up @@ -86,13 +87,13 @@ func (bf *BlockFilter) Filter(b *metadata.Meta) bool {

gotResolution := compact.ResolutionLevel(b.Thanos.Downsample.Resolution)
if _, ok := bf.resolutionLevels[gotResolution]; !ok {
level.Debug(bf.logger).Log("msg", "filtering block", "reason", "resolution doesn't match allowed resolutions", "got_resolution", gotResolution, "allowed_resolutions", bf.resolutionLevels)
level.Info(bf.logger).Log("msg", "filtering block", "reason", "resolution doesn't match allowed resolutions", "got_resolution", gotResolution, "allowed_resolutions", fmt.Sprintf("%v", bf.resolutionLevels))
return false
}

gotCompactionLevel := b.BlockMeta.Compaction.Level
if _, ok := bf.compactionLevels[gotCompactionLevel]; !ok {
level.Debug(bf.logger).Log("msg", "filtering block", "reason", "compaction level doesn't match allowed levels", "got_compaction_level", gotCompactionLevel, "allowed_compaction_levels", bf.compactionLevels)
level.Info(bf.logger).Log("msg", "filtering block", "reason", "compaction level doesn't match allowed levels", "got_compaction_level", gotCompactionLevel, "allowed_compaction_levels", fmt.Sprintf("%v", bf.compactionLevels))
return false
}

Expand Down Expand Up @@ -213,31 +214,23 @@ func (rs *replicationScheme) execute(ctx context.Context) error {
return nil
}

level.Debug(rs.logger).Log("msg", "adding block to available blocks", "block_uuid", id.String())

availableBlocks = append(availableBlocks, meta)
if rs.blockFilter(meta) {
level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String())
availableBlocks = append(availableBlocks, meta)
}

return nil
}); err != nil {
return errors.Wrap(err, "iterate over origin bucket")
}

candidateBlocks := []*metadata.Meta{}

for _, b := range availableBlocks {
if rs.blockFilter(b) {
level.Debug(rs.logger).Log("msg", "adding block to candidate blocks", "block_uuid", b.BlockMeta.ULID.String())
candidateBlocks = append(candidateBlocks, b)
}
}

// In order to prevent races in compactions by the target environment, we
// need to replicate oldest start timestamp first.
sort.Slice(candidateBlocks, func(i, j int) bool {
return candidateBlocks[i].BlockMeta.MinTime < candidateBlocks[j].BlockMeta.MinTime
sort.Slice(availableBlocks, func(i, j int) bool {
return availableBlocks[i].BlockMeta.MinTime < availableBlocks[j].BlockMeta.MinTime
})

for _, b := range candidateBlocks {
for _, b := range availableBlocks {
if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil {
return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String())
}
Expand Down

0 comments on commit 1f4ce5e

Please sign in to comment.