Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Don't write empty blocks #374

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
beb0d73
Dont write empty blocks
codesome Sep 8, 2018
bb76c82
Added unit test TestNoEmptyBlock
codesome Sep 12, 2018
c4edbcc
Fix infinite loop while compacting head.
codesome Sep 18, 2018
9de3926
Fix deletion of old blocks after no block is written.
codesome Sep 19, 2018
ab14c9c
Fix review comments
codesome Sep 19, 2018
268ae54
Merge remote-tracking branch 'upstream/master' into dont-write-empty-…
codesome Sep 21, 2018
281a493
Merge remote-tracking branch 'upstream/master' into dont-write-empty-…
codesome Sep 28, 2018
897dd33
Fixed infinite loop for head compaction
codesome Sep 28, 2018
9938162
Merge remote-tracking branch 'upstream/master' into dont-write-empty-…
codesome Oct 1, 2018
0faafec
Test fix attempt
codesome Oct 3, 2018
49631bb
Merge remote-tracking branch 'upstream/master' into dont-write-empty-…
codesome Nov 20, 2018
38a2c6b
Updated tests
codesome Nov 22, 2018
061971b
Fix review comments
codesome Nov 23, 2018
059dbd7
Merge remote-tracking branch 'upstream/master' into dont-write-empty-…
codesome Dec 1, 2018
2378d2b
Updated tests and added CHANGELOG entry
codesome Dec 1, 2018
98988fd
Merge remote-tracking branch 'upstream/master' into pull/374-review
Jan 16, 2019
e0bb757
rebased
Jan 16, 2019
efa23ce
Merge pull request #4 from krasi-georgiev/pull/374-review
codesome Jan 16, 2019
0eed036
Revert returning after empty block from head
codesome Jan 16, 2019
271b98f
Fix review comments
codesome Jan 16, 2019
a6a1779
Dont create empty blocks in tests
codesome Jan 16, 2019
2b509e7
nits
codesome Jan 17, 2019
ad9e3ed
simplified test
Jan 17, 2019
441ea45
Merge remote-tracking branch 'upstream/master' into pull/374-review
Jan 17, 2019
02dc732
solved the mistery for double compaction
Jan 17, 2019
162fa29
nit
Jan 17, 2019
9f24cde
less samples
Jan 17, 2019
2cb745f
Merge pull request #5 from krasi-georgiev/pull/374-review
codesome Jan 17, 2019
45bde4c
Nits
codesome Jan 17, 2019
6b87a56
revert some changes
Jan 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ type BlockMetaCompaction struct {
Level int `json:"level"`
// ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"`
// Indicates that during compaction it resulted in a block without any samples
// so it should be deleted on the next reload.
Deletable bool `json:"deletable,omitempty"`
// Short descriptions of the direct blocks that were used to create
// this block.
Parents []BlockDesc `json:"parents,omitempty"`
Expand Down
54 changes: 41 additions & 13 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type Compactor interface {

// Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan().
// Compaction resulting in an empty block are not written to the disk
// and marks all parents as deletable in its meta data.
Compact(dest string, dirs ...string) (ulid.ULID, error)
}

Expand Down Expand Up @@ -349,14 +351,29 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID,
meta := compactBlockMetas(uid, metas...)
err = c.write(dest, meta, blocks...)
if err == nil {
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
)

if meta.Stats.NumSamples == 0 {
level.Info(c.logger).Log(
"msg", "compact blocks [resulted in empty block]",
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
"count", len(blocks),
"sources", fmt.Sprintf("%v", uids),
)
for _, b := range bs {
b.meta.Compaction.Deletable = true
writeMetaFile(b.dir, &b.meta)
codesome marked this conversation as resolved.
Show resolved Hide resolved
}
uid = ulid.ULID{}
} else {
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
)
}

return uid, nil
}

Expand Down Expand Up @@ -395,6 +412,10 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
return uid, err
}

if meta.Stats.NumSamples == 0 {
return ulid.ULID{}, nil
codesome marked this conversation as resolved.
Show resolved Hide resolved
}

level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID)
return uid, nil
}
Expand Down Expand Up @@ -472,11 +493,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction")
}

if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}

// We are explicitly closing them here to check for error even
// though these are covered under defer. This is because in Windows,
// you cannot delete these unless they are closed and the defer is to
Expand All @@ -488,6 +504,18 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return errors.Wrap(err, "close index writer")
}

// Populated block is empty, so cleanup and exit.
if meta.Stats.NumSamples == 0 {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
if err := os.RemoveAll(tmp); err != nil {
return errors.Wrap(err, "remove tmp folder after empty block failed")
}
return nil
}

