Skip to content

Commit

Permalink
storage: constrained span of rangedel in ClearRange to keys in range
Browse files Browse the repository at this point in the history
Constrains the width of the range deletion tombstone to the span of keys
actually present within the range. If the range has no kv-entries, then skip the
rangedel completely.

Before this change, when receiving a snapshot, the original file
would have a range deletion tombstone that spanned the entire range written to
it regardless of the actual keys contained in the range or if the range was
empty. This resulted in the creation of excessively wide tombstones, which has
significant performance implications since the wide tombstones impede
compaction.

Fixes cockroachdb#44048.

Release note: None.
  • Loading branch information
Owen Qian committed Feb 5, 2020
1 parent 7a38d44 commit d291f5c
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 23 deletions.
1 change: 1 addition & 0 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2917,6 +2917,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
if err != nil {
return err
}
// TODO(owenqian): update expectedSSTs to use rditer.ConstrainToKeys
expectedSSTs = append(expectedSSTs, sstFile.Data())
}

Expand Down
24 changes: 14 additions & 10 deletions pkg/storage/engine/sst_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
// SSTWriter writes SSTables.
type SSTWriter struct {
fw *sstable.Writer
f writeCloseSyncer
// DataSize tracks the total key and value bytes added so far.
DataSize int64
scratch []byte
Expand All @@ -44,7 +43,7 @@ func MakeBackupSSTWriter(f writeCloseSyncer) SSTWriter {
opts.FilterPolicy = nil
opts.MergerName = "nullptr"
sst := sstable.NewWriter(f, opts)
return SSTWriter{fw: sst, f: f}
return SSTWriter{fw: sst}
}

// MakeIngestionSSTWriter creates a new SSTWriter tailored for ingestion SSTs.
Expand All @@ -55,13 +54,13 @@ func MakeIngestionSSTWriter(f writeCloseSyncer) SSTWriter {
opts.TableFormat = sstable.TableFormatRocksDBv2
opts.MergerName = "nullptr"
sst := sstable.NewWriter(f, opts)
return SSTWriter{fw: sst, f: f}
return SSTWriter{fw: sst}
}

// Finish finalizes the writer and returns the constructed file's contents,
// since the last call to Truncate (if any). At least one kv entry must have been added.
func (fw *SSTWriter) Finish() error {
if fw.fw == nil {
if fw.Closed() {
return errors.New("cannot call Finish on a closed writer")
}
if err := fw.fw.Close(); err != nil {
Expand All @@ -73,7 +72,7 @@ func (fw *SSTWriter) Finish() error {

// ClearRange implements the Writer interface.
func (fw *SSTWriter) ClearRange(start, end MVCCKey) error {
if fw.fw == nil {
if fw.Closed() {
return errors.New("cannot call ClearRange on a closed writer")
}
fw.DataSize += int64(len(start.Key)) + int64(len(end.Key))
Expand All @@ -85,7 +84,7 @@ func (fw *SSTWriter) ClearRange(start, end MVCCKey) error {
// is not greater than any previously added entry (according to the comparator
// configured during writer creation). `Close` cannot have been called.
func (fw *SSTWriter) Put(key MVCCKey, value []byte) error {
if fw.fw == nil {
if fw.Closed() {
return errors.New("cannot call Put on a closed writer")
}
fw.DataSize += int64(len(key.Key)) + int64(len(value))
Expand All @@ -100,7 +99,7 @@ func (fw *SSTWriter) ApplyBatchRepr(repr []byte, sync bool) error {

// Clear implements the Writer interface.
func (fw *SSTWriter) Clear(key MVCCKey) error {
if fw.fw == nil {
if fw.Closed() {
return errors.New("cannot call Clear on a closed writer")
}
fw.scratch = EncodeKeyToBuf(fw.scratch[:0], key)
Expand All @@ -115,7 +114,7 @@ func (fw *SSTWriter) SingleClear(key MVCCKey) error {

// ClearIterRange implements the Writer interface.
func (fw *SSTWriter) ClearIterRange(iter Iterator, start, end roachpb.Key) error {
if fw.fw == nil {
if fw.Closed() {
return errors.New("cannot call ClearIterRange on a closed writer")
}

Expand All @@ -142,7 +141,7 @@ func (fw *SSTWriter) ClearIterRange(iter Iterator, start, end roachpb.Key) error

// Merge implements the Writer interface.
func (fw *SSTWriter) Merge(key MVCCKey, value []byte) error {
if fw.fw == nil {
if fw.Closed() {
return errors.New("cannot call Merge on a closed writer")
}
fw.DataSize += int64(len(key.Key)) + int64(len(value))
Expand All @@ -161,9 +160,14 @@ func (fw *SSTWriter) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDet
// No-op.
}

// Closed checks to see if the underlying sstable.writer is present.
func (fw *SSTWriter) Closed() bool {
return fw.fw == nil
}

// Close finishes and frees memory and other resources. Close is idempotent.
func (fw *SSTWriter) Close() {
if fw.fw == nil {
if fw.Closed() {
return
}
// pebble.Writer *does* return interesting errors from Close... but normally
Expand Down
20 changes: 20 additions & 0 deletions pkg/storage/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,26 @@ func MakeAllKeyRanges(d *roachpb.RangeDescriptor) []KeyRange {
}
}

// ConstrainToKeys returns a Span constrained to the keys present in the given Range.
// Returns (span, true) if the Range is contains no keys, otherwise (span, false).
func ConstrainToKeys(reader engine.Reader, span roachpb.Span) (_ roachpb.Span, empty bool) {
it := reader.NewIterator(engine.IterOptions{LowerBound: span.Key, UpperBound: span.EndKey})
defer it.Close()
it.SeekGE(engine.MakeMVCCMetadataKey(span.Key))
if valid, _ := it.Valid(); !valid {
return roachpb.Span{}, true
}
startKey := it.Key().Key

it.SeekLT(engine.MakeMVCCMetadataKey(span.EndKey))
if valid, _ := it.Valid(); !valid {
return roachpb.Span{}, true
}

endKey := it.Key().Key.Next()
return roachpb.Span{Key: startKey, EndKey: endKey}, false
}

// MakeReplicatedKeyRanges returns all key ranges that are fully Raft
// replicated for the given Range.
//
Expand Down
47 changes: 47 additions & 0 deletions pkg/storage/rditer/replica_data_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,50 @@ func TestReplicaDataIterator(t *testing.T) {
})
}
}

func TestConstrainToKeysEmptyRange(t *testing.T) {
defer leaktest.AfterTest(t)()

eng := engine.NewDefaultInMem()
defer eng.Close()

span := roachpb.Span{
Key: roachpb.Key("a"),
EndKey: roachpb.Key("z"),
}
if _, empty := ConstrainToKeys(eng, span); !empty {
t.Errorf("Expected empty range")
}
}

func TestConstrainToKeysNonEmptyRange(t *testing.T) {
defer leaktest.AfterTest(t)()

eng := engine.NewDefaultInMem()
defer eng.Close()

startKey := roachpb.RKey("a")
endKey := roachpb.RKey("z")
span := roachpb.Span{
Key: startKey.AsRawKey(),
EndKey: endKey.AsRawKey(),
}
desc := roachpb.RangeDescriptor{
RangeID: 2,
StartKey: startKey,
EndKey: endKey,
}
createRangeData(t, eng, desc)
span, empty := ConstrainToKeys(eng, span)
if empty {
t.Errorf("Expected non-empty range")
}

key := keys.TransactionKey(roachpb.Key(desc.StartKey), uuid.MakeV4())
ts := hlc.Timestamp{}
if err := engine.MVCCPut(context.Background(), eng, nil, key, ts, roachpb.MakeValueFromString("value"), nil); err != nil {
if !span.Key.Equal(key) || !span.EndKey.Equal(key){
t.Errorf("Expected span.Key and span.EndKey to equal %d. Instead Key: %d, EndKey: %d", key, span.Key, span.EndKey);
}
}
}
1 change: 1 addition & 0 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,7 @@ func (r *Replica) clearSubsumedReplicaDiskData(
subsumedReplSSTFile := &engine.MemFile{}
subsumedReplSST := engine.MakeIngestionSSTWriter(subsumedReplSSTFile)
defer subsumedReplSST.Close()
// TODO(owenqian): skip ClearRange if rditer.ConstrainToKeys returns an empty range
if err := engine.ClearRangeWithHeuristic(
r.store.Engine(),
&subsumedReplSST,
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_sst_snapshot_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (s *SSTSnapshotStorageScratch) NewFile(
) (*SSTSnapshotStorageFile, error) {
id := len(s.ssts)
filename := s.filename(id)
s.ssts = append(s.ssts, filename)
f := &SSTSnapshotStorageFile{
scratch: s,
filename: filename,
Expand Down Expand Up @@ -164,6 +163,7 @@ func (f *SSTSnapshotStorageFile) openFile() error {
}
f.file = file
f.created = true
f.scratch.ssts = append(f.scratch.ssts, f.filename)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_sst_snapshot_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestSSTSnapshotStorage(t *testing.T) {
}()

// Check that even though the files aren't created, they are still recorded in SSTs().
require.Equal(t, len(scratch.SSTs()), 1)
require.Equal(t, len(scratch.SSTs()), 0)

// Check that the storage lazily creates the files on write.
for _, fileName := range scratch.SSTs() {
Expand Down
52 changes: 41 additions & 11 deletions pkg/storage/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type kvBatchSnapshotStrategy struct {
sstChunkSize int64
// Only used on the receiver side.
scratch *SSTSnapshotStorageScratch
// Used to restrict the span of the rangedel done in ClearRange when receiving a snapshot
reader engine.Reader
}

// multiSSTWriter is a wrapper around RocksDBSstFileWriter and
Expand All @@ -120,18 +122,22 @@ type multiSSTWriter struct {
// The approximate size of the SST chunk to buffer in memory on the receiver
// before flushing to disk.
sstChunkSize int64
// Used to scan for the Upper and LowerBound of ranges to restrict tombstone width.
reader engine.Reader
}

func newMultiSSTWriter(
ctx context.Context,
scratch *SSTSnapshotStorageScratch,
keyRanges []rditer.KeyRange,
sstChunkSize int64,
reader engine.Reader,
) (multiSSTWriter, error) {
msstw := multiSSTWriter{
scratch: scratch,
keyRanges: keyRanges,
sstChunkSize: sstChunkSize,
reader: reader,
}
if err := msstw.initSST(ctx); err != nil {
return msstw, err
Expand All @@ -140,26 +146,45 @@ func newMultiSSTWriter(
}

func (msstw *multiSSTWriter) initSST(ctx context.Context) error {
newSSTFile, err := msstw.scratch.NewFile(ctx, msstw.sstChunkSize)
if err != nil {
return errors.Wrap(err, "failed to create new sst file")
span := roachpb.Span{Key: msstw.keyRanges[msstw.currRange].Start.Key, EndKey: msstw.keyRanges[msstw.currRange].End.Key}
span, empty := rditer.ConstrainToKeys(msstw.reader, span)
if empty {
return nil
}
// Only create file if the range is non-empty to avoid ingesting an empty SST later on since range dels are skipped for empty ranges.
if err := msstw.maybeCreateNewFile(ctx); err != nil {
return err
}
newSST := engine.MakeIngestionSSTWriter(newSSTFile)
msstw.currSST = newSST
if err := msstw.currSST.ClearRange(msstw.keyRanges[msstw.currRange].Start, msstw.keyRanges[msstw.currRange].End); err != nil {
if err := msstw.currSST.ClearRange(engine.MakeMVCCMetadataKey(span.Key), engine.MakeMVCCMetadataKey(span.EndKey)); err != nil {
msstw.currSST.Close()
return errors.Wrap(err, "failed to clear range on sst file writer")
}
return nil
}

func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error {
err := msstw.currSST.Finish()
if err != nil {
return errors.Wrap(err, "failed to finish sst")
if !msstw.currSST.Closed() {
err := msstw.currSST.Finish()
if err != nil {
return errors.Wrap(err, "failed to finish sst")
}
msstw.currSST.Close()
}
msstw.currRange++
msstw.currSST.Close()
return nil
}

// maybeCreateNewFile creates a new file if currSST is closed. Otherwise it is a no-op.
// maybeCreateNewFile is idempotent.
func (msstw *multiSSTWriter) maybeCreateNewFile(ctx context.Context) error {
if !msstw.currSST.Closed() {
return nil
}
newSSTFile, err := msstw.scratch.NewFile(ctx, msstw.sstChunkSize)
if err != nil {
return errors.Wrap(err, "failed to create new sst file")
}
msstw.currSST = engine.MakeIngestionSSTWriter(newSSTFile)
return nil
}

Expand All @@ -174,6 +199,9 @@ func (msstw *multiSSTWriter) Put(ctx context.Context, key engine.MVCCKey, value
return err
}
}
if err := msstw.maybeCreateNewFile(ctx); err != nil {
return err
}
if msstw.keyRanges[msstw.currRange].Start.Key.Compare(key.Key) > 0 {
return crdberrors.AssertionFailedf("client error: expected %s to fall in one of %s", key.Key, msstw.keyRanges)
}
Expand All @@ -200,6 +228,7 @@ func (msstw *multiSSTWriter) Finish(ctx context.Context) error {
return nil
}

// Close is idempotent.
func (msstw *multiSSTWriter) Close() {
msstw.currSST.Close()
}
Expand All @@ -220,7 +249,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
// At the moment we'll write at most three SSTs.
// TODO(jeffreyxiao): Re-evaluate as the default range size grows.
keyRanges := rditer.MakeReplicatedKeyRanges(header.State.Desc)
msstw, err := newMultiSSTWriter(ctx, kvSS.scratch, keyRanges, kvSS.sstChunkSize)
msstw, err := newMultiSSTWriter(ctx, kvSS.scratch, keyRanges, kvSS.sstChunkSize, kvSS.reader)
if err != nil {
return noSnap, err
}
Expand Down Expand Up @@ -821,6 +850,7 @@ func (s *Store) receiveSnapshot(
raftCfg: &s.cfg.RaftConfig,
scratch: s.sstSnapshotStorage.NewScratchSpace(header.State.Desc.RangeID, snapUUID),
sstChunkSize: snapshotSSTWriteSyncRate.Get(&s.cfg.Settings.SV),
reader: s.Engine(),
}
defer ss.Close(ctx)
default:
Expand Down

0 comments on commit d291f5c

Please sign in to comment.