Skip to content

Commit

Permalink
storageccl: Added test to check export correctness with random input.
Browse files Browse the repository at this point in the history
Added a test which generates random KVPs and timestamps, subsequently
attempting to export the data between provided key and time ranges.
Correctness of the new export logic is checked against the old
implementation which used the now non-existant MVCCIncrementalIterator.

Release note: None
  • Loading branch information
adityamaru27 committed Jun 4, 2019
1 parent 75f1314 commit 959f577
Show file tree
Hide file tree
Showing 2 changed files with 298 additions and 41 deletions.
257 changes: 257 additions & 0 deletions pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,26 @@ package storageccl
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"math"
"path/filepath"
"sort"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

func TestExportCmd(t *testing.T) {
Expand Down Expand Up @@ -190,3 +195,255 @@ func TestExportGCThreshold(t *testing.T) {
t.Fatalf(`expected "must be after replica GC threshold" error got: %+v`, pErr)
}
}

// exportUsingGoIterator uses the legacy implementation of export, and is used
// as ana oracle to check the correctness of the new C++ implementation.
func exportUsingGoIterator(
filter roachpb.MVCCFilter,
startTime, endTime hlc.Timestamp,
startKey, endKey roachpb.Key,
enableTimeBoundIteratorOptimization bool,
batch engine.Reader,
) ([]byte, error) {
sst, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
return nil, nil
}
defer sst.Close()

var skipTombstones bool
var iterFn func(*MVCCIncrementalIterator)
switch filter {
case roachpb.MVCCFilter_Latest:
skipTombstones = true
iterFn = (*MVCCIncrementalIterator).NextKey
case roachpb.MVCCFilter_All:
skipTombstones = false
iterFn = (*MVCCIncrementalIterator).Next
default:
return nil, nil
}

iter := NewMVCCIncrementalIterator(batch, IterOptions{
StartTime: startTime,
EndTime: endTime,
UpperBound: endKey,
EnableTimeBoundIteratorOptimization: enableTimeBoundIteratorOptimization,
})
defer iter.Close()
for iter.Seek(engine.MakeMVCCMetadataKey(startKey)); ; iterFn(iter) {
ok, err := iter.Valid()
if err != nil {
// The error may be a WriteIntentError. In which case, returning it will
// cause this command to be retried.
return nil, err
}
if !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 {
break
}

// Skip tombstone (len=0) records when startTime is zero
// (non-incremental) and we're not exporting all versions.
if skipTombstones && startTime.IsEmpty() && len(iter.UnsafeValue()) == 0 {
continue
}

if err := sst.Add(engine.MVCCKeyValue{Key: iter.UnsafeKey(), Value: iter.UnsafeValue()}); err != nil {
return nil, err
}
}

if sst.DataSize == 0 {
// Let the defer Close the sstable.
return nil, nil
}

sstContents, err := sst.Finish()
if err != nil {
return nil, err
}

return sstContents, nil
}

func loadSST(t *testing.T, data []byte, start, end roachpb.Key) []engine.MVCCKeyValue {
t.Helper()
if len(data) == 0 {
return nil
}

sst := engine.MakeRocksDBSstFileReader()
defer sst.Close()

if err := sst.IngestExternalFile(data); err != nil {
t.Fatal(err)
}

var kvs []engine.MVCCKeyValue
if err := sst.Iterate(engine.MVCCKey{Key: start}, engine.MVCCKey{Key: end}, func(kv engine.MVCCKeyValue) (bool, error) {
kvs = append(kvs, kv)
return false, nil
}); err != nil {
t.Fatal(err)
}

return kvs
}

