Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jbowens committed Feb 13, 2024
1 parent d729cd1 commit 0c88050
Showing 1 changed file with 176 additions and 23 deletions.
199 changes: 176 additions & 23 deletions flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
package pebble

import (
"bytes"
"context"
"fmt"
"io"
"slices"
"sync/atomic"
"time"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
Expand Down Expand Up @@ -319,32 +322,99 @@ func (s *ingestedFlushable) computePossibleOverlaps(
}
}

// bufferedSSTables holds a set of in-memory sstables produced by a flush.
// Buffering flushed state reduces write amplification by making it more likely
// that we're able to drop KVs before they reach disk.
type bufferedSSTables struct {
metas []*fileMetadata
readers []*sstable.Reader
// flushableBufferedSSTables holds a set of in-memory sstables produced by a
// flush. Buffering flushed state reduces write amplification by making it more
// likely that we're able to drop KVs before they reach disk.
type flushableBufferedSSTables struct {
comparer *base.Comparer
ls manifest.LevelSlice
metas []*fileMetadata
readers []*sstable.Reader
}

var (
// Assert that *bufferedSSTables implements the flushable interface.
_ flushable = (*bufferedSSTables)(nil)
// Assert that *bufferedSSTables implements the objectCreator interface.
_ objectCreator = (*bufferedSSTables)(nil)
// Assert that *flushableBufferedSSTables implements the flushable
// interface.
_ flushable = (*flushableBufferedSSTables)(nil)
)

// newIters implements the tableNewIters function signature. Ordinarily this
// function is provided by the table cache. Flushable buffered sstables are not
// opened through the table cache since they're not present on the real
// filesystem and do not require use of file descriptors. Instead, the
// flushableBufferedSSTables keeps sstable.Readers for all the buffered sstables
// open, and this newIters func uses them to construct iterators.
func (b *flushableBufferedSSTables) newIters(
ctx context.Context,
file *manifest.FileMetadata,
opts *IterOptions,
internalOpts internalIterOpts,
kinds iterKinds,
) (iterSet, error) {
var r *sstable.Reader
for i := range b.metas {
if b.metas[i].FileNum == file.FileNum {
r = b.readers[i]
break
}
}
if r == nil {
return iterSet{}, errors.Newf("file %s not found among flushable buffered sstables", file.FileNum)
}
var iters iterSet
var err error
if kinds.RangeKey() && file.HasRangeKeys {
iters.rangeKey, err = r.NewRawRangeKeyIter()
}
if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
iters.rangeDeletion, err = r.NewRawRangeDelIter()
}
if kinds.Point() && err == nil {
panic("TODO")
//var categoryAndQoS sstable.CategoryAndQoS
//var iter internalIterator
//if internalOpts.bytesIterated != nil {
//iter, err = r.NewCompactionIter(
//internalOpts.bytesIterated, categoryAndQoS, nil [> statsCollector <], rp,
//internalOpts.bufferPool)
//} else {
//iter, err = cr.NewIterWithBlockPropertyFiltersAndContextEtc(
//ctx, opts.GetLowerBound(), opts.GetUpperBound(),
//nil [> filterer */, false /* hideObsoletePoints */, true, /* useFilter <]
//internalOpts.stats, categoryAndQoS, nil [> stats collector <], rp)
//}
}
if err != nil {
iters.CloseAll()
return iterSet{}, err
}
return iters, nil
}

