Skip to content

Commit

Permalink
compaction: remove all cache dirs at the end of each run (#1587)
Browse files Browse the repository at this point in the history
* remove all cache dirs at the end of each run

Signed-off-by: Krasi Georgiev <[email protected]>

* changelog

Signed-off-by: Krasi Georgiev <[email protected]>

* .

Signed-off-by: Krasi Georgiev <[email protected]>

* changelog formating and pr num.

Signed-off-by: Krasi Georgiev <[email protected]>

* added test for the compaction cleanup

Signed-off-by: Krasi Georgiev <[email protected]>

* added tests

Signed-off-by: Krasi Georgiev <[email protected]>

* check error

Signed-off-by: Krasi Georgiev <[email protected]>

* move GatherAndCompare to tesutil

Signed-off-by: Krasi Georgiev <[email protected]>

* comment

Signed-off-by: Krasi Georgiev <[email protected]>

* nit

Signed-off-by: Krasi Georgiev <[email protected]>

* Added back deletion of the downsampling dir.

Signed-off-by: Bartek Plotka <[email protected]>

* Fix after pull master.

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
krasi-georgiev authored and bwplotka committed Oct 10, 2019
1 parent baf0b28 commit a09a4b9
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1525](https://github.com/thanos-io/thanos/pull/1525) Thanos now deletes block's file in correct order allowing to detect partial blocks without problems.
- [#1505](https://github.com/thanos-io/thanos/pull/1505) Thanos store now removes invalid local cache blocks.
- [#1587](https://github.com/thanos-io/thanos/pull/1587) Cleanup all cache dirs after each compaction run.
- [#1582](https://github.com/thanos-io/thanos/pull/1582) Thanos rule correctly parses Alertmanager URL if there is more `+` in it.

## v0.7.0 - 2019.09.02
Expand Down
16 changes: 14 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func runCompact(

// Generate index file.
if generateMissingIndexCacheFiles {
if err := genMissingIndexCacheFiles(ctx, logger, bkt, indexCacheDir); err != nil {
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, indexCacheDir); err != nil {
return err
}
}
Expand Down Expand Up @@ -343,8 +343,19 @@ func runCompact(
return nil
}

const (
MetricIndexGenerateName = "thanos_compact_generated_index_total"
MetricIndexGenerateHelp = "Total number of generated indexes."
)

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string) error {
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, dir string) error {
genIndex := prometheus.NewCounter(prometheus.CounterOpts{
Name: MetricIndexGenerateName,
Help: MetricIndexGenerateHelp,
})
reg.MustRegister(genIndex)

if err := os.RemoveAll(dir); err != nil {
return errors.Wrap(err, "clean index cache directory")
}
Expand Down Expand Up @@ -395,6 +406,7 @@ func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objst
if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil {
return err
}
genIndex.Inc()
}

level.Info(logger).Log("msg", "generating index cache files is done, you can remove startup argument `index.generate-missing-cache-file`")
Expand Down
7 changes: 7 additions & 0 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ func downsampleBucket(
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}

defer func() {
if err := os.RemoveAll(dir); err != nil {
level.Error(logger).Log("msg", "failed to remove downsample cache directory", "path", dir, "err", err)
}
}()

var metas []*metadata.Meta

err := bkt.Iter(ctx, "", func(name string) error {
Expand Down
128 changes: 128 additions & 0 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package main

import (
"context"
"io/ioutil"
"os"
"path"

"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestCleanupCompactCacheFolder(t *testing.T) {
ctx, logger, dir, _, bkt, actReg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

sy, err := compact.NewSyncer(logger, actReg, bkt, 0*time.Second, 1, false, nil)
testutil.Ok(t, err)

expReg := prometheus.NewRegistry()
syncExp := prometheus.NewCounter(prometheus.CounterOpts{
Name: compact.MetricSyncMetaName,
Help: compact.MetricSyncMetaHelp,
})
expReg.MustRegister(syncExp)

testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

comp, err := tsdb.NewLeveledCompactor(ctx, nil, logger, []int64{1}, nil)
testutil.Ok(t, err)

bComp, err := compact.NewBucketCompactor(logger, sy, comp, dir, bkt, 1)
testutil.Ok(t, err)

// Even with with a single uploaded block the bucker compactor needs to
// downloads the meta file to plan the compaction groups.
testutil.Ok(t, bComp.Compact(ctx))

syncExp.Inc()

testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")

}

func TestCleanupIndexCacheFolder(t *testing.T) {
ctx, logger, dir, _, bkt, actReg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

expReg := prometheus.NewRegistry()
genIndexExp := prometheus.NewCounter(prometheus.CounterOpts{
Name: MetricIndexGenerateName,
Help: MetricIndexGenerateHelp,
})
expReg.MustRegister(genIndexExp)

testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, actReg, bkt, dir))

genIndexExp.Inc()
testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

_, err := os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
}

func TestCleanupDownsampleCacheFolder(t *testing.T) {
ctx, logger, dir, blckID, bkt, reg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

meta, err := block.DownloadMeta(ctx, logger, bkt, blckID)
testutil.Ok(t, err)

metrics := newDownsampleMetrics(reg)
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta))))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, dir))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta))))

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
}

