Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jbowens committed Feb 12, 2024
1 parent 38e3430 commit 2d0549b
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 9 deletions.
49 changes: 42 additions & 7 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,33 @@ type compaction struct {
pickerMetrics compactionPickerMetrics
}

// objectCreator provides the subset of the objstorage.Provider interface
// necessary for compactions and flushes. It's typically satisfied by
// d.objProvider but may be satisfied by bufferedSSTables during flushes.
type objectCreator interface {
// Create creates a new object and opens it for writing.
//
// The object is not guaranteed to be durable (accessible in case of crashes)
// until Sync is called.
Create(
ctx context.Context,
fileType base.FileType,
FileNum base.DiskFileNum,
opts objstorage.CreateOptions,
) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error)
// Path returns an internal, implementation-dependent path for the object. It is
// meant to be used for informational purposes (like logging).
Path(meta objstorage.ObjectMetadata) string
// Remove removes an object.
//
// The object is not guaranteed to be durably removed until Sync is called.
Remove(fileType base.FileType, FileNum base.DiskFileNum) error
// Sync flushes the metadata from creation or removal of objects since the
// last Sync. This includes objects that have been Created but for which
// Writable.Finish() has not yet been called.
Sync() error
}

func (c *compaction) makeInfo(jobID int) CompactionInfo {
info := CompactionInfo{
JobID: jobID,
Expand Down Expand Up @@ -1916,6 +1943,9 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
})
startTime := d.timeNow()

// Compactions always write directly to the database's object provider.
// Flushes may write to an in-memory object provider first.
var objCreator objectCreator = d.objProvider
var ve *manifest.VersionEdit
var pendingOutputs []physicalMeta
var stats compactStats
Expand All @@ -1926,9 +1956,13 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
// runCompaction. For all other flush cases, we construct the VersionEdit
// inside runCompaction.
if c.kind != compactionKindIngestedFlushable {
ve, pendingOutputs, stats, err = d.runCompaction(jobID, c)
ve, pendingOutputs, stats, err = d.runCompaction(jobID, c, objCreator)
}

// TODO(aadityas,jackson): If the buffered output sstables are too small,
// avoid linking them into the version and just update the flushable queue
// appropriately.

// Acquire logLock. This will be released either on an error, by way of
// logUnlock, or through a call to logAndApply if there is no error.
d.mu.versions.logLock()
Expand Down Expand Up @@ -2626,7 +2660,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
d.opts.EventListener.CompactionBegin(info)
startTime := d.timeNow()

ve, pendingOutputs, stats, err := d.runCompaction(jobID, c)
ve, pendingOutputs, stats, err := d.runCompaction(jobID, c, d.objProvider)

