From 891d0b52b7de861d4a8d7969236383d4f2157a5e Mon Sep 17 00:00:00 2001 From: Olivier Biesmans Date: Wed, 23 Oct 2019 17:00:38 +0200 Subject: [PATCH] Upload blocks with the lowest minTime first Fixes #1670 Signed-off-by: Olivier Biesmans --- CHANGELOG.md | 1 + pkg/shipper/shipper.go | 13 ++++- pkg/shipper/shipper_test.go | 95 +++++++++++++++++++++++++++++++++++++ 3 files changed, 107 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f74eca471..0cba332f20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1656](https://github.com/thanos-io/thanos/pull/1656) Thanos Store now starts metric and status probe HTTP server earlier in its start-up sequence. `/-/healthy` endpoint now starts to respond with success earlier. `/metrics` endpoint starts serving metrics earlier as well. Make sure to point your readiness probes to the `/-/ready` endpoint rather than `/metrics`. - [#1669](https://github.com/thanos-io/thanos/pull/1669) Fixed store sharding. Now it does not load excluded meta.jsons and load/fetch index-cache.json files. +- [#1670](https://github.com/thanos-io/thanos/pull/1670) Fixed un-ordered blocks upload. Sidecar now uploads the oldest blocks first. ### Changed diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 6631c274f7..3f4267e95d 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -380,10 +380,12 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { return block.Upload(ctx, s.logger, s.bucket, updir) } -// iterBlockMetas calls f with the block meta for each block found in dir. It logs -// an error and continues if it cannot access a meta.json file. +// iterBlockMetas calls f with the block meta for each block found in dir +// sorted by minTime asc. It logs an error and continues if it cannot access a +// meta.json file. // If f returns an error, the function returns with the same error. func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error { + var metas []*metadata.Meta names, err := fileutil.ReadDir(s.dir) if err != nil { return errors.Wrap(err, "read dir") @@ -407,6 +409,13 @@ func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error { level.Warn(s.logger).Log("msg", "reading meta file failed", "err", err) continue } + metas = append(metas, m) + } + sort.Slice(metas, func(i, j int) bool { + return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime + }) + for _, m := range metas { + if err := f(m); err != nil { return err } diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index a0f02572e8..85a78b16dc 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -3,8 +3,10 @@ package shipper import ( "io/ioutil" "math" + "math/rand" "os" "path" + "sort" "testing" "github.com/go-kit/kit/log" @@ -77,3 +79,96 @@ func TestShipperTimestamps(t *testing.T) { testutil.Equals(t, int64(1000), mint) testutil.Equals(t, int64(2000), maxt) } + +func TestIterBlockMetas(t *testing.T) { + var metas []*metadata.Meta + dir, err := ioutil.TempDir("", "shipper-test") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + id1 := ulid.MustNew(1, nil) + testutil.Ok(t, os.Mkdir(path.Join(dir, id1.String()), os.ModePerm)) + testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id1.String()), &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: id1, + MaxTime: 2000, + MinTime: 1000, + Version: 1, + }, + })) + + id2 := ulid.MustNew(2, nil) + testutil.Ok(t, os.Mkdir(path.Join(dir, id2.String()), os.ModePerm)) + testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id2.String()), &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: id2, + MaxTime: 5000, + MinTime: 4000, + Version: 1, + }, + })) + + id3 := ulid.MustNew(3, nil) + testutil.Ok(t, os.Mkdir(path.Join(dir, id3.String()), os.ModePerm)) + testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id3.String()), &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: id3, + MaxTime: 3000, + MinTime: 2000, + Version: 1, + }, + })) + + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource) + if err := shipper.iterBlockMetas(func(m *metadata.Meta) error { + metas = append(metas, m) + return nil + }); err != nil { + testutil.Ok(t, err) + } + testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool { + return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime + }), true) +} + +func BenchmarkIterBlockMetas(b *testing.B) { + var metas []*metadata.Meta + dir, err := ioutil.TempDir("", "shipper-test") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(dir)) + }() + + for i := 0; i < 100; i++ { + id := ulid.MustNew(uint64(i), nil) + testutil.Ok(b, os.Mkdir(path.Join(dir, id.String()), os.ModePerm)) + testutil.Ok(b, + metadata.Write( + log.NewNopLogger(), + path.Join(dir, id.String()), + &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: id, + MaxTime: int64((i + 1) * 1000), + MinTime: int64(i * 1000), + Version: 1, + }, + }, + ), + ) + } + rand.Shuffle(len(metas), func(i, j int) { + metas[i], metas[j] = metas[j], metas[i] + }) + b.ResetTimer() + + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource) + if err := shipper.iterBlockMetas(func(m *metadata.Meta) error { + metas = append(metas, m) + return nil + }); err != nil { + testutil.Ok(b, err) + } +}