diff --git a/cleaner.go b/cleaner.go index f9fa43b32c..64b6f3157f 100644 --- a/cleaner.go +++ b/cleaner.go @@ -27,10 +27,11 @@ type DeleteCleaner = base.DeleteCleaner type ArchiveCleaner = base.ArchiveCleaner type cleanupManager struct { - opts *Options - objProvider objstorage.Provider - onTableDeleteFn func(fileSize uint64) - deletePacer *deletionPacer + opts *Options + objProvider objstorage.Provider + onTableDeleteFn func(fileSize uint64) + deletePacer *deletionPacer + deletedFileSizePendingSync uint64 // jobsCh is used as the cleanup job queue. jobsCh chan *cleanupJob @@ -160,7 +161,7 @@ func (cm *cleanupManager) mainLoop() { } else { cm.maybePace(&tb, of.fileType, of.fileNum, of.fileSize) cm.onTableDeleteFn(of.fileSize) - cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.fileNum) + cm.deleteObsoleteObjectAndSyncIfRequired(job.jobID, of.fileNum, of.fileSize) } } cm.mu.Lock() @@ -171,6 +172,24 @@ func (cm *cleanupManager) mainLoop() { } } +func (cm *cleanupManager) deleteObsoleteObjectAndSyncIfRequired(jobID int, fileNum base.DiskFileNum, fileSize uint64) { + var deleteThresholdForSync uint64 = 64 * 1024 * 1024 // 64 MB + + deleted := cm.deleteObsoleteObject(fileTypeTable, jobID, fileNum) + if !deleted { + return + } + + cm.deletedFileSizePendingSync += fileSize + if cm.deletedFileSizePendingSync < deleteThresholdForSync { + return + } + + if err := cm.objProvider.Sync(); err == nil { + cm.deletedFileSizePendingSync = 0 + } +} + func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNum base.DiskFileNum) bool { if fileType != fileTypeTable { return false @@ -245,7 +264,7 @@ func (cm *cleanupManager) deleteObsoleteFile( func (cm *cleanupManager) deleteObsoleteObject( fileType fileType, jobID int, fileNum base.DiskFileNum, -) { +) bool { if fileType != fileTypeTable { panic("not an object") } @@ -258,8 +277,9 @@ func (cm *cleanupManager) deleteObsoleteObject( path = cm.objProvider.Path(meta) err = cm.objProvider.Remove(fileType, fileNum) } + if cm.objProvider.IsNotExistError(err) { - return + return false } switch fileType { @@ -271,6 +291,7 @@ func (cm *cleanupManager) deleteObsoleteObject( Err: err, }) } + return true } // maybeLogLocked issues a log if the job queue gets 75% full and issues a log diff --git a/cleaner_test.go b/cleaner_test.go index 11d9ab9d4a..132c46138b 100644 --- a/cleaner_test.go +++ b/cleaner_test.go @@ -7,6 +7,7 @@ package pebble import ( "fmt" "sort" + "strconv" "strings" "testing" @@ -119,12 +120,21 @@ func TestCleaner(t *testing.T) { return memLog.String() case "create-bogus-file": - if len(td.CmdArgs) != 1 { - return "create-bogus-file " + if len(td.CmdArgs) < 1 { + return "create-bogus-file [size]" } dst, err := fs.Create(td.CmdArgs[0].String()) require.NoError(t, err) - _, err = dst.Write([]byte("bogus data")) + var byteStream []byte + if len(td.CmdArgs) == 2 { + num, err := strconv.Atoi(td.CmdArgs[1].String()) + require.NoError(t, err) + byteStream = make([]byte, num) + } else { + byteStream = []byte("bogus data") + } + + _, err = dst.Write(byteStream) require.NoError(t, err) require.NoError(t, dst.Sync()) require.NoError(t, dst.Close()) diff --git a/testdata/cleaner b/testdata/cleaner index cd96e7d2fb..8c3501fa04 100644 --- a/testdata/cleaner +++ b/testdata/cleaner @@ -187,7 +187,7 @@ create: db1/000123.sst sync: db1/000123.sst close: db1/000123.sst -create-bogus-file db1/000456.sst +create-bogus-file db1/000456.sst 100000000 ---- create: db1/000456.sst sync: db1/000456.sst @@ -231,6 +231,7 @@ remove: db1_wal/000002.log remove: db1_wal/000004.log remove: db1/000123.sst remove: db1/000456.sst +sync: db1 remove: db1/OPTIONS-000003 list db1