Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
Release note (<category, see below>): <what> <show> <why>
  • Loading branch information
Owen Qian committed Feb 6, 2020
1 parent f9cc63e commit f05f56a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 33 deletions.
14 changes: 8 additions & 6 deletions pkg/storage/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,24 @@ func MakeAllKeyRanges(d *roachpb.RangeDescriptor) []KeyRange {

// ConstrainToKeys returns a Span constrained to the keys present in the given Range.
// Returns (span, true) if the Range is contains no keys, otherwise (span, false).
func ConstrainToKeys(reader engine.Reader, span roachpb.Span) (_ roachpb.Span, empty bool) {
func ConstrainToKeys(
reader engine.Reader, span roachpb.Span,
) (_ roachpb.Span, empty bool, _ error) {
it := reader.NewIterator(engine.IterOptions{LowerBound: span.Key, UpperBound: span.EndKey})
defer it.Close()
it.SeekGE(engine.MakeMVCCMetadataKey(span.Key))
if valid, _ := it.Valid(); !valid {
return roachpb.Span{}, true
if valid, err := it.Valid(); !valid {
return roachpb.Span{}, true, err
}
startKey := it.Key().Key

it.SeekLT(engine.MakeMVCCMetadataKey(span.EndKey))
if valid, _ := it.Valid(); !valid {
return roachpb.Span{}, true
if valid, err := it.Valid(); !valid {
return roachpb.Span{}, true, err
}

endKey := it.Key().Key.Next()
return roachpb.Span{Key: startKey, EndKey: endKey}, false
return roachpb.Span{Key: startKey, EndKey: endKey}, false, nil
}

// MakeReplicatedKeyRanges returns all key ranges that are fully Raft
Expand Down
26 changes: 8 additions & 18 deletions pkg/storage/rditer/replica_data_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

func fakePrevKey(k []byte) roachpb.Key {
Expand Down Expand Up @@ -272,13 +273,10 @@ func TestConstrainToKeysEmptyRange(t *testing.T) {
eng := engine.NewDefaultInMem()
defer eng.Close()

span := roachpb.Span{
Key: roachpb.Key("a"),
EndKey: roachpb.Key("z"),
}
if _, empty := ConstrainToKeys(eng, span); !empty {
t.Errorf("Expected empty range")
}
span, empty, err := ConstrainToKeys(eng, roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")})
require.NoError(t, err)
require.True(t, empty)
require.Equal(t, roachpb.Span{}, span)
}

func TestConstrainToKeysNonEmptyRange(t *testing.T) {
Expand All @@ -299,16 +297,8 @@ func TestConstrainToKeysNonEmptyRange(t *testing.T) {
EndKey: endKey,
}
createRangeData(t, eng, desc)
span, empty := ConstrainToKeys(eng, span)
if empty {
t.Errorf("Expected non-empty range")
}

key := keys.TransactionKey(roachpb.Key(desc.StartKey), uuid.MakeV4())
ts := hlc.Timestamp{}
if err := engine.MVCCPut(context.Background(), eng, nil, key, ts, roachpb.MakeValueFromString("value"), nil); err != nil {
if !span.Key.Equal(key) || !span.EndKey.Equal(key){
t.Errorf("Expected span.Key and span.EndKey to equal %d. Instead Key: %d, EndKey: %d", key, span.Key, span.EndKey);
}
}
_, empty, err := ConstrainToKeys(eng, span)
require.NoError(t, err)
require.False(t, empty)
}
4 changes: 3 additions & 1 deletion pkg/storage/replica_sst_snapshot_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestSSTSnapshotStorage(t *testing.T) {
require.NoError(t, f.Close())
}()

// Check that even though the files aren't created, they are still recorded in SSTs().
// The files are lazily created and also lazily added to the SSTs() slice.
require.Equal(t, len(scratch.SSTs()), 0)

// Check that the storage lazily creates the files on write.
Expand All @@ -62,6 +62,8 @@ func TestSSTSnapshotStorage(t *testing.T) {

_, err = f.Write([]byte("foo"))
require.NoError(t, err)
// After the SST file has been written to, it should be recorded in SSTs().
require.Equal(t, len(scratch.SSTs()), 1)

// After writing to files, check that they have been flushed to disk.
for _, fileName := range scratch.SSTs() {
Expand Down
21 changes: 13 additions & 8 deletions pkg/storage/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type kvBatchSnapshotStrategy struct {
sstChunkSize int64
// Only used on the receiver side.
scratch *SSTSnapshotStorageScratch
// Used to restrict the span of the rangedel done in ClearRange when receiving a snapshot
// Used to restrict the span of the rangedel done in ClearRange when receiving a snapshot.
reader engine.Reader
}

Expand Down Expand Up @@ -147,11 +147,16 @@ func newMultiSSTWriter(

func (msstw *multiSSTWriter) initSST(ctx context.Context) error {
span := roachpb.Span{Key: msstw.keyRanges[msstw.currRange].Start.Key, EndKey: msstw.keyRanges[msstw.currRange].End.Key}
span, empty := rditer.ConstrainToKeys(msstw.reader, span)
// Check if the span contains any kv data and only perform the rangedel if the range is non-empty.
span, empty, err := rditer.ConstrainToKeys(msstw.reader, span)
if err != nil {
return errors.Wrap(err, "failed to constrain span to keys in range")
}
if empty {
return nil
}
// Only create file if the range is non-empty to avoid ingesting an empty SST later on since range dels are skipped for empty ranges.
// Only create file if the range is non-empty to avoid ingesting an empty
// SST later on since range dels are skipped for empty ranges.
if err := msstw.maybeCreateNewFile(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -199,12 +204,12 @@ func (msstw *multiSSTWriter) Put(ctx context.Context, key engine.MVCCKey, value
return err
}
}
if err := msstw.maybeCreateNewFile(ctx); err != nil {
return err
}
if msstw.keyRanges[msstw.currRange].Start.Key.Compare(key.Key) > 0 {
return crdberrors.AssertionFailedf("client error: expected %s to fall in one of %s", key.Key, msstw.keyRanges)
}
if err := msstw.maybeCreateNewFile(ctx); err != nil {
return errors.Wrap(err, "failed to create SST file for Put")
}
if err := msstw.currSST.Put(key, value); err != nil {
return errors.Wrap(err, "failed to put in sst")
}
Expand Down Expand Up @@ -291,8 +296,8 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
if req.Final {
// We finished receiving all batches and log entries. It's possible that
// we did not receive any key-value pairs for some of the key ranges, but
// we must still construct SSTs with range deletion tombstones to remove
// the data.
// we must still construct SSTs with range deletion tombstones for
// non-empty ranges in order to remove the data.
if err := msstw.Finish(ctx); err != nil {
return noSnap, err
}
Expand Down

0 comments on commit f05f56a

Please sign in to comment.