diff --git a/compaction.go b/compaction.go index 350c731874..a03ba1281a 100644 --- a/compaction.go +++ b/compaction.go @@ -1472,7 +1472,6 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { startTime := d.timeNow() var ve *manifest.VersionEdit - var pendingOutputs []compactionOutput var stats compactStats // To determine the target level of the files in the ingestedFlushable, we // need to acquire the logLock, and not release it for that duration. Since, @@ -1481,7 +1480,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { // runCompaction. For all other flush cases, we construct the VersionEdit // inside runCompaction. if c.kind != compactionKindIngestedFlushable { - ve, pendingOutputs, stats, err = d.runCompaction(jobID, c) + ve, stats, err = d.runCompaction(jobID, c) } // Acquire logLock. This will be released either on an error, by way of @@ -1572,23 +1571,6 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) }) if err != nil { info.Err = err - // TODO(peter): untested. - for _, f := range pendingOutputs { - // Note that the FileBacking for the file metadata might not have - // been set yet. So, we directly use the FileNum. Since these - // files were generated as compaction outputs, these must be - // physical files on disk. This property might not hold once - // https://github.com/cockroachdb/pebble/issues/389 is - // implemented if #389 creates virtual sstables as output files. - d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, tableInfo{ - fileInfo: fileInfo{ - FileNum: base.PhysicalTableDiskFileNum(f.meta.FileNum), - FileSize: f.meta.Size, - }, - isLocal: f.isLocal, - }) - } - d.mu.versions.updateObsoleteTableMetricsLocked() } } else { // We won't be performing the logAndApply step because of the error, @@ -2169,7 +2151,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { d.opts.EventListener.CompactionBegin(info) startTime := d.timeNow() - ve, pendingOutputs, stats, err := d.runCompaction(jobID, c) + ve, stats, err := d.runCompaction(jobID, c) info.Duration = d.timeNow().Sub(startTime) if err == nil { @@ -2194,25 +2176,6 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { return d.getInProgressCompactionInfoLocked(c) }) }() - if err != nil { - // TODO(peter): untested. - for _, f := range pendingOutputs { - // Note that the FileBacking for the file metadata might not have - // been set yet. So, we directly use the FileNum. Since these - // files were generated as compaction outputs, these must be - // physical files on disk. This property might not hold once - // https://github.com/cockroachdb/pebble/issues/389 is - // implemented if #389 creates virtual sstables as output files. - d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, tableInfo{ - fileInfo: fileInfo{ - FileNum: base.PhysicalTableDiskFileNum(f.meta.FileNum), - FileSize: f.meta.Size, - }, - isLocal: f.isLocal, - }) - } - d.mu.versions.updateObsoleteTableMetricsLocked() - } } info.Done = true @@ -2271,7 +2234,7 @@ func (d *DB) runCopyCompaction( inputMeta *fileMetadata, objMeta objstorage.ObjectMetadata, ve *versionEdit, -) (pendingOutputs []compactionOutput, retErr error) { +) error { ctx := context.TODO() if !objMeta.IsExternal() { @@ -2338,13 +2301,20 @@ func (d *DB) runCopyCompaction( d.mu.Unlock() defer d.mu.Lock() + deleteOnExit := false + defer func() { + if deleteOnExit { + _ = d.objProvider.Remove(fileTypeTable, newMeta.FileBacking.DiskFileNum) + } + }() + // If the src obj is external, we're doing an external to local/shared copy. if objMeta.IsExternal() { src, err := d.objProvider.OpenForReading( ctx, fileTypeTable, inputMeta.FileBacking.DiskFileNum, objstorage.OpenOptions{}, ) if err != nil { - return pendingOutputs, err + return err } defer func() { if src != nil { @@ -2352,19 +2322,16 @@ func (d *DB) runCopyCompaction( } }() - w, outObjMeta, err := d.objProvider.Create( - ctx, fileTypeTable, base.PhysicalTableDiskFileNum(newMeta.FileNum), + w, _, err := d.objProvider.Create( + ctx, fileTypeTable, newMeta.FileBacking.DiskFileNum, objstorage.CreateOptions{ PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level), }, ) if err != nil { - return pendingOutputs, err + return err } - pendingOutputs = append(pendingOutputs, compactionOutput{ - meta: newMeta, - isLocal: !outObjMeta.IsRemote(), - }) + deleteOnExit = true start, end := newMeta.Smallest, newMeta.Largest if newMeta.SyntheticPrefix.IsSet() { @@ -2386,22 +2353,19 @@ func (d *DB) runCopyCompaction( ) src = nil // We passed src to CopySpan; it's responsible for closing it. if err != nil { - return pendingOutputs, err + return err } newMeta.FileBacking.Size = wrote newMeta.Size = wrote } else { - pendingOutputs = append(pendingOutputs, compactionOutput{ - meta: newMeta.PhysicalMeta().FileMetadata, - isLocal: true, - }) _, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS, d.objProvider.Path(objMeta), fileTypeTable, newMeta.FileBacking.DiskFileNum, objstorage.CreateOptions{PreferSharedStorage: true}) if err != nil { - return pendingOutputs, err + return err } + deleteOnExit = true } ve.NewFiles[0].Meta = newMeta if newMeta.Virtual { @@ -2409,19 +2373,15 @@ func (d *DB) runCopyCompaction( } if err := d.objProvider.Sync(); err != nil { - return pendingOutputs, err + return err } - return pendingOutputs, nil -} - -type compactionOutput struct { - meta *fileMetadata - isLocal bool + deleteOnExit = false + return nil } func (d *DB) runDeleteOnlyCompaction( jobID JobID, c *compaction, -) (ve *versionEdit, pendingOutputs []compactionOutput, stats compactStats, retErr error) { +) (ve *versionEdit, stats compactStats, retErr error) { c.metrics = make(map[int]*LevelMetrics, len(c.inputs)) ve = &versionEdit{ DeletedFiles: map[deletedFileEntry]*fileMetadata{}, @@ -2437,12 +2397,12 @@ func (d *DB) runDeleteOnlyCompaction( } c.metrics[cl.level] = levelMetrics } - return ve, nil, stats, nil + return ve, stats, nil } func (d *DB) runMoveOrCopyCompaction( jobID JobID, c *compaction, -) (ve *versionEdit, pendingOutputs []compactionOutput, stats compactStats, _ error) { +) (ve *versionEdit, stats compactStats, _ error) { iter := c.startLevel.files.Iter() meta := iter.First() if invariants.Enabled { @@ -2451,11 +2411,11 @@ func (d *DB) runMoveOrCopyCompaction( } } if c.cancel.Load() { - return ve, nil, stats, ErrCancelledCompaction + return ve, stats, ErrCancelledCompaction } objMeta, err := d.objProvider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum) if err != nil { - return ve, pendingOutputs, stats, err + return ve, stats, err } c.metrics = map[int]*LevelMetrics{ c.outputLevel.level: { @@ -2472,11 +2432,11 @@ func (d *DB) runMoveOrCopyCompaction( }, } if c.kind == compactionKindMove { - return ve, nil, stats, nil + return ve, stats, nil } - pendingOutputs, err = d.runCopyCompaction(jobID, c, meta, objMeta, ve) - return ve, pendingOutputs, stats, err + err = d.runCopyCompaction(jobID, c, meta, objMeta, ve) + return ve, stats, err } // runCompaction runs a compaction that produces new on-disk tables from @@ -2488,7 +2448,7 @@ func (d *DB) runMoveOrCopyCompaction( // re-acquired during the course of this method. func (d *DB) runCompaction( jobID JobID, c *compaction, -) (ve *versionEdit, pendingOutputs []compactionOutput, stats compactStats, retErr error) { +) (ve *versionEdit, stats compactStats, retErr error) { switch c.kind { case compactionKindDeleteOnly: return d.runDeleteOnlyCompaction(jobID, c) @@ -2498,12 +2458,6 @@ func (d *DB) runCompaction( panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.") } - defer func() { - if retErr != nil { - pendingOutputs = nil - } - }() - snapshots := d.mu.snapshots.toSlice() formatVers := d.FormatMajorVersion() @@ -2520,7 +2474,7 @@ func (d *DB) runCompaction( } if c.cancel.Load() { - return ve, nil, stats, ErrCancelledCompaction + return ve, stats, ErrCancelledCompaction } // Release the d.mu lock while doing I/O. @@ -2556,7 +2510,7 @@ func (d *DB) runCompaction( pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, d.tableNewRangeKeyIter) if err != nil { - return nil, pendingOutputs, stats, err + return nil, stats, err } c.allowedZeroSeqNum = c.allowZeroSeqNum() cfg := compact.IterConfig{ @@ -2669,10 +2623,6 @@ func (d *DB) runCompaction( FileNum: base.PhysicalTableFileNum(objMeta.DiskFileNum), CreationTime: time.Now().Unix(), } - pendingOutputs = append(pendingOutputs, compactionOutput{ - meta: fileMeta.PhysicalMeta().FileMetadata, - isLocal: !objMeta.IsRemote(), - }) createdFiles = append(createdFiles, objMeta.DiskFileNum) ve.NewFiles = append(ve.NewFiles, newFileEntry{ @@ -2866,7 +2816,7 @@ func (d *DB) runCompaction( } splitter := compact.NewOutputSplitter(c.cmp, firstKey, splitLimitFunc(firstKey), c.maxOutputFileSize, c.grandparents.Iter(), iter.Frontiers()) if err := newOutput(); err != nil { - return nil, pendingOutputs, stats, err + return nil, stats, err } // Each inner loop iteration processes one key from the input iterator. @@ -2880,7 +2830,7 @@ func (d *DB) runCompaction( // The previous span (if any) must end at or before this key, since the // spans we receive are non-overlapping. if err := tw.EncodeSpan(&lastRangeDelSpan); err != nil { - return nil, pendingOutputs, stats, err + return nil, stats, err } lastRangeDelSpan.CopyFrom(iter.Span()) continue @@ -2888,13 +2838,13 @@ func (d *DB) runCompaction( // The previous span (if any) must end at or before this key, since the // spans we receive are non-overlapping. if err := tw.EncodeSpan(&lastRangeKeySpan); err != nil { - return nil, pendingOutputs, stats, err + return nil, stats, err } lastRangeKeySpan.CopyFrom(iter.Span()) continue } if err := tw.AddWithForceObsolete(*key, val, iter.ForceObsoleteDueToRangeDel()); err != nil { - return nil, pendingOutputs, stats, err + return nil, stats, err } if iter.SnapshotPinned() { // The kv pair we just added to the sstable was only surfaced by @@ -2906,7 +2856,7 @@ func (d *DB) runCompaction( } } if err := finishOutput(splitter.SplitKey()); err != nil { - return nil, pendingOutputs, stats, err + return nil, stats, err } } @@ -2926,14 +2876,14 @@ func (d *DB) runCompaction( stats.countMissizedDels = iter.Stats().CountMissizedDels if err := d.objProvider.Sync(); err != nil { - return nil, pendingOutputs, stats, err + return nil, stats, err } // Refresh the disk available statistic whenever a compaction/flush // completes, before re-acquiring the mutex. _ = d.calculateDiskAvailableBytes() - return ve, pendingOutputs, stats, nil + return ve, stats, nil } // newCompactionOutput creates an object for a new table produced by a diff --git a/data_test.go b/data_test.go index 41ed5d05bf..3ee315c148 100644 --- a/data_test.go +++ b/data_test.go @@ -914,7 +914,7 @@ func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error) // to the user-defined boundaries. c.maxOutputFileSize = math.MaxUint64 - newVE, _, _, err := d.runCompaction(0, c) + newVE, _, err := d.runCompaction(0, c) if err != nil { return err } diff --git a/open.go b/open.go index 4c6139dbe9..21336ac90f 100644 --- a/open.go +++ b/open.go @@ -826,7 +826,7 @@ func (d *DB) replayWAL( if err != nil { return err } - newVE, _, _, err := d.runCompaction(jobID, c) + newVE, _, err := d.runCompaction(jobID, c) if err != nil { return errors.Wrapf(err, "running compaction during WAL replay") }