From b3240c8418254d5f41950a18a369a4e8e47ade87 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 30 Nov 2018 14:50:34 +0200 Subject: [PATCH 01/12] no overlapping on compaction when an existing block is not within default boundaries. Signed-off-by: Krasi Georgiev --- block_test.go | 7 ++- db.go | 3 +- db_test.go | 111 ++++++++++++++++++++++++++++++++++++++++++++++++ head.go | 9 +++- querier_test.go | 2 +- 5 files changed, 125 insertions(+), 7 deletions(-) diff --git a/block_test.go b/block_test.go index 61666fe3..460b40dd 100644 --- a/block_test.go +++ b/block_test.go @@ -78,7 +78,7 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block { } // createPopulatedBlock creates a block with nSeries series, and nSamples samples. -func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Block { +func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) *Block { head, err := NewHead(nil, nil, nil, 2*60*60*1000) testutil.Ok(tb, err) defer head.Close() @@ -87,12 +87,11 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Blo testutil.Ok(tb, err) refs := make([]uint64, nSeries) - for n := 0; n < nSamples; n++ { + for ts := mint; ts <= maxt; ts++ { app := head.Appender() - ts := n * 1000 for i, lbl := range lbls { if refs[i] != 0 { - err := app.AddFast(refs[i], int64(ts), rand.Float64()) + err := app.AddFast(refs[i], ts, rand.Float64()) if err == nil { continue } diff --git a/db.go b/db.go index bbd46c30..d40764e3 100644 --- a/db.go +++ b/db.go @@ -395,7 +395,8 @@ func (db *DB) compact() (err error) { if db.head.MaxTime()-db.head.MinTime() <= db.opts.BlockRanges[0]/2*3 { break } - mint, maxt := rangeForTimestamp(db.head.MinTime(), db.opts.BlockRanges[0]) + mint := db.head.MinTime() + _, maxt := rangeForTimestamp(mint, db.opts.BlockRanges[0]) // Wrap head into a range that bounds all reads to it. head := &rangeHead{ diff --git a/db_test.go b/db_test.go index 6fd59f34..d6c48344 100644 --- a/db_test.go +++ b/db_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "flag" "fmt" "io/ioutil" "math" @@ -25,6 +26,7 @@ import ( "testing" "time" + "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -1441,3 +1443,112 @@ func TestCorrectNumTombstones(t *testing.T) { testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar"))) testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones) } + +// h.minTime in different scenarios: +// - no blocks no WAL: set to the time of the first appended sample +// - no blocks with WAL: set to the smallest sample from the WAL +// - with blocks no WAL: set to the last block maxT +// - with blocks with WAL: same as above +// TestBlockRanges checks the following use cases: +// - No samples can be added with timestamps lower than the last block maxt. +// - The compactor doesn't create overlaping blocks +// even when the last blocks is not within the default boundaries. +// - Lower bondary is based on the smallest sample in the head and +// upper boundary is rounded to the configured block range. +// +// This ensures that a snapshot that includes the head and creates a block with a custom time range +// will not overlap with the first block created by the next compaction. +func TestBlockRanges(t *testing.T) { + logger := log.NewNopLogger() + flag.Visit(func(f *flag.Flag) { + if f.Name == "test.v" && f.Value.String() == "true" { + logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + } + }) + + dir, err := ioutil.TempDir("", "test_storage") + if err != nil { + t.Fatalf("Opening test dir failed: %s", err) + } + + rangeToTriggercompaction := DefaultOptions.BlockRanges[0]/2*3 + 1 + + // Test that the compactor doesn't create overlapping blocks + // when a non standard block already exists. + firstBlockMaxT := int64(3) + createPopulatedBlock(t, dir, 1, 0, firstBlockMaxT) + db, err := Open(dir, logger, nil, DefaultOptions) + if err != nil { + t.Fatalf("Opening test storage failed: %s", err) + } + defer func() { + os.RemoveAll(dir) + }() + app := db.Appender() + lbl := labels.Labels{{"a", "b"}} + _, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64()) + if err == nil { + t.Fatalf("appending a sample with a timestamp covered by a previous block shouldn't be possible") + } + _, err = app.Add(lbl, firstBlockMaxT+1, rand.Float64()) + testutil.Ok(t, err) + _, err = app.Add(lbl, firstBlockMaxT+2, rand.Float64()) + testutil.Ok(t, err) + secondBlockMaxt := firstBlockMaxT + rangeToTriggercompaction + _, err = app.Add(lbl, secondBlockMaxt, rand.Float64()) // Add samples to trigger a new compaction + + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + for x := 1; x < 10; x++ { + if len(db.Blocks()) == 2 { + break + } + time.Sleep(100 * time.Millisecond) + } + testutil.Equals(t, 2, len(db.Blocks()), "no new block created after the set timeout") + + if db.Blocks()[0].Meta().MaxTime > db.Blocks()[1].Meta().MinTime { + t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta()) + } + + // Test that wal records are skipped when an existing block covers the same time ranges + // and compaction doesn't create an overlapping block. + db.DisableCompactions() + _, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64()) + testutil.Ok(t, err) + _, err = app.Add(lbl, secondBlockMaxt+2, rand.Float64()) + testutil.Ok(t, err) + _, err = app.Add(lbl, secondBlockMaxt+3, rand.Float64()) + testutil.Ok(t, err) + _, err = app.Add(lbl, secondBlockMaxt+4, rand.Float64()) + testutil.Ok(t, app.Commit()) + testutil.Ok(t, db.Close()) + + thirdBlockMaxt := secondBlockMaxt + 2 + createPopulatedBlock(t, dir, 1, secondBlockMaxt+1, thirdBlockMaxt) + + db, err = Open(dir, logger, nil, DefaultOptions) + if err != nil { + t.Fatalf("Opening test storage failed: %s", err) + } + defer db.Close() + testutil.Equals(t, 3, len(db.Blocks()), "db doesn't include expected number of blocks") + testutil.Equals(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block") + + app = db.Appender() + _, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggercompaction, rand.Float64()) // Trigger a compaction + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + for x := 1; x < 10; x++ { + if len(db.Blocks()) == 4 { + break + } + time.Sleep(100 * time.Millisecond) + } + + testutil.Equals(t, 4, len(db.Blocks()), "no new block created after the set timeout") + + if db.Blocks()[2].Meta().MaxTime > db.Blocks()[3].Meta().MinTime { + t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta()) + } +} diff --git a/head.go b/head.go index 90aba0b9..f20669b0 100644 --- a/head.go +++ b/head.go @@ -655,13 +655,20 @@ func (h *Head) Appender() Appender { func (h *Head) appender() *headAppender { return &headAppender{ head: h, - minValidTime: h.MaxTime() - h.chunkRange/2, + minValidTime: max(h.MinTime(), h.MaxTime()-h.chunkRange/2), mint: math.MaxInt64, maxt: math.MinInt64, samples: h.getAppendBuffer(), } } +func max(a, b int64) int64 { + if a > b { + return a + } + return b +} + func (h *Head) getAppendBuffer() []RefSample { b := h.appendPool.Get() if b == nil { diff --git a/querier_test.go b/querier_test.go index 2eb5b037..e31072ce 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1292,7 +1292,7 @@ func BenchmarkPersistedQueries(b *testing.B) { dir, err := ioutil.TempDir("", "bench_persisted") testutil.Ok(b, err) defer os.RemoveAll(dir) - block := createPopulatedBlock(b, dir, nSeries, nSamples) + block := createPopulatedBlock(b, dir, nSeries, 1, int64(nSamples)) defer block.Close() q, err := NewBlockQuerier(block, block.Meta().MinTime, block.Meta().MaxTime) From c772180e72a763c53741f1d6b0e804bc900a897a Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 30 Nov 2018 16:15:10 +0200 Subject: [PATCH 02/12] fix broken tests that rely on the old behaviour Signed-off-by: Krasi Georgiev --- db_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/db_test.go b/db_test.go index d6c48344..b02da229 100644 --- a/db_test.go +++ b/db_test.go @@ -182,7 +182,7 @@ func TestDBAppenderAddRef(t *testing.T) { err = app2.AddFast(ref2, 143, 2) testutil.Ok(t, err) - err = app2.AddFast(9999999, 1, 1) + err = app2.AddFast(9999999, 124, 1) testutil.Equals(t, ErrNotFound, errors.Cause(err)) testutil.Ok(t, app2.Commit()) @@ -571,20 +571,20 @@ func TestDB_e2e(t *testing.T) { app := db.Appender() + ts := rand.Int63n(300) for _, l := range lbls { lset := labels.New(l...) series := []sample{} - ts := rand.Int63n(300) + next := ts for i := 0; i < numDatapoints; i++ { v := rand.Float64() - series = append(series, sample{ts, v}) - - _, err := app.Add(lset, ts, v) + series = append(series, sample{next, v}) + _, err := app.Add(lset, next, v) testutil.Ok(t, err) - ts += rand.Int63n(timeInterval) + 1 + next += rand.Int63n(timeInterval) + 1 } seriesMap[lset.String()] = series From afe03715cd983005014a2014d55e3e886130bf50 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 30 Nov 2018 16:19:13 +0200 Subject: [PATCH 03/12] comment Signed-off-by: Krasi Georgiev --- db_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db_test.go b/db_test.go index b02da229..feacd13b 100644 --- a/db_test.go +++ b/db_test.go @@ -1460,7 +1460,7 @@ func TestCorrectNumTombstones(t *testing.T) { // will not overlap with the first block created by the next compaction. func TestBlockRanges(t *testing.T) { logger := log.NewNopLogger() - flag.Visit(func(f *flag.Flag) { + flag.Visit(func(f *flag.Flag) { // Use a real logger when we the test runs in verbose mode. if f.Name == "test.v" && f.Value.String() == "true" { logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) } From 48747cef50334c9fd1f806c4e87d513af19fafdb Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 30 Nov 2018 16:40:50 +0200 Subject: [PATCH 04/12] no need to check for verbose mode Signed-off-by: Krasi Georgiev --- db_test.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/db_test.go b/db_test.go index feacd13b..412102f6 100644 --- a/db_test.go +++ b/db_test.go @@ -14,7 +14,6 @@ package tsdb import ( - "flag" "fmt" "io/ioutil" "math" @@ -1459,12 +1458,7 @@ func TestCorrectNumTombstones(t *testing.T) { // This ensures that a snapshot that includes the head and creates a block with a custom time range // will not overlap with the first block created by the next compaction. func TestBlockRanges(t *testing.T) { - logger := log.NewNopLogger() - flag.Visit(func(f *flag.Flag) { // Use a real logger when we the test runs in verbose mode. - if f.Name == "test.v" && f.Value.String() == "true" { - logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) - } - }) + logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) dir, err := ioutil.TempDir("", "test_storage") if err != nil { From 7952f14d5e665a24dd36fc474a804c643f7355ca Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 30 Nov 2018 16:51:20 +0200 Subject: [PATCH 05/12] createPopulatedBlock comment Signed-off-by: Krasi Georgiev --- block_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/block_test.go b/block_test.go index 460b40dd..03ac006a 100644 --- a/block_test.go +++ b/block_test.go @@ -77,7 +77,8 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block { return b } -// createPopulatedBlock creates a block with nSeries series, and nSamples samples. +// createPopulatedBlock creates a block with nSeries series, filled with +// samples of the given mint,maxt time range. func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) *Block { head, err := NewHead(nil, nil, nil, 2*60*60*1000) testutil.Ok(tb, err) From 30ecfa1ed4460644455d05ae0676e7f958314d18 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 30 Nov 2018 16:51:33 +0200 Subject: [PATCH 06/12] simplified rangeForTimestamp Signed-off-by: Krasi Georgiev --- db.go | 7 +++---- head.go | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/db.go b/db.go index d40764e3..a65b75f0 100644 --- a/db.go +++ b/db.go @@ -396,7 +396,7 @@ func (db *DB) compact() (err error) { break } mint := db.head.MinTime() - _, maxt := rangeForTimestamp(mint, db.opts.BlockRanges[0]) + maxt := rangeForTimestamp(mint, db.opts.BlockRanges[0]) // Wrap head into a range that bounds all reads to it. head := &rangeHead{ @@ -827,9 +827,8 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { return sq, nil } -func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { - mint = (t / width) * width - return mint, mint + width +func rangeForTimestamp(t int64, width int64) (maxt int64) { + return (t/width)*width + width } // Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis. diff --git a/head.go b/head.go index f20669b0..e1929d81 100644 --- a/head.go +++ b/head.go @@ -1418,7 +1418,7 @@ func (s *memSeries) cut(mint int64) *memChunk { // Set upper bound on when the next chunk must be started. An earlier timestamp // may be chosen dynamically at a later point. - _, s.nextAt = rangeForTimestamp(mint, s.chunkRange) + s.nextAt = rangeForTimestamp(mint, s.chunkRange) app, err := c.chunk.Appender() if err != nil { From 831daf77149fb5b58c0c0153bb77ba3f676fddf4 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sat, 1 Dec 2018 13:16:30 +0200 Subject: [PATCH 07/12] Revert "fix broken tests that rely on the old behaviour" This reverts commit c772180e72a763c53741f1d6b0e804bc900a897a. Signed-off-by: Krasi Georgiev --- db_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/db_test.go b/db_test.go index 412102f6..d33fcd2e 100644 --- a/db_test.go +++ b/db_test.go @@ -181,7 +181,7 @@ func TestDBAppenderAddRef(t *testing.T) { err = app2.AddFast(ref2, 143, 2) testutil.Ok(t, err) - err = app2.AddFast(9999999, 124, 1) + err = app2.AddFast(9999999, 1, 1) testutil.Equals(t, ErrNotFound, errors.Cause(err)) testutil.Ok(t, app2.Commit()) @@ -570,20 +570,20 @@ func TestDB_e2e(t *testing.T) { app := db.Appender() - ts := rand.Int63n(300) for _, l := range lbls { lset := labels.New(l...) series := []sample{} - next := ts + ts := rand.Int63n(300) for i := 0; i < numDatapoints; i++ { v := rand.Float64() - series = append(series, sample{next, v}) - _, err := app.Add(lset, next, v) + series = append(series, sample{ts, v}) + + _, err := app.Add(lset, ts, v) testutil.Ok(t, err) - next += rand.Int63n(timeInterval) + 1 + ts += rand.Int63n(timeInterval) + 1 } seriesMap[lset.String()] = series From 527ba6f077c56b5e9cca941a5d232e08aebb18d8 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sat, 1 Dec 2018 14:24:09 +0200 Subject: [PATCH 08/12] refactor the logic Signed-off-by: Krasi Georgiev --- db.go | 15 ++++++++++++--- db_test.go | 10 +++++----- head.go | 27 ++++++++++++++------------- head_test.go | 7 ++++--- 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/db.go b/db.go index a65b75f0..3a47f0bf 100644 --- a/db.go +++ b/db.go @@ -271,12 +271,21 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if err != nil { return nil, err } - if err := db.head.Init(); err != nil { - return nil, errors.Wrap(err, "read WAL") - } + if err := db.reload(); err != nil { return nil, err } + // Set the min valid time for the ingested samples + // to be no lower than the maxt of the last block. + blocks := db.Blocks() + minValidTime := int64(math.MinInt64) + if len(blocks) > 0 { + minValidTime = blocks[len(blocks)-1].Meta().MaxTime + } + + if err := db.head.Init(minValidTime); err != nil { + return nil, errors.Wrap(err, "read WAL") + } go db.run() diff --git a/db_test.go b/db_test.go index d33fcd2e..60b26076 100644 --- a/db_test.go +++ b/db_test.go @@ -1200,6 +1200,11 @@ func TestQuerierWithBoundaryChunks(t *testing.T) { testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count) } +// TestInitializeHeadTimestamp ensures that the h.minTime is set properly. +// - no blocks no WAL: set to the time of the first appended sample +// - no blocks with WAL: set to the smallest sample from the WAL +// - with blocks no WAL: set to the last block maxT +// - with blocks with WAL: same as above func TestInitializeHeadTimestamp(t *testing.T) { t.Run("clean", func(t *testing.T) { dir, err := ioutil.TempDir("", "test_head_init") @@ -1443,11 +1448,6 @@ func TestCorrectNumTombstones(t *testing.T) { testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones) } -// h.minTime in different scenarios: -// - no blocks no WAL: set to the time of the first appended sample -// - no blocks with WAL: set to the smallest sample from the WAL -// - with blocks no WAL: set to the last block maxT -// - with blocks with WAL: same as above // TestBlockRanges checks the following use cases: // - No samples can be added with timestamps lower than the last block maxt. // - The compactor doesn't create overlaping blocks diff --git a/head.go b/head.go index e1929d81..8028a77d 100644 --- a/head.go +++ b/head.go @@ -59,7 +59,8 @@ type Head struct { appendPool sync.Pool bytesPool sync.Pool - minTime, maxTime int64 + minTime, maxTime int64 // Current min and max of the samples included in the head. + minValidTime int64 // Mint allowed to be added to te head. It shouldn't be lower than the last persisted block. lastSeriesID uint64 // All series addressable by their ID or hash. @@ -300,13 +301,6 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) { } func (h *Head) loadWAL(r *wal.Reader) error { - minValidTime := h.MinTime() - // If the min time is still uninitialized (no persisted blocks yet), - // we accept all sample timestamps from the WAL. - if minValidTime == math.MaxInt64 { - minValidTime = math.MinInt64 - } - // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs uint64 @@ -327,7 +321,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { inputs[i] = make(chan []RefSample, 300) go func(input <-chan []RefSample, output chan<- []RefSample) { - unknown := h.processWALSamples(minValidTime, input, output) + unknown := h.processWALSamples(h.minValidTime, input, output) atomic.AddUint64(&unknownRefs, unknown) wg.Done() }(inputs[i], outputs[i]) @@ -410,7 +404,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { } for _, s := range tstones { for _, itv := range s.intervals { - if itv.Maxt < minValidTime { + if itv.Maxt < h.minValidTime { continue } h.tombstones.addInterval(s.ref, itv) @@ -443,8 +437,12 @@ func (h *Head) loadWAL(r *wal.Reader) error { } // Init loads data from the write ahead log and prepares the head for writes. -func (h *Head) Init() error { +// It should be called before using an appender so that +// limits the ingested samples to the head min valid time. +func (h *Head) Init(minValidTime int64) error { + h.minValidTime = minValidTime defer h.postings.EnsureOrder() + defer h.gc() // After loading the wal remove the obsolete data from the head. if h.wal == nil { return nil @@ -486,6 +484,7 @@ func (h *Head) Init() error { if err := h.wal.Repair(err); err != nil { return errors.Wrap(err, "repair corrupted WAL") } + return nil } @@ -654,8 +653,10 @@ func (h *Head) Appender() Appender { func (h *Head) appender() *headAppender { return &headAppender{ - head: h, - minValidTime: max(h.MinTime(), h.MaxTime()-h.chunkRange/2), + head: h, + // Set the minimum valid time to whichever is greater the head min valid time or the compaciton window. + // This ensures that no samples will be added within the compaction window to avoid races. + minValidTime: max(h.minValidTime, h.MaxTime()-h.chunkRange/2), mint: math.MaxInt64, maxt: math.MinInt64, samples: h.getAppendBuffer(), diff --git a/head_test.go b/head_test.go index 93eccecd..c77ecaad 100644 --- a/head_test.go +++ b/head_test.go @@ -15,6 +15,7 @@ package tsdb import ( "io/ioutil" + "math" "math/rand" "os" "path/filepath" @@ -123,7 +124,7 @@ func TestHead_ReadWAL(t *testing.T) { testutil.Ok(t, err) defer head.Close() - testutil.Ok(t, head.Init()) + testutil.Ok(t, head.Init(math.MinInt64)) testutil.Equals(t, uint64(100), head.lastSeriesID) s10 := head.series.getByID(10) @@ -288,7 +289,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { testutil.Ok(t, err) defer head.Close() - testutil.Ok(t, head.Init()) + testutil.Ok(t, head.Init(math.MinInt64)) testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1"))) } @@ -923,7 +924,7 @@ func TestWalRepair(t *testing.T) { h, err := NewHead(nil, nil, w, 1) testutil.Ok(t, err) - testutil.Ok(t, h.Init()) + testutil.Ok(t, h.Init(math.MinInt64)) sr, err := wal.NewSegmentsReader(dir) testutil.Ok(t, err) From f192250444d21dedebe3830ee514009a1dca1633 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sat, 1 Dec 2018 16:58:08 +0200 Subject: [PATCH 09/12] update the failing test. Signed-off-by: Krasi Georgiev --- head_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/head_test.go b/head_test.go index c77ecaad..70873c7a 100644 --- a/head_test.go +++ b/head_test.go @@ -133,7 +133,7 @@ func TestHead_ReadWAL(t *testing.T) { s100 := head.series.getByID(100) testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset) - testutil.Equals(t, labels.FromStrings("a", "2"), s11.lset) + testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage colected at head.Init(). testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset) testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset) @@ -147,7 +147,6 @@ func TestHead_ReadWAL(t *testing.T) { } testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) - testutil.Equals(t, 0, len(s11.chunks)) testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0))) } From 0cc27efac043e209d9cafecd1922e0549472f5ae Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sat, 1 Dec 2018 17:15:50 +0200 Subject: [PATCH 10/12] update the changelog Signed-off-by: Krasi Georgiev --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 11f704b2..6836f674 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,3 +3,5 @@ - `LastCheckpoint` used to return just the segment name and now it returns the full relative path. - `NewSegmentsRangeReader` can now read over miltiple wal ranges by using the new `SegmentRange` struct. - `CorruptionErr` now also exposes the Segment `Dir` which is added when displaying any errors. + - `Head.Init()` is changed to `Head.Init(minValidTime int64)` where `minValidTime` is taken from the maxt of the last persisted block and any samples below `minValidTime` will not be loaded from the wal in the head. The same value is used when using the `Heah.Appender()` to disallow adding samples below the `minValidTime` timestamp. This change was nececary to fix a bug where a `Snapshot()` with the head included would create a block with custom time range(not bound to the default time ranges) and the next block population from the head would create an overlapping block. + - https://github.com/prometheus/tsdb/issues/446 \ No newline at end of file From 967ebfe37b31de2b3c11258616a04752ea59ba75 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Tue, 4 Dec 2018 12:15:37 +0200 Subject: [PATCH 11/12] nit and set the minValidTime at head truncating. Signed-off-by: Krasi Georgiev --- head.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/head.go b/head.go index 8028a77d..52a3dfe4 100644 --- a/head.go +++ b/head.go @@ -60,7 +60,7 @@ type Head struct { bytesPool sync.Pool minTime, maxTime int64 // Current min and max of the samples included in the head. - minValidTime int64 // Mint allowed to be added to te head. It shouldn't be lower than the last persisted block. + minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. lastSeriesID uint64 // All series addressable by their ID or hash. @@ -501,6 +501,7 @@ func (h *Head) Truncate(mint int64) (err error) { return nil } atomic.StoreInt64(&h.minTime, mint) + h.minValidTime = mint // Ensure that max time is at least as high as min time. for h.MaxTime() < mint { From 698e97271de3602542aee176cb2d5f11b8d9a0c4 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Tue, 4 Dec 2018 12:22:17 +0200 Subject: [PATCH 12/12] flaky test Signed-off-by: Krasi Georgiev --- compact_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compact_test.go b/compact_test.go index 4d99ce05..2489a21e 100644 --- a/compact_test.go +++ b/compact_test.go @@ -733,7 +733,7 @@ func TestDisableAutoCompactions(t *testing.T) { case db.compactc <- struct{}{}: default: } - for x := 0; x < 10; x++ { + for x := 0; x < 20; x++ { if len(db.Blocks()) > 0 { break }