func assertEqualKVs(
ctx context.Context,
e engine.Engine,
startKey, endKey roachpb.Key,
startTime, endTime hlc.Timestamp,
exportAllRevisions bool,
enableTimeBoundIteratorOptimization bool,
) func(*testing.T) {
return func(t *testing.T) {
t.Helper()

var filter roachpb.MVCCFilter
if exportAllRevisions {
filter = roachpb.MVCCFilter_All
} else {
filter = roachpb.MVCCFilter_Latest
}

// Run oracle (go implementation of the IncrementalIterator).
expected, err := exportUsingGoIterator(filter, startTime, endTime,
startKey, endKey, enableTimeBoundIteratorOptimization, e)
if err != nil {
t.Fatalf("Oracle failed to export provided key range.")
}

// Run new C++ implementation of IncrementalIterator.
start := engine.MVCCKey{Key: startKey, Timestamp: startTime}
end := engine.MVCCKey{Key: endKey, Timestamp: endTime}
io := engine.IterOptions{
UpperBound: endKey,
}
if enableTimeBoundIteratorOptimization {
io.MaxTimestampHint = endTime
io.MinTimestampHint = startTime
}
sst, _, err := engine.ExportToSst(ctx, e, start, end, exportAllRevisions, io)
if err != nil {
t.Fatal(err)
}

// Compare new C++ implementation against the oracle.
expectedKVS := loadSST(t, expected, startKey, endKey)
kvs := loadSST(t, sst, startKey, endKey)

if len(kvs) != len(expectedKVS) {
t.Fatalf("got %d kvs (%+v) but expected %d (%+v)", len(kvs), kvs, len(expected), expected)
}

for i := range kvs {
if !kvs[i].Key.Equal(expectedKVS[i].Key) {
t.Fatalf("%d key: got %v but expected %v", i, kvs[i].Key, expectedKVS[i].Key)
}
if !bytes.Equal(kvs[i].Value, expectedKVS[i].Value) {
t.Fatalf("%d value: got %x but expected %x", i, kvs[i].Value, expectedKVS[i].Value)
}
}
}
}
func TestRandomKeyAndTimestampExport(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
dir, cleanup := testutils.TempDir(t)
defer cleanup()

rocksdb, err := engine.NewRocksDB(
engine.RocksDBConfig{
Settings: cluster.MakeTestingClusterSettings(),
Dir: dir,
},
engine.RocksDBCache{},
)
if err != nil {
t.Fatal(err)
}
defer rocksdb.Close()

rnd, _ := randutil.NewPseudoRand()

var (
keyMin = roachpb.KeyMin
keyMax = roachpb.KeyMax

tsMin = hlc.Timestamp{WallTime: 0, Logical: 0}
tsMax = hlc.Timestamp{WallTime: math.MaxInt64, Logical: 0}
)

// Store generated keys and timestamps.
var keys []roachpb.Key
var timestamps []hlc.Timestamp

// Set to > 1 to prevent random key test from panicking.
var numKeys = 5000
var curWallTime = 0
var curLogical = 0

for i := 0; i < numKeys; i++ {
// Ensure walltime and logical are monotonically increasing.
curWallTime = randutil.RandIntInRange(rnd, 0, math.MaxInt64-1)
curLogical = randutil.RandIntInRange(rnd, 0, math.MaxInt32-1)
ts := hlc.Timestamp{WallTime: int64(curWallTime), Logical: int32(curLogical)}
timestamps = append(timestamps, ts)

// Make keys unique and ensure they are monotonically increasing.
key := roachpb.Key(randutil.RandBytes(rnd, 100))
key = append([]byte(fmt.Sprintf("#%d", i)), key...)
keys = append(keys, key)

value := roachpb.MakeValueFromBytes(randutil.RandBytes(rnd, 200))
value.InitChecksum(key)
if err := engine.MVCCPut(ctx, rocksdb, nil, key, ts, value, nil); err != nil {
t.Fatal(err)
}

// Randomly decide whether to add a newer version of the same key to test
// MVCC_Filter_All.
if randutil.RandIntInRange(rnd, 0, math.MaxInt64)%2 == 0 {
curWallTime++
ts = hlc.Timestamp{WallTime: int64(curWallTime), Logical: int32(curLogical)}
value = roachpb.MakeValueFromBytes(randutil.RandBytes(rnd, 200))
value.InitChecksum(key)
if err := engine.MVCCPut(ctx, rocksdb, nil, key, ts, value, nil); err != nil {
t.Fatal(err)
}
}
}

sort.Slice(timestamps, func(i, j int) bool {
return (timestamps[i].WallTime < timestamps[j].WallTime) ||
(timestamps[i].WallTime == timestamps[j].WallTime &&
timestamps[i].Logical < timestamps[j].Logical)
})

// Exercise min to max time and key ranges.
t.Run("ts (0-∞], latest, nontimebound", assertEqualKVs(ctx, rocksdb, keyMin, keyMax, tsMin, tsMax, false, false))
t.Run("ts (0-∞], all, nontimebound", assertEqualKVs(ctx, rocksdb, keyMin, keyMax, tsMin, tsMax, true, false))
t.Run("ts (0-∞], latest, timebound", assertEqualKVs(ctx, rocksdb, keyMin, keyMax, tsMin, tsMax, false, true))
t.Run("ts (0-∞], all, timebound", assertEqualKVs(ctx, rocksdb, keyMin, keyMax, tsMin, tsMax, true, true))

upperBound := randutil.RandIntInRange(rnd, 1, numKeys)
lowerBound := rnd.Intn(upperBound)

// Exercise random key ranges.
t.Run("kv [randLower, randUpper), latest, nontimebound", assertEqualKVs(ctx, rocksdb, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, false))
t.Run("kv [randLower, randUpper), all, nontimebound", assertEqualKVs(ctx, rocksdb, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, false))
t.Run("kv [randLower, randUpper), latest, timebound", assertEqualKVs(ctx, rocksdb, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, true))
t.Run("kv [randLower, randUpper), all, timebound", assertEqualKVs(ctx, rocksdb, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, true))

upperBound = randutil.RandIntInRange(rnd, 1, numKeys)
lowerBound = rnd.Intn(upperBound)

// Exercise random timestamps.
t.Run("kv (randLowerTime, randUpperTime], latest, nontimebound", assertEqualKVs(ctx, rocksdb, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, false))
t.Run("kv (randLowerTime, randUpperTime], all, nontimebound", assertEqualKVs(ctx, rocksdb, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, false))
t.Run("kv (randLowerTime, randUpperTime], latest, timebound", assertEqualKVs(ctx, rocksdb, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, true))
t.Run("kv (randLowerTime, randUpperTime], all, timebound", assertEqualKVs(ctx, rocksdb, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, true))
}
Loading

0 comments on commit 959f577

Please sign in to comment.