Skip to content

Commit

Permalink
Removed legacy index.cache.json support
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci committed May 28, 2020
1 parent e7d431d commit 81603b1
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 686 deletions.
116 changes: 0 additions & 116 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ import (
"fmt"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -25,15 +23,12 @@ import (
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
Expand All @@ -42,11 +37,6 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

const (
metricIndexGenerateName = "thanos_compact_generated_index_total"
metricIndexGenerateHelp = "Total number of generated indexes."
)

var (
compactions = compactionSet{
1 * time.Hour,
Expand Down Expand Up @@ -265,7 +255,6 @@ func runCompact(
var (
compactDir = path.Join(conf.dataDir, "compact")
downsamplingDir = path.Join(conf.dataDir, "downsample")
indexCacheDir = path.Join(conf.dataDir, "index_cache")
)

if err := os.RemoveAll(downsamplingDir); err != nil {
Expand Down Expand Up @@ -345,17 +334,6 @@ func runCompact(
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

// Generate index files.
// TODO(bwplotka): Remove this in next release.
if conf.generateMissingIndexCacheFiles {
if err := sy.SyncMetas(ctx); err != nil {
return err
}
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, sy.Metas(), indexCacheDir); err != nil {
return err
}
}

if !conf.wait {
return compactMainFn()
}
Expand Down Expand Up @@ -439,96 +417,6 @@ func runCompact(
return nil
}

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, metas map[ulid.ULID]*metadata.Meta, dir string) error {
genIndex := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
})

if err := os.RemoveAll(dir); err != nil {
return errors.Wrap(err, "clean index cache directory")
}
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}

defer func() {
if err := os.RemoveAll(dir); err != nil {
level.Error(logger).Log("msg", "failed to remove index cache directory", "path", dir, "err", err)
}
}()

level.Info(logger).Log("msg", "start index cache processing")

for _, meta := range metas {
// New version of compactor pushes index cache along with data block.
// Skip uncompacted blocks.
if meta.Compaction.Level == 1 {
continue
}

if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil {
return err
}
genIndex.Inc()
}

level.Info(logger).Log("msg", "generating index cache files is done, you can remove startup argument `index.generate-missing-cache-file`")
return nil
}

func generateIndexCacheFile(
ctx context.Context,
bkt objstore.Bucket,
logger log.Logger,
indexCacheDir string,
meta *metadata.Meta,
) error {
id := meta.ULID

bdir := filepath.Join(indexCacheDir, id.String())
if err := os.MkdirAll(bdir, 0777); err != nil {
return errors.Wrap(err, "create block dir")
}

defer func() {
if err := os.RemoveAll(bdir); err != nil {
level.Error(logger).Log("msg", "failed to remove index cache directory", "path", bdir, "err", err)
}
}()

cachePath := filepath.Join(bdir, block.IndexCacheFilename)
cache := path.Join(meta.ULID.String(), block.IndexCacheFilename)

ok, err := bkt.Exists(ctx, cache)
if ok {
return nil
}
if err != nil {
return errors.Wrapf(err, "attempt to check if a cached index file exists")
}

level.Debug(logger).Log("msg", "make index cache", "block", id)

// Try to download index file from obj store.
indexPath := filepath.Join(bdir, block.IndexFilename)
index := path.Join(id.String(), block.IndexFilename)

if err := objstore.DownloadFile(ctx, logger, bkt, index, indexPath); err != nil {
return errors.Wrap(err, "download index file")
}

if err := indexheader.WriteJSON(logger, indexPath, cachePath); err != nil {
return errors.Wrap(err, "write index cache")
}

if err := objstore.UploadFile(ctx, logger, bkt, cachePath, cache); err != nil {
return errors.Wrap(err, "upload index cache")
}
return nil
}

type compactConfig struct {
haltOnError bool
acceptMalformedIndex bool
Expand All @@ -540,7 +428,6 @@ type compactConfig struct {
retentionRaw, retentionFiveMin, retentionOneHr model.Duration
wait bool
waitInterval time.Duration
generateMissingIndexCacheFiles bool
disableDownsampling bool
blockSyncConcurrency int
compactionConcurrency int
Expand Down Expand Up @@ -584,9 +471,6 @@ func (cc *compactConfig) registerFlag(cmd *kingpin.CmdClause) *compactConfig {
cmd.Flag("wait-interval", "Wait interval between consecutive compaction runs and bucket refreshes. Only works when --wait flag specified.").
Default("5m").DurationVar(&cc.waitInterval)

cmd.Flag("index.generate-missing-cache-file", "DEPRECATED flag. Will be removed in next release. If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload.").
Hidden().Default("false").BoolVar(&cc.generateMissingIndexCacheFiles)

cmd.Flag("downsampling.disable", "Disables downsampling. This is not recommended "+
"as querying long time ranges without non-downsampled data is not efficient and useful e.g it is not possible to render all samples for a human eye anyway").
Default("false").BoolVar(&cc.disableDownsampling)
Expand Down
66 changes: 0 additions & 66 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,88 +8,22 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)

func TestCleanupIndexCacheFolder(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())

// Upload one compaction lvl = 2 block, one compaction lvl = 1.
// We generate index cache files only for lvl > 1 blocks.
{
id, err := e2eutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "1"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0)
testutil.Ok(t, err)

meta, err := metadata.Read(filepath.Join(dir, id.String()))
testutil.Ok(t, err)

meta.Compaction.Level = 2

testutil.Ok(t, metadata.Write(logger, filepath.Join(dir, id.String()), meta))
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String())))
}
{
id, err := e2eutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "1"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String())))
}