func bootstrap(t *testing.T) (context.Context, log.Logger, string, ulid.ULID, objstore.Bucket, *prometheus.Registry) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
testutil.Ok(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

bkt := inmem.NewBucket()
var blckID ulid.ULID

// Create and upload a single block to the bucker.
// The compaction will download the meta block of
// this block to plan the compaction groups.
{
blckID, err = testutil.CreateBlock(
ctx,
dir,
[]labels.Labels{
{{Name: "a", Value: "1"}},
},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, blckID.String())))
}

return ctx, logger, dir, blckID, bkt, prometheus.NewRegistry()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.6.0
github.com/prometheus/prometheus v1.8.2-0.20190913102521-8ab628b35467 // v1.8.2 is misleading as Prometheus does not have v2 module.
github.com/uber-go/atomic v1.4.0 // indirect
Expand Down
15 changes: 13 additions & 2 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,19 @@ type syncerMetrics struct {
compactionFailures *prometheus.CounterVec
}

const (
MetricSyncMetaName = "thanos_compact_sync_meta_total"
MetricSyncMetaHelp = "Total number of sync meta operations."
)

func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {
var m syncerMetrics

m.syncMetas = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_sync_meta_total",
Help: "Total number of sync meta operations.",
Name: MetricSyncMetaName,
Help: MetricSyncMetaHelp,
})

m.syncMetaFailures = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_sync_meta_failures_total",
Help: "Total number of failed sync meta operations.",
Expand Down Expand Up @@ -1001,6 +1007,11 @@ func NewBucketCompactor(

// Compact runs compaction over bucket.
func (c *BucketCompactor) Compact(ctx context.Context) error {
defer func() {
if err := os.RemoveAll(c.compactDir); err != nil {
level.Error(c.logger).Log("msg", "failed to remove compaction cache directory", "path", c.compactDir, "err", err)
}
}()
// Loop over bucket and compact until there's no work left.
for {
var (
Expand Down
25 changes: 25 additions & 0 deletions pkg/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"reflect"
"runtime"
"testing"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

// Assert fails the test if the condition is false.
Expand Down Expand Up @@ -71,3 +74,25 @@ func Equals(tb testing.TB, exp, act interface{}, v ...interface{}) {
tb.FailNow()
}
}

// GatherAndCompare compares the metrics of a Gatherers pair.
func GatherAndCompare(t *testing.T, g1 prometheus.Gatherer, g2 prometheus.Gatherer, filter string) {
g1m, err := g1.Gather()
Ok(t, err)
g2m, err := g2.Gather()
Ok(t, err)

var m1 *dto.MetricFamily
for _, m := range g1m {
if *m.Name == filter {
m1 = m
}
}
var m2 *dto.MetricFamily
for _, m := range g2m {
if *m.Name == filter {
m2 = m
}
}
Equals(t, m1.String(), m2.String())
}

0 comments on commit a09a4b9

Please sign in to comment.