Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: add SharedLowerUserKeyPrefix and WriteSharedWithStrictObsolete op… #3624

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2518,14 +2518,23 @@ func (d *DB) compactAndWrite(
MaxGrandparentOverlapBytes: c.maxOverlapBytes,
TargetOutputFileSize: c.maxOutputFileSize,
}
considerCreateShared := remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level)
if considerCreateShared {
runnerCfg.ConsiderCreateShared = true
runnerCfg.SharedLowerUserKeyPrefix = d.opts.Experimental.SharedLowerUserKeyPrefix
}
runner := compact.NewRunner(runnerCfg, iter)
for runner.MoreDataToWrite() {
for {
moreData, keyShouldBeWrittenToShared := runner.MoreDataToWrite()
if !moreData {
break
}
if c.cancel.Load() {
return runner.Finish().WithError(ErrCancelledCompaction)
}
// Create a new table.
writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts)
objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts, keyShouldBeWrittenToShared)
if err != nil {
return runner.Finish().WithError(err)
}
Expand Down Expand Up @@ -2652,7 +2661,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error
// newCompactionOutput creates an object for a new table produced by a
// compaction or flush.
func (d *DB) newCompactionOutput(
jobID JobID, c *compaction, writerOpts sstable.WriterOptions,
jobID JobID, c *compaction, writerOpts sstable.WriterOptions, preferSharedStorage bool,
) (objstorage.ObjectMetadata, *sstable.Writer, CPUWorkHandle, error) {
d.mu.Lock()
diskFileNum := d.mu.versions.getNextDiskFileNum()
Expand Down Expand Up @@ -2689,7 +2698,7 @@ func (d *DB) newCompactionOutput(

// Prefer shared storage if present.
createOpts := objstorage.CreateOptions{
PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
PreferSharedStorage: preferSharedStorage,
WriteCategory: writeCategory,
}
writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts)
Expand Down Expand Up @@ -2721,6 +2730,9 @@ func (d *DB) newCompactionOutput(
d.opts.Experimental.MaxWriterConcurrency > 0 &&
(cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)

if d.opts.Experimental.WriteSharedWithStrictObsolete && objMeta.IsShared() {
writerOpts.IsStrictObsolete = true
}
tw := sstable.NewWriter(writable, writerOpts, cacheOpts)
return objMeta, tw, cpuWorkHandle, nil
}
Expand Down
11 changes: 11 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,17 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
// creator ID was set (as creator IDs are necessary to enable shared storage)
// resulting in some lower level SSTs being on non-shared storage. Skip-shared
// iteration is invalid in those cases.
//
// The above error handling implies that ScanInternal with a non-nil
// VisitSharedFile can only be called without error if both the following
// conditions are true:
//
// - All files in the LSM conform to remote.CreateOnSharedLower strategy (they
// can of course conform to the stronger remote.CreateOnSharedAll).
//
// - If Options.Experimental.SharedLowerUserKeyPrefix is non-nil, the lower
// parameter must have a prefix that is greater than or equal to this lower
// bound.
func (d *DB) ScanInternal(
ctx context.Context,
categoryAndQoS sstable.CategoryAndQoS,
Expand Down
79 changes: 72 additions & 7 deletions internal/compact/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ type RunnerConfig struct {
// during compaction. In practice, the sizes can vary between 50%-200% of this
// value.
TargetOutputFileSize uint64

// ConsiderCreateShared is true if this compaction can write shared files if
// SharedLowerUserKeyPrefix allows.
ConsiderCreateShared bool

// Set equal to Options.Experimental.SharedLowerUserKeyPrefix, when
// ConsiderCreateShared is true, else nil.
SharedLowerUserKeyPrefix []byte
}

// Runner is a helper for running the "data" part of a compaction (where we use
Expand All @@ -97,6 +105,11 @@ type Runner struct {
cfg RunnerConfig
iter *Iter

split base.Split
// At most one state transition from false => true, when the next key should
// be written to shared storage.
keyShouldBeWrittenToShared bool

tables []OutputTable
// Stores any error encountered.
err error
Expand All @@ -113,20 +126,67 @@ type Runner struct {
// NewRunner creates a new Runner.
func NewRunner(cfg RunnerConfig, iter *Iter) *Runner {
r := &Runner{
cmp: iter.cmp,
cfg: cfg,
iter: iter,
cmp: iter.cmp,
cfg: cfg,
iter: iter,
split: iter.cfg.Comparer.Split,
}
if cfg.SharedLowerUserKeyPrefix == nil {
r.keyShouldBeWrittenToShared = cfg.ConsiderCreateShared
} else if r.cmp(cfg.CompactionBounds.Start[:r.split(cfg.CompactionBounds.Start)], cfg.SharedLowerUserKeyPrefix) >= 0 {
// All keys in the compaction are in the shared key space.
r.keyShouldBeWrittenToShared = true
// No more need to do key comparisons.
r.cfg.SharedLowerUserKeyPrefix = nil
} else {
// Check if the compaction will only write non-shared keys.
endKeyPrefix := base.UserKeyBoundary{
Key: cfg.CompactionBounds.End.Key[:r.split(cfg.CompactionBounds.End.Key)],
Kind: cfg.CompactionBounds.End.Kind,
}
// By taking the prefix, we turn an exclusive user-key bound into an inclusive user-key prefix bound.
if endKeyPrefix.Kind == base.Exclusive && len(endKeyPrefix.Key) < len(cfg.CompactionBounds.End.Key) {
endKeyPrefix.Kind = base.Inclusive
}
c := r.cmp(endKeyPrefix.Key, cfg.SharedLowerUserKeyPrefix)
if c < 0 || c == 0 && endKeyPrefix.Kind == base.Exclusive {
r.keyShouldBeWrittenToShared = false
// No more need to do key comparisons, since compaction only writes
// non-shared keys.
r.cfg.SharedLowerUserKeyPrefix = nil
}
}
r.key, r.value = r.iter.First()
return r
}

// MoreDataToWrite returns true if there is more data to be written.
func (r *Runner) MoreDataToWrite() bool {
func (r *Runner) MoreDataToWrite() (moreData bool, keyShouldBeWrittenToShared bool) {
if r.err != nil {
return false
return false, false
}
moreData = r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty()
if moreData && !r.keyShouldBeWrittenToShared && r.cfg.SharedLowerUserKeyPrefix != nil {
// May be stepping to a shared key.
firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan))
if r.key != nil && firstKey == nil {
firstKey = r.key.UserKey
}
if firstKey == nil {
panic(base.AssertionFailedf("no data to write"))
}
firstKeyPrefix := firstKey[:r.split(firstKey)]
cmp := r.cmp(firstKeyPrefix, r.cfg.SharedLowerUserKeyPrefix)
if cmp >= 0 {
r.keyShouldBeWrittenToShared = true
// No more need to do key comparisons.
r.cfg.SharedLowerUserKeyPrefix = nil
}
// Else cmp < 0, i.e., firstKeyPrefix < SharedLowerUserKeyPrefix. So
// firstKey < SharedLowerUserKeyPrefix, and we can safely use the latter
// as the split-limit in writeKeysToTable below.
}
return r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty()
return moreData, r.keyShouldBeWrittenToShared
}

// WriteTable writes a new output table. This table will be part of
Expand Down Expand Up @@ -168,8 +228,13 @@ func (r *Runner) writeKeysToTable(tw *sstable.Writer) (splitKey []byte, _ error)
if firstKey == nil {
return nil, base.AssertionFailedf("no data to write")
}
tableSplitLimit := r.TableSplitLimit(firstKey)
if r.cfg.SharedLowerUserKeyPrefix != nil &&
(tableSplitLimit == nil || r.cmp(r.cfg.SharedLowerUserKeyPrefix, tableSplitLimit) < 0) {
tableSplitLimit = r.cfg.SharedLowerUserKeyPrefix
}
splitter := NewOutputSplitter(
r.cmp, firstKey, r.TableSplitLimit(firstKey),
r.cmp, firstKey, tableSplitLimit,
r.cfg.TargetOutputFileSize, r.cfg.Grandparents.Iter(), r.iter.Frontiers(),
)
lastUserKeyFn := func() []byte {
Expand Down
19 changes: 16 additions & 3 deletions internal/compact/splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ type OutputSplitter struct {

shouldSplitCalled bool

pastStartKey bool

nextBoundary splitterBoundary
// reachedBoundary is set when the frontier reaches a boundary and is cleared
// in the first ShouldSplitBefore call after that.
Expand Down Expand Up @@ -212,8 +214,18 @@ func (s *OutputSplitter) ShouldSplitBefore(
panic("ShouldSplitBefore called after it returned SplitNow")
}
if !s.shouldSplitCalled {
// The boundary could have been advanced to nextUserKey before the splitter
// was created. So one single time, we advance the boundary manually.
// The boundary could have been advanced to nextUserKey before the
// splitter was created (the compact.Iter was at nextUserKey when a
// previous OutputSplitter decided to split-before). So one single time,
// we advance the boundary manually.
//
// Note that this first nextUserKey can be ahead of
// OutputSplitter.startKey, since the startKey is decided by the previous
// split key. For example, the preceding file was split at c, resulting in
// splitting of a rangedel [a,f) into [a,c) and [c,f) where [a,c) is
// included in the preceding file. The compact.Iter is at key e (which
// happens to be a point key). The startKey will be c, and nextUserKey
// will be e. We have the opportunity here to split at d.
s.shouldSplitCalled = true
for s.nextBoundary.key != nil && s.cmp(s.nextBoundary.key, nextUserKey) <= 0 {
s.boundaryReached(nextUserKey)
Expand Down Expand Up @@ -250,9 +262,10 @@ func (s *OutputSplitter) ShouldSplitBefore(

// When the target file size limit is very small (in tests), we could end up
// splitting at the first key, which is not allowed.
if s.cmp(nextUserKey, s.startKey) <= 0 {
if !s.pastStartKey && s.cmp(nextUserKey, s.startKey) <= 0 {
return NoSplit
}
s.pastStartKey = true

// TODO(radu): it would make for a cleaner interface if we didn't rely on a
// lastUserKeyFn. We could make a copy of the key here and split at the next
Expand Down
12 changes: 12 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,18 @@ type Options struct {
// CreateOnSharedLocator).
CreateOnShared remote.CreateOnSharedStrategy
CreateOnSharedLocator remote.Locator
// SharedLowerUserKeyPrefix, if specified, is an additional lower bound
// constraint on key prefixes that should be written to shared files.
SharedLowerUserKeyPrefix []byte
// WriteSharedWithStrictObsolete specifies that shared sstables are
// written with WriterOptions.IsStrictObsolete set to true. Strict
// obsolete tables do not permit merge keys.
WriteSharedWithStrictObsolete bool
// TODO(sumeer): add ReadSharedRequiresStrictObsolete to require that
// shared files visited via ScanInternal parameter func(sst
// *SharedSSTMeta) must be strict obsolete. That visit only has access to
// FileMetadata, so we will need to encode the StrictObsolete bit in
// there.

// CacheSizeBytesBytes is the size of the on-disk block cache for objects
// on shared storage in bytes. If it is 0, no cache is used.
Expand Down
Loading