diff --git a/levels.go b/levels.go index b823dc346..0801a8d4d 100644 --- a/levels.go +++ b/levels.go @@ -442,11 +442,8 @@ func (s *levelsController) compactBuildTables( botTables := cd.bot // Check overlap of the top level with the levels which are not being - // compacted in this compaction. We don't need to check overlap of the bottom - // tables with other levels because if the top tables overlap with any of the lower - // levels, it implies bottom level also overlaps because top and bottom tables - // overlap with each other. - hasOverlap := s.checkOverlap(cd.top, cd.nextLevel.level+1) + // compacted in this compaction. + hasOverlap := s.checkOverlap(cd.allTables(), cd.nextLevel.level+1) // Try to collect stats so that we can inform value log about GC. That would help us find which // value log file should be GCed. @@ -710,6 +707,13 @@ func (cd *compactDef) unlockLevels() { cd.thisLevel.RUnlock() } +func (cd *compactDef) allTables() []*table.Table { + ret := make([]*table.Table, 0, len(cd.top)+len(cd.bot)) + ret = append(ret, cd.top...) + ret = append(ret, cd.bot...) + return ret +} + func (s *levelsController) fillTablesL0(cd *compactDef) bool { cd.lockLevels() defer cd.unlockLevels() diff --git a/levels_test.go b/levels_test.go index 3891dc36d..027f9db60 100644 --- a/levels_test.go +++ b/levels_test.go @@ -296,6 +296,40 @@ func TestCompaction(t *testing.T) { getAllAndCheck(t, db, []keyValVersion{}) }) }) + t.Run("with bottom overlap", func(t *testing.T) { + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l1 := []keyValVersion{{"foo", "bar", 3, bitDelete}} + l2 := []keyValVersion{{"foo", "bar", 2, 0}, {"fooz", "baz", 2, bitDelete}} + l3 := []keyValVersion{{"fooz", "baz", 1, 0}} + createAndOpen(db, l1, 1) + createAndOpen(db, l2, 2) + createAndOpen(db, l3, 3) + + // Set a high discard timestamp so that all the keys are below the discard timestamp. + db.SetDiscardTs(10) + + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 3, bitDelete}, + {"foo", "bar", 2, 0}, + {"fooz", "baz", 2, bitDelete}, + {"fooz", "baz", 1, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[1], + nextLevel: db.lc.levels[2], + top: db.lc.levels[1].tables, + bot: db.lc.levels[2].tables, + } + require.NoError(t, db.lc.runCompactDef(1, cdef)) + // the top table at L1 doesn't overlap L3, but the bottom table at L2 + // does, delete keys should not be removed. + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 3, bitDelete}, + {"fooz", "baz", 2, bitDelete}, + {"fooz", "baz", 1, 0}, + }) + }) + }) t.Run("without overlap", func(t *testing.T) { runBadgerTest(t, &opt, func(t *testing.T, db *DB) { l1 := []keyValVersion{{"foo", "bar", 3, bitDelete}, {"fooz", "baz", 1, bitDelete}}