Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
56104: storage: changes to the Writer and Batch interfaces r=sumeerbhola a=sumeerbhola

These are in preparation for the separated lock table.
The Clear, ClearRange, Put methods are split into
many methods so we can differentiate the purpose of
the caller. A SingleClearEngine is added to Batch,
which will replace the Writer.SingleClear method -
this is a low-level method that we don't want anyone
outside storage to call.

There are many TODOs which will get fixed in future
PRs when we introduce new functionality.

Informs cockroachdb#41720

Release note: None

Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
craig[bot] and sumeerbhola committed Oct 31, 2020
2 parents e82749a + 6c86b99 commit 70b028d
Show file tree
Hide file tree
Showing 37 changed files with 674 additions and 287 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/engineccl/encrypted_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func TestPebbleEncryption(t *testing.T) {
t.Logf("EnvStats:\n%+v\n\n", *stats)

batch := db.NewWriteOnlyBatch()
require.NoError(t, batch.Put(storage.MVCCKey{Key: roachpb.Key("a")}, []byte("a")))
require.NoError(t, batch.PutUnversioned(roachpb.Key("a"), []byte("a")))
require.NoError(t, batch.Commit(true))
require.NoError(t, db.Flush())
val, err := db.MVCCGet(storage.MVCCKey{Key: roachpb.Key("a")})
Expand Down
13 changes: 11 additions & 2 deletions pkg/ccl/storageccl/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,17 @@ func slurpSSTablesLatestKey(
v := roachpb.Value{RawBytes: newKv.Value}
v.ClearChecksum()
v.InitChecksum(newKv.Key.Key)
if err := batch.Put(newKv.Key, v.RawBytes); err != nil {
t.Fatal(err)
// TODO(sumeer): this will not be correct with the separated
// lock table. We should iterate using EngineKey on the sst,
// and expose a PutEngine method to write directly.
if newKv.Key.Timestamp.IsEmpty() {
if err := batch.PutUnversioned(newKv.Key.Key, v.RawBytes); err != nil {
t.Fatal(err)
}
} else {
if err := batch.PutMVCC(newKv.Key, v.RawBytes); err != nil {
t.Fatal(err)
}
}
sst.Next()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/writebatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestWriteBatchMVCCStats(t *testing.T) {
// adjusted accordingly.
const numInitialEntries = 100
for i := 0; i < numInitialEntries; i++ {
if err := e.Put(storage.MVCCKey{Key: append([]byte("b"), byte(i))}, nil); err != nil {
if err := e.PutUnversioned(append([]byte("b"), byte(i)), nil); err != nil {
t.Fatalf("%+v", err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cli/debug_synctest.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ func runSyncer(
}
}()

k, v := storage.MakeMVCCMetadataKey(key()), []byte("payload")
k, v := key(), []byte("payload")
switch seq % 2 {
case 0:
if err := db.Put(k, v); err != nil {
if err := db.PutUnversioned(k, v); err != nil {
seq--
return seq, err
}
Expand All @@ -197,7 +197,7 @@ func runSyncer(
}
default:
b := db.NewBatch()
if err := b.Put(k, v); err != nil {
if err := b.PutUnversioned(k, v); err != nil {
seq--
return seq, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ func TestOpenReadOnlyStore(t *testing.T) {
t.Fatal(err)
}

key := storage.MakeMVCCMetadataKey(roachpb.Key("key"))
key := roachpb.Key("key")
val := []byte("value")
err = db.Put(key, val)
err = db.PutUnversioned(key, val)
if !testutils.IsError(err, test.expErr) {
t.Fatalf("wanted %s but got %v", test.expErr, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/syncbench/syncbench.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (w *worker) run(wg *sync.WaitGroup) {
for j := 0; j < 5; j++ {
block := randBlock(60, 80)
key := encoding.EncodeUint32Ascending(buf, rand.Uint32())
if err := b.Put(storage.MakeMVCCMetadataKey(key), block); err != nil {
if err := b.PutUnversioned(key, block); err != nil {
log.Fatalf(ctx, "%v", err)
}
buf = key[:0]
Expand Down
38 changes: 19 additions & 19 deletions pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@ func TestSpanSetBatchBoundaries(t *testing.T) {
insideKey3 := storage.MakeMVCCMetadataKey(roachpb.Key("f"))

// Write values outside the range that we can try to read later.
if err := eng.Put(outsideKey, []byte("value")); err != nil {
if err := eng.PutUnversioned(outsideKey.Key, []byte("value")); err != nil {
t.Fatalf("direct write failed: %+v", err)
}
if err := eng.Put(outsideKey3, []byte("value")); err != nil {
if err := eng.PutUnversioned(outsideKey3.Key, []byte("value")); err != nil {
t.Fatalf("direct write failed: %+v", err)
}

batch := spanset.NewBatch(eng.NewBatch(), &ss)
defer batch.Close()

// Writes inside the range work. Write twice for later read testing.
if err := batch.Put(insideKey, []byte("value")); err != nil {
if err := batch.PutUnversioned(insideKey.Key, []byte("value")); err != nil {
t.Fatalf("failed to write inside the range: %+v", err)
}
if err := batch.Put(insideKey2, []byte("value2")); err != nil {
if err := batch.PutUnversioned(insideKey2.Key, []byte("value2")); err != nil {
t.Fatalf("failed to write inside the range: %+v", err)
}

Expand All @@ -71,10 +71,10 @@ func TestSpanSetBatchBoundaries(t *testing.T) {
}

t.Run("writes before range", func(t *testing.T) {
if err := batch.Clear(outsideKey); !isWriteSpanErr(err) {
if err := batch.ClearUnversioned(outsideKey.Key); !isWriteSpanErr(err) {
t.Errorf("Clear: unexpected error %v", err)
}
if err := batch.ClearRange(outsideKey, outsideKey2); !isWriteSpanErr(err) {
if err := batch.ClearRawRange(outsideKey.Key, outsideKey2.Key); !isWriteSpanErr(err) {
t.Errorf("ClearRange: unexpected error %v", err)
}
{
Expand All @@ -88,16 +88,16 @@ func TestSpanSetBatchBoundaries(t *testing.T) {
if err := batch.Merge(outsideKey, nil); !isWriteSpanErr(err) {
t.Errorf("Merge: unexpected error %v", err)
}
if err := batch.Put(outsideKey, nil); !isWriteSpanErr(err) {
if err := batch.PutUnversioned(outsideKey.Key, nil); !isWriteSpanErr(err) {
t.Errorf("Put: unexpected error %v", err)
}
})

t.Run("writes after range", func(t *testing.T) {
if err := batch.Clear(outsideKey3); !isWriteSpanErr(err) {
if err := batch.ClearUnversioned(outsideKey3.Key); !isWriteSpanErr(err) {
t.Errorf("Clear: unexpected error %v", err)
}
if err := batch.ClearRange(insideKey2, outsideKey4); !isWriteSpanErr(err) {
if err := batch.ClearRawRange(insideKey2.Key, outsideKey4.Key); !isWriteSpanErr(err) {
t.Errorf("ClearRange: unexpected error %v", err)
}
{
Expand All @@ -111,7 +111,7 @@ func TestSpanSetBatchBoundaries(t *testing.T) {
if err := batch.Merge(outsideKey3, nil); !isWriteSpanErr(err) {
t.Errorf("Merge: unexpected error %v", err)
}
if err := batch.Put(outsideKey3, nil); !isWriteSpanErr(err) {
if err := batch.PutUnversioned(outsideKey3.Key, nil); !isWriteSpanErr(err) {
t.Errorf("Put: unexpected error %v", err)
}
})
Expand Down Expand Up @@ -284,7 +284,7 @@ func TestSpanSetBatchTimestamps(t *testing.T) {
value := []byte("value")

// Write value that we can try to read later.
if err := eng.Put(rkey, value); err != nil {
if err := eng.PutUnversioned(rkey.Key, value); err != nil {
t.Fatalf("direct write failed: %+v", err)
}

Expand All @@ -302,13 +302,13 @@ func TestSpanSetBatchTimestamps(t *testing.T) {

// Writes.
for _, batch := range []storage.Batch{batchAfter, batchDuring} {
if err := batch.Put(wkey, value); err != nil {
if err := batch.PutUnversioned(wkey.Key, value); err != nil {
t.Fatalf("failed to write inside the range at same or greater ts than latch declaration: %+v", err)
}
}

for _, batch := range []storage.Batch{batchBefore, batchNonMVCC} {
if err := batch.Put(wkey, value); err == nil {
if err := batch.PutUnversioned(wkey.Key, value); err == nil {
t.Fatalf("was able to write inside the range at ts less than latch declaration: %+v", err)
}
}
Expand All @@ -321,7 +321,7 @@ func TestSpanSetBatchTimestamps(t *testing.T) {
}

for _, batch := range []storage.Batch{batchBefore, batchNonMVCC} {
if err := batch.Clear(wkey); !isWriteSpanErr(err) {
if err := batch.ClearUnversioned(wkey.Key); !isWriteSpanErr(err) {
t.Errorf("Clear: unexpected error %v", err)
}
{
Expand All @@ -335,7 +335,7 @@ func TestSpanSetBatchTimestamps(t *testing.T) {
if err := batch.Merge(wkey, nil); !isWriteSpanErr(err) {
t.Errorf("Merge: unexpected error %v", err)
}
if err := batch.Put(wkey, nil); !isWriteSpanErr(err) {
if err := batch.PutUnversioned(wkey.Key, nil); !isWriteSpanErr(err) {
t.Errorf("Put: unexpected error %v", err)
}
}
Expand Down Expand Up @@ -388,10 +388,10 @@ func TestSpanSetIteratorTimestamps(t *testing.T) {
k2, v2 := storage.MakeMVCCMetadataKey(roachpb.Key("d")), []byte("d-value")

// Write values that we can try to read later.
if err := eng.Put(k1, v1); err != nil {
if err := eng.PutUnversioned(k1.Key, v1); err != nil {
t.Fatalf("direct write failed: %+v", err)
}
if err := eng.Put(k2, v2); err != nil {
if err := eng.PutUnversioned(k2.Key, v2); err != nil {
t.Fatalf("direct write failed: %+v", err)
}

Expand Down Expand Up @@ -482,7 +482,7 @@ func TestSpanSetNonMVCCBatch(t *testing.T) {
value := []byte("value")

// Write value that we can try to read later.
if err := eng.Put(rkey, value); err != nil {
if err := eng.PutUnversioned(rkey.Key, value); err != nil {
t.Fatalf("direct write failed: %+v", err)
}

Expand All @@ -494,7 +494,7 @@ func TestSpanSetNonMVCCBatch(t *testing.T) {

// Writes.
for _, batch := range []storage.Batch{batchNonMVCC, batchMVCC} {
if err := batch.Put(wkey, value); err != nil {
if err := batch.PutUnversioned(wkey.Key, value); err != nil {
t.Fatalf("write disallowed through non-MVCC latch: %+v", err)
}
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func EvalAddSSTable(

ms.Add(stats)

// TODO(sumeer): use EngineIterator and replace the Put hack below.
if args.IngestAsWrites {
log.VEventf(ctx, 2, "ingesting SST (%d keys/%d bytes) via regular write batch", stats.KeyCount, len(args.Data))
dataIter.SeekGE(storage.MVCCKey{Key: keys.MinKey})
Expand All @@ -192,8 +193,15 @@ func EvalAddSSTable(
// NB: This is *not* a general transformation of any arbitrary SST to a
// WriteBatch: it assumes every key in the SST is a simple Set. This is
// already assumed elsewhere in this RPC though, so that's OK here.
if err := readWriter.Put(dataIter.UnsafeKey(), dataIter.UnsafeValue()); err != nil {
return result.Result{}, err
k := dataIter.UnsafeKey()
if k.Timestamp.IsEmpty() {
if err := readWriter.PutUnversioned(k.Key, dataIter.UnsafeValue()); err != nil {
return result.Result{}, err
}
} else {
if err := readWriter.PutMVCC(dataIter.UnsafeKey(), dataIter.UnsafeValue()); err != nil {
return result.Result{}, err
}
}
dataIter.Next()
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func TestAddSSTableMVCCStats(t *testing.T) {
{"e", 1, "e"},
{"z", 2, "zzzzzz"},
}) {
if err := e.Put(kv.Key, kv.Value); err != nil {
if err := e.PutMVCC(kv.Key, kv.Value); err != nil {
t.Fatalf("%+v", err)
}
}
Expand Down Expand Up @@ -499,7 +499,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
{"y", 5, "yyy"},
{"z", 2, "zz"},
}) {
if err := e.Put(kv.Key, kv.Value); err != nil {
if err := e.PutMVCC(kv.Key, kv.Value); err != nil {
t.Fatalf("%+v", err)
}
}
Expand Down Expand Up @@ -974,7 +974,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
// ingesting the perfectly shadowing KVs (same ts and same value) in the
// second SST.
for _, kv := range sstKVs {
if err := e.Put(kv.Key, kv.Value); err != nil {
if err := e.PutMVCC(kv.Key, kv.Value); err != nil {
t.Fatalf("%+v", err)
}
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_clear_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,20 @@ func ClearRange(
if total := statsDelta.Total(); total < ClearRangeBytesThreshold {
log.VEventf(ctx, 2, "delta=%d < threshold=%d; using non-range clear", total, ClearRangeBytesThreshold)
if err := readWriter.MVCCIterate(from, to, storage.MVCCKeyAndIntentsIterKind, func(kv storage.MVCCKeyValue) error {
return readWriter.Clear(kv.Key)
if kv.Key.Timestamp.IsEmpty() {
// It can be an intent or an inline MVCCMetadata -- we have no idea.
// TODO(sumeer): cannot clear separated intents in this manner. Write the iteration code
// here instead of using Reader.MVCCIterate.
return readWriter.ClearUnversioned(kv.Key.Key)
}
return readWriter.ClearMVCC(kv.Key)
}); err != nil {
return result.Result{}, err
}
return pd, nil
}

if err := readWriter.ClearRange(storage.MakeMVCCMetadataKey(from),
storage.MakeMVCCMetadataKey(to)); err != nil {
if err := readWriter.ClearMVCCRangeAndIntents(from, to); err != nil {
return result.Result{}, err
}
return pd, nil
Expand Down
31 changes: 27 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_clear_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,37 @@ type wrappedBatch struct {
clearRangeCount int
}

func (wb *wrappedBatch) Clear(key storage.MVCCKey) error {
// TODO(sbhola): narrow the calls where we increment counters to
// make this test stricter.

func (wb *wrappedBatch) ClearMVCC(key storage.MVCCKey) error {
wb.clearCount++
return wb.Batch.ClearMVCC(key)
}

func (wb *wrappedBatch) ClearUnversioned(key roachpb.Key) error {
wb.clearCount++
return wb.Batch.ClearUnversioned(key)
}

func (wb *wrappedBatch) ClearIntent(key roachpb.Key) error {
wb.clearCount++
return wb.Batch.Clear(key)
return wb.Batch.ClearIntent(key)
}

func (wb *wrappedBatch) ClearRawRange(start, end roachpb.Key) error {
wb.clearRangeCount++
return wb.Batch.ClearRawRange(start, end)
}

func (wb *wrappedBatch) ClearMVCCRangeAndIntents(start, end roachpb.Key) error {
wb.clearRangeCount++
return wb.Batch.ClearMVCCRangeAndIntents(start, end)
}

func (wb *wrappedBatch) ClearRange(start, end storage.MVCCKey) error {
func (wb *wrappedBatch) ClearMVCCRange(start, end storage.MVCCKey) error {
wb.clearRangeCount++
return wb.Batch.ClearRange(start, end)
return wb.Batch.ClearMVCCRange(start, end)
}

// TestCmdClearRangeBytesThreshold verifies that clear range resorts to
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3065,7 +3065,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
for _, r := range keyRanges {
sstFile := &storage.MemFile{}
sst := storage.MakeIngestionSSTWriter(sstFile)
if err := sst.ClearRange(r.Start, r.End); err != nil {
if err := sst.ClearRawRange(r.Start.Key, r.End.Key); err != nil {
return err
}

Expand Down Expand Up @@ -3098,7 +3098,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
sst := storage.MakeIngestionSSTWriter(sstFile)
defer sst.Close()
r := rditer.MakeRangeIDLocalKeyRange(rangeID, false /* replicatedOnly */)
if err := sst.ClearRange(r.Start, r.End); err != nil {
if err := sst.ClearRawRange(r.Start.Key, r.End.Key); err != nil {
return err
}
tombstoneKey := keys.RangeTombstoneKey(rangeID)
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/gc/data_distribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ func (ds dataDistribution) setupTest(
break
}
if txn == nil {
require.NoError(t, eng.Put(kv.Key, kv.Value))
if kv.Key.Timestamp.IsEmpty() {
require.NoError(t, eng.PutUnversioned(kv.Key.Key, kv.Value))
} else {
require.NoError(t, eng.PutMVCC(kv.Key, kv.Value))
}
} else {
// TODO(ajwerner): Decide if using MVCCPut is worth it.
ts := kv.Key.Timestamp
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
)

// KeyRange is a helper struct for the ReplicaDataIterator.
// TODO(sumeer): change these to roachpb.Key since the timestamp is
// always empty.
type KeyRange struct {
Start, End storage.MVCCKey
}
Expand All @@ -34,6 +36,8 @@ type KeyRange struct {
// This is problematic as it requires of every user careful tracking of when
// to call that method; some just never call it and pull the whole replica
// into memory. Use of an allocator should be opt-in.
//
// TODO(sumeer): Should return EngineKeys, to handle separated lock table.
type ReplicaDataIterator struct {
curIndex int
ranges []KeyRange
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1693,7 +1693,7 @@ func handleTruncatedStateBelowRaft(
// NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to
// avoid allocating when constructing Raft log keys (16 bytes).
unsafeKey := prefixBuf.RaftLogKey(idx)
if err := readWriter.Clear(storage.MakeMVCCMetadataKey(unsafeKey)); err != nil {
if err := readWriter.ClearUnversioned(unsafeKey); err != nil {
return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState)
}
}
Expand Down
Loading

0 comments on commit 70b028d

Please sign in to comment.