diff --git a/compact.go b/compact.go index 82737eb0..d4a2dd30 100644 --- a/compact.go +++ b/compact.go @@ -59,6 +59,8 @@ type Compactor interface { // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). + // Compaction resulting in an empty block are not written to the disk + // and marks all parents as deletable in its meta data. Compact(dest string, dirs ...string) (ulid.ULID, error) } @@ -493,6 +495,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "close index writer") } + // Populated block is empty, so cleanup and exit. if meta.Stats.NumSamples == 0 { if err := os.RemoveAll(tmp); err != nil { return errors.Wrap(err, "remove tmp folder after empty block failed") diff --git a/db_test.go b/db_test.go index f3c3cef6..83e30dcf 100644 --- a/db_test.go +++ b/db_test.go @@ -1302,7 +1302,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { }) } -func TestNoEmptyBlock(t *testing.T) { +func TestNoEmptyBlocks(t *testing.T) { db, close := openTestDB(t, nil) defer close() defer db.Close() @@ -1317,7 +1317,7 @@ func TestNoEmptyBlock(t *testing.T) { label := labels.FromStrings("foo", "bar") app := db.Appender() - for i := int64(0); i < 3; i++ { + for i := int64(0); i < 6; i++ { _, err := app.Add(label, i*blockRange, 0) testutil.Ok(t, err) _, err = app.Add(label, i*blockRange+1000, 0) @@ -1326,15 +1326,18 @@ func TestNoEmptyBlock(t *testing.T) { err = app.Commit() testutil.Ok(t, err) + oldHeadMinT := db.head.MinTime() testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) _, err = db.compact() testutil.Ok(t, err) + // Making sure that head was modified. + testutil.Assert(t, oldHeadMinT < db.head.MinTime(), "Head was not changed after compaction.") // No blocks created. testutil.Equals(t, 0, len(db.blocks)) - // Test no blocks remaining after small samples are deleted from disk. + // Test no blocks remaining after deleting all samples from disk. app = db.Appender() - for i := int64(3); i < 20; i++ { + for i := int64(7); i < 25; i++ { for j := int64(0); j < 10; j++ { _, err := app.Add(label, i*blockRange+j, 0) testutil.Ok(t, err) @@ -1345,17 +1348,39 @@ func TestNoEmptyBlock(t *testing.T) { _, err = db.compact() testutil.Ok(t, err) - testutil.Assert(t, len(db.blocks) > 0, "No blocks created") + testutil.Assert(t, len(db.blocks) > 0, "No blocks created when compacting with >0 samples") testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) - // Mimicking small part of compaction. + // Mimicking Plan() of compactor and getting list + // of all block directories to pass for compaction. plan := []string{} for _, b := range db.blocks { plan = append(plan, b.Dir()) } + + // Blocks are not set deletable before compaction. + for _, b := range db.blocks { + meta, err := readMetaFile(b.dir) + testutil.Ok(t, err) + testutil.Assert(t, !meta.Compaction.Deletable, "Block was marked deletable before compaction") + } + + // No new blocks are created by Compact, and marks all old blocks as deletable. + oldLen := len(db.blocks) _, err = db.compactor.Compact(db.dir, plan...) testutil.Ok(t, err) + // Number of blocks are the same. + testutil.Equals(t, oldLen, len(db.blocks)) + + // Marked as deletable. + for _, b := range db.blocks { + meta, err := readMetaFile(b.dir) + testutil.Ok(t, err) + testutil.Assert(t, meta.Compaction.Deletable, "Block was not marked deletable after compaction") + } + + // Deletes the deletable blocks. testutil.Ok(t, db.reload()) // All samples are deleted. No blocks should be remeianing after compact. testutil.Equals(t, 0, len(db.blocks))