Skip to content

Commit

Permalink
compaction: remove redundant cleanup code
Browse files Browse the repository at this point in the history
The main compaction code deletes all created tables in case of error.
The calling code also deletes them by declaring them obsolete; this is
unnecessary. This change makes the copy compaction code also delete
the object on error, and removes the outer handling code (along with
all `pendingOutputs` returns).
  • Loading branch information
RaduBerinde committed May 8, 2024
1 parent 57caca1 commit 6a357e6
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 91 deletions.
128 changes: 39 additions & 89 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,6 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
startTime := d.timeNow()

var ve *manifest.VersionEdit
var pendingOutputs []compactionOutput
var stats compactStats
// To determine the target level of the files in the ingestedFlushable, we
// need to acquire the logLock, and not release it for that duration. Since,
Expand All @@ -1481,7 +1480,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
// runCompaction. For all other flush cases, we construct the VersionEdit
// inside runCompaction.
if c.kind != compactionKindIngestedFlushable {
ve, pendingOutputs, stats, err = d.runCompaction(jobID, c)
ve, stats, err = d.runCompaction(jobID, c)
}

// Acquire logLock. This will be released either on an error, by way of
Expand Down Expand Up @@ -1572,23 +1571,6 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) })
if err != nil {
info.Err = err
// TODO(peter): untested.
for _, f := range pendingOutputs {
// Note that the FileBacking for the file metadata might not have
// been set yet. So, we directly use the FileNum. Since these
// files were generated as compaction outputs, these must be
// physical files on disk. This property might not hold once
// https://github.com/cockroachdb/pebble/issues/389 is
// implemented if #389 creates virtual sstables as output files.
d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, tableInfo{
fileInfo: fileInfo{
FileNum: base.PhysicalTableDiskFileNum(f.meta.FileNum),
FileSize: f.meta.Size,
},
isLocal: f.isLocal,
})
}
d.mu.versions.updateObsoleteTableMetricsLocked()
}
} else {
// We won't be performing the logAndApply step because of the error,
Expand Down Expand Up @@ -2169,7 +2151,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
d.opts.EventListener.CompactionBegin(info)
startTime := d.timeNow()

ve, pendingOutputs, stats, err := d.runCompaction(jobID, c)
ve, stats, err := d.runCompaction(jobID, c)

info.Duration = d.timeNow().Sub(startTime)
if err == nil {
Expand All @@ -2194,25 +2176,6 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
return d.getInProgressCompactionInfoLocked(c)
})
}()
if err != nil {
// TODO(peter): untested.
for _, f := range pendingOutputs {
// Note that the FileBacking for the file metadata might not have
// been set yet. So, we directly use the FileNum. Since these
// files were generated as compaction outputs, these must be
// physical files on disk. This property might not hold once
// https://github.com/cockroachdb/pebble/issues/389 is
// implemented if #389 creates virtual sstables as output files.
d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, tableInfo{
fileInfo: fileInfo{
FileNum: base.PhysicalTableDiskFileNum(f.meta.FileNum),
FileSize: f.meta.Size,
},
isLocal: f.isLocal,
})
}
d.mu.versions.updateObsoleteTableMetricsLocked()
}
}

