Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

no overlapping on compaction when an existing block is not within default boundaries. #461

Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
- `LastCheckpoint` used to return just the segment name and now it returns the full relative path.
- `NewSegmentsRangeReader` can now read over miltiple wal ranges by using the new `SegmentRange` struct.
- `CorruptionErr` now also exposes the Segment `Dir` which is added when displaying any errors.
- `Head.Init()` is changed to `Head.Init(minValidTime int64)` where `minValidTime` is taken from the maxt of the last persisted block and any samples below `minValidTime` will not be loaded from the wal in the head. The same value is used when using the `Heah.Appender()` to disallow adding samples below the `minValidTime` timestamp. This change was nececary to fix a bug where a `Snapshot()` with the head included would create a block with custom time range(not bound to the default time ranges) and the next block population from the head would create an overlapping block.
- https://github.com/prometheus/tsdb/issues/446
10 changes: 5 additions & 5 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block {
return b
}

// createPopulatedBlock creates a block with nSeries series, and nSamples samples.
func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Block {
// createPopulatedBlock creates a block with nSeries series, filled with
// samples of the given mint,maxt time range.
func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) *Block {
head, err := NewHead(nil, nil, nil, 2*60*60*1000)
testutil.Ok(tb, err)
defer head.Close()
Expand All @@ -87,12 +88,11 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Blo
testutil.Ok(tb, err)
refs := make([]uint64, nSeries)

for n := 0; n < nSamples; n++ {
for ts := mint; ts <= maxt; ts++ {
app := head.Appender()
ts := n * 1000
for i, lbl := range lbls {
if refs[i] != 0 {
err := app.AddFast(refs[i], int64(ts), rand.Float64())
err := app.AddFast(refs[i], ts, rand.Float64())
if err == nil {
continue
}
Expand Down
23 changes: 16 additions & 7 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,21 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if err != nil {
return nil, err
}
if err := db.head.Init(); err != nil {
return nil, errors.Wrap(err, "read WAL")
}

if err := db.reload(); err != nil {
return nil, err
}
// Set the min valid time for the ingested samples
// to be no lower than the maxt of the last block.
blocks := db.Blocks()
minValidTime := int64(math.MinInt64)
if len(blocks) > 0 {
minValidTime = blocks[len(blocks)-1].Meta().MaxTime
}

if err := db.head.Init(minValidTime); err != nil {
return nil, errors.Wrap(err, "read WAL")
}

go db.run()

Expand Down Expand Up @@ -395,7 +404,8 @@ func (db *DB) compact() (err error) {
if db.head.MaxTime()-db.head.MinTime() <= db.opts.BlockRanges[0]/2*3 {
break
}
mint, maxt := rangeForTimestamp(db.head.MinTime(), db.opts.BlockRanges[0])
mint := db.head.MinTime()
maxt := rangeForTimestamp(mint, db.opts.BlockRanges[0])

// Wrap head into a range that bounds all reads to it.
head := &rangeHead{
Expand Down Expand Up @@ -826,9 +836,8 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
return sq, nil
}

func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
mint = (t / width) * width
return mint, mint + width
func rangeForTimestamp(t int64, width int64) (maxt int64) {
return (t/width)*width + width
}

// Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis.
Expand Down
105 changes: 105 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -1199,6 +1200,11 @@ func TestQuerierWithBoundaryChunks(t *testing.T) {
testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count)
}

// TestInitializeHeadTimestamp ensures that the h.minTime is set properly.
// - no blocks no WAL: set to the time of the first appended sample
// - no blocks with WAL: set to the smallest sample from the WAL
// - with blocks no WAL: set to the last block maxT
// - with blocks with WAL: same as above
func TestInitializeHeadTimestamp(t *testing.T) {
t.Run("clean", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
Expand Down Expand Up @@ -1441,3 +1447,102 @@ func TestCorrectNumTombstones(t *testing.T) {
testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar")))
testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones)
}

// TestBlockRanges checks the following use cases:
// - No samples can be added with timestamps lower than the last block maxt.
// - The compactor doesn't create overlaping blocks
// even when the last blocks is not within the default boundaries.
// - Lower bondary is based on the smallest sample in the head and
// upper boundary is rounded to the configured block range.
//
// This ensures that a snapshot that includes the head and creates a block with a custom time range
// will not overlap with the first block created by the next compaction.
func TestBlockRanges(t *testing.T) {
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))

dir, err := ioutil.TempDir("", "test_storage")
if err != nil {
t.Fatalf("Opening test dir failed: %s", err)
}

rangeToTriggercompaction := DefaultOptions.BlockRanges[0]/2*3 + 1

// Test that the compactor doesn't create overlapping blocks
// when a non standard block already exists.
firstBlockMaxT := int64(3)
createPopulatedBlock(t, dir, 1, 0, firstBlockMaxT)
db, err := Open(dir, logger, nil, DefaultOptions)
if err != nil {
t.Fatalf("Opening test storage failed: %s", err)
}
defer func() {
os.RemoveAll(dir)
}()
app := db.Appender()
lbl := labels.Labels{{"a", "b"}}
_, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64())
if err == nil {
t.Fatalf("appending a sample with a timestamp covered by a previous block shouldn't be possible")
}
_, err = app.Add(lbl, firstBlockMaxT+1, rand.Float64())
testutil.Ok(t, err)
_, err = app.Add(lbl, firstBlockMaxT+2, rand.Float64())
testutil.Ok(t, err)
secondBlockMaxt := firstBlockMaxT + rangeToTriggercompaction
_, err = app.Add(lbl, secondBlockMaxt, rand.Float64()) // Add samples to trigger a new compaction

testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
for x := 1; x < 10; x++ {
if len(db.Blocks()) == 2 {
break
}
time.Sleep(100 * time.Millisecond)
}
testutil.Equals(t, 2, len(db.Blocks()), "no new block created after the set timeout")

if db.Blocks()[0].Meta().MaxTime > db.Blocks()[1].Meta().MinTime {
t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta())
}

// Test that wal records are skipped when an existing block covers the same time ranges
// and compaction doesn't create an overlapping block.
db.DisableCompactions()
_, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64())
testutil.Ok(t, err)
_, err = app.Add(lbl, secondBlockMaxt+2, rand.Float64())
testutil.Ok(t, err)
_, err = app.Add(lbl, secondBlockMaxt+3, rand.Float64())
testutil.Ok(t, err)
_, err = app.Add(lbl, secondBlockMaxt+4, rand.Float64())
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.Close())

thirdBlockMaxt := secondBlockMaxt + 2
createPopulatedBlock(t, dir, 1, secondBlockMaxt+1, thirdBlockMaxt)

db, err = Open(dir, logger, nil, DefaultOptions)
if err != nil {
t.Fatalf("Opening test storage failed: %s", err)
}
defer db.Close()
testutil.Equals(t, 3, len(db.Blocks()), "db doesn't include expected number of blocks")
testutil.Equals(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block")

app = db.Appender()
_, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggercompaction, rand.Float64()) // Trigger a compaction
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
for x := 1; x < 10; x++ {
if len(db.Blocks()) == 4 {
break
}
time.Sleep(100 * time.Millisecond)
}

testutil.Equals(t, 4, len(db.Blocks()), "no new block created after the set timeout")

if db.Blocks()[2].Meta().MaxTime > db.Blocks()[3].Meta().MinTime {
t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta())
}
}
36 changes: 22 additions & 14 deletions head.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type Head struct {
appendPool sync.Pool
bytesPool sync.Pool

minTime, maxTime int64
minTime, maxTime int64 // Current min and max of the samples included in the head.
minValidTime int64 // Mint allowed to be added to te head. It shouldn't be lower than the last persisted block.
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
lastSeriesID uint64

// All series addressable by their ID or hash.
Expand Down Expand Up @@ -300,13 +301,6 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) {
}