reg := prometheus.NewRegistry()
expReg := prometheus.NewRegistry()
genIndexExp := promauto.With(expReg).NewCounter(prometheus.CounterOpts{
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
})
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil)
testutil.Ok(t, err)

metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metas, dir))

genIndexExp.Inc()
testutil.GatherAndCompare(t, expReg, reg, metricIndexGenerateName)

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir should not exist at the end of execution")
}

func TestCleanupDownsampleCacheFolder(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
Expand Down
13 changes: 2 additions & 11 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {

selectorRelabelConf := regSelectorRelabelFlags(cmd)

// TODO(bwplotka): Remove in v0.13.0 if no issues.
disableIndexHeader := cmd.Flag("store.disable-index-header", "If specified, Store Gateway will use index-cache.json for each block instead of recreating binary index-header").
Hidden().Default("false").Bool()

postingOffsetsInMemSampling := cmd.Flag("store.index-header-posting-offsets-in-mem-sampling", "Controls what is the ratio of postings offsets store will hold in memory. "+
"Larger value will keep less offsets, which will increase CPU cycles needed for query touching those postings. It's meant for setups that want low baseline memory pressure and where less traffic is expected. "+
"On the contrary, smaller value will increase baseline memory usage, but improve latency slightly. 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance. This works only when --store.disable-index-header is NOT specified.").
"On the contrary, smaller value will increase baseline memory usage, but improve latency slightly. 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance.").
Hidden().Default(fmt.Sprintf("%v", store.DefaultPostingOffsetInMemorySampling)).Int()

enablePostingsCompression := cmd.Flag("experimental.enable-index-cache-postings-compression", "If true, Store Gateway will reencode and compress postings before storing them into cache. Compressed postings take about 10% of the original size.").
Expand Down Expand Up @@ -146,7 +142,6 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
},
selectorRelabelConf,
*advertiseCompatibilityLabel,
*disableIndexHeader,
*enablePostingsCompression,
time.Duration(*consistencyDelay),
time.Duration(*ignoreDeletionMarksDelay),
Expand Down Expand Up @@ -179,7 +174,7 @@ func runStore(
blockSyncConcurrency int,
filterConf *store.FilterConfig,
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel, disableIndexHeader, enablePostingsCompression bool,
advertiseCompatibilityLabel, enablePostingsCompression bool,
consistencyDelay time.Duration,
ignoreDeletionMarksDelay time.Duration,
externalPrefix, prefixHeader string,
Expand Down Expand Up @@ -281,9 +276,6 @@ func runStore(
return errors.Wrap(err, "meta fetcher")
}

if !disableIndexHeader {
level.Info(logger).Log("msg", "index-header instead of index-cache.json enabled")
}
bs, err := store.NewBucketStore(
logger,
reg,
Expand All @@ -298,7 +290,6 @@ func runStore(
blockSyncConcurrency,
filterConf,
advertiseCompatibilityLabel,
!disableIndexHeader,
enablePostingsCompression,
postingOffsetsInMemSampling,
false,
Expand Down
8 changes: 0 additions & 8 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ const (
MetaFilename = "meta.json"
// IndexFilename is the known index file for block index.
IndexFilename = "index"
// IndexCacheFilename is the canonical name for json index cache file that stores essential information.
IndexCacheFilename = "index.cache.json"
// IndexHeaderFilename is the canonical name for binary index header file that stores essential information.
IndexHeaderFilename = "index-header"
// ChunksDirname is the known dir name for chunks with compressed samples.
Expand Down Expand Up @@ -105,12 +103,6 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload index"))
}

if meta.Thanos.Source == metadata.CompactorSource {
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexCacheFilename), path.Join(id.String(), IndexCacheFilename)); err != nil {
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload index cache"))
}
}

// Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file
// to be pending uploads.
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, MetaFilename), path.Join(id.String(), MetaFilename)); err != nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,3 +886,17 @@ func (r BinaryReader) LabelNames() []string {
}

func (r *BinaryReader) Close() error { return r.c.Close() }

type realByteSlice []byte

func (b realByteSlice) Len() int {
return len(b)
}

func (b realByteSlice) Range(start, end int) []byte {
return b[start:end]
}

func (b realByteSlice) Sub(start, end int) index.ByteSlice {
return b[start:end]
}
Loading

0 comments on commit 81603b1

Please sign in to comment.