info.Duration = d.timeNow().Sub(startTime)
if err == nil {
Expand Down Expand Up @@ -2792,7 +2826,7 @@ func (d *DB) runCopyCompaction(
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) runCompaction(
jobID int, c *compaction,
jobID int, c *compaction, objCreator objectCreator,
) (ve *versionEdit, pendingOutputs []physicalMeta, stats compactStats, retErr error) {
// As a sanity check, confirm that the smallest / largest keys for new and
// deleted files in the new versionEdit pass a validation function before
Expand Down Expand Up @@ -2960,7 +2994,7 @@ func (d *DB) runCompaction(
}
if retErr != nil {
for _, fileNum := range createdFiles {
_ = d.objProvider.Remove(fileTypeTable, fileNum)
_ = objCreator.Remove(fileTypeTable, fileNum)
}
}
for _, closer := range c.closers {
Expand Down Expand Up @@ -3055,7 +3089,8 @@ func (d *DB) runCompaction(
PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
}
diskFileNum := base.PhysicalTableDiskFileNum(fileNum)
writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts)

writable, objMeta, err := objCreator.Create(ctx, fileTypeTable, diskFileNum, createOpts)
if err != nil {
return err
}
Expand All @@ -3067,7 +3102,7 @@ func (d *DB) runCompaction(
d.opts.EventListener.TableCreated(TableCreateInfo{
JobID: jobID,
Reason: reason,
Path: d.objProvider.Path(objMeta),
Path: objCreator.Path(objMeta),
FileNum: diskFileNum,
})
if c.kind != compactionKindFlush {
Expand Down Expand Up @@ -3510,7 +3545,7 @@ func (d *DB) runCompaction(
// compactStats.
stats.countMissizedDels = iter.stats.countMissizedDels

if err := d.objProvider.Sync(); err != nil {
if err := objCreator.Sync(); err != nil {
return nil, pendingOutputs, stats, err
}

Expand Down
2 changes: 1 addition & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
// to the user-defined boundaries.
c.maxOutputFileSize = math.MaxUint64

newVE, _, _, err := d.runCompaction(0, c)
newVE, _, _, err := d.runCompaction(0, c, d.objProvider)
if err != nil {
return err
}
Expand Down
106 changes: 106 additions & 0 deletions flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/sstable"
)

// flushable defines the interface for immutable memtables.
Expand Down Expand Up @@ -77,6 +79,10 @@ type flushableEntry struct {
delayedFlushForcedAt time.Time
// logNum corresponds to the WAL that contains the records present in the
// receiver.
//
// TODO(aadityas,jackson): We'll need to do something about this (and
// logSize) for entries corresponding to bufferedSSTables since there may be
// multiple associated log nums.
logNum base.DiskFileNum
// logSize is the size in bytes of the associated WAL. Protected by DB.mu.
logSize uint64
Expand Down Expand Up @@ -312,6 +318,106 @@ func (s *ingestedFlushable) computePossibleOverlaps(
}
}

// bufferedSSTables holds a set of in-memory sstables produced by a flush.
// Buffering flushed state reduces write amplification by making it more likely
// that we're able to drop KVs before they reach disk.
type bufferedSSTables struct {
metas []*fileMetadata

Check failure on line 325 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-32bit

field metas is unused (U1000)

Check failure on line 325 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux

field metas is unused (U1000)

Check failure on line 325 in flushable.go

View workflow job for this annotation

GitHub Actions / go-macos

field metas is unused (U1000)

Check failure on line 325 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-cgo

field metas is unused (U1000)

Check failure on line 325 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-invariants

field metas is unused (U1000)
readers []*sstable.Reader

Check failure on line 326 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-32bit

field readers is unused (U1000)

Check failure on line 326 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux

field readers is unused (U1000)

Check failure on line 326 in flushable.go

View workflow job for this annotation

GitHub Actions / go-macos

field readers is unused (U1000)

Check failure on line 326 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-cgo

field readers is unused (U1000)

Check failure on line 326 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-invariants

field readers is unused (U1000)
}

var (
// Assert that *bufferedSSTables implements the flushable interface.
_ flushable = (*bufferedSSTables)(nil)
// Assert that *bufferedSSTables implements the objectCreator interface.
_ objectCreator = (*bufferedSSTables)(nil)
)

// newIter is part of the flushable interface.
func (b *bufferedSSTables) newIter(o *IterOptions) internalIterator {
panic("TODO")
}

// newFlushIter is part of the flushable interface.
func (b *bufferedSSTables) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
panic("TODO")
}

func (b *bufferedSSTables) constructRangeDelIter(

Check failure on line 346 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-32bit

func (*bufferedSSTables).constructRangeDelIter is unused (U1000)

Check failure on line 346 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux

func (*bufferedSSTables).constructRangeDelIter is unused (U1000)

Check failure on line 346 in flushable.go

View workflow job for this annotation

GitHub Actions / go-macos

func (*bufferedSSTables).constructRangeDelIter is unused (U1000)

Check failure on line 346 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-cgo

func (*bufferedSSTables).constructRangeDelIter is unused (U1000)

Check failure on line 346 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-invariants

func (*bufferedSSTables).constructRangeDelIter is unused (U1000)
file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
) (keyspan.FragmentIterator, error) {
panic("TODO")
}

// newRangeDelIter is part of the flushable interface.
//
// TODO(sumeer): *IterOptions are being ignored, so the index block load for
// the point iterator in constructRangeDeIter is not tracked.
func (b *bufferedSSTables) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
panic("TODO")
}

// newRangeKeyIter is part of the flushable interface.
func (b *bufferedSSTables) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
if !b.containsRangeKeys() {
return nil
}
panic("TODO")
}

// containsRangeKeys is part of the flushable interface.
func (b *bufferedSSTables) containsRangeKeys() bool {
panic("TODO")
}

// inuseBytes is part of the flushable interface.
func (b *bufferedSSTables) inuseBytes() uint64 {
panic("TODO")
}

// totalBytes is part of the flushable interface.
func (b *bufferedSSTables) totalBytes() uint64 {
panic("TODO")
}

// readyForFlush is part of the flushable interface.
func (b *bufferedSSTables) readyForFlush() bool {
// Buffered sstables are always ready for flush; they're immutable.
return true
}

// computePossibleOverlaps is part of the flushable interface.
func (b *bufferedSSTables) computePossibleOverlaps(
fn func(bounded) shouldContinue, bounded ...bounded,
) {
panic("TODO")
}

// Create implements the objectCreator interface.
func (b *bufferedSSTables) Create(
ctx context.Context,
fileType base.FileType,
FileNum base.DiskFileNum,
opts objstorage.CreateOptions,
) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) {
panic("TODO")
}

// Path implements the objectCreator interface.
func (b *bufferedSSTables) Path(meta objstorage.ObjectMetadata) string {
panic("TODO")
}

// Remove implements the objectCreator interface.
func (b *bufferedSSTables) Remove(fileType base.FileType, FileNum base.DiskFileNum) error {
panic("TODO")
}

// Sync implements the objectCreator interface.
func (b *bufferedSSTables) Sync() error {
panic("TODO")
}

// computePossibleOverlapsGenericImpl is an implemention of the flushable
// interface's computePossibleOverlaps function for flushable implementations
// with only in-memory state that do not have special requirements and should
Expand Down
2 changes: 1 addition & 1 deletion open.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ func (d *DB) replayWAL(
if err != nil {
return err
}
newVE, _, _, err := d.runCompaction(jobID, c)
newVE, _, _, err := d.runCompaction(jobID, c, d.objProvider)
if err != nil {
return errors.Wrapf(err, "running compaction during WAL replay")
}
Expand Down

0 comments on commit 2d0549b

Please sign in to comment.