Skip to content

Commit

Permalink
Refactor replicate execution
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Sep 9, 2020
1 parent f8c5815 commit 8d2e154
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 87 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

### Changed

- [#2906](https://github.com/thanos-io/thanos/pull/2906) Tools: Refactor Bucket replicate execution. Removed all `thanos_replicate_origin_.*` metrics.
- `thanos_replicate_origin_meta_loads_total` can be replaced by `blocks_meta_synced{state="loaded"}`.
- `thanos_replicate_origin_partial_meta_reads_total` can be replaced by `blocks_meta_synced{state="failed"}`.

## [v0.15.0](https://github.com/thanos-io/thanos/releases) - 2020.09.07

Highlights:
Expand Down
12 changes: 2 additions & 10 deletions examples/dashboards/bucket_replicate.json
Original file line number Diff line number Diff line change
Expand Up @@ -305,23 +305,15 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(thanos_replicate_origin_iterations_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "iterations",
"legendLink": null,
"step": 10
},
{
"expr": "sum(rate(thanos_replicate_origin_meta_loads_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))",
"expr": "sum(rate(blocks_meta_synced{state=\"loaded\",namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "meta loads",
"legendLink": null,
"step": 10
},
{
"expr": "sum(rate(thanos_replicate_origin_partial_meta_reads_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))",
"expr": "sum(rate(blocks_meta_synced{state=\"failed\",namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "partial meta reads",
Expand Down
7 changes: 3 additions & 4 deletions mixin/dashboards/bucket_replicate.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@ local g = import '../lib/thanos-grafana-builder/builder.libsonnet';
g.panel('Metrics') +
g.queryPanel(
[
'sum(rate(thanos_replicate_origin_iterations_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
'sum(rate(thanos_replicate_origin_meta_loads_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
'sum(rate(thanos_replicate_origin_partial_meta_reads_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
'sum(rate(blocks_meta_synced{state="loaded",namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
'sum(rate(blocks_meta_synced{state="failed",namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
'sum(rate(thanos_replicate_blocks_already_replicated_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
'sum(rate(thanos_replicate_blocks_replicated_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
'sum(rate(thanos_replicate_objects_replicated_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
],
['iterations', 'meta loads', 'partial meta reads', 'already replicated blocks', 'replicated blocks', 'replicated objects']
['meta loads', 'partial meta reads', 'already replicated blocks', 'replicated blocks', 'replicated objects']
)
)
)
Expand Down
82 changes: 9 additions & 73 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,29 +117,13 @@ type replicationScheme struct {
}

type replicationMetrics struct {
originIterations prometheus.Counter
originMetaLoads prometheus.Counter
originPartialMeta prometheus.Counter

blocksAlreadyReplicated prometheus.Counter
blocksReplicated prometheus.Counter
objectsReplicated prometheus.Counter
}

func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics {
m := &replicationMetrics{
originIterations: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_origin_iterations_total",
Help: "Total number of objects iterated over in the origin bucket.",
}),
originMetaLoads: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_origin_meta_loads_total",
Help: "Total number of meta.json reads in the origin bucket.",
}),
originPartialMeta: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_origin_partial_meta_reads_total",
Help: "Total number of partial meta reads encountered.",
}),
blocksAlreadyReplicated: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_blocks_already_replicated_total",
Help: "Total number of blocks skipped due to already being replicated.",
Expand Down Expand Up @@ -183,45 +167,20 @@ func newReplicationScheme(
func (rs *replicationScheme) execute(ctx context.Context) error {
availableBlocks := []*metadata.Meta{}

level.Debug(rs.logger).Log("msg", "scanning blocks available blocks for replication")

if err := rs.fromBkt.Iter(ctx, "", func(name string) error {
rs.metrics.originIterations.Inc()

id, ok := thanosblock.IsBlockDir(name)
if !ok {
return nil
}

rs.metrics.originMetaLoads.Inc()

meta, metaNonExistentOrPartial, err := loadMeta(ctx, rs, id)
if metaNonExistentOrPartial {
// meta.json is the last file uploaded by a Thanos shipper,
// therefore a block may be partially present, but no meta.json
// file yet. If this is the case we skip that block for now.
rs.metrics.originPartialMeta.Inc()
level.Info(rs.logger).Log("msg", "block meta not uploaded yet. Skipping.", "block_uuid", id.String())
return nil
}
if err != nil {
return errors.Wrapf(err, "load meta for block %v from origin bucket", id.String())
}
metas, partials, err := rs.fetcher.Fetch(ctx)
if err != nil && metas == nil {
return err
}

if len(meta.Thanos.Labels) == 0 {
// TODO(bwplotka): Allow injecting custom labels as shipper does.
level.Info(rs.logger).Log("msg", "block meta without Thanos external labels set. This is not allowed. Skipping.", "block_uuid", id.String())
return nil
}
for id, partialError := range partials {
level.Info(rs.logger).Log("msg", "failed to fetch block meta. Skipping.", "block_uuid", id.String(), "err", partialError)
}

for id, meta := range metas {
if rs.blockFilter(meta) {
level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String())
availableBlocks = append(availableBlocks, meta)
}

return nil
}); err != nil {
return errors.Wrap(err, "iterate over origin bucket")
}

// In order to prevent races in compactions by the target environment, we
Expand Down Expand Up @@ -266,6 +225,7 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli
return errors.Wrap(err, "get meta file from target bucket")
}

// TODO(bwplotka): Allow injecting custom labels as shipper does.
originMetaFileContent, err := ioutil.ReadAll(originMetaFile)
if err != nil {
return errors.Wrap(err, "read origin meta file")
Expand Down Expand Up @@ -348,27 +308,3 @@ func (rs *replicationScheme) ensureObjectReplicated(ctx context.Context, objectN

return nil
}

// loadMeta loads the meta.json from the origin bucket and returns the meta
// struct as well as if failed, whether the failure was due to the meta.json
// not being present or partial. The distinction is important, as if missing or
// partial, this is just a temporary failure, as the block is still being
// uploaded to the origin bucket.
func loadMeta(ctx context.Context, rs *replicationScheme, id ulid.ULID) (*metadata.Meta, bool, error) {
metas, _, err := rs.fetcher.Fetch(ctx)
if err != nil {
switch errors.Cause(err) {
default:
return nil, false, errors.Wrap(err, "fetch meta")
case thanosblock.ErrorSyncMetaNotFound:
return nil, true, errors.Wrap(err, "fetch meta")
}
}

m, ok := metas[id]
if !ok {
return nil, true, errors.Wrap(err, "fetch meta")
}

return m, false, nil
}

0 comments on commit 8d2e154

Please sign in to comment.