Skip to content

Commit

Permalink
no overlapping on compaction when an existing block is not within def…
Browse files Browse the repository at this point in the history
…ault boundaries.

Signed-off-by: Krasi Georgiev <[email protected]>
  • Loading branch information
Krasi Georgiev committed Dec 1, 2018
1 parent 01e8296 commit b3240c8
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 7 deletions.
7 changes: 3 additions & 4 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block {
}

// createPopulatedBlock creates a block with nSeries series, and nSamples samples.
func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Block {
func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) *Block {
head, err := NewHead(nil, nil, nil, 2*60*60*1000)
testutil.Ok(tb, err)
defer head.Close()
Expand All @@ -87,12 +87,11 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Blo
testutil.Ok(tb, err)
refs := make([]uint64, nSeries)

for n := 0; n < nSamples; n++ {
for ts := mint; ts <= maxt; ts++ {
app := head.Appender()
ts := n * 1000
for i, lbl := range lbls {
if refs[i] != 0 {
err := app.AddFast(refs[i], int64(ts), rand.Float64())
err := app.AddFast(refs[i], ts, rand.Float64())
if err == nil {
continue
}
Expand Down
3 changes: 2 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ func (db *DB) compact() (err error) {
if db.head.MaxTime()-db.head.MinTime() <= db.opts.BlockRanges[0]/2*3 {
break
}
mint, maxt := rangeForTimestamp(db.head.MinTime(), db.opts.BlockRanges[0])
mint := db.head.MinTime()
_, maxt := rangeForTimestamp(mint, db.opts.BlockRanges[0])

// Wrap head into a range that bounds all reads to it.
head := &rangeHead{
Expand Down
111 changes: 111 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tsdb

import (
"flag"
"fmt"
"io/ioutil"
"math"
Expand All @@ -25,6 +26,7 @@ import (
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -1441,3 +1443,112 @@ func TestCorrectNumTombstones(t *testing.T) {
testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar")))
testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones)
}

// h.minTime in different scenarios:
// - no blocks no WAL: set to the time of the first appended sample
// - no blocks with WAL: set to the smallest sample from the WAL
// - with blocks no WAL: set to the last block maxT
// - with blocks with WAL: same as above
// TestBlockRanges checks the following use cases:
// - No samples can be added with timestamps lower than the last block maxt.
// - The compactor doesn't create overlaping blocks
// even when the last blocks is not within the default boundaries.
// - Lower bondary is based on the smallest sample in the head and
// upper boundary is rounded to the configured block range.
//
// This ensures that a snapshot that includes the head and creates a block with a custom time range
// will not overlap with the first block created by the next compaction.
func TestBlockRanges(t *testing.T) {
logger := log.NewNopLogger()
flag.Visit(func(f *flag.Flag) {
if f.Name == "test.v" && f.Value.String() == "true" {
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
}
})

dir, err := ioutil.TempDir("", "test_storage")
if err != nil {
t.Fatalf("Opening test dir failed: %s", err)
}

rangeToTriggercompaction := DefaultOptions.BlockRanges[0]/2*3 + 1

// Test that the compactor doesn't create overlapping blocks
// when a non standard block already exists.
firstBlockMaxT := int64(3)
createPopulatedBlock(t, dir, 1, 0, firstBlockMaxT)
db, err := Open(dir, logger, nil, DefaultOptions)
if err != nil {
t.Fatalf("Opening test storage failed: %s", err)
}
defer func() {
os.RemoveAll(dir)
}()
app := db.Appender()
lbl := labels.Labels{{"a", "b"}}
_, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64())
if err == nil {
t.Fatalf("appending a sample with a timestamp covered by a previous block shouldn't be possible")
}
_, err = app.Add(lbl, firstBlockMaxT+1, rand.Float64())
testutil.Ok(t, err)
_, err = app.Add(lbl, firstBlockMaxT+2, rand.Float64())
testutil.Ok(t, err)
secondBlockMaxt := firstBlockMaxT + rangeToTriggercompaction
_, err = app.Add(lbl, secondBlockMaxt, rand.Float64()) // Add samples to trigger a new compaction

testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
for x := 1; x < 10; x++ {
if len(db.Blocks()) == 2 {
break
}
time.Sleep(100 * time.Millisecond)
}
testutil.Equals(t, 2, len(db.Blocks()), "no new block created after the set timeout")

if db.Blocks()[0].Meta().MaxTime > db.Blocks()[1].Meta().MinTime {
t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta())
}

// Test that wal records are skipped when an existing block covers the same time ranges
// and compaction doesn't create an overlapping block.
db.DisableCompactions()
_, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64())
testutil.Ok(t, err)
_, err = app.Add(lbl, secondBlockMaxt+2, rand.Float64())
testutil.Ok(t, err)
_, err = app.Add(lbl, secondBlockMaxt+3, rand.Float64())
testutil.Ok(t, err)
_, err = app.Add(lbl, secondBlockMaxt+4, rand.Float64())
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.Close())

thirdBlockMaxt := secondBlockMaxt + 2
createPopulatedBlock(t, dir, 1, secondBlockMaxt+1, thirdBlockMaxt)

db, err = Open(dir, logger, nil, DefaultOptions)
if err != nil {
t.Fatalf("Opening test storage failed: %s", err)
}
defer db.Close()
testutil.Equals(t, 3, len(db.Blocks()), "db doesn't include expected number of blocks")
testutil.Equals(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block")

app = db.Appender()
_, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggercompaction, rand.Float64()) // Trigger a compaction
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
for x := 1; x < 10; x++ {
if len(db.Blocks()) == 4 {
break
}
time.Sleep(100 * time.Millisecond)
}

testutil.Equals(t, 4, len(db.Blocks()), "no new block created after the set timeout")

if db.Blocks()[2].Meta().MaxTime > db.Blocks()[3].Meta().MinTime {
t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta())
}
}
9 changes: 8 additions & 1 deletion head.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,13 +655,20 @@ func (h *Head) Appender() Appender {
func (h *Head) appender() *headAppender {
return &headAppender{
head: h,
minValidTime: h.MaxTime() - h.chunkRange/2,
minValidTime: max(h.MinTime(), h.MaxTime()-h.chunkRange/2),
mint: math.MaxInt64,
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
}
}

func max(a, b int64) int64 {
if a > b {
return a
}
return b
}

func (h *Head) getAppendBuffer() []RefSample {
b := h.appendPool.Get()
if b == nil {
Expand Down
2 changes: 1 addition & 1 deletion querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ func BenchmarkPersistedQueries(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_persisted")
testutil.Ok(b, err)
defer os.RemoveAll(dir)
block := createPopulatedBlock(b, dir, nSeries, nSamples)
block := createPopulatedBlock(b, dir, nSeries, 1, int64(nSamples))
defer block.Close()

q, err := NewBlockQuerier(block, block.Meta().MinTime, block.Meta().MaxTime)
Expand Down

0 comments on commit b3240c8

Please sign in to comment.