Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Fix deletion of old blocks after no block is written.
Browse files Browse the repository at this point in the history
With previous design if a block is not created from parent blocks, the parent block gets undeleted. This adds `Deletable` field in `BlockMetaCompaction` which helps find such parent blocks.

Signed-off-by: Ganesh Vernekar <[email protected]>
  • Loading branch information
codesome committed Sep 19, 2018
1 parent c4edbcc commit 9de3926
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 17 deletions.
4 changes: 4 additions & 0 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ type BlockMetaCompaction struct {
Level int `json:"level"`
// ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"`
// As we dont write empty blocks, we need this to mark
// if the block is deletable if in compaction this block
// resulted in an empty block.
Deletable bool `json:"deletable,omitempty"`
// Short descriptions of the direct blocks that were used to create
// this block.
Parents []BlockDesc `json:"parents,omitempty"`
Expand Down
31 changes: 23 additions & 8 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,29 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID,
meta := compactBlockMetas(uid, metas...)
err = c.write(dest, meta, blocks...)
if err == nil {
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
)

if meta.Stats.NumSamples == 0 {
level.Info(c.logger).Log(
"msg", "compact blocks [resulted in empty block]",
"count", len(blocks),
"sources", fmt.Sprintf("%v", uids),
)
for _, b := range bs {
b.meta.Compaction.Deletable = true
writeMetaFile(b.dir, &b.meta)
}
uid = ulid.ULID{}
} else {
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
)
}

return uid, nil
}

Expand Down
3 changes: 3 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,9 @@ func (db *DB) reload() (err error) {
corrupted[ulid] = err
continue
}
if meta.Compaction.Deletable {
deleteable[meta.ULID] = struct{}{}
}
if db.beyondRetention(meta) {
deleteable[meta.ULID] = struct{}{}
continue
Expand Down
27 changes: 18 additions & 9 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,7 @@ func TestNoEmptyBlock(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, 0, len(db.blocks))

// Test no blocks after deleting all samples from head.
blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")

Expand All @@ -1325,29 +1326,37 @@ func TestNoEmptyBlock(t *testing.T) {
err = app.Commit()
testutil.Ok(t, err)

// Test no blocks after deleting all samples from head.
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar")))
_, err = db.compact()
testutil.Ok(t, err)
// No blocks created.
testutil.Equals(t, 0, len(db.blocks))

// Test no blocks remaining after small samples are deleted from disk.
app = db.Appender()
for i := int64(3); i < 6; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
_, err = app.Add(label, i*blockRange+1000, 0)
testutil.Ok(t, err)
for i := int64(3); i < 20; i++ {
for j := int64(0); j < 10; j++ {
_, err := app.Add(label, i*blockRange+j, 0)
testutil.Ok(t, err)
}
}
err = app.Commit()
testutil.Ok(t, err)

_, err = db.compact()
testutil.Ok(t, err)
testutil.Equals(t, 1, len(db.blocks))
testutil.Assert(t, len(db.blocks) > 0, "No blocks created")

// No blocks after deleting all samples from disk.
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar")))
_, err = db.compact()

// Mimicking small part of compaction.
plan := []string{}
for _, b := range db.blocks {
plan = append(plan, b.Dir())
}
_, err = db.compactor.Compact(db.dir, plan...)
testutil.Ok(t, err)
testutil.Ok(t, db.reload())
// All samples are deleted. No blocks should be remeianing after compact.
testutil.Equals(t, 0, len(db.blocks))
}

0 comments on commit 9de3926

Please sign in to comment.