From 4301b8f511d92fb1ecbd9cec0a01b9a4d9ed9193 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 7 May 2024 19:29:10 -0700 Subject: [PATCH] compaction: clean up runCompaction In this change we separate the code that deals with the "data" portion of the compaction from the code that deals with the "metadata" portion. These were intertwined in the old code, leading to a lot of clutter. We move the "inner" loop of the compaction (writing one output table) to `compact.Runner`. We then use the tables in `compact.Result` to create the version edit. --- compaction.go | 460 ++++++++++------------------------- internal/compact/run.go | 234 +++++++++++++++++- internal/compact/run_test.go | 12 +- 3 files changed, 360 insertions(+), 346 deletions(-) diff --git a/compaction.go b/compaction.go index 0e804e3fe0..bccd3bdf12 100644 --- a/compaction.go +++ b/compaction.go @@ -40,12 +40,6 @@ var compactLabels = pprof.Labels("pebble", "compact") var flushLabels = pprof.Labels("pebble", "flush") var gcLabels = pprof.Labels("pebble", "gc") -// getInternalWriterProperties accesses a private variable (in the -// internal/private package) initialized by the sstable Writer. This indirection -// is necessary to ensure non-Pebble users constructing sstables for ingestion -// are unable to set internal-only properties. -var getInternalWriterProperties = private.SSTableInternalProperties.(func(*sstable.Writer) *sstable.Properties) - // expandedCompactionByteSizeLimit is the maximum number of bytes in all // compacted files. We avoid expanding the lower level file set of a compaction // if it would make the total compaction cover more than this many bytes. @@ -1418,7 +1412,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { startTime := d.timeNow() var ve *manifest.VersionEdit - var stats compactStats + var stats compact.Stats // To determine the target level of the files in the ingestedFlushable, we // need to acquire the logLock, and not release it for that duration. Since, // we need to acquire the logLock below to perform the logAndApply step @@ -1527,9 +1521,9 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { // If err != nil, then the flush will be retried, and we will recalculate // these metrics. if err == nil { - d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys - d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize - d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels + d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys + d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize + d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels } d.clearCompactingState(c, err != nil) @@ -2131,9 +2125,9 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { e := &ve.NewFiles[i] info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo()) } - d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys - d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize - d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels + d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys + d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize + d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels } // NB: clearing compacting state must occur before updating the read state; @@ -2158,12 +2152,6 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { return err } -type compactStats struct { - cumulativePinnedKeys uint64 - cumulativePinnedSize uint64 - countMissizedDels uint64 -} - // runCopyCompaction runs a copy compaction where a new FileNum is created that // is a byte-for-byte copy of the input file or span thereof in some cases. This // is used in lieu of a move compaction when a file is being moved across the @@ -2327,7 +2315,7 @@ func (d *DB) runCopyCompaction( func (d *DB) runDeleteOnlyCompaction( jobID JobID, c *compaction, -) (ve *versionEdit, stats compactStats, retErr error) { +) (ve *versionEdit, stats compact.Stats, retErr error) { c.metrics = make(map[int]*LevelMetrics, len(c.inputs)) ve = &versionEdit{ DeletedFiles: map[deletedFileEntry]*fileMetadata{}, @@ -2348,7 +2336,7 @@ func (d *DB) runDeleteOnlyCompaction( func (d *DB) runMoveOrCopyCompaction( jobID JobID, c *compaction, -) (ve *versionEdit, stats compactStats, _ error) { +) (ve *versionEdit, stats compact.Stats, _ error) { iter := c.startLevel.files.Iter() meta := iter.First() if invariants.Enabled { @@ -2394,7 +2382,7 @@ func (d *DB) runMoveOrCopyCompaction( // re-acquired during the course of this method. func (d *DB) runCompaction( jobID JobID, c *compaction, -) (ve *versionEdit, stats compactStats, retErr error) { +) (ve *versionEdit, stats compact.Stats, retErr error) { switch c.kind { case compactionKindDeleteOnly: return d.runDeleteOnlyCompaction(jobID, c) @@ -2405,7 +2393,6 @@ func (d *DB) runCompaction( } snapshots := d.mu.snapshots.toSlice() - formatVers := d.FormatMajorVersion() if c.flushing == nil { // Before dropping the db mutex, grab a ref to the current version. This @@ -2423,11 +2410,44 @@ func (d *DB) runCompaction( return ve, stats, ErrCancelledCompaction } + // The table is typically written at the maximum allowable format implied by + // the current format major version of the DB. + tableFormat := d.FormatMajorVersion().MaxTableFormat() + // In format major versions with maximum table formats of Pebblev3, value + // blocks were conditional on an experimental setting. In format major + // versions with maximum table formats of Pebblev4 and higher, value blocks + // are always enabled. + if tableFormat == sstable.TableFormatPebblev3 && + (d.opts.Experimental.EnableValueBlocks == nil || !d.opts.Experimental.EnableValueBlocks()) { + tableFormat = sstable.TableFormatPebblev2 + } + // Release the d.mu lock while doing I/O. // Note the unusual order: Unlock and then Lock. d.mu.Unlock() defer d.mu.Lock() + result := d.compactAndWrite(jobID, c, snapshots, tableFormat) + if result.Err == nil { + ve, result.Err = c.makeVersionEdit(result) + } + if result.Err != nil { + // Delete any created tables. + for i := range result.Tables { + _ = d.objProvider.Remove(fileTypeTable, result.Tables[i].ObjMeta.DiskFileNum) + } + } + // Refresh the disk available statistic whenever a compaction/flush + // completes, before re-acquiring the mutex. + d.calculateDiskAvailableBytes() + return ve, result.Stats, result.Err +} + +// compactAndWrite runs the data part of a compaction, where we set up a +// compaction iterator and use it to write output tables. +func (d *DB) compactAndWrite( + jobID JobID, c *compaction, snapshots compact.Snapshots, tableFormat sstable.TableFormat, +) (result compact.Result) { // Compactions use a pool of buffers to read blocks, avoiding polluting the // block cache with blocks that will not be read again. We initialize the // buffer pool with a size 12. This initial size does not need to be @@ -2455,8 +2475,13 @@ func (d *DB) runCompaction( defer c.bufferPool.Release() pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, d.tableNewRangeKeyIter) + defer func() { + for _, closer := range c.closers { + result.Err = firstError(result.Err, closer.Close()) + } + }() if err != nil { - return nil, stats, err + return compact.Result{Err: err} } c.allowedZeroSeqNum = c.allowZeroSeqNum() cfg := compact.IterConfig{ @@ -2470,35 +2495,50 @@ func (d *DB) runCompaction( SingleDeleteInvariantViolationCallback: d.opts.Experimental.SingleDeleteInvariantViolationCallback, } iter := compact.NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter) - var lastRangeDelSpan, lastRangeKeySpan keyspan.Span - - var ( - createdFiles []base.DiskFileNum - tw *sstable.Writer - pinnedKeySize uint64 - pinnedValueSize uint64 - pinnedCount uint64 - ) - defer func() { - if iter != nil { - retErr = firstError(retErr, iter.Close()) - } - if tw != nil { - retErr = firstError(retErr, tw.Close()) - } - if retErr != nil { - for _, fileNum := range createdFiles { - _ = d.objProvider.Remove(fileTypeTable, fileNum) - } + + runnerCfg := compact.RunnerConfig{ + CompactionBounds: base.UserKeyBoundsFromInternal(c.smallest, c.largest), + L0SplitKeys: c.l0Limits, + Grandparents: c.grandparents, + MaxGrandparentOverlapBytes: c.maxOverlapBytes, + TargetOutputFileSize: c.maxOutputFileSize, + } + runner := compact.NewRunner(runnerCfg, iter) + for runner.MoreDataToWrite() { + if c.cancel.Load() { + return runner.Finish().WithError(ErrCancelledCompaction) } - for _, closer := range c.closers { - retErr = firstError(retErr, closer.Close()) + // Create a new table. + writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat) + objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts) + if err != nil { + return runner.Finish().WithError(err) } - }() + runner.WriteTable(objMeta, tw) + d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle) + } + result = runner.Finish() + if result.Err == nil { + result.Err = d.objProvider.Sync() + } + return result +} - ve = &versionEdit{ +// makeVersionEdit creates the version edit for a compaction, based on the +// tables in compact.Result. +func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error) { + ve := &versionEdit{ DeletedFiles: map[deletedFileEntry]*fileMetadata{}, } + for _, cl := range c.inputs { + iter := cl.files.Iter() + for f := iter.First(); f != nil; f = iter.Next() { + ve.DeletedFiles[deletedFileEntry{ + Level: cl.level, + FileNum: f.FileNum, + }] = f + } + } startLevelBytes := c.startLevel.files.SizeSum() outputMetrics := &LevelMetrics{ @@ -2523,312 +2563,75 @@ func (d *DB) runCompaction( outputMetrics.MultiLevel.BytesRead = outputMetrics.BytesRead } - // The table is typically written at the maximum allowable format implied by - // the current format major version of the DB. - tableFormat := formatVers.MaxTableFormat() inputLargestSeqNumAbsolute := c.inputLargestSeqNumAbsolute() + ve.NewFiles = make([]newFileEntry, len(result.Tables)) + for i := range result.Tables { + t := &result.Tables[i] - // In format major versions with maximum table formats of Pebblev3, value - // blocks were conditional on an experimental setting. In format major - // versions with maximum table formats of Pebblev4 and higher, value blocks - // are always enabled. - if tableFormat == sstable.TableFormatPebblev3 && - (d.opts.Experimental.EnableValueBlocks == nil || !d.opts.Experimental.EnableValueBlocks()) { - tableFormat = sstable.TableFormatPebblev2 - } - - writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat) - - // prevPointKey is a sstable.WriterOption that provides access to - // the last point key written to a writer's sstable. When a new - // output begins in newOutput, prevPointKey is updated to point to - // the new output's sstable.Writer. This allows the compaction loop - // to access the last written point key without requiring the - // compaction loop to make a copy of each key ahead of time. Users - // must be careful, because the byte slice returned by UnsafeKey - // points directly into the Writer's block buffer. - var cpuWorkHandle CPUWorkHandle - defer func() { - if cpuWorkHandle != nil { - d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle) - } - }() - - newOutput := func() error { - // Check if we've been cancelled by a concurrent operation. - if c.cancel.Load() { - return ErrCancelledCompaction - } - var objMeta objstorage.ObjectMetadata - var err error - objMeta, tw, cpuWorkHandle, err = d.newCompactionOutput(jobID, c, writerOpts) - if err != nil { - return err - } fileMeta := &fileMetadata{ - FileNum: base.PhysicalTableFileNum(objMeta.DiskFileNum), - CreationTime: time.Now().Unix(), - } - createdFiles = append(createdFiles, objMeta.DiskFileNum) - - ve.NewFiles = append(ve.NewFiles, newFileEntry{ - Level: c.outputLevel.level, - Meta: fileMeta, - }) - return nil - } - - // finishOutput is called with the a user key up to which all tombstones - // should be flushed. Typically, this is the first key of the next - // sstable or an empty key if this output is the final sstable. - finishOutput := func(splitKey []byte) error { - if err := compact.SplitAndEncodeSpan(c.cmp, &lastRangeDelSpan, splitKey, tw); err != nil { - return err - } - if err := compact.SplitAndEncodeSpan(c.cmp, &lastRangeKeySpan, splitKey, tw); err != nil { - return err - } - { - // Set internal sstable properties. - p := getInternalWriterProperties(tw) - // Set the snapshot pinned totals. - p.SnapshotPinnedKeys = pinnedCount - p.SnapshotPinnedKeySize = pinnedKeySize - p.SnapshotPinnedValueSize = pinnedValueSize - stats.cumulativePinnedKeys += pinnedCount - stats.cumulativePinnedSize += pinnedKeySize + pinnedValueSize - pinnedCount = 0 - pinnedKeySize = 0 - pinnedValueSize = 0 - } - if err := tw.Close(); err != nil { - tw = nil - return err + FileNum: base.PhysicalTableFileNum(t.ObjMeta.DiskFileNum), + CreationTime: t.CreationTime.Unix(), + Size: t.WriterMeta.Size, + SmallestSeqNum: t.WriterMeta.SmallestSeqNum, + LargestSeqNum: t.WriterMeta.LargestSeqNum, } - d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle) - cpuWorkHandle = nil - writerMeta, err := tw.Metadata() - if err != nil { - tw = nil - return err - } - tw = nil - meta := ve.NewFiles[len(ve.NewFiles)-1].Meta - meta.Size = writerMeta.Size - meta.SmallestSeqNum = writerMeta.SmallestSeqNum - meta.LargestSeqNum = writerMeta.LargestSeqNum if c.flushing == nil { // Set the file's LargestSeqNumAbsolute to be the maximum value of any // of the compaction's input sstables. // TODO(jackson): This could be narrowed to be the maximum of input // sstables that overlap the output sstable's key range. - meta.LargestSeqNumAbsolute = inputLargestSeqNumAbsolute + fileMeta.LargestSeqNumAbsolute = inputLargestSeqNumAbsolute } else { - meta.LargestSeqNumAbsolute = writerMeta.LargestSeqNum + fileMeta.LargestSeqNumAbsolute = t.WriterMeta.LargestSeqNum } - meta.InitPhysicalBacking() + fileMeta.InitPhysicalBacking() // If the file didn't contain any range deletions, we can fill its // table stats now, avoiding unnecessarily loading the table later. maybeSetStatsFromProperties( - meta.PhysicalMeta(), &writerMeta.Properties, + fileMeta.PhysicalMeta(), &t.WriterMeta.Properties, ) - if c.flushing == nil { - outputMetrics.TablesCompacted++ - outputMetrics.BytesCompacted += meta.Size - } else { - outputMetrics.TablesFlushed++ - outputMetrics.BytesFlushed += meta.Size - } - outputMetrics.Size += int64(meta.Size) - outputMetrics.NumFiles++ - outputMetrics.Additional.BytesWrittenDataBlocks += writerMeta.Properties.DataSize - outputMetrics.Additional.BytesWrittenValueBlocks += writerMeta.Properties.ValueBlocksSize - - if n := len(ve.NewFiles); n > 1 { - // This is not the first output file. Ensure the sstable boundaries - // are nonoverlapping. - prevMeta := ve.NewFiles[n-2].Meta - if writerMeta.SmallestRangeDel.UserKey != nil { - c := d.cmp(writerMeta.SmallestRangeDel.UserKey, prevMeta.Largest.UserKey) - if c < 0 { - return base.AssertionFailedf( - "pebble: smallest range tombstone start key is less than previous sstable largest key: %s < %s", - writerMeta.SmallestRangeDel.Pretty(d.opts.Comparer.FormatKey), - prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey)) - } else if c == 0 && !prevMeta.Largest.IsExclusiveSentinel() { - // The user key portion of the range boundary start key is - // equal to the previous table's largest key user key, and - // the previous table's largest key is not exclusive. This - // violates the invariant that tables are key-space - // partitioned. - return base.AssertionFailedf( - "pebble: invariant violation: previous sstable largest key %s, current sstable smallest rangedel: %s", - prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey), - writerMeta.SmallestRangeDel.Pretty(d.opts.Comparer.FormatKey), - ) - } - } - } - - // Verify that all range deletions outputted to the sstable are - // truncated to split key. - if splitKey != nil && writerMeta.LargestRangeDel.UserKey != nil && - d.cmp(writerMeta.LargestRangeDel.UserKey, splitKey) > 0 { - return errors.Errorf( - "pebble: invariant violation: rangedel largest key %q extends beyond split key %q", - writerMeta.LargestRangeDel.Pretty(d.opts.Comparer.FormatKey), - d.opts.Comparer.FormatKey(splitKey), - ) + if t.WriterMeta.HasPointKeys { + fileMeta.ExtendPointKeyBounds(c.cmp, t.WriterMeta.SmallestPoint, t.WriterMeta.LargestPoint) } - - if writerMeta.HasPointKeys { - meta.ExtendPointKeyBounds(d.cmp, writerMeta.SmallestPoint, writerMeta.LargestPoint) + if t.WriterMeta.HasRangeDelKeys { + fileMeta.ExtendPointKeyBounds(c.cmp, t.WriterMeta.SmallestRangeDel, t.WriterMeta.LargestRangeDel) } - if writerMeta.HasRangeDelKeys { - meta.ExtendPointKeyBounds(d.cmp, writerMeta.SmallestRangeDel, writerMeta.LargestRangeDel) + if t.WriterMeta.HasRangeKeys { + fileMeta.ExtendRangeKeyBounds(c.cmp, t.WriterMeta.SmallestRangeKey, t.WriterMeta.LargestRangeKey) } - if writerMeta.HasRangeKeys { - meta.ExtendRangeKeyBounds(d.cmp, writerMeta.SmallestRangeKey, writerMeta.LargestRangeKey) - } - - // Verify that the sstable bounds fall within the compaction input - // bounds. This is a sanity check that we don't have a logic error - // elsewhere that causes the sstable bounds to accidentally expand past the - // compaction input bounds as doing so could lead to various badness such - // as keys being deleted by a range tombstone incorrectly. - if c.smallest.UserKey != nil { - switch v := d.cmp(meta.Smallest.UserKey, c.smallest.UserKey); { - case v >= 0: - // Nothing to do. - case v < 0: - return errors.Errorf("pebble: compaction output grew beyond bounds of input: %s < %s", - meta.Smallest.Pretty(d.opts.Comparer.FormatKey), - c.smallest.Pretty(d.opts.Comparer.FormatKey)) - } - } - if c.largest.UserKey != nil { - switch v := d.cmp(meta.Largest.UserKey, c.largest.UserKey); { - case v <= 0: - // Nothing to do. - case v > 0: - return errors.Errorf("pebble: compaction output grew beyond bounds of input: %s > %s", - meta.Largest.Pretty(d.opts.Comparer.FormatKey), - c.largest.Pretty(d.opts.Comparer.FormatKey)) - } - } - // Verify that we never split different revisions of the same user key - // across two different sstables. - if err := c.errorOnUserKeyOverlap(ve); err != nil { - return err - } - if err := meta.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil { - return err - } - return nil - } - - runnerCfg := compact.RunnerConfig{ - Grandparents: c.grandparents, - MaxGrandparentOverlapBytes: c.maxOverlapBytes, - TargetOutputFileSize: c.maxOutputFileSize, - L0SplitKeys: c.l0Limits, - } - runner := compact.NewRunner(runnerCfg, iter) - - lastUserKeyFn := func() []byte { - return tw.UnsafeLastPointUserKey() - } - // Each outer loop iteration produces one output file. An iteration that - // produces a file containing point keys (and optionally range tombstones) - // guarantees that the input iterator advanced. An iteration that produces - // a file containing only range tombstones guarantees the limit passed to - // `finishOutput()` advanced to a strictly greater user key corresponding - // to a grandparent file largest key, or nil. Taken together, these - // progress guarantees ensure that eventually the input iterator will be - // exhausted and the range tombstone fragments will all be flushed. - for key, val := iter.First(); key != nil || !lastRangeDelSpan.Empty() || !lastRangeKeySpan.Empty(); { - firstKey := base.MinUserKey(c.cmp, spanStartOrNil(&lastRangeDelSpan), spanStartOrNil(&lastRangeKeySpan)) - if key != nil && firstKey == nil { - firstKey = key.UserKey - } - if invariants.Enabled && firstKey == nil { - panic("nil first key") - } - splitter := compact.NewOutputSplitter( - c.cmp, firstKey, runner.TableSplitLimit(firstKey), c.maxOutputFileSize, c.grandparents.Iter(), iter.Frontiers(), - ) - if err := newOutput(); err != nil { - return nil, stats, err + ve.NewFiles[i] = newFileEntry{ + Level: c.outputLevel.level, + Meta: fileMeta, } - // Each inner loop iteration processes one key from the input iterator. - for ; key != nil; key, val = iter.Next() { - if splitter.ShouldSplitBefore(key.UserKey, tw.EstimatedSize(), lastUserKeyFn) { - break - } - - switch key.Kind() { - case InternalKeyKindRangeDelete: - // The previous span (if any) must end at or before this key, since the - // spans we receive are non-overlapping. - if err := tw.EncodeSpan(&lastRangeDelSpan); err != nil { - return nil, stats, err - } - lastRangeDelSpan.CopyFrom(iter.Span()) - continue - case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete: - // The previous span (if any) must end at or before this key, since the - // spans we receive are non-overlapping. - if err := tw.EncodeSpan(&lastRangeKeySpan); err != nil { - return nil, stats, err - } - lastRangeKeySpan.CopyFrom(iter.Span()) - continue - } - if err := tw.AddWithForceObsolete(*key, val, iter.ForceObsoleteDueToRangeDel()); err != nil { - return nil, stats, err - } - if iter.SnapshotPinned() { - // The kv pair we just added to the sstable was only surfaced by - // the compaction iterator because an open snapshot prevented - // its elision. Increment the stats. - pinnedCount++ - pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen - pinnedValueSize += uint64(len(val)) - } - } - if err := finishOutput(splitter.SplitKey()); err != nil { - return nil, stats, err + // Update metrics. + if c.flushing == nil { + outputMetrics.TablesCompacted++ + outputMetrics.BytesCompacted += fileMeta.Size + } else { + outputMetrics.TablesFlushed++ + outputMetrics.BytesFlushed += fileMeta.Size } + outputMetrics.Size += int64(fileMeta.Size) + outputMetrics.NumFiles++ + outputMetrics.Additional.BytesWrittenDataBlocks += t.WriterMeta.Properties.DataSize + outputMetrics.Additional.BytesWrittenValueBlocks += t.WriterMeta.Properties.ValueBlocksSize } - for _, cl := range c.inputs { - iter := cl.files.Iter() - for f := iter.First(); f != nil; f = iter.Next() { - ve.DeletedFiles[deletedFileEntry{ - Level: cl.level, - FileNum: f.FileNum, - }] = f + // Sanity check that the tables are ordered and don't overlap. + for i := 1; i < len(ve.NewFiles); i++ { + if ve.NewFiles[i-1].Meta.UserKeyBounds().End.IsUpperBoundFor(c.cmp, ve.NewFiles[i].Meta.Smallest.UserKey) { + return nil, base.AssertionFailedf("pebble: compaction output tables overlap: %s and %s", + ve.NewFiles[i-1].Meta.DebugString(c.formatKey, true), + ve.NewFiles[i].Meta.DebugString(c.formatKey, true), + ) } } - // The compaction iterator keeps track of a count of the number of DELSIZED - // keys that encoded an incorrect size. Propagate it up as a part of - // compactStats. - stats.countMissizedDels = iter.Stats().CountMissizedDels - - if err := d.objProvider.Sync(); err != nil { - return nil, stats, err - } - - // Refresh the disk available statistic whenever a compaction/flush - // completes, before re-acquiring the mutex. - _ = d.calculateDiskAvailableBytes() - - return ve, stats, nil + return ve, nil } // newCompactionOutput creates an object for a new table produced by a @@ -2928,10 +2731,3 @@ func validateVersionEdit( validateKey(m, m.Largest.UserKey) } } - -func spanStartOrNil(s *keyspan.Span) []byte { - if s.Empty() { - return nil - } - return s.Start -} diff --git a/internal/compact/run.go b/internal/compact/run.go index 3ee3cac80b..f7591e9dad 100644 --- a/internal/compact/run.go +++ b/internal/compact/run.go @@ -8,8 +8,11 @@ import ( "sort" "time" + "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" + "github.com/cockroachdb/pebble/internal/private" "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/sstable" ) @@ -17,15 +20,21 @@ import ( // Result stores the result of a compaction - more specifically, the "data" part // where we use the compaction iterator to write output tables. type Result struct { - // Err is the result of the compaction. - // - // On success, Err is nil and Tables stores the output tables. - // - // On failure, Err is set and Tables stores the tables created so far (and - // which need to be cleaned up). - Err error - + // Err is the result of the compaction. On success, Err is nil and Tables + // stores the output tables. On failure, Err is set and Tables stores the + // tables created so far (and which need to be cleaned up). + Err error Tables []OutputTable + Stats Stats +} + +// WithError returns a modified Result which has the Err field set. +func (r Result) WithError(err error) Result { + return Result{ + Err: errors.CombineErrors(r.Err, err), + Tables: r.Tables, + Stats: r.Stats, + } } // OutputTable contains metadata about a table that was created during a compaction. @@ -38,8 +47,19 @@ type OutputTable struct { WriterMeta sstable.WriterMetadata } +// Stats describes stats collected during the compaction. +type Stats struct { + CumulativePinnedKeys uint64 + CumulativePinnedSize uint64 + CountMissizedDels uint64 +} + // RunnerConfig contains the parameters needed for the Runner. type RunnerConfig struct { + // CompactionBounds are the bounds containing all the input tables. All output + // tables must fall within these bounds as well. + CompactionBounds base.UserKeyBounds + // L0SplitKeys is only set for flushes and it contains the flush split keys // (see L0Sublevels.FlushSplitKeys). These are split points enforced for the // output tables. @@ -63,23 +83,169 @@ type RunnerConfig struct { // Runner is a helper for running the "data" part of a compaction (where we use // the compaction iterator to write output tables). +// +// Sample usage: +// +// r := NewRunner(cfg, iter) +// for r.MoreDataToWrite() { +// objMeta, tw := ... // Create object and table writer. +// r.WriteTable(objMeta, tw) +// } +// result := r.Finish() type Runner struct { cmp base.Compare cfg RunnerConfig iter *Iter + + tables []OutputTable + // Stores any error encountered. + err error + // Last key/value returned by the compaction iterator. + key *base.InternalKey + value []byte + // Last RANGEDEL span (or portion of it) that was not yet written to a table. + lastRangeDelSpan keyspan.Span + // Last range key span (or portion of it) that was not yet written to a table. + lastRangeKeySpan keyspan.Span + stats Stats } // NewRunner creates a new Runner. -// TODO(radu): document usage once we have more functionality. func NewRunner(cfg RunnerConfig, iter *Iter) *Runner { r := &Runner{ cmp: iter.cmp, cfg: cfg, iter: iter, } + r.key, r.value = r.iter.First() return r } +// MoreDataToWrite returns true if there is more data to be written. +func (r *Runner) MoreDataToWrite() bool { + if r.err != nil { + return false + } + return r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty() +} + +// WriteTable writes a new output table. This table will be part of +// Result.Tables. Should only be called if MoreDataToWrite() returned true. +// +// WriteTable always closes the Writer. +func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.Writer) { + if r.err != nil { + panic("error already encountered") + } + r.tables = append(r.tables, OutputTable{ + CreationTime: time.Now(), + ObjMeta: objMeta, + }) + splitKey, err := r.writeKeysToTable(tw) + err = errors.CombineErrors(err, tw.Close()) + if err != nil { + r.err = err + r.key, r.value = nil, nil + return + } + writerMeta, err := tw.Metadata() + if err != nil { + r.err = err + return + } + if err := r.validateWriterMeta(writerMeta, splitKey); err != nil { + r.err = err + return + } + r.tables[len(r.tables)-1].WriterMeta = *writerMeta +} + +func (r *Runner) writeKeysToTable(tw *sstable.Writer) (splitKey []byte, _ error) { + firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan)) + if r.key != nil && firstKey == nil { + firstKey = r.key.UserKey + } + if firstKey == nil { + return nil, base.AssertionFailedf("no data to write") + } + splitter := NewOutputSplitter( + r.cmp, firstKey, r.TableSplitLimit(firstKey), + r.cfg.TargetOutputFileSize, r.cfg.Grandparents.Iter(), r.iter.Frontiers(), + ) + lastUserKeyFn := func() []byte { + return tw.UnsafeLastPointUserKey() + } + var pinnedKeySize, pinnedValueSize, pinnedCount uint64 + key, value := r.key, r.value + for ; key != nil; key, value = r.iter.Next() { + if splitter.ShouldSplitBefore(key.UserKey, tw.EstimatedSize(), lastUserKeyFn) { + break + } + + switch key.Kind() { + case base.InternalKeyKindRangeDelete: + // The previous span (if any) must end at or before this key, since the + // spans we receive are non-overlapping. + if err := tw.EncodeSpan(&r.lastRangeDelSpan); r.err != nil { + return nil, err + } + r.lastRangeDelSpan.CopyFrom(r.iter.Span()) + continue + + case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete: + // The previous span (if any) must end at or before this key, since the + // spans we receive are non-overlapping. + if err := tw.EncodeSpan(&r.lastRangeKeySpan); err != nil { + return nil, err + } + r.lastRangeKeySpan.CopyFrom(r.iter.Span()) + continue + } + if err := tw.AddWithForceObsolete(*key, value, r.iter.ForceObsoleteDueToRangeDel()); err != nil { + return nil, err + } + if r.iter.SnapshotPinned() { + // The kv pair we just added to the sstable was only surfaced by + // the compaction iterator because an open snapshot prevented + // its elision. Increment the stats. + pinnedCount++ + pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen + pinnedValueSize += uint64(len(value)) + } + } + r.key, r.value = key, value + splitKey = splitter.SplitKey() + if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeDelSpan, splitKey, tw); err != nil { + return nil, err + } + if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeKeySpan, splitKey, tw); err != nil { + return nil, err + } + // Set internal sstable properties. + p := getInternalWriterProperties(tw) + // Set the snapshot pinned totals. + p.SnapshotPinnedKeys = pinnedCount + p.SnapshotPinnedKeySize = pinnedKeySize + p.SnapshotPinnedValueSize = pinnedValueSize + r.stats.CumulativePinnedKeys += pinnedCount + r.stats.CumulativePinnedSize += pinnedKeySize + pinnedValueSize + return splitKey, nil +} + +// Finish closes the compaction iterator and returns the result of the +// compaction. +func (r *Runner) Finish() Result { + r.err = errors.CombineErrors(r.err, r.iter.Close()) + // The compaction iterator keeps track of a count of the number of DELSIZED + // keys that encoded an incorrect size. + r.stats.CountMissizedDels = r.iter.Stats().CountMissizedDels + return Result{ + Err: r.err, + Tables: r.tables, + Stats: r.stats, + } +} + // TableSplitLimit returns a hard split limit for an output table that starts at // startKey (which must be strictly greater than startKey), or nil if there is // no limit. @@ -125,3 +291,53 @@ func (r *Runner) TableSplitLimit(startKey []byte) []byte { return limitKey } + +// validateWriterMeta runs some sanity cehcks on the WriterMetadata on an output +// table that was just finished. splitKey is the key where the table must have +// ended (or nil). +func (r *Runner) validateWriterMeta(meta *sstable.WriterMetadata, splitKey []byte) error { + if !meta.HasPointKeys && !meta.HasRangeDelKeys && !meta.HasRangeKeys { + return base.AssertionFailedf("output table has no keys") + } + + var err error + checkBounds := func(smallest, largest base.InternalKey, description string) { + bounds := base.UserKeyBoundsFromInternal(smallest, largest) + if !r.cfg.CompactionBounds.ContainsBounds(r.cmp, &bounds) { + err = errors.CombineErrors(err, base.AssertionFailedf( + "output table %s bounds %s extend beyond compaction bounds %s", + description, bounds, r.cfg.CompactionBounds, + )) + } + if splitKey != nil && bounds.End.IsUpperBoundFor(r.cmp, splitKey) { + err = errors.CombineErrors(err, base.AssertionFailedf( + "output table %s bounds %s extend beyond split key %s", + description, bounds, splitKey, + )) + } + } + + if meta.HasPointKeys { + checkBounds(meta.SmallestPoint, meta.LargestPoint, "point key") + } + if meta.HasRangeDelKeys { + checkBounds(meta.SmallestRangeDel, meta.LargestRangeDel, "range del") + } + if meta.HasRangeKeys { + checkBounds(meta.SmallestRangeKey, meta.LargestRangeKey, "range key") + } + return err +} + +// getInternalWriterProperties accesses a private variable (in the +// internal/private package) initialized by the sstable Writer. This indirection +// is necessary to ensure non-Pebble users constructing sstables for ingestion +// are unable to set internal-only properties. +var getInternalWriterProperties = private.SSTableInternalProperties.(func(*sstable.Writer) *sstable.Properties) + +func spanStartOrNil(s *keyspan.Span) []byte { + if s.Empty() { + return nil + } + return s.Start +} diff --git a/internal/compact/run_test.go b/internal/compact/run_test.go index 4b042f6b61..260d664386 100644 --- a/internal/compact/run_test.go +++ b/internal/compact/run_test.go @@ -35,12 +35,14 @@ func TestTableSplitLimit(t *testing.T) { case "split-limit": var maxOverlap uint64 d.MaybeScanArgs(t, "max-overlap", &maxOverlap) - cfg := RunnerConfig{ - L0SplitKeys: v.L0Sublevels.FlushSplitKeys(), - Grandparents: v.Levels[1].Slice(), - MaxGrandparentOverlapBytes: maxOverlap, + r := &Runner{ + cmp: base.DefaultComparer.Compare, + cfg: RunnerConfig{ + L0SplitKeys: v.L0Sublevels.FlushSplitKeys(), + Grandparents: v.Levels[1].Slice(), + MaxGrandparentOverlapBytes: maxOverlap, + }, } - r := NewRunner(cfg, &Iter{cmp: base.DefaultComparer.Compare}) for _, k := range strings.Fields(d.Input) { res := r.TableSplitLimit([]byte(k)) if res == nil {