func (h *Head) loadWAL(r *wal.Reader) error {
minValidTime := h.MinTime()
// If the min time is still uninitialized (no persisted blocks yet),
// we accept all sample timestamps from the WAL.
if minValidTime == math.MaxInt64 {
minValidTime = math.MinInt64
}

// Track number of samples that referenced a series we don't know about
// for error reporting.
var unknownRefs uint64
Expand All @@ -327,7 +321,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
inputs[i] = make(chan []RefSample, 300)

go func(input <-chan []RefSample, output chan<- []RefSample) {
unknown := h.processWALSamples(minValidTime, input, output)
unknown := h.processWALSamples(h.minValidTime, input, output)
atomic.AddUint64(&unknownRefs, unknown)
wg.Done()
}(inputs[i], outputs[i])
Expand Down Expand Up @@ -410,7 +404,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
}
for _, s := range tstones {
for _, itv := range s.intervals {
if itv.Maxt < minValidTime {
if itv.Maxt < h.minValidTime {
continue
}
h.tombstones.addInterval(s.ref, itv)
Expand Down Expand Up @@ -443,8 +437,12 @@ func (h *Head) loadWAL(r *wal.Reader) error {
}

// Init loads data from the write ahead log and prepares the head for writes.
func (h *Head) Init() error {
// It should be called before using an appender so that
// limits the ingested samples to the head min valid time.
func (h *Head) Init(minValidTime int64) error {
h.minValidTime = minValidTime
defer h.postings.EnsureOrder()
defer h.gc() // After loading the wal remove the obsolete data from the head.

if h.wal == nil {
return nil
Expand Down Expand Up @@ -486,6 +484,7 @@ func (h *Head) Init() error {
if err := h.wal.Repair(err); err != nil {
return errors.Wrap(err, "repair corrupted WAL")
}

return nil
}

Expand Down Expand Up @@ -654,14 +653,23 @@ func (h *Head) Appender() Appender {

func (h *Head) appender() *headAppender {
return &headAppender{
head: h,
minValidTime: h.MaxTime() - h.chunkRange/2,
head: h,
// Set the minimum valid time to whichever is greater the head min valid time or the compaciton window.
// This ensures that no samples will be added within the compaction window to avoid races.
minValidTime: max(h.minValidTime, h.MaxTime()-h.chunkRange/2),
mint: math.MaxInt64,
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
}
}

func max(a, b int64) int64 {
if a > b {
return a
}
return b
}

func (h *Head) getAppendBuffer() []RefSample {
b := h.appendPool.Get()
if b == nil {
Expand Down Expand Up @@ -1411,7 +1419,7 @@ func (s *memSeries) cut(mint int64) *memChunk {

// Set upper bound on when the next chunk must be started. An earlier timestamp
// may be chosen dynamically at a later point.
_, s.nextAt = rangeForTimestamp(mint, s.chunkRange)
s.nextAt = rangeForTimestamp(mint, s.chunkRange)

app, err := c.chunk.Appender()
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package tsdb

import (
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
Expand Down Expand Up @@ -123,7 +124,7 @@ func TestHead_ReadWAL(t *testing.T) {
testutil.Ok(t, err)
defer head.Close()

testutil.Ok(t, head.Init())
testutil.Ok(t, head.Init(math.MinInt64))
testutil.Equals(t, uint64(100), head.lastSeriesID)

s10 := head.series.getByID(10)
Expand All @@ -132,7 +133,7 @@ func TestHead_ReadWAL(t *testing.T) {
s100 := head.series.getByID(100)

testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset)
testutil.Equals(t, labels.FromStrings("a", "2"), s11.lset)
testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage colected at head.Init().
testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset)
testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset)

Expand All @@ -146,7 +147,6 @@ func TestHead_ReadWAL(t *testing.T) {
}

testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0)))
testutil.Equals(t, 0, len(s11.chunks))
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0)))
testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0)))
}
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
testutil.Ok(t, err)
defer head.Close()

testutil.Ok(t, head.Init())
testutil.Ok(t, head.Init(math.MinInt64))

testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1")))
}
Expand Down Expand Up @@ -923,7 +923,7 @@ func TestWalRepair(t *testing.T) {

h, err := NewHead(nil, nil, w, 1)
testutil.Ok(t, err)
testutil.Ok(t, h.Init())
testutil.Ok(t, h.Init(math.MinInt64))

sr, err := wal.NewSegmentsReader(dir)
testutil.Ok(t, err)
Expand Down
2 changes: 1 addition & 1 deletion querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ func BenchmarkPersistedQueries(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_persisted")
testutil.Ok(b, err)
defer os.RemoveAll(dir)
block := createPopulatedBlock(b, dir, nSeries, nSamples)
block := createPopulatedBlock(b, dir, nSeries, 1, int64(nSamples))
defer block.Close()

q, err := NewBlockQuerier(block, block.Meta().MinTime, block.Meta().MaxTime)
Expand Down