// newIter is part of the flushable interface.
func (b *bufferedSSTables) newIter(o *IterOptions) internalIterator {
panic("TODO")
func (b *flushableBufferedSSTables) newIter(o *IterOptions) internalIterator {
var opts IterOptions
if o != nil {
opts = *o
}
// TODO(jackson): The manifest.Level in newLevelIter is only used for
// logging. Update the manifest.Level encoding to account for levels which
// aren't truly levels in the lsm. Right now, the encoding only supports
// L0 sublevels, and the rest of the levels in the lsm.
return newLevelIter(
context.Background(), opts, b.comparer, b.newIters, b.ls.Iter(), manifest.Level(0),
internalIterOpts{},
)
}

// newFlushIter is part of the flushable interface.
func (b *bufferedSSTables) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
func (b *flushableBufferedSSTables) newFlushIter(
o *IterOptions, bytesFlushed *uint64,
) internalIterator {
panic("TODO")
}

func (b *bufferedSSTables) constructRangeDelIter(
func (b *flushableBufferedSSTables) constructRangeDelIter(

Check failure on line 417 in flushable.go

View workflow job for this annotation

GitHub Actions / go-macos

func (*flushableBufferedSSTables).constructRangeDelIter is unused (U1000)

Check failure on line 417 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux

func (*flushableBufferedSSTables).constructRangeDelIter is unused (U1000)

Check failure on line 417 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-32bit

func (*flushableBufferedSSTables).constructRangeDelIter is unused (U1000)

Check failure on line 417 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-cgo

func (*flushableBufferedSSTables).constructRangeDelIter is unused (U1000)

Check failure on line 417 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-invariants

func (*flushableBufferedSSTables).constructRangeDelIter is unused (U1000)
file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
) (keyspan.FragmentIterator, error) {
panic("TODO")
Expand All @@ -354,54 +424,103 @@ func (b *bufferedSSTables) constructRangeDelIter(
//
// TODO(sumeer): *IterOptions are being ignored, so the index block load for
// the point iterator in constructRangeDeIter is not tracked.
func (b *bufferedSSTables) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
func (b *flushableBufferedSSTables) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
panic("TODO")
}

// newRangeKeyIter is part of the flushable interface.
func (b *bufferedSSTables) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
func (b *flushableBufferedSSTables) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
if !b.containsRangeKeys() {
return nil
}
panic("TODO")
}

// containsRangeKeys is part of the flushable interface.
func (b *bufferedSSTables) containsRangeKeys() bool {
func (b *flushableBufferedSSTables) containsRangeKeys() bool {
panic("TODO")
}

// inuseBytes is part of the flushable interface.
func (b *bufferedSSTables) inuseBytes() uint64 {
func (b *flushableBufferedSSTables) inuseBytes() uint64 {
panic("TODO")
}

// totalBytes is part of the flushable interface.
func (b *bufferedSSTables) totalBytes() uint64 {
func (b *flushableBufferedSSTables) totalBytes() uint64 {
panic("TODO")
}

// readyForFlush is part of the flushable interface.
func (b *bufferedSSTables) readyForFlush() bool {
func (b *flushableBufferedSSTables) readyForFlush() bool {
// Buffered sstables are always ready for flush; they're immutable.
return true
}

// computePossibleOverlaps is part of the flushable interface.
func (b *bufferedSSTables) computePossibleOverlaps(
func (b *flushableBufferedSSTables) computePossibleOverlaps(
fn func(bounded) shouldContinue, bounded ...bounded,
) {
panic("TODO")
}

// bufferedSSTables implements the objectCreator interface and is used by a
// flush to buffer sstables into memory. When the flush is complete, the
// buffered sstables are either flushed to durable storage or moved into a
// flushableBufferedSSTables that's linked into the flushable queue.
//
// The bufferedSSTables implementation of objectCreator requires that only one
// created object may be open at a time. If violated, Create will panic.
type bufferedSSTables struct {
// curr is a byte buffer used to accumulate the writes of the current
// sstable when *bufferedSSTables is used as a writable.
curr bytes.Buffer
// currFileNum holds the file number assigned to the sstable being
// constructed in curr.
currFileNum base.DiskFileNum
// finished holds the set of previously written and finished sstables.
finished []bufferedSSTable
// objectIsOpen is true if the bufferedSSTables is currently being used as a
// Writable.
objectIsOpen bool
}

// A bufferedSSTable holds a single, serialized sstable and a corresponding file
// number.
type bufferedSSTable struct {
fileNum base.DiskFileNum
buf []byte
}

// init initializes the bufferedSSTables.
func (b *bufferedSSTables) init(targetFileSize int) {
b.curr.Grow(targetFileSize)
}

// Assert that *bufferedSSTables implements the objectCreator interface.
var _ objectCreator = (*bufferedSSTables)(nil)

// Create implements the objectCreator interface.
func (b *bufferedSSTables) Create(
ctx context.Context,
fileType base.FileType,
FileNum base.DiskFileNum,
fileNum base.DiskFileNum,
opts objstorage.CreateOptions,
) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) {
panic("TODO")
// The bufferedSSTables implementation depends on only one writable being
// open at a time. The *bufferedSSTables itself is used as the
// implementation of both the objectCreator interface and the
// objstorage.Writable interface. We guard against misuse by verifying that
// there is no object currently open.
if b.objectIsOpen {
panic("bufferedSSTables used with concurrent open files")
}
b.objectIsOpen = true
b.currFileNum = fileNum
return b, objstorage.ObjectMetadata{
DiskFileNum: fileNum,
FileType: fileType,
}, nil
}

// Path implements the objectCreator interface.
Expand All @@ -419,6 +538,40 @@ func (b *bufferedSSTables) Sync() error {
panic("TODO")
}

// Assert that bufferedSSTables implements objstorage.Writable.
//
// A flush writes files sequentially, so the bufferedSSTables type implements
// Writable directly, serving as the destination for writes across all sstables
// written by the flush.
var _ objstorage.Writable = (*bufferedSSTables)(nil)

// Finish implements objstorage.Writable.
func (o *bufferedSSTables) Write(p []byte) error {

Check failure on line 549 in flushable.go

View workflow job for this annotation

GitHub Actions / go-macos

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 549 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 549 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-32bit

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 549 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-cgo

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 549 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-invariants

receiver name o should be consistent with previous receiver name b for bufferedSSTables
_, err := o.curr.Write(p)
o.curr.Reset()
return err
}

// Finish implements objstorage.Writable.
func (o *bufferedSSTables) Finish() error {

Check failure on line 556 in flushable.go

View workflow job for this annotation

GitHub Actions / go-macos

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 556 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 556 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-32bit

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 556 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-cgo

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 556 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-invariants

receiver name o should be consistent with previous receiver name b for bufferedSSTables
if !o.objectIsOpen {
panic("bufferedSSTables.Finish() invoked when no object is open")
}
o.finished = append(o.finished, bufferedSSTable{
fileNum: o.currFileNum,
buf: slices.Clone(o.curr.Bytes()),
})
o.curr.Reset()
o.objectIsOpen = false
return nil
}

// Abort implements objstorage.Writable.
func (o *bufferedSSTables) Abort() {

Check failure on line 570 in flushable.go

View workflow job for this annotation

GitHub Actions / go-macos

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 570 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 570 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-32bit

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 570 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-cgo

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 570 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-invariants

receiver name o should be consistent with previous receiver name b for bufferedSSTables
o.curr.Reset()
o.objectIsOpen = false
}

// computePossibleOverlapsGenericImpl is an implemention of the flushable
// interface's computePossibleOverlaps function for flushable implementations
// with only in-memory state that do not have special requirements and should
Expand Down

0 comments on commit 0c88050

Please sign in to comment.