diff --git a/compaction.go b/compaction.go index 0e804e3fe0..bccd3bdf12 100644 --- a/compaction.go +++ b/compaction.go @@ -40,12 +40,6 @@ var compactLabels = pprof.Labels("pebble", "compact") var flushLabels = pprof.Labels("pebble", "flush") var gcLabels = pprof.Labels("pebble", "gc") -// getInternalWriterProperties accesses a private variable (in the -// internal/private package) initialized by the sstable Writer. This indirection -// is necessary to ensure non-Pebble users constructing sstables for ingestion -// are unable to set internal-only properties. -var getInternalWriterProperties = private.SSTableInternalProperties.(func(*sstable.Writer) *sstable.Properties) - // expandedCompactionByteSizeLimit is the maximum number of bytes in all // compacted files. We avoid expanding the lower level file set of a compaction // if it would make the total compaction cover more than this many bytes. @@ -1418,7 +1412,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { startTime := d.timeNow() var ve *manifest.VersionEdit - var stats compactStats + var stats compact.Stats // To determine the target level of the files in the ingestedFlushable, we // need to acquire the logLock, and not release it for that duration. Since, // we need to acquire the logLock below to perform the logAndApply step @@ -1527,9 +1521,9 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { // If err != nil, then the flush will be retried, and we will recalculate // these metrics. if err == nil { - d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys - d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize - d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels + d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys + d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize + d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels } d.clearCompactingState(c, err != nil) @@ -2131,9 +2125,9 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { e := &ve.NewFiles[i] info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo()) } - d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys - d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize - d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels + d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys + d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize + d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels } // NB: clearing compacting state must occur before updating the read state; @@ -2158,12 +2152,6 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { return err } -type compactStats struct { - cumulativePinnedKeys uint64 - cumulativePinnedSize uint64 - countMissizedDels uint64 -} - // runCopyCompaction runs a copy compaction where a new FileNum is created that // is a byte-for-byte copy of the input file or span thereof in some cases. This // is used in lieu of a move compaction when a file is being moved across the @@ -2327,7 +2315,7 @@ func (d *DB) runCopyCompaction( func (d *DB) runDeleteOnlyCompaction( jobID JobID, c *compaction, -) (ve *versionEdit, stats compactStats, retErr error) { +) (ve *versionEdit, stats compact.Stats, retErr error) { c.metrics = make(map[int]*LevelMetrics, len(c.inputs)) ve = &versionEdit{ DeletedFiles: map[deletedFileEntry]*fileMetadata{}, @@ -2348,7 +2336,7 @@ func (d *DB) runDeleteOnlyCompaction( func (d *DB) runMoveOrCopyCompaction( jobID JobID, c *compaction, -) (ve *versionEdit, stats compactStats, _ error) { +) (ve *versionEdit, stats compact.Stats, _ error) { iter := c.startLevel.files.Iter() meta := iter.First() if invariants.Enabled { @@ -2394,7 +2382,7 @@ func (d *DB) runMoveOrCopyCompaction( // re-acquired during the course of this method. func (d *DB) runCompaction( jobID JobID, c *compaction, -) (ve *versionEdit, stats compactStats, retErr error) { +) (ve *versionEdit, stats compact.Stats, retErr error) { switch c.kind { case compactionKindDeleteOnly: return d.runDeleteOnlyCompaction(jobID, c) @@ -2405,7 +2393,6 @@ func (d *DB) runCompaction( } snapshots := d.mu.snapshots.toSlice() - formatVers := d.FormatMajorVersion() if c.flushing == nil { // Before dropping the db mutex, grab a ref to the current version. This @@ -2423,11 +2410,44 @@ func (d *DB) runCompaction( return ve, stats, ErrCancelledCompaction } + // The table is typically written at the maximum allowable format implied by + // the current format major version of the DB. + tableFormat := d.FormatMajorVersion().MaxTableFormat() + // In format major versions with maximum table formats of Pebblev3, value + // blocks were conditional on an experimental setting. In format major + // versions with maximum table formats of Pebblev4 and higher, value blocks + // are always enabled. + if tableFormat == sstable.TableFormatPebblev3 && + (d.opts.Experimental.EnableValueBlocks == nil || !d.opts.Experimental.EnableValueBlocks()) { + tableFormat = sstable.TableFormatPebblev2 + } + // Release the d.mu lock while doing I/O. // Note the unusual order: Unlock and then Lock. d.mu.Unlock() defer d.mu.Lock() + result := d.compactAndWrite(jobID, c, snapshots, tableFormat) + if result.Err == nil { + ve, result.Err = c.makeVersionEdit(result) + } + if result.Err != nil { + // Delete any created tables. + for i := range result.Tables { + _ = d.objProvider.Remove(fileTypeTable, result.Tables[i].ObjMeta.DiskFileNum) + } + } + // Refresh the disk available statistic whenever a compaction/flush + // completes, before re-acquiring the mutex. + d.calculateDiskAvailableBytes() + return ve, result.Stats, result.Err +} + +// compactAndWrite runs the data part of a compaction, where we set up a +// compaction iterator and use it to write output tables. +func (d *DB) compactAndWrite( + jobID JobID, c *compaction, snapshots compact.Snapshots, tableFormat sstable.TableFormat, +) (result compact.Result) { // Compactions use a pool of buffers to read blocks, avoiding polluting the // block cache with blocks that will not be read again. We initialize the // buffer pool with a size 12. This initial size does not need to be @@ -2455,8 +2475,13 @@ func (d *DB) runCompaction( defer c.bufferPool.Release() pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, d.tableNewRangeKeyIter) + defer func() { + for _, closer := range c.closers { + result.Err = firstError(result.Err, closer.Close()) + } + }() if err != nil { - return nil, stats, err + return compact.Result{Err: err} } c.allowedZeroSeqNum = c.allowZeroSeqNum() cfg := compact.IterConfig{ @@ -2470,35 +2495,50 @@ func (d *DB) runCompaction( SingleDeleteInvariantViolationCallback: d.opts.Experimental.SingleDeleteInvariantViolationCallback, } iter := compact.NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter) - var lastRangeDelSpan, lastRangeKeySpan keyspan.Span - - var ( - createdFiles []base.DiskFileNum - tw *sstable.Writer - pinnedKeySize uint64 - pinnedValueSize uint64 - pinnedCount uint64 - ) - defer func() { - if iter != nil { - retErr = firstError(retErr, iter.Close()) - } - if tw != nil { - retErr = firstError(retErr, tw.Close()) - } - if retErr != nil { - for _, fileNum := range createdFiles { - _ = d.objProvider.Remove(fileTypeTable, fileNum) - } + + runnerCfg := compact.RunnerConfig{ + CompactionBounds: base.UserKeyBoundsFromInternal(c.smallest, c.largest), + L0SplitKeys: c.l0Limits, + Grandparents: c.grandparents, + MaxGrandparentOverlapBytes: c.maxOverlapBytes, + TargetOutputFileSize: c.maxOutputFileSize, + } + runner := compact.NewRunner(runnerCfg, iter) + for runner.MoreDataToWrite() { + if c.cancel.Load() { + return runner.Finish().WithError(ErrCancelledCompaction) } - for _, closer := range c.closers { - retErr = firstError(retErr, closer.Close()) + // Create a new table. + writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat) + objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts) + if err != nil { + return runner.Finish().WithError(err) } - }() + runner.WriteTable(objMeta, tw) + d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle) + } + result = runner.Finish() + if result.Err == nil { + result.Err = d.objProvider.Sync() + } + return result +} - ve = &versionEdit{ +// makeVersionEdit creates the version edit for a compaction, based on the +// tables in compact.Result. +func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error) { + ve := &versionEdit{ DeletedFiles: map[deletedFileEntry]*fileMetadata{}, } + for _, cl := range c.inputs { + iter := cl.files.Iter() + for f := iter.First(); f != nil; f = iter.Next() { + ve.DeletedFiles[deletedFileEntry{ + Level: cl.level, + FileNum: f.FileNum, + }] = f + } + } startLevelBytes := c.startLevel.files.SizeSum() outputMetrics := &LevelMetrics{ @@ -2523,312 +2563,75 @@ func (d *DB) runCompaction( outputMetrics.MultiLevel.BytesRead = outputMetrics.BytesRead } - // The table is typically written at the maximum allowable format implied by - // the current format major version of the DB. - tableFormat := formatVers.MaxTableFormat() inputLargestSeqNumAbsolute := c.inputLargestSeqNumAbsolute() + ve.NewFiles = make([]newFileEntry, len(result.Tables)) + for i := range result.Tables { + t := &result.Tables[i] - // In format major versions with maximum table formats of Pebblev3, value - // blocks were conditional on an experimental setting. In format major - // versions with maximum table formats of Pebblev4 and higher, value blocks - // are always enabled. - if tableFormat == sstable.TableFormatPebblev3 && - (d.opts.Experimental.EnableValueBlocks == nil || !d.opts.Experimental.EnableValueBlocks()) { - tableFormat = sstable.TableFormatPebblev2 - } - - writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat) - - // prevPointKey is a sstable.WriterOption that provides access to - // the last point key written to a writer's sstable. When a new - // output begins in newOutput, prevPointKey is updated to point to - // the new output's sstable.Writer. This allows the compaction loop - // to access the last written point key without requiring the - // compaction loop to make a copy of each key ahead of time. Users - // must be careful, because the byte slice returned by UnsafeKey - // points directly into the Writer's block buffer. - var cpuWorkHandle CPUWorkHandle - defer func() { - if cpuWorkHandle != nil { - d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle) - } - }() - - newOutput := func() error { - // Check if we've been cancelled by a concurrent operation. - if c.cancel.Load() { - return ErrCancelledCompaction - } - var objMeta objstorage.ObjectMetadata - var err error - objMeta, tw, cpuWorkHandle, err = d.newCompactionOutput(jobID, c, writerOpts) - if err != nil { - return err - } fileMeta := &fileMetadata{ - FileNum: base.PhysicalTableFileNum(objMeta.DiskFileNum), - CreationTime: time.Now().Unix(), - } - createdFiles = append(createdFiles, objMeta.DiskFileNum) - - ve.NewFiles = append(ve.NewFiles, newFileEntry{ - Level: c.outputLevel.level, - Meta: fileMeta, - }) - return nil - } - - // finishOutput is called with the a user key up to which all tombstones - // should be flushed. Typically, this is the first key of the next - // sstable or an empty key if this output is the final sstable. - finishOutput := func(splitKey []byte) error { - if err := compact.SplitAndEncodeSpan(c.cmp, &lastRangeDelSpan, splitKey, tw); err != nil { - return err - } - if err := compact.SplitAndEncodeSpan(c.cmp, &lastRangeKeySpan, splitKey, tw); err != nil { - return err - } - { - // Set internal sstable properties. - p := getInternalWriterProperties(tw) - // Set the snapshot pinned totals. - p.SnapshotPinnedKeys = pinnedCount - p.SnapshotPinnedKeySize = pinnedKeySize - p.SnapshotPinnedValueSize = pinnedValueSize - stats.cumulativePinnedKeys += pinnedCount - stats.cumulativePinnedSize += pinnedKeySize + pinnedValueSize - pinnedCount = 0 - pinnedKeySize = 0 - pinnedValueSize = 0 - } - if err := tw.Close(); err != nil { - tw = nil - return err + FileNum: base.PhysicalTableFileNum(t.ObjMeta.DiskFileNum), + CreationTime: t.CreationTime.Unix(), + Size: t.WriterMeta.Size, + SmallestSeqNum: t.WriterMeta.SmallestSeqNum, + LargestSeqNum: t.WriterMeta.LargestSeqNum, } - d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle) - cpuWorkHandle = nil - writerMeta, err := tw.Metadata() - if err != nil { - tw = nil - return err - } - tw = nil - meta := ve.NewFiles[len(ve.NewFiles)-1].Meta - meta.Size = writerMeta.Size - meta.SmallestSeqNum = writerMeta.SmallestSeqNum - meta.LargestSeqNum = writerMeta.LargestSeqNum if c.flushing == nil { // Set the file's LargestSeqNumAbsolute to be the maximum value of any // of the compaction's input sstables. // TODO(jackson): This could be narrowed to be the maximum of input // sstables that overlap the output sstable's key range. - meta.LargestSeqNumAbsolute = inputLargestSeqNumAbsolute + fileMeta.LargestSeqNumAbsolute = inputLargestSeqNumAbsolute } else { - meta.LargestSeqNumAbsolute = writerMeta.LargestSeqNum + fileMeta.LargestSeqNumAbsolute = t.WriterMeta.LargestSeqNum } - meta.InitPhysicalBacking() + fileMeta.InitPhysicalBacking() // If the file didn't contain any range deletions, we can fill its // table stats now, avoiding unnecessarily loading the table later. maybeSetStatsFromProperties( - meta.PhysicalMeta(), &writerMeta.Properties, + fileMeta.PhysicalMeta(), &t.WriterMeta.Properties, ) - if c.flushing == nil { - outputMetrics.TablesCompacted++ - outputMetrics.BytesCompacted += meta.Size - } else { - outputMetrics.TablesFlushed++ - outputMetrics.BytesFlushed += meta.Size - } - outputMetrics.Size += int64(meta.Size) - outputMetrics.NumFiles++ - outputMetrics.Additional.BytesWrittenDataBlocks += writerMeta.Properties.DataSize - outputMetrics.Additional.BytesWrittenValueBlocks += writerMeta.Properties.ValueBlocksSize - - if n := len(ve.NewFiles); n > 1 { - // This is not the first output file. Ensure the sstable boundaries - // are nonoverlapping. - prevMeta := ve.NewFiles[n-2].Meta - if writerMeta.SmallestRangeDel.UserKey != nil { - c := d.cmp(writerMeta.SmallestRangeDel.UserKey, prevMeta.Largest.UserKey) - if c < 0 { - return base.AssertionFailedf( - "pebble: smallest range tombstone start key is less than previous sstable largest key: %s < %s", - writerMeta.SmallestRangeDel.Pretty(d.opts.Comparer.FormatKey), - prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey)) - } else if c == 0 && !prevMeta.Largest.IsExclusiveSentinel() { - // The user key portion of the range boundary start key is - // equal to the previous table's largest key user key, and - // the previous table's largest key is not exclusive. This - // violates the invariant that tables are key-space - // partitioned. - return base.AssertionFailedf( - "pebble: invariant violation: previous sstable largest key %s, current sstable smallest rangedel: %s", - prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey), - writerMeta.SmallestRangeDel.Pretty(d.opts.Comparer.FormatKey), - ) - } - } - } - - // Verify that all range deletions outputted to the sstable are - // truncated to split key. - if splitKey != nil && writerMeta.LargestRangeDel.UserKey != nil && - d.cmp(writerMeta.LargestRangeDel.UserKey, splitKey) > 0 { - return errors.Errorf( - "pebble: invariant violation: rangedel largest key %q extends beyond split key %q", - writerMeta.LargestRangeDel.Pretty(d.opts.Comparer.FormatKey), - d.opts.Comparer.FormatKey(splitKey), - ) + if t.WriterMeta.HasPointKeys { + fileMeta.ExtendPointKeyBounds(c.cmp, t.WriterMeta.SmallestPoint, t.WriterMeta.LargestPoint) } - - if writerMeta.HasPointKeys { - meta.ExtendPointKeyBounds(d.cmp, writerMeta.SmallestPoint, writerMeta.LargestPoint) + if t.WriterMeta.HasRangeDelKeys { + fileMeta.ExtendPointKeyBounds(c.cmp, t.WriterMeta.SmallestRangeDel, t.WriterMeta.LargestRangeDel) } - if writerMeta.HasRangeDelKeys { - meta.ExtendPointKeyBounds(d.cmp, writerMeta.SmallestRangeDel, writerMeta.LargestRangeDel) + if t.WriterMeta.HasRangeKeys { + fileMeta.ExtendRangeKeyBounds(c.cmp, t.WriterMeta.SmallestRangeKey, t.WriterMeta.LargestRangeKey) } - if writerMeta.HasRangeKeys { - meta.ExtendRangeKeyBounds(d.cmp, writerMeta.SmallestRangeKey, writerMeta.LargestRangeKey) - } - - // Verify that the sstable bounds fall within the compaction input - // bounds. This is a sanity check that we don't have a logic error - // elsewhere that causes the sstable bounds to accidentally expand past the - // compaction input bounds as doing so could lead to various badness such - // as keys being deleted by a range tombstone incorrectly. - if c.smallest.UserKey != nil { - switch v := d.cmp(meta.Smallest.UserKey, c.smallest.UserKey); { - case v >= 0: - // Nothing to do. - case v < 0: - return errors.Errorf("pebble: compaction output grew beyond bounds of input: %s < %s", - meta.Smallest.Pretty(d.opts.Comparer.FormatKey), - c.smallest.Pretty(d.opts.Comparer.FormatKey)) - } - } - if c.largest.UserKey != nil { - switch v := d.cmp(meta.Largest.UserKey, c.largest.UserKey); { - case v <= 0: - // Nothing to do. - case v > 0: - return errors.Errorf("pebble: compaction output grew beyond bounds of input: %s > %s", - meta.Largest.Pretty(d.opts.Comparer.FormatKey), - c.largest.Pretty(d.opts.Comparer.FormatKey)) - } - } - // Verify that we never split different revisions of the same user key - // across two different sstables. - if err := c.errorOnUserKeyOverlap(ve); err != nil { - return err - } - if err := meta.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil { - return err - } - return nil - } - - runnerCfg := compact.RunnerConfig{ - Grandparents: c.grandparents, - MaxGrandparentOverlapBytes: c.maxOverlapBytes, - TargetOutputFileSize: c.maxOutputFileSize, - L0SplitKeys: c.l0Limits, - } - runner := compact.NewRunner(runnerCfg, iter) - - lastUserKeyFn := func() []byte { - return tw.UnsafeLastPointUserKey() - } - // Each outer loop iteration produces one output file. An iteration that - // produces a file containing point keys (and optionally range tombstones) - // guarantees that the input iterator advanced. An iteration that produces - // a file containing only range tombstones guarantees the limit passed to - // `finishOutput()` advanced to a strictly greater user key corresponding - // to a grandparent file largest key, or nil. Taken together, these - // progress guarantees ensure that eventually the input iterator will be - // exhausted and the range tombstone fragments will all be flushed. - for key, val := iter.First(); key != nil || !lastRangeDelSpan.Empty() || !lastRangeKeySpan.Empty(); { - firstKey := base.MinUserKey(c.cmp, spanStartOrNil(&lastRangeDelSpan), spanStartOrNil(&lastRangeKeySpan)) - if key != nil && firstKey == nil { - firstKey = key.UserKey - } - if invariants.Enabled && firstKey == nil { - panic("nil first key") - } - splitter := compact.NewOutputSplitter( - c.cmp, firstKey, runner.TableSplitLimit(firstKey), c.maxOutputFileSize, c.grandparents.Iter(), iter.Frontiers(), - ) - if err := newOutput(); err != nil { - return nil, stats, err + ve.NewFiles[i] = newFileEntry{ + Level: c.outputLevel.level, + Meta: fileMeta, } - // Each inner loop iteration processes one key from the input iterator. - for ; key != nil; key, val = iter.Next() { - if splitter.ShouldSplitBefore(key.UserKey, tw.EstimatedSize(), lastUserKeyFn) { - break - } - - switch key.Kind() { - case InternalKeyKindRangeDelete: - // The previous span (if any) must end at or before this key, since the - // spans we receive are non-overlapping. - if err := tw.EncodeSpan(&lastRangeDelSpan); err != nil { - return nil, stats, err - } - lastRangeDelSpan.CopyFrom(iter.Span()) - continue - case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete: - // The previous span (if any) must end at or before this key, since the - // spans we receive are non-overlapping. - if err := tw.EncodeSpan(&lastRangeKeySpan); err != nil { - return nil, stats, err - } - lastRangeKeySpan.CopyFrom(iter.Span()) - continue - } - if err := tw.AddWithForceObsolete(*key, val, iter.ForceObsoleteDueToRangeDel()); err != nil { - return nil, stats, err - } - if iter.SnapshotPinned() { - // The kv pair we just added to the sstable was only surfaced by - // the compaction iterator because an open snapshot prevented - // its elision. Increment the stats. - pinnedCount++ - pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen - pinnedValueSize += uint64(len(val)) - } - } - if err := finishOutput(splitter.SplitKey()); err != nil { - return nil, stats, err + // Update metrics. + if c.flushing == nil { + outputMetrics.TablesCompacted++ + outputMetrics.BytesCompacted += fileMeta.Size + } else { + outputMetrics.TablesFlushed++ + outputMetrics.BytesFlushed += fileMeta.Size } + outputMetrics.Size += int64(fileMeta.Size) + outputMetrics.NumFiles++ + outputMetrics.Additional.BytesWrittenDataBlocks += t.WriterMeta.Properties.DataSize + outputMetrics.Additional.BytesWrittenValueBlocks += t.WriterMeta.Properties.ValueBlocksSize } - for _, cl := range c.inputs { - iter := cl.files.Iter() - for f := iter.First(); f != nil; f = iter.Next() { - ve.DeletedFiles[deletedFileEntry{ - Level: cl.level, - FileNum: f.FileNum, - }] = f + // Sanity check that the tables are ordered and don't overlap. + for i := 1; i < len(ve.NewFiles); i++ { + if ve.NewFiles[i-1].Meta.UserKeyBounds().End.IsUpperBoundFor(c.cmp, ve.NewFiles[i].Meta.Smallest.UserKey) { + return nil, base.AssertionFailedf("pebble: compaction output tables overlap: %s and %s", + ve.NewFiles[i-1].Meta.DebugString(c.formatKey, true), + ve.NewFiles[i].Meta.DebugString(c.formatKey, true), + ) } } - // The compaction iterator keeps track of a count of the number of DELSIZED - // keys that encoded an incorrect size. Propagate it up as a part of - // compactStats. - stats.countMissizedDels = iter.Stats().CountMissizedDels - - if err := d.objProvider.Sync(); err != nil { - return nil, stats, err - } - - // Refresh the disk available statistic whenever a compaction/flush - // completes, before re-acquiring the mutex. - _ = d.calculateDiskAvailableBytes() - - return ve, stats, nil + return ve, nil } // newCompactionOutput creates an object for a new table produced by a @@ -2928,10 +2731,3 @@ func validateVersionEdit( validateKey(m, m.Largest.UserKey) } } - -func spanStartOrNil(s *keyspan.Span) []byte { - if s.Empty() { - return nil - } - return s.Start -} diff --git a/internal/compact/run.go b/internal/compact/run.go index 3ee3cac80b..f7591e9dad 100644 --- a/internal/compact/run.go +++ b/internal/compact/run.go @@ -8,8 +8,11 @@ import ( "sort" "time" + "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" + "github.com/cockroachdb/pebble/internal/private" "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/sstable" ) @@ -17,15 +20,21 @@ import ( // Result stores the result of a compaction - more specifically, the "data" part // where we use the compaction iterator to write output tables. type Result struct { - // Err is the result of the compaction. - // - // On success, Err is nil and Tables stores the output tables. - // - // On failure, Err is set and Tables stores the tables created so far (and - // which need to be cleaned up). - Err error - + // Err is the result of the compaction. On success, Err is nil and Tables + // stores the output tables. On failure, Err is set and Tables stores the + // tables created so far (and which need to be cleaned up). + Err error Tables []OutputTable + Stats Stats +} + +// WithError returns a modified Result which has the Err field set. +func (r Result) WithError(err error) Result { + return Result{ + Err: errors.CombineErrors(r.Err, err), + Tables: r.Tables, + Stats: r.Stats, + } } // OutputTable contains metadata about a table that was created during a compaction. @@ -38,8 +47,19 @@ type OutputTable struct { WriterMeta sstable.WriterMetadata } +// Stats describes stats collected during the compaction. +type Stats struct { + CumulativePinnedKeys uint64 + CumulativePinnedSize uint64 + CountMissizedDels uint64 +} + // RunnerConfig contains the parameters needed for the Runner. type RunnerConfig struct { + // CompactionBounds are the bounds containing all the input tables. All output + // tables must fall within these bounds as well. + CompactionBounds base.UserKeyBounds + // L0SplitKeys is only set for flushes and it contains the flush split keys // (see L0Sublevels.FlushSplitKeys). These are split points enforced for the // output tables. @@ -63,23 +83,169 @@ type RunnerConfig struct { // Runner is a helper for running the "data" part of a compaction (where we use // the compaction iterator to write output tables). +// +// Sample usage: +// +// r := NewRunner(cfg, iter) +// for r.MoreDataToWrite() { +// objMeta, tw := ... // Create object and table writer. +// r.WriteTable(objMeta, tw) +// } +// result := r.Finish() type Runner struct { cmp base.Compare cfg RunnerConfig iter *Iter + + tables []OutputTable + // Stores any error encountered. + err error + // Last key/value returned by the compaction iterator. + key *base.InternalKey + value []byte + // Last RANGEDEL span (or portion of it) that was not yet written to a table. + lastRangeDelSpan keyspan.Span + // Last range key span (or portion of it) that was not yet written to a table. + lastRangeKeySpan keyspan.Span + stats Stats } // NewRunner creates a new Runner. -// TODO(radu): document usage once we have more functionality. func NewRunner(cfg RunnerConfig, iter *Iter) *Runner { r := &Runner{ cmp: iter.cmp, cfg: cfg, iter: iter, } + r.key, r.value = r.iter.First() return r } +// MoreDataToWrite returns true if there is more data to be written. +func (r *Runner) MoreDataToWrite() bool { + if r.err != nil { + return false + } + return r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty() +} + +// WriteTable writes a new output table. This table will be part of +// Result.Tables. Should only be called if MoreDataToWrite() returned true. +// +// WriteTable always closes the Writer. +func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.Writer) { + if r.err != nil { + panic("error already encountered") + } + r.tables = append(r.tables, OutputTable{ + CreationTime: time.Now(), + ObjMeta: objMeta, + }) + splitKey, err := r.writeKeysToTable(tw) + err = errors.CombineErrors(err, tw.Close()) + if err != nil { + r.err = err + r.key, r.value = nil, nil + return + } + writerMeta, err := tw.Metadata() + if err != nil { + r.err = err + return + } + if err := r.validateWriterMeta(writerMeta, splitKey); err != nil { + r.err = err + return + } + r.tables[len(r.tables)-1].WriterMeta = *writerMeta +} + +func (r *Runner) writeKeysToTable(tw *sstable.Writer) (splitKey []byte, _ error) { + firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan)) + if r.key != nil && firstKey == nil { + firstKey = r.key.UserKey + } + if firstKey == nil { + return nil, base.AssertionFailedf("no data to write") + } + splitter := NewOutputSplitter( + r.cmp, firstKey, r.TableSplitLimit(firstKey), + r.cfg.TargetOutputFileSize, r.cfg.Grandparents.Iter(), r.iter.Frontiers(), + ) + lastUserKeyFn := func() []byte { + return tw.UnsafeLastPointUserKey() + } + var pinnedKeySize, pinnedValueSize, pinnedCount uint64 + key, value := r.key, r.value + for ; key != nil; key, value = r.iter.Next() { + if splitter.ShouldSplitBefore(key.UserKey, tw.EstimatedSize(), lastUserKeyFn) { + break + } + + switch key.Kind() { + case base.InternalKeyKindRangeDelete: + // The previous span (if any) must end at or before this key, since the + // spans we receive are non-overlapping. + if err := tw.EncodeSpan(&r.lastRangeDelSpan); r.err != nil { + return nil, err + } + r.lastRangeDelSpan.CopyFrom(r.iter.Span()) + continue + + case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete: + // The previous span (if any) must end at or before this key, since the + // spans we receive are non-overlapping. + if err := tw.EncodeSpan(&r.lastRangeKeySpan); err != nil { + return nil, err + } + r.lastRangeKeySpan.CopyFrom(r.iter.Span()) + continue + } + if err := tw.AddWithForceObsolete(*key, value, r.iter.ForceObsoleteDueToRangeDel()); err != nil { + return nil, err + } + if r.iter.SnapshotPinned() { + // The kv pair we just added to the sstable was only surfaced by + // the compaction iterator because an open snapshot prevented + // its elision. Increment the stats. + pinnedCount++ + pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen + pinnedValueSize += uint64(len(value)) + } + } + r.key, r.value = key, value + splitKey = splitter.SplitKey() + if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeDelSpan, splitKey, tw); err != nil { + return nil, err + } + if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeKeySpan, splitKey, tw); err != nil { + return nil, err + } + // Set internal sstable properties. + p := getInternalWriterProperties(tw) + // Set the snapshot pinned totals. + p.SnapshotPinnedKeys = pinnedCount + p.SnapshotPinnedKeySize = pinnedKeySize + p.SnapshotPinnedValueSize = pinnedValueSize + r.stats.CumulativePinnedKeys += pinnedCount + r.stats.CumulativePinnedSize += pinnedKeySize + pinnedValueSize + return splitKey, nil +} + +// Finish closes the compaction iterator and returns the result of the +// compaction. +func (r *Runner) Finish() Result { + r.err = errors.CombineErrors(r.err, r.iter.Close()) + // The compaction iterator keeps track of a count of the number of DELSIZED + // keys that encoded an incorrect size. + r.stats.CountMissizedDels = r.iter.Stats().CountMissizedDels + return Result{ + Err: r.err, + Tables: r.tables, + Stats: r.stats, + } +} + // TableSplitLimit returns a hard split limit for an output table that starts at // startKey (which must be strictly greater than startKey), or nil if there is // no limit. @@ -125,3 +291,53 @@ func (r *Runner) TableSplitLimit(startKey []byte) []byte { return limitKey } + +// validateWriterMeta runs some sanity cehcks on the WriterMetadata on an output +// table that was just finished. splitKey is the key where the table must have +// ended (or nil). +func (r *Runner) validateWriterMeta(meta *sstable.WriterMetadata, splitKey []byte) error { + if !meta.HasPointKeys && !meta.HasRangeDelKeys && !meta.HasRangeKeys { + return base.AssertionFailedf("output table has no keys") + } + + var err error + checkBounds := func(smallest, largest base.InternalKey, description string) { + bounds := base.UserKeyBoundsFromInternal(smallest, largest) + if !r.cfg.CompactionBounds.ContainsBounds(r.cmp, &bounds) { + err = errors.CombineErrors(err, base.AssertionFailedf( + "output table %s bounds %s extend beyond compaction bounds %s", + description, bounds, r.cfg.CompactionBounds, + )) + } + if splitKey != nil && bounds.End.IsUpperBoundFor(r.cmp, splitKey) { + err = errors.CombineErrors(err, base.AssertionFailedf( + "output table %s bounds %s extend beyond split key %s", + description, bounds, splitKey, + )) + } + } + + if meta.HasPointKeys { + checkBounds(meta.SmallestPoint, meta.LargestPoint, "point key") + } + if meta.HasRangeDelKeys { + checkBounds(meta.SmallestRangeDel, meta.LargestRangeDel, "range del") + } + if meta.HasRangeKeys { + checkBounds(meta.SmallestRangeKey, meta.LargestRangeKey, "range key") + } + return err +} + +// getInternalWriterProperties accesses a private variable (in the +// internal/private package) initialized by the sstable Writer. This indirection +// is necessary to ensure non-Pebble users constructing sstables for ingestion +// are unable to set internal-only properties. +var getInternalWriterProperties = private.SSTableInternalProperties.(func(*sstable.Writer) *sstable.Properties) + +func spanStartOrNil(s *keyspan.Span) []byte { + if s.Empty() { + return nil + } + return s.Start +} diff --git a/internal/compact/run_test.go b/internal/compact/run_test.go index 4b042f6b61..260d664386 100644 --- a/internal/compact/run_test.go +++ b/internal/compact/run_test.go @@ -35,12 +35,14 @@ func TestTableSplitLimit(t *testing.T) { case "split-limit": var maxOverlap uint64 d.MaybeScanArgs(t, "max-overlap", &maxOverlap) - cfg := RunnerConfig{ - L0SplitKeys: v.L0Sublevels.FlushSplitKeys(), - Grandparents: v.Levels[1].Slice(), - MaxGrandparentOverlapBytes: maxOverlap, + r := &Runner{ + cmp: base.DefaultComparer.Compare, + cfg: RunnerConfig{ + L0SplitKeys: v.L0Sublevels.FlushSplitKeys(), + Grandparents: v.Levels[1].Slice(), + MaxGrandparentOverlapBytes: maxOverlap, + }, } - r := NewRunner(cfg, &Iter{cmp: base.DefaultComparer.Compare}) for _, k := range strings.Fields(d.Input) { res := r.TableSplitLimit([]byte(k)) if res == nil {