Skip to content

Commit

Permalink
kvserver/gc: replace oldGC based tests with dedicated
Browse files Browse the repository at this point in the history
Previously, gc tests used old gc implementation as a source of
truth to validate gc behaviour. This approach is hard to maintain
as we need to add new classes of objects to it (range keys).
This diff adds a new test that uses assertions on key history
to validate outcome of GC.

Release note: None
  • Loading branch information
aliher1911 committed Jun 27, 2022
1 parent 4ee236c commit ccb8ef1
Show file tree
Hide file tree
Showing 5 changed files with 1,068 additions and 64 deletions.
170 changes: 142 additions & 28 deletions pkg/kv/kvserver/gc/data_distribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package gc
import (
"context"
"fmt"
"math"
"math/rand"
"sort"
"testing"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -65,7 +67,8 @@ func (ds dataDistribution) setupTest(
}
err := storage.MVCCPut(ctx, eng, &ms, kv.Key.Key, ts,
hlc.ClockTimestamp{}, roachpb.Value{RawBytes: kv.Value}, txn)
require.NoError(t, err)
require.NoError(t, err, "failed to insert value for key %s, value length=%d",
kv.Key.String(), len(kv.Value))
}
if !kv.Key.Timestamp.Less(maxTs) {
maxTs = kv.Key.Timestamp
Expand All @@ -79,62 +82,134 @@ func (ds dataDistribution) setupTest(
return ms
}

// maxRetriesAllowed is limiting how many times we could retry when generating
// keys and timestamps for objects that are restricted by some criteria (e.g.
// keys are unique, timestamps shouldn't be duplicate in history, intents
// should be newer than range tombstones). If distribution spec is too
// restrictive it may limit maximum permissive objects and generation would loop
// infinitely. Once this threshold is reached, generator will panic and stop
// test or benchmark with meaningful message instead of timeout.
const maxRetriesAllowed = 1000

// newDataDistribution constructs a dataDistribution from various underlying
// distributions.
func newDataDistribution(
tsDist func() hlc.Timestamp,
minIntentTs, maxOldIntentTs hlc.Timestamp,
keyDist func() roachpb.Key,
valueDist func() roachpb.Value,
versionsPerKey func() int,
intentFrac float64,
oldIntentFrac float64, // within intents(!)
totalKeys int,
rng *rand.Rand,
) dataDistribution {
// TODO(ajwerner): provide a mechanism to control the rate of expired intents
// or the intent age. Such a knob would likely require decoupling intents from
// other keys.
var (
remaining = totalKeys
key roachpb.Key
seen = map[string]struct{}{}
// Remaining values (all versions of all keys together with intents).
remaining = totalKeys
// Key for the objects currently emitted (if versions are not empty).
key roachpb.Key
// Set of key.String() to avoid generating data for the same key multiple
// times.
seen = map[string]struct{}{}
// Pending timestamps for current key sorted in ascending order.
timestamps []hlc.Timestamp
haveIntent bool
// If we should have an intent at the start of history.
hasIntent bool
)
return func() (storage.MVCCKeyValue, *roachpb.Transaction, bool) {
if remaining == 0 {
return storage.MVCCKeyValue{}, nil, false
}
defer func() { remaining-- }()
for len(timestamps) == 0 {

generatePointKey := func() (nextKey roachpb.Key, keyTimestamps []hlc.Timestamp, hasIntent bool) {
hasIntent = rng.Float64() < intentFrac
oldIntent := hasIntent && rng.Float64() < oldIntentFrac
for retries := 0; len(keyTimestamps) == 0; retries++ {
if retries > maxRetriesAllowed {
panic("generation rules are too restrictive, can't generate more data")
}
versions := versionsPerKey()
if versions == 0 {
continue
}
if versions > remaining {
versions = remaining
}
timestamps = make([]hlc.Timestamp, 0, versions)
keyTimestamps = make([]hlc.Timestamp, 0, versions)
for i := 0; i < versions; i++ {
timestamps = append(timestamps, tsDist())
keyTimestamps = append(keyTimestamps, tsDist())
}
sort.Slice(timestamps, func(i, j int) bool {
return timestamps[i].Less(timestamps[j])
sort.Slice(keyTimestamps, func(i, j int) bool {
return keyTimestamps[i].Less(keyTimestamps[j])
})
for {
key = keyDist()
sk := string(key)
prevTs := hlc.Timestamp{WallTime: math.MaxInt64}
duplicate := false
for _, ts := range keyTimestamps {
if ts.Equal(prevTs) {
duplicate = true
break
}
prevTs = ts
}
if duplicate {
keyTimestamps = nil
continue
}
lastTs := keyTimestamps[len(keyTimestamps)-1]
if hasIntent {
// Last value (intent) is older than min intent threshold.
if lastTs.LessEq(minIntentTs) {
keyTimestamps = nil
continue
}
// Intent ts is higher than max allowed old intent.
if oldIntent && maxOldIntentTs.Less(lastTs) {
keyTimestamps = nil
continue
}
// Intent ts is lower than min allowed for non-pushed intents.
if !oldIntent && lastTs.LessEq(maxOldIntentTs) {
keyTimestamps = nil
continue
}
}
for ; ; retries++ {
if retries > maxRetriesAllowed {
panic("generation rules are too restrictive, can't generate more data")
}
nextKey = keyDist()
sk := string(nextKey)
if _, ok := seen[sk]; ok {
continue
}
seen[sk] = struct{}{}
break
}
haveIntent = rng.Float64() < intentFrac
retries = 0
}
return nextKey, keyTimestamps, hasIntent
}

return func() (storage.MVCCKeyValue, *roachpb.Transaction, bool) {
if remaining == 0 {
// Throw away temp key data, because we reached the end of sequence.
seen = nil
return storage.MVCCKeyValue{}, nil, false
}
defer func() { remaining-- }()

if len(timestamps) == 0 {
// Loop because we can have duplicate keys or unacceptable values, in that
// case we retry key from scratch.
for len(timestamps) == 0 {
key, timestamps, hasIntent = generatePointKey()
}
seen[string(key)] = struct{}{}
}
ts := timestamps[0]
timestamps = timestamps[1:]
var txn *roachpb.Transaction
if len(timestamps) == 0 && haveIntent {
// On the last version, we generate a transaction as needed.
if len(timestamps) == 0 && hasIntent {
txn = &roachpb.Transaction{
Status: roachpb.PENDING,
ReadTimestamp: ts,
Expand All @@ -161,23 +236,26 @@ type distSpec interface {
// uniformDistSpec is a distSpec which represents uniform distributions over its
// various dimensions.
type uniformDistSpec struct {
tsFrom, tsTo int64 // seconds
tsSecFrom, tsSecTo int64 // seconds
tsSecMinIntent, tsSecOldIntentTo int64
keySuffixMin, keySuffixMax int
valueLenMin, valueLenMax int
deleteFrac float64
keysPerValueMin, keysPerValueMax int
intentFrac float64
intentFrac, oldIntentFrac float64
}

var _ distSpec = uniformDistSpec{}

func (ds uniformDistSpec) dist(maxRows int, rng *rand.Rand) dataDistribution {
return newDataDistribution(
uniformTimestampDistribution(ds.tsFrom*time.Second.Nanoseconds(), ds.tsTo*time.Second.Nanoseconds(), rng),
uniformTableKeyDistribution(ds.desc().StartKey.AsRawKey(), ds.keySuffixMin, ds.keySuffixMax, rng),
uniformValueDistribution(ds.valueLenMin, ds.valueLenMax, ds.deleteFrac, rng),
uniformTimestampDistribution(ds.tsSecFrom*time.Second.Nanoseconds(), ds.tsSecTo*time.Second.Nanoseconds(), rng),
hlc.Timestamp{WallTime: ds.tsSecMinIntent * time.Second.Nanoseconds()},
hlc.Timestamp{WallTime: ds.tsSecOldIntentTo * time.Second.Nanoseconds()},
uniformTableStringKeyDistribution(ds.desc().StartKey.AsRawKey(), ds.keySuffixMin, ds.keySuffixMax, rng),
uniformValueStringDistribution(ds.valueLenMin, ds.valueLenMax, ds.deleteFrac, rng),
uniformValuesPerKey(ds.keysPerValueMin, ds.keysPerValueMax, rng),
ds.intentFrac,
ds.intentFrac, ds.oldIntentFrac,
maxRows,
rng,
)
Expand All @@ -198,7 +276,7 @@ func (ds uniformDistSpec) String() string {
"valueLen=[%d,%d],"+
"keysPerValue=[%d,%d],"+
"deleteFrac=%f,intentFrac=%f",
ds.tsFrom, ds.tsTo,
ds.tsSecFrom, ds.tsSecTo,
ds.keySuffixMin, ds.keySuffixMax,
ds.valueLenMin, ds.valueLenMax,
ds.keysPerValueMin, ds.keysPerValueMax,
Expand Down Expand Up @@ -239,6 +317,24 @@ func uniformValueDistribution(
}
}

// returns a uniform length random value distribution.
func uniformValueStringDistribution(
min, max int, deleteFrac float64, rng *rand.Rand,
) func() roachpb.Value {
if min > max {
panic(fmt.Errorf("min (%d) > max (%d)", min, max))
}
n := (max - min) + 1
return func() roachpb.Value {
if rng.Float64() < deleteFrac {
return roachpb.Value{}
}
var v roachpb.Value
v.SetString(randutil.RandString(rng, min+rng.Intn(n), randutil.PrintableKeyAlphabet))
return v
}
}

func uniformValuesPerKey(valuesPerKeyMin, valuesPerKeyMax int, rng *rand.Rand) func() int {
if valuesPerKeyMin > valuesPerKeyMax {
panic(fmt.Errorf("min (%d) > max (%d)", valuesPerKeyMin, valuesPerKeyMax))
Expand All @@ -260,3 +356,21 @@ func uniformTableKeyDistribution(
return encoding.EncodeBytesAscending(prefix[0:len(prefix):len(prefix)], randData)
}
}

// TODO(oleg): Suppress lint for now, check how reduced byte choice affects
// performance of tests.
var _ = uniformTableKeyDistribution

func uniformTableStringKeyDistribution(
prefix roachpb.Key, suffixMin, suffixMax int, rng *rand.Rand,
) func() roachpb.Key {
if suffixMin > suffixMax {
panic(fmt.Errorf("suffixMin (%d) > suffixMax (%d)", suffixMin, suffixMax))
}
n := (suffixMax - suffixMin) + 1
return func() roachpb.Key {
lenSuffix := suffixMin + rng.Intn(n)
key := randutil.RandString(rng, lenSuffix, randutil.PrintableKeyAlphabet)
return encoding.EncodeBytesAscending(prefix[0:len(prefix):len(prefix)], []byte(key))
}
}
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/gc/gc_old_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func runGCOld(
intentExp := now.Add(-options.IntentAgeThreshold.Nanoseconds(), 0)
txnExp := now.Add(-kvserverbase.TxnCleanupThreshold.Nanoseconds(), 0)

gc := MakeGarbageCollector(now, gcTTL)
gc := makeGarbageCollector(now, gcTTL)

if err := gcer.SetGCThreshold(ctx, Threshold{
Key: gc.Threshold,
Expand Down Expand Up @@ -248,9 +248,9 @@ type GarbageCollector struct {
ttl time.Duration
}

// MakeGarbageCollector allocates and returns a new GC, with expiration
// makeGarbageCollector allocates and returns a new GC, with expiration
// computed based on current time and the gc TTL.
func MakeGarbageCollector(now hlc.Timestamp, gcTTL time.Duration) GarbageCollector {
func makeGarbageCollector(now hlc.Timestamp, gcTTL time.Duration) GarbageCollector {
return GarbageCollector{
Threshold: CalculateThreshold(now, gcTTL),
ttl: gcTTL,
Expand Down Expand Up @@ -326,10 +326,11 @@ var (

// TestGarbageCollectorFilter verifies the filter policies for
// different sorts of MVCC keys.
// TODO(oleg): Remove once we don't need old GC.
func TestGarbageCollectorFilter(t *testing.T) {
defer leaktest.AfterTest(t)()
gcA := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, time.Second)
gcB := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, 2*time.Second)
gcA := makeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, time.Second)
gcB := makeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, 2*time.Second)
n := []byte("data")
d := []byte(nil)
testData := []struct {
Expand Down
Loading

0 comments on commit ccb8ef1

Please sign in to comment.