diff --git a/CHANGELOG.md b/CHANGELOG.md index 11f704b2..6836f674 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,3 +3,5 @@ - `LastCheckpoint` used to return just the segment name and now it returns the full relative path. - `NewSegmentsRangeReader` can now read over miltiple wal ranges by using the new `SegmentRange` struct. - `CorruptionErr` now also exposes the Segment `Dir` which is added when displaying any errors. + - `Head.Init()` is changed to `Head.Init(minValidTime int64)` where `minValidTime` is taken from the maxt of the last persisted block and any samples below `minValidTime` will not be loaded from the wal in the head. The same value is used when using the `Heah.Appender()` to disallow adding samples below the `minValidTime` timestamp. This change was nececary to fix a bug where a `Snapshot()` with the head included would create a block with custom time range(not bound to the default time ranges) and the next block population from the head would create an overlapping block. + - https://github.com/prometheus/tsdb/issues/446 \ No newline at end of file diff --git a/block_test.go b/block_test.go index 61666fe3..03ac006a 100644 --- a/block_test.go +++ b/block_test.go @@ -77,8 +77,9 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block { return b } -// createPopulatedBlock creates a block with nSeries series, and nSamples samples. -func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Block { +// createPopulatedBlock creates a block with nSeries series, filled with +// samples of the given mint,maxt time range. +func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) *Block { head, err := NewHead(nil, nil, nil, 2*60*60*1000) testutil.Ok(tb, err) defer head.Close() @@ -87,12 +88,11 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Blo testutil.Ok(tb, err) refs := make([]uint64, nSeries) - for n := 0; n < nSamples; n++ { + for ts := mint; ts <= maxt; ts++ { app := head.Appender() - ts := n * 1000 for i, lbl := range lbls { if refs[i] != 0 { - err := app.AddFast(refs[i], int64(ts), rand.Float64()) + err := app.AddFast(refs[i], ts, rand.Float64()) if err == nil { continue } diff --git a/compact_test.go b/compact_test.go index 4d99ce05..2489a21e 100644 --- a/compact_test.go +++ b/compact_test.go @@ -733,7 +733,7 @@ func TestDisableAutoCompactions(t *testing.T) { case db.compactc <- struct{}{}: default: } - for x := 0; x < 10; x++ { + for x := 0; x < 20; x++ { if len(db.Blocks()) > 0 { break } diff --git a/db.go b/db.go index bbd46c30..3a47f0bf 100644 --- a/db.go +++ b/db.go @@ -271,12 +271,21 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if err != nil { return nil, err } - if err := db.head.Init(); err != nil { - return nil, errors.Wrap(err, "read WAL") - } + if err := db.reload(); err != nil { return nil, err } + // Set the min valid time for the ingested samples + // to be no lower than the maxt of the last block. + blocks := db.Blocks() + minValidTime := int64(math.MinInt64) + if len(blocks) > 0 { + minValidTime = blocks[len(blocks)-1].Meta().MaxTime + } + + if err := db.head.Init(minValidTime); err != nil { + return nil, errors.Wrap(err, "read WAL") + } go db.run() @@ -395,7 +404,8 @@ func (db *DB) compact() (err error) { if db.head.MaxTime()-db.head.MinTime() <= db.opts.BlockRanges[0]/2*3 { break } - mint, maxt := rangeForTimestamp(db.head.MinTime(), db.opts.BlockRanges[0]) + mint := db.head.MinTime() + maxt := rangeForTimestamp(mint, db.opts.BlockRanges[0]) // Wrap head into a range that bounds all reads to it. head := &rangeHead{ @@ -826,9 +836,8 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { return sq, nil } -func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { - mint = (t / width) * width - return mint, mint + width +func rangeForTimestamp(t int64, width int64) (maxt int64) { + return (t/width)*width + width } // Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis. diff --git a/db_test.go b/db_test.go index 6fd59f34..60b26076 100644 --- a/db_test.go +++ b/db_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -1199,6 +1200,11 @@ func TestQuerierWithBoundaryChunks(t *testing.T) { testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count) } +// TestInitializeHeadTimestamp ensures that the h.minTime is set properly. +// - no blocks no WAL: set to the time of the first appended sample +// - no blocks with WAL: set to the smallest sample from the WAL +// - with blocks no WAL: set to the last block maxT +// - with blocks with WAL: same as above func TestInitializeHeadTimestamp(t *testing.T) { t.Run("clean", func(t *testing.T) { dir, err := ioutil.TempDir("", "test_head_init") @@ -1441,3 +1447,102 @@ func TestCorrectNumTombstones(t *testing.T) { testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar"))) testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones) } + +// TestBlockRanges checks the following use cases: +// - No samples can be added with timestamps lower than the last block maxt. +// - The compactor doesn't create overlaping blocks +// even when the last blocks is not within the default boundaries. +// - Lower bondary is based on the smallest sample in the head and +// upper boundary is rounded to the configured block range. +// +// This ensures that a snapshot that includes the head and creates a block with a custom time range +// will not overlap with the first block created by the next compaction. +func TestBlockRanges(t *testing.T) { + logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + + dir, err := ioutil.TempDir("", "test_storage") + if err != nil { + t.Fatalf("Opening test dir failed: %s", err) + } + + rangeToTriggercompaction := DefaultOptions.BlockRanges[0]/2*3 + 1 + + // Test that the compactor doesn't create overlapping blocks + // when a non standard block already exists. + firstBlockMaxT := int64(3) + createPopulatedBlock(t, dir, 1, 0, firstBlockMaxT) + db, err := Open(dir, logger, nil, DefaultOptions) + if err != nil { + t.Fatalf("Opening test storage failed: %s", err) + } + defer func() { + os.RemoveAll(dir) + }() + app := db.Appender() + lbl := labels.Labels{{"a", "b"}} + _, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64()) + if err == nil { + t.Fatalf("appending a sample with a timestamp covered by a previous block shouldn't be possible") + } + _, err = app.Add(lbl, firstBlockMaxT+1, rand.Float64()) + testutil.Ok(t, err) + _, err = app.Add(lbl, firstBlockMaxT+2, rand.Float64()) + testutil.Ok(t, err) + secondBlockMaxt := firstBlockMaxT + rangeToTriggercompaction + _, err = app.Add(lbl, secondBlockMaxt, rand.Float64()) // Add samples to trigger a new compaction + + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + for x := 1; x < 10; x++ { + if len(db.Blocks()) == 2 { + break + } + time.Sleep(100 * time.Millisecond) + } + testutil.Equals(t, 2, len(db.Blocks()), "no new block created after the set timeout") + + if db.Blocks()[0].Meta().MaxTime > db.Blocks()[1].Meta().MinTime { + t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta()) + } + + // Test that wal records are skipped when an existing block covers the same time ranges + // and compaction doesn't create an overlapping block. + db.DisableCompactions() + _, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64()) + testutil.Ok(t, err) + _, err = app.Add(lbl, secondBlockMaxt+2, rand.Float64()) + testutil.Ok(t, err) + _, err = app.Add(lbl, secondBlockMaxt+3, rand.Float64()) + testutil.Ok(t, err) + _, err = app.Add(lbl, secondBlockMaxt+4, rand.Float64()) + testutil.Ok(t, app.Commit()) + testutil.Ok(t, db.Close()) + + thirdBlockMaxt := secondBlockMaxt + 2 + createPopulatedBlock(t, dir, 1, secondBlockMaxt+1, thirdBlockMaxt) + + db, err = Open(dir, logger, nil, DefaultOptions) + if err != nil { + t.Fatalf("Opening test storage failed: %s", err) + } + defer db.Close() + testutil.Equals(t, 3, len(db.Blocks()), "db doesn't include expected number of blocks") + testutil.Equals(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block") + + app = db.Appender() + _, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggercompaction, rand.Float64()) // Trigger a compaction + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + for x := 1; x < 10; x++ { + if len(db.Blocks()) == 4 { + break + } + time.Sleep(100 * time.Millisecond) + } + + testutil.Equals(t, 4, len(db.Blocks()), "no new block created after the set timeout") + + if db.Blocks()[2].Meta().MaxTime > db.Blocks()[3].Meta().MinTime { + t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta()) + } +} diff --git a/head.go b/head.go index 90aba0b9..52a3dfe4 100644 --- a/head.go +++ b/head.go @@ -59,7 +59,8 @@ type Head struct { appendPool sync.Pool bytesPool sync.Pool - minTime, maxTime int64 + minTime, maxTime int64 // Current min and max of the samples included in the head. + minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. lastSeriesID uint64 // All series addressable by their ID or hash. @@ -300,13 +301,6 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) { } func (h *Head) loadWAL(r *wal.Reader) error { - minValidTime := h.MinTime() - // If the min time is still uninitialized (no persisted blocks yet), - // we accept all sample timestamps from the WAL. - if minValidTime == math.MaxInt64 { - minValidTime = math.MinInt64 - } - // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs uint64 @@ -327,7 +321,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { inputs[i] = make(chan []RefSample, 300) go func(input <-chan []RefSample, output chan<- []RefSample) { - unknown := h.processWALSamples(minValidTime, input, output) + unknown := h.processWALSamples(h.minValidTime, input, output) atomic.AddUint64(&unknownRefs, unknown) wg.Done() }(inputs[i], outputs[i]) @@ -410,7 +404,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { } for _, s := range tstones { for _, itv := range s.intervals { - if itv.Maxt < minValidTime { + if itv.Maxt < h.minValidTime { continue } h.tombstones.addInterval(s.ref, itv) @@ -443,8 +437,12 @@ func (h *Head) loadWAL(r *wal.Reader) error { } // Init loads data from the write ahead log and prepares the head for writes. -func (h *Head) Init() error { +// It should be called before using an appender so that +// limits the ingested samples to the head min valid time. +func (h *Head) Init(minValidTime int64) error { + h.minValidTime = minValidTime defer h.postings.EnsureOrder() + defer h.gc() // After loading the wal remove the obsolete data from the head. if h.wal == nil { return nil @@ -486,6 +484,7 @@ func (h *Head) Init() error { if err := h.wal.Repair(err); err != nil { return errors.Wrap(err, "repair corrupted WAL") } + return nil } @@ -502,6 +501,7 @@ func (h *Head) Truncate(mint int64) (err error) { return nil } atomic.StoreInt64(&h.minTime, mint) + h.minValidTime = mint // Ensure that max time is at least as high as min time. for h.MaxTime() < mint { @@ -654,14 +654,23 @@ func (h *Head) Appender() Appender { func (h *Head) appender() *headAppender { return &headAppender{ - head: h, - minValidTime: h.MaxTime() - h.chunkRange/2, + head: h, + // Set the minimum valid time to whichever is greater the head min valid time or the compaciton window. + // This ensures that no samples will be added within the compaction window to avoid races. + minValidTime: max(h.minValidTime, h.MaxTime()-h.chunkRange/2), mint: math.MaxInt64, maxt: math.MinInt64, samples: h.getAppendBuffer(), } } +func max(a, b int64) int64 { + if a > b { + return a + } + return b +} + func (h *Head) getAppendBuffer() []RefSample { b := h.appendPool.Get() if b == nil { @@ -1411,7 +1420,7 @@ func (s *memSeries) cut(mint int64) *memChunk { // Set upper bound on when the next chunk must be started. An earlier timestamp // may be chosen dynamically at a later point. - _, s.nextAt = rangeForTimestamp(mint, s.chunkRange) + s.nextAt = rangeForTimestamp(mint, s.chunkRange) app, err := c.chunk.Appender() if err != nil { diff --git a/head_test.go b/head_test.go index 93eccecd..70873c7a 100644 --- a/head_test.go +++ b/head_test.go @@ -15,6 +15,7 @@ package tsdb import ( "io/ioutil" + "math" "math/rand" "os" "path/filepath" @@ -123,7 +124,7 @@ func TestHead_ReadWAL(t *testing.T) { testutil.Ok(t, err) defer head.Close() - testutil.Ok(t, head.Init()) + testutil.Ok(t, head.Init(math.MinInt64)) testutil.Equals(t, uint64(100), head.lastSeriesID) s10 := head.series.getByID(10) @@ -132,7 +133,7 @@ func TestHead_ReadWAL(t *testing.T) { s100 := head.series.getByID(100) testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset) - testutil.Equals(t, labels.FromStrings("a", "2"), s11.lset) + testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage colected at head.Init(). testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset) testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset) @@ -146,7 +147,6 @@ func TestHead_ReadWAL(t *testing.T) { } testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) - testutil.Equals(t, 0, len(s11.chunks)) testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0))) } @@ -288,7 +288,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { testutil.Ok(t, err) defer head.Close() - testutil.Ok(t, head.Init()) + testutil.Ok(t, head.Init(math.MinInt64)) testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1"))) } @@ -923,7 +923,7 @@ func TestWalRepair(t *testing.T) { h, err := NewHead(nil, nil, w, 1) testutil.Ok(t, err) - testutil.Ok(t, h.Init()) + testutil.Ok(t, h.Init(math.MinInt64)) sr, err := wal.NewSegmentsReader(dir) testutil.Ok(t, err) diff --git a/querier_test.go b/querier_test.go index 2eb5b037..e31072ce 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1292,7 +1292,7 @@ func BenchmarkPersistedQueries(b *testing.B) { dir, err := ioutil.TempDir("", "bench_persisted") testutil.Ok(b, err) defer os.RemoveAll(dir) - block := createPopulatedBlock(b, dir, nSeries, nSamples) + block := createPopulatedBlock(b, dir, nSeries, 1, int64(nSamples)) defer block.Close() q, err := NewBlockQuerier(block, block.Meta().MinTime, block.Meta().MaxTime)