info.Done = true
Expand Down Expand Up @@ -2271,7 +2234,7 @@ func (d *DB) runCopyCompaction(
inputMeta *fileMetadata,
objMeta objstorage.ObjectMetadata,
ve *versionEdit,
) (pendingOutputs []compactionOutput, retErr error) {
) error {
ctx := context.TODO()

if !objMeta.IsExternal() {
Expand Down Expand Up @@ -2338,33 +2301,37 @@ func (d *DB) runCopyCompaction(
d.mu.Unlock()
defer d.mu.Lock()

deleteOnExit := false
defer func() {
if deleteOnExit {
_ = d.objProvider.Remove(fileTypeTable, newMeta.FileBacking.DiskFileNum)
}
}()

// If the src obj is external, we're doing an external to local/shared copy.
if objMeta.IsExternal() {
src, err := d.objProvider.OpenForReading(
ctx, fileTypeTable, inputMeta.FileBacking.DiskFileNum, objstorage.OpenOptions{},
)
if err != nil {
return pendingOutputs, err
return err
}
defer func() {
if src != nil {
src.Close()
}
}()

w, outObjMeta, err := d.objProvider.Create(
ctx, fileTypeTable, base.PhysicalTableDiskFileNum(newMeta.FileNum),
w, _, err := d.objProvider.Create(
ctx, fileTypeTable, newMeta.FileBacking.DiskFileNum,
objstorage.CreateOptions{
PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
},
)
if err != nil {
return pendingOutputs, err
return err
}
pendingOutputs = append(pendingOutputs, compactionOutput{
meta: newMeta,
isLocal: !outObjMeta.IsRemote(),
})
deleteOnExit = true

start, end := newMeta.Smallest, newMeta.Largest
if newMeta.SyntheticPrefix.IsSet() {
Expand All @@ -2386,42 +2353,35 @@ func (d *DB) runCopyCompaction(
)
src = nil // We passed src to CopySpan; it's responsible for closing it.
if err != nil {
return pendingOutputs, err
return err
}
newMeta.FileBacking.Size = wrote
newMeta.Size = wrote
} else {
pendingOutputs = append(pendingOutputs, compactionOutput{
meta: newMeta.PhysicalMeta().FileMetadata,
isLocal: true,
})
_, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
d.objProvider.Path(objMeta), fileTypeTable, newMeta.FileBacking.DiskFileNum,
objstorage.CreateOptions{PreferSharedStorage: true})

if err != nil {
return pendingOutputs, err
return err
}
deleteOnExit = true
}
ve.NewFiles[0].Meta = newMeta
if newMeta.Virtual {
ve.CreatedBackingTables = []*fileBacking{newMeta.FileBacking}
}

if err := d.objProvider.Sync(); err != nil {
return pendingOutputs, err
return err
}
return pendingOutputs, nil
}

type compactionOutput struct {
meta *fileMetadata
isLocal bool
deleteOnExit = false
return nil
}

func (d *DB) runDeleteOnlyCompaction(
jobID JobID, c *compaction,
) (ve *versionEdit, pendingOutputs []compactionOutput, stats compactStats, retErr error) {
) (ve *versionEdit, stats compactStats, retErr error) {
c.metrics = make(map[int]*LevelMetrics, len(c.inputs))
ve = &versionEdit{
DeletedFiles: map[deletedFileEntry]*fileMetadata{},
Expand All @@ -2437,12 +2397,12 @@ func (d *DB) runDeleteOnlyCompaction(
}
c.metrics[cl.level] = levelMetrics
}
return ve, nil, stats, nil
return ve, stats, nil
}

func (d *DB) runMoveOrCopyCompaction(
jobID JobID, c *compaction,
) (ve *versionEdit, pendingOutputs []compactionOutput, stats compactStats, _ error) {
) (ve *versionEdit, stats compactStats, _ error) {
iter := c.startLevel.files.Iter()
meta := iter.First()
if invariants.Enabled {
Expand All @@ -2451,11 +2411,11 @@ func (d *DB) runMoveOrCopyCompaction(
}
}
if c.cancel.Load() {
return ve, nil, stats, ErrCancelledCompaction
return ve, stats, ErrCancelledCompaction
}
objMeta, err := d.objProvider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
if err != nil {
return ve, pendingOutputs, stats, err
return ve, stats, err
}
c.metrics = map[int]*LevelMetrics{
c.outputLevel.level: {
Expand All @@ -2472,11 +2432,11 @@ func (d *DB) runMoveOrCopyCompaction(
},
}
if c.kind == compactionKindMove {
return ve, nil, stats, nil
return ve, stats, nil
}

pendingOutputs, err = d.runCopyCompaction(jobID, c, meta, objMeta, ve)
return ve, pendingOutputs, stats, err
err = d.runCopyCompaction(jobID, c, meta, objMeta, ve)
return ve, stats, err
}

// runCompaction runs a compaction that produces new on-disk tables from
Expand All @@ -2488,7 +2448,7 @@ func (d *DB) runMoveOrCopyCompaction(
// re-acquired during the course of this method.
func (d *DB) runCompaction(
jobID JobID, c *compaction,
) (ve *versionEdit, pendingOutputs []compactionOutput, stats compactStats, retErr error) {
) (ve *versionEdit, stats compactStats, retErr error) {
switch c.kind {
case compactionKindDeleteOnly:
return d.runDeleteOnlyCompaction(jobID, c)
Expand All @@ -2498,12 +2458,6 @@ func (d *DB) runCompaction(
panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.")
}

defer func() {
if retErr != nil {
pendingOutputs = nil
}
}()

snapshots := d.mu.snapshots.toSlice()
formatVers := d.FormatMajorVersion()

Expand All @@ -2520,7 +2474,7 @@ func (d *DB) runCompaction(
}

if c.cancel.Load() {
return ve, nil, stats, ErrCancelledCompaction
return ve, stats, ErrCancelledCompaction
}

// Release the d.mu lock while doing I/O.
Expand Down Expand Up @@ -2556,7 +2510,7 @@ func (d *DB) runCompaction(

pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, d.tableNewRangeKeyIter)
if err != nil {
return nil, pendingOutputs, stats, err
return nil, stats, err
}
c.allowedZeroSeqNum = c.allowZeroSeqNum()
cfg := compact.IterConfig{
Expand Down Expand Up @@ -2669,10 +2623,6 @@ func (d *DB) runCompaction(
FileNum: base.PhysicalTableFileNum(objMeta.DiskFileNum),
CreationTime: time.Now().Unix(),
}
pendingOutputs = append(pendingOutputs, compactionOutput{
meta: fileMeta.PhysicalMeta().FileMetadata,
isLocal: !objMeta.IsRemote(),
})
createdFiles = append(createdFiles, objMeta.DiskFileNum)

ve.NewFiles = append(ve.NewFiles, newFileEntry{
Expand Down Expand Up @@ -2866,7 +2816,7 @@ func (d *DB) runCompaction(
}
splitter := compact.NewOutputSplitter(c.cmp, firstKey, splitLimitFunc(firstKey), c.maxOutputFileSize, c.grandparents.Iter(), iter.Frontiers())
if err := newOutput(); err != nil {
return nil, pendingOutputs, stats, err
return nil, stats, err
}

// Each inner loop iteration processes one key from the input iterator.
Expand All @@ -2880,21 +2830,21 @@ func (d *DB) runCompaction(
// The previous span (if any) must end at or before this key, since the
// spans we receive are non-overlapping.
if err := tw.EncodeSpan(&lastRangeDelSpan); err != nil {
return nil, pendingOutputs, stats, err
return nil, stats, err
}
lastRangeDelSpan.CopyFrom(iter.Span())
continue
case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
// The previous span (if any) must end at or before this key, since the
// spans we receive are non-overlapping.
if err := tw.EncodeSpan(&lastRangeKeySpan); err != nil {
return nil, pendingOutputs, stats, err
return nil, stats, err
}
lastRangeKeySpan.CopyFrom(iter.Span())
continue
}
if err := tw.AddWithForceObsolete(*key, val, iter.ForceObsoleteDueToRangeDel()); err != nil {
return nil, pendingOutputs, stats, err
return nil, stats, err
}
if iter.SnapshotPinned() {
// The kv pair we just added to the sstable was only surfaced by
Expand All @@ -2906,7 +2856,7 @@ func (d *DB) runCompaction(
}
}
if err := finishOutput(splitter.SplitKey()); err != nil {
return nil, pendingOutputs, stats, err
return nil, stats, err
}
}

Expand All @@ -2926,14 +2876,14 @@ func (d *DB) runCompaction(
stats.countMissizedDels = iter.Stats().CountMissizedDels

if err := d.objProvider.Sync(); err != nil {
return nil, pendingOutputs, stats, err
return nil, stats, err
}

// Refresh the disk available statistic whenever a compaction/flush
// completes, before re-acquiring the mutex.
_ = d.calculateDiskAvailableBytes()

return ve, pendingOutputs, stats, nil
return ve, stats, nil
}

// newCompactionOutput creates an object for a new table produced by a
Expand Down
2 changes: 1 addition & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error)
// to the user-defined boundaries.
c.maxOutputFileSize = math.MaxUint64

newVE, _, _, err := d.runCompaction(0, c)
newVE, _, err := d.runCompaction(0, c)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion open.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ func (d *DB) replayWAL(
if err != nil {
return err
}
newVE, _, _, err := d.runCompaction(jobID, c)
newVE, _, err := d.runCompaction(jobID, c)
if err != nil {
return errors.Wrapf(err, "running compaction during WAL replay")
}
Expand Down

0 comments on commit 6a357e6

Please sign in to comment.