Skip to content

Commit

Permalink
Add DB.CheckLevels() that compares sequence number across all levels for
Browse files Browse the repository at this point in the history
consistency.
  • Loading branch information
sumeerbhola committed Dec 11, 2019
1 parent 382a403 commit 67899d5
Show file tree
Hide file tree
Showing 16 changed files with 930 additions and 28 deletions.
12 changes: 10 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,11 @@ func (d *DB) flush1() error {
if err == nil {
flushed = d.mu.mem.queue[:n]
d.mu.mem.queue = d.mu.mem.queue[n:]
d.updateReadStateLocked()
var checker func() error
if d.opts.DebugCheck {
checker = func() error { return d.CheckLevels(nil) }
}
d.updateReadStateLocked(checker)
}

d.deleteObsoleteFiles(jobID)
Expand Down Expand Up @@ -1032,7 +1036,11 @@ func (d *DB) compact1() (err error) {
// there are no references obsolete tables will be added to the obsolete
// table list.
if err == nil {
d.updateReadStateLocked()
var checker func() error
if d.opts.DebugCheck {
checker = func() error { return d.CheckLevels(nil) }
}
d.updateReadStateLocked(checker)
}
d.deleteObsoleteFiles(jobID)

Expand Down
5 changes: 4 additions & 1 deletion compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ func TestCompaction(t *testing.T) {
d, err := Open("", &Options{
FS: mem,
MemTableSize: memTableSize,
DebugCheck: true,
})
if err != nil {
t.Fatalf("Open: %v", err)
Expand Down Expand Up @@ -719,7 +720,8 @@ func TestManualCompaction(t *testing.T) {
}

d, err := Open("", &Options{
FS: mem,
FS: mem,
DebugCheck: true,
})
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1310,6 +1312,7 @@ func TestFlushInvariant(t *testing.T) {
runtime.Goexit() // ensure we don't try to reschedule the flush
},
},
DebugCheck: true,
})
if err != nil {
t.Fatal(err)
Expand Down
4 changes: 2 additions & 2 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
if !d.mu.mem.mutable.empty() {
d.mu.mem.mutable = newMemTable(d.opts, 0 /* size */, nil /* reservation */)
d.mu.mem.queue = append(d.mu.mem.queue, d.mu.mem.mutable)
d.updateReadStateLocked()
d.updateReadStateLocked(nil)
}
mem = d.mu.mem.mutable
fields = fields[1:]
Expand Down Expand Up @@ -366,7 +366,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
if err := d.mu.versions.logAndApply(jobID, ve, nil, d.dataDir); err != nil {
return nil, err
}
d.updateReadStateLocked()
d.updateReadStateLocked(nil)
for i := range ve.NewFiles {
meta := &ve.NewFiles[i].Meta
delete(d.mu.compact.pendingOutputs, meta.FileNum)
Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,7 +1258,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error {
// that was not flushed).
d.mu.mem.mutable.logNum = newLogNum
d.mu.mem.queue = append(d.mu.mem.queue, d.mu.mem.mutable)
d.updateReadStateLocked()
d.updateReadStateLocked(nil)
if imm.writerUnref() {
d.maybeScheduleFlush()
}
Expand Down
6 changes: 5 additions & 1 deletion ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,11 @@ func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error)
if err := d.mu.versions.logAndApply(jobID, ve, metrics, d.dataDir); err != nil {
return nil, err
}
d.updateReadStateLocked()
var checker func() error
if d.opts.DebugCheck {
checker = func() error { return d.CheckLevels(nil) }
}
d.updateReadStateLocked(checker)
d.deleteObsoleteFiles(jobID)
// The ingestion may have pushed a level over the threshold for compaction,
// so check to see if one is necessary and schedule it.
Expand Down
1 change: 1 addition & 0 deletions internal/metamorphic/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (t *test) init(h *history, dir string, opts *pebble.Options) error {
t.opts = opts.EnsureDefaults()
t.opts.Logger = h.Logger()
t.opts.EventListener = pebble.MakeLoggingEventListener(t.opts.Logger)
t.opts.DebugCheck = true

// If an error occurs and we were using an in-memory FS, attempt to clone to
// on-disk in order to allow post-mortem debugging. Note that always using
Expand Down
Loading

0 comments on commit 67899d5

Please sign in to comment.