diff --git a/levels.go b/levels.go index 2db260fb9..764fcd944 100644 --- a/levels.go +++ b/levels.go @@ -269,7 +269,8 @@ func (s *levelsController) dropTree() (int, error) { // dropPrefix runs a L0->L1 compaction, and then runs same level compaction on the rest of the // levels. For L0->L1 compaction, it runs compactions normally, but skips over all the keys with the -// provided prefix. For Li->Li compactions, it picks up the tables which would have the prefix. The +// provided prefix and also the internal move keys for the same prefix. +// For Li->Li compactions, it picks up the tables which would have the prefix. The // tables who only have keys with this prefix are quickly dropped. The ones which have other keys // are run through MergeIterator and compacted to create new tables. All the mechanisms of // compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow. @@ -298,13 +299,15 @@ func (s *levelsController) dropPrefix(prefix []byte) error { } var tables []*table.Table + // Internal move keys related to the given prefix should also be skipped. + moveKeyForPrefix := append(badgerMove, prefix...) + prefixesToSkip := [][]byte{prefix, moveKeyForPrefix} for _, table := range l.tables { var absent bool switch { - case bytes.HasPrefix(table.Smallest(), prefix): - case bytes.HasPrefix(table.Biggest(), prefix): - case bytes.Compare(prefix, table.Smallest()) > 0 && - bytes.Compare(prefix, table.Biggest()) < 0: + case hasAnyPrefixes(table.Smallest(), prefixesToSkip): + case hasAnyPrefixes(table.Biggest(), prefixesToSkip): + case containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixesToSkip): default: absent = true } @@ -532,9 +535,12 @@ func (s *levelsController) compactBuildTables( bopts.BfCache = s.kv.bfCache builder := table.NewTableBuilder(bopts) var numKeys, numSkips uint64 + // Internal move keys related to the given prefix should also be skipped. + moveKeyForPrefix := append(badgerMove, cd.dropPrefix...) + prefixesToSkip := [][]byte{cd.dropPrefix, moveKeyForPrefix} for ; it.Valid(); it.Next() { // See if we need to skip the prefix. - if len(cd.dropPrefix) > 0 && bytes.HasPrefix(it.Key(), cd.dropPrefix) { + if len(cd.dropPrefix) > 0 && hasAnyPrefixes(it.Key(), prefixesToSkip) { numSkips++ updateStats(it.Value()) continue @@ -699,6 +705,27 @@ func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeS return pb.ManifestChangeSet{Changes: changes} } +func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool { + for _, prefix := range listOfPrefixes { + if bytes.HasPrefix(s, prefix) { + return true + } + } + + return false +} + +func containsAnyPrefixes(smallValue, largeValue []byte, listOfPrefixes [][]byte) bool { + for _, prefix := range listOfPrefixes { + if bytes.Compare(prefix, smallValue) > 0 && + bytes.Compare(prefix, largeValue) < 0 { + return true + } + } + + return false +} + type compactDef struct { elog trace.Trace