Skip to content

Commit

Permalink
Upload blocks with the lowest minTime first
Browse files Browse the repository at this point in the history
Fixes thanos-io#1670

Signed-off-by: Olivier Biesmans <[email protected]>
  • Loading branch information
Olivier Biesmans committed Oct 24, 2019
1 parent 64af185 commit 891d0b5
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1656](https://github.com/thanos-io/thanos/pull/1656) Thanos Store now starts metric and status probe HTTP server earlier in its start-up sequence. `/-/healthy` endpoint now starts to respond with success earlier. `/metrics` endpoint starts serving metrics earlier as well. Make sure to point your readiness probes to the `/-/ready` endpoint rather than `/metrics`.
- [#1669](https://github.com/thanos-io/thanos/pull/1669) Fixed store sharding. Now it does not load excluded meta.jsons and load/fetch index-cache.json files.
- [#1670](https://github.com/thanos-io/thanos/pull/1670) Fixed un-ordered blocks upload. Sidecar now uploads the oldest blocks first.

### Changed

Expand Down
13 changes: 11 additions & 2 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,12 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
return block.Upload(ctx, s.logger, s.bucket, updir)
}

// iterBlockMetas calls f with the block meta for each block found in dir. It logs
// an error and continues if it cannot access a meta.json file.
// iterBlockMetas calls f with the block meta for each block found in dir
// sorted by minTime asc. It logs an error and continues if it cannot access a
// meta.json file.
// If f returns an error, the function returns with the same error.
func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error {
var metas []*metadata.Meta
names, err := fileutil.ReadDir(s.dir)
if err != nil {
return errors.Wrap(err, "read dir")
Expand All @@ -407,6 +409,13 @@ func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error {
level.Warn(s.logger).Log("msg", "reading meta file failed", "err", err)
continue
}
metas = append(metas, m)
}
sort.Slice(metas, func(i, j int) bool {
return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime
})
for _, m := range metas {

if err := f(m); err != nil {
return err
}
Expand Down
95 changes: 95 additions & 0 deletions pkg/shipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package shipper
import (
"io/ioutil"
"math"
"math/rand"
"os"
"path"
"sort"
"testing"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -77,3 +79,96 @@ func TestShipperTimestamps(t *testing.T) {
testutil.Equals(t, int64(1000), mint)
testutil.Equals(t, int64(2000), maxt)
}

func TestIterBlockMetas(t *testing.T) {
var metas []*metadata.Meta
dir, err := ioutil.TempDir("", "shipper-test")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

id1 := ulid.MustNew(1, nil)
testutil.Ok(t, os.Mkdir(path.Join(dir, id1.String()), os.ModePerm))
testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id1.String()), &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: id1,
MaxTime: 2000,
MinTime: 1000,
Version: 1,
},
}))

id2 := ulid.MustNew(2, nil)
testutil.Ok(t, os.Mkdir(path.Join(dir, id2.String()), os.ModePerm))
testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id2.String()), &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: id2,
MaxTime: 5000,
MinTime: 4000,
Version: 1,
},
}))

id3 := ulid.MustNew(3, nil)
testutil.Ok(t, os.Mkdir(path.Join(dir, id3.String()), os.ModePerm))
testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id3.String()), &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: id3,
MaxTime: 3000,
MinTime: 2000,
Version: 1,
},
}))

shipper := New(nil, nil, dir, nil, nil, metadata.TestSource)
if err := shipper.iterBlockMetas(func(m *metadata.Meta) error {
metas = append(metas, m)
return nil
}); err != nil {
testutil.Ok(t, err)
}
testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool {
return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime
}), true)
}

func BenchmarkIterBlockMetas(b *testing.B) {
var metas []*metadata.Meta
dir, err := ioutil.TempDir("", "shipper-test")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(dir))
}()

for i := 0; i < 100; i++ {
id := ulid.MustNew(uint64(i), nil)
testutil.Ok(b, os.Mkdir(path.Join(dir, id.String()), os.ModePerm))
testutil.Ok(b,
metadata.Write(
log.NewNopLogger(),
path.Join(dir, id.String()),
&metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: id,
MaxTime: int64((i + 1) * 1000),
MinTime: int64(i * 1000),
Version: 1,
},
},
),
)
}
rand.Shuffle(len(metas), func(i, j int) {
metas[i], metas[j] = metas[j], metas[i]
})
b.ResetTimer()

shipper := New(nil, nil, dir, nil, nil, metadata.TestSource)
if err := shipper.iterBlockMetas(func(m *metadata.Meta) error {
metas = append(metas, m)
return nil
}); err != nil {
testutil.Ok(b, err)
}
}

0 comments on commit 891d0b5

Please sign in to comment.