if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}

// Create an empty tombstones file.
if err := writeTombstoneFile(tmp, NewMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")
Expand Down
17 changes: 16 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,8 @@ func (db *DB) compact() (err error) {
// from the block interval here.
maxt: maxt - 1,
}
if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil {
uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil)
if err != nil {
return errors.Wrap(err, "persist head block")
}
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -410,6 +411,17 @@ func (db *DB) compact() (err error) {
if err := db.reload(); err != nil {
return errors.Wrap(err, "reload blocks")
}
if (uid == ulid.ULID{}) { // No block created.
// After this was fixed https://github.com/prometheus/tsdb/issues/309,
// Case 1: It is possible to have 0 blocks after compaction.
// db.reload() doesn't truncate the head when the block count is zero.
// Case 2: If there were blocks on disk, head will be truncated to a
// wrong value based on old blocks.
// Hence we need to truncate manually.
if err = db.head.Truncate(maxt); err != nil {
return errors.Wrap(err, "head truncate failed (in compact)")
}
}
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
runtime.GC()
}

Expand Down Expand Up @@ -494,6 +506,9 @@ func (db *DB) reload() (err error) {
corrupted[ulid] = err
continue
}
if meta.Compaction.Deletable {
deleteable[meta.ULID] = struct{}{}
}
if db.beyondRetention(meta) {
deleteable[meta.ULID] = struct{}{}
continue
Expand Down
91 changes: 91 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,6 +1303,97 @@ func TestInitializeHeadTimestamp(t *testing.T) {
})
}

func TestNoEmptyBlocks(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()

// Test no blocks after compact with empty head.
testutil.Ok(t, db.compact())
testutil.Equals(t, 0, len(db.blocks))
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved

// Test no blocks after deleting all samples from head.
blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")
codesome marked this conversation as resolved.
Show resolved Hide resolved

app := db.Appender()
for i := int64(0); i < 6; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
_, err = app.Add(label, i*blockRange+1000, 0)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())

testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar")))
uid, err := db.compactor.Write(db.dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil)
testutil.Ok(t, err)
testutil.Equals(t, ulid.ULID{}, uid)
testutil.Ok(t, db.reload())
testutil.Ok(t, db.head.Truncate(db.head.MaxTime()))
codesome marked this conversation as resolved.
Show resolved Hide resolved
// No blocks created.
testutil.Equals(t, 0, len(db.blocks))
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved

app = db.Appender()
for i := int64(7); i < 25; i++ {
for j := int64(0); j < 10; j++ {
_, err := app.Add(label, i*blockRange+j, 0)
testutil.Ok(t, err)
}
}
testutil.Ok(t, app.Commit())

testutil.Ok(t, db.compact())
codesome marked this conversation as resolved.
Show resolved Hide resolved
testutil.Assert(t, len(db.blocks) > 0, "No blocks created when compacting with >0 samples")

// When no new block is created from head, and there are some blocks on disk,
// compaction should not run into infinite loop (was seen during development).
app = db.Appender()
for i := int64(26); i < 30; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar")))
testutil.Ok(t, db.compact())

// Test no blocks remaining after deleting all samples from disk.
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar")))

// Mimicking Plan() of compactor and getting list
// of all block directories to pass for compaction.
codesome marked this conversation as resolved.
Show resolved Hide resolved
plan := []string{}
for _, b := range db.blocks {
plan = append(plan, b.Dir())
}

// Blocks are not set deletable before compaction.
codesome marked this conversation as resolved.
Show resolved Hide resolved
for _, b := range db.blocks {
meta, err := readMetaFile(b.dir)
testutil.Ok(t, err)
testutil.Assert(t, !meta.Compaction.Deletable, "Block was marked deletable before compaction")
}

// No new blocks are created by Compact, and marks all old blocks as deletable.
oldLen := len(db.blocks)
_, err = db.compactor.Compact(db.dir, plan...)
testutil.Ok(t, err)
// Number of blocks are the same.
testutil.Equals(t, oldLen, len(db.blocks))

// Marked as deletable.
for _, b := range db.blocks {
meta, err := readMetaFile(b.dir)
testutil.Ok(t, err)
testutil.Assert(t, meta.Compaction.Deletable, "Block was not marked deletable after compaction")
}

// Deletes the deletable blocks.
testutil.Ok(t, db.reload())
// All samples are deleted. No blocks should be remeianing after compact.
testutil.Equals(t, 0, len(db.blocks))
}

func TestCorrectNumTombstones(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
Expand Down