Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull/374 review #5

Merged
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@
- `OpenBlock` signature changed to take a logger.
- [REMOVED] `PrefixMatcher` is considered unused so was removed.
- [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere.
- [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read.

## 0.3.1
- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers.
9 changes: 4 additions & 5 deletions compact.go
Original file line number Diff line number Diff line change
@@ -191,19 +191,18 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
return res, nil
}

// Compact any blocks that have >5% tombstones.
// Compact any blocks with big enough time range that have >5% tombstones.
for i := len(dms) - 1; i >= 0; i-- {
meta := dms[i].meta
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
break
continue
}

if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
return []string{dms[i].dir}, nil
res = append(res, dms[i].dir)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid running plan multiple times.

}
}

return nil, nil
return res, nil
}

// selectDirs returns the dir metas that should be compacted into a single new block.
155 changes: 83 additions & 72 deletions db_test.go
Original file line number Diff line number Diff line change
@@ -1365,93 +1365,104 @@ func TestInitializeHeadTimestamp(t *testing.T) {
}

func TestNoEmptyBlocks(t *testing.T) {
db, close := openTestDB(t, nil)
db, close := openTestDB(t, &Options{
BlockRanges: []int64{100},
})
defer close()
defer db.Close()
db.DisableCompactions()

// Test no blocks after compact with empty head.
testutil.Ok(t, db.compact())
bb, err := blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(bb))
testutil.Equals(t, 0, len(bb))

// Test no blocks after deleting all samples from head.
rangeToTriggercompaction := DefaultOptions.BlockRanges[0]/2*3 + 1
rangeToTriggercompaction := db.opts.BlockRanges[0] / 2 * 3
defaultLabel := labels.FromStrings("foo", "bar")
defaultMatcher := labels.NewEqualMatcher(defaultLabel[0].Name, defaultLabel[0].Value)
defaultMatcher := labels.NewMustRegexpMatcher("", ".*")

app := db.Appender()
for i := int64(0); i < 6; i++ {
_, err := app.Add(defaultLabel, i*rangeToTriggercompaction, 0)
t.Run("Test no blocks after compact with empty head.", func(t *testing.T) {
testutil.Ok(t, db.compact())
actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Equals(t, 0, len(actBlocks))
})

t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) {
app := db.Appender()
_, err := app.Add(defaultLabel, 1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, i*rangeToTriggercompaction+1000, 0)
_, err = app.Add(defaultLabel, 2, 0)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
_, err = app.Add(defaultLabel, 3+rangeToTriggercompaction, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Equals(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "initial compaction count should be zero")
testutil.Ok(t, db.compact())
testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")

testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Ok(t, db.compact())
actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Equals(t, 0, len(actBlocks))

app = db.Appender()
_, err = app.Add(defaultLabel, 1, 0)
testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed")
currentTime := db.Head().MaxTime()
_, err = app.Add(defaultLabel, currentTime, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())

// No blocks created.
bb, err = blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(bb))
testutil.Equals(t, 0, len(bb))
testutil.Ok(t, db.compact())
testutil.Equals(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
actBlocks, err = blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Assert(t, len(actBlocks) == 1, "No blocks created when compacting with >0 samples")
})

app = db.Appender()
for i := int64(7); i < 25; i++ {
for j := int64(0); j < 10; j++ {
_, err := app.Add(defaultLabel, i*rangeToTriggercompaction+j, 0)
testutil.Ok(t, err)
t.Run(`When no new block is created from head, and there are some blocks on disk
compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) {
oldBlocks := db.Blocks()
app := db.Appender()
currentTime := db.Head().MaxTime()
_, err := app.Add(defaultLabel, currentTime, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction-3, 0) // ?????????????
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Ok(t, db.compact())
testutil.Equals(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
testutil.Equals(t, oldBlocks, db.Blocks())
})

t.Run("Test no blocks remaining after deleting all samples from disk.", func(t *testing.T) {
currentTime := db.Head().MaxTime()
blocks := []*BlockMeta{
{MinTime: currentTime, MaxTime: currentTime + db.opts.BlockRanges[0]}, //
{MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]},
}
for _, m := range blocks {
createBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime)
}
}
testutil.Ok(t, app.Commit())

testutil.Ok(t, db.compact())
bb, err = blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(bb))
testutil.Assert(t, len(bb) > 0, "No blocks created when compacting with >0 samples")
oldBlocks := db.Blocks()
testutil.Ok(t, db.reload()) // Reload the db to register the new blocks.
testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered.
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Ok(t, db.compact())
testutil.Equals(t, 4, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here as all blocks have tombstones")

// When no new block is created from head, and there are some blocks on disk,
// compaction should not run into infinite loop (was seen during development).
oldBlocks := db.Blocks()
app = db.Appender()
for i := int64(26); i < 30; i++ {
_, err := app.Add(defaultLabel, i*rangeToTriggercompaction, 0)
actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Ok(t, db.compact())
testutil.Equals(t, oldBlocks, db.Blocks())

// Test no blocks remaining after deleting all samples from disk.
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))

// Mimicking Plan() of compactor and getting list
// of all block directories to pass for compaction.
plan := []string{}
for _, b := range db.Blocks() {
plan = append(plan, b.Dir())
}

// No new blocks are created by Compact, and marks all old blocks as deletable.
oldBlocks = db.Blocks()
_, err = db.compactor.Compact(db.dir, plan, db.Blocks())
testutil.Ok(t, err)
// Blocks are the same.
testutil.Equals(t, oldBlocks, db.Blocks())

// Deletes the deletable blocks.
testutil.Ok(t, db.reload())
// All samples are deleted. No blocks should be remaining after compact.
bb, err = blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(bb))
testutil.Equals(t, 0, len(bb))
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Equals(t, 1, len(actBlocks), "All samples are deleted. Only the most recent block should remain after compaction.")
})
}

func TestDB_LabelNames(t *testing.T) {
Loading