From ccb8ef1db11b62828c00f7f7db5c9f567ad3076a Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Tue, 31 May 2022 22:04:52 +0100 Subject: [PATCH] kvserver/gc: replace oldGC based tests with dedicated Previously, gc tests used old gc implementation as a source of truth to validate gc behaviour. This approach is hard to maintain as we need to add new classes of objects to it (range keys). This diff adds a new test that uses assertions on key history to validate outcome of GC. Release note: None --- pkg/kv/kvserver/gc/data_distribution_test.go | 170 +++++- pkg/kv/kvserver/gc/gc_old_test.go | 11 +- pkg/kv/kvserver/gc/gc_random_test.go | 336 ++++++++++- pkg/kv/kvserver/gc/gc_test.go | 598 ++++++++++++++++++- pkg/util/randutil/rand.go | 17 + 5 files changed, 1068 insertions(+), 64 deletions(-) diff --git a/pkg/kv/kvserver/gc/data_distribution_test.go b/pkg/kv/kvserver/gc/data_distribution_test.go index ac4945c42969..e5b1e5738315 100644 --- a/pkg/kv/kvserver/gc/data_distribution_test.go +++ b/pkg/kv/kvserver/gc/data_distribution_test.go @@ -13,6 +13,7 @@ package gc import ( "context" "fmt" + "math" "math/rand" "sort" "testing" @@ -25,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) @@ -65,7 +67,8 @@ func (ds dataDistribution) setupTest( } err := storage.MVCCPut(ctx, eng, &ms, kv.Key.Key, ts, hlc.ClockTimestamp{}, roachpb.Value{RawBytes: kv.Value}, txn) - require.NoError(t, err) + require.NoError(t, err, "failed to insert value for key %s, value length=%d", + kv.Key.String(), len(kv.Value)) } if !kv.Key.Timestamp.Less(maxTs) { maxTs = kv.Key.Timestamp @@ -79,14 +82,25 @@ func (ds dataDistribution) setupTest( return ms } +// maxRetriesAllowed is limiting how many times we could retry when generating +// keys and timestamps for objects that are restricted by some criteria (e.g. +// keys are unique, timestamps shouldn't be duplicate in history, intents +// should be newer than range tombstones). If distribution spec is too +// restrictive it may limit maximum permissive objects and generation would loop +// infinitely. Once this threshold is reached, generator will panic and stop +// test or benchmark with meaningful message instead of timeout. +const maxRetriesAllowed = 1000 + // newDataDistribution constructs a dataDistribution from various underlying // distributions. func newDataDistribution( tsDist func() hlc.Timestamp, + minIntentTs, maxOldIntentTs hlc.Timestamp, keyDist func() roachpb.Key, valueDist func() roachpb.Value, versionsPerKey func() int, intentFrac float64, + oldIntentFrac float64, // within intents(!) totalKeys int, rng *rand.Rand, ) dataDistribution { @@ -94,18 +108,26 @@ func newDataDistribution( // or the intent age. Such a knob would likely require decoupling intents from // other keys. var ( - remaining = totalKeys - key roachpb.Key - seen = map[string]struct{}{} + // Remaining values (all versions of all keys together with intents). + remaining = totalKeys + // Key for the objects currently emitted (if versions are not empty). + key roachpb.Key + // Set of key.String() to avoid generating data for the same key multiple + // times. + seen = map[string]struct{}{} + // Pending timestamps for current key sorted in ascending order. timestamps []hlc.Timestamp - haveIntent bool + // If we should have an intent at the start of history. + hasIntent bool ) - return func() (storage.MVCCKeyValue, *roachpb.Transaction, bool) { - if remaining == 0 { - return storage.MVCCKeyValue{}, nil, false - } - defer func() { remaining-- }() - for len(timestamps) == 0 { + + generatePointKey := func() (nextKey roachpb.Key, keyTimestamps []hlc.Timestamp, hasIntent bool) { + hasIntent = rng.Float64() < intentFrac + oldIntent := hasIntent && rng.Float64() < oldIntentFrac + for retries := 0; len(keyTimestamps) == 0; retries++ { + if retries > maxRetriesAllowed { + panic("generation rules are too restrictive, can't generate more data") + } versions := versionsPerKey() if versions == 0 { continue @@ -113,28 +135,81 @@ func newDataDistribution( if versions > remaining { versions = remaining } - timestamps = make([]hlc.Timestamp, 0, versions) + keyTimestamps = make([]hlc.Timestamp, 0, versions) for i := 0; i < versions; i++ { - timestamps = append(timestamps, tsDist()) + keyTimestamps = append(keyTimestamps, tsDist()) } - sort.Slice(timestamps, func(i, j int) bool { - return timestamps[i].Less(timestamps[j]) + sort.Slice(keyTimestamps, func(i, j int) bool { + return keyTimestamps[i].Less(keyTimestamps[j]) }) - for { - key = keyDist() - sk := string(key) + prevTs := hlc.Timestamp{WallTime: math.MaxInt64} + duplicate := false + for _, ts := range keyTimestamps { + if ts.Equal(prevTs) { + duplicate = true + break + } + prevTs = ts + } + if duplicate { + keyTimestamps = nil + continue + } + lastTs := keyTimestamps[len(keyTimestamps)-1] + if hasIntent { + // Last value (intent) is older than min intent threshold. + if lastTs.LessEq(minIntentTs) { + keyTimestamps = nil + continue + } + // Intent ts is higher than max allowed old intent. + if oldIntent && maxOldIntentTs.Less(lastTs) { + keyTimestamps = nil + continue + } + // Intent ts is lower than min allowed for non-pushed intents. + if !oldIntent && lastTs.LessEq(maxOldIntentTs) { + keyTimestamps = nil + continue + } + } + for ; ; retries++ { + if retries > maxRetriesAllowed { + panic("generation rules are too restrictive, can't generate more data") + } + nextKey = keyDist() + sk := string(nextKey) if _, ok := seen[sk]; ok { continue } - seen[sk] = struct{}{} break } - haveIntent = rng.Float64() < intentFrac + retries = 0 + } + return nextKey, keyTimestamps, hasIntent + } + + return func() (storage.MVCCKeyValue, *roachpb.Transaction, bool) { + if remaining == 0 { + // Throw away temp key data, because we reached the end of sequence. + seen = nil + return storage.MVCCKeyValue{}, nil, false + } + defer func() { remaining-- }() + + if len(timestamps) == 0 { + // Loop because we can have duplicate keys or unacceptable values, in that + // case we retry key from scratch. + for len(timestamps) == 0 { + key, timestamps, hasIntent = generatePointKey() + } + seen[string(key)] = struct{}{} } ts := timestamps[0] timestamps = timestamps[1:] var txn *roachpb.Transaction - if len(timestamps) == 0 && haveIntent { + // On the last version, we generate a transaction as needed. + if len(timestamps) == 0 && hasIntent { txn = &roachpb.Transaction{ Status: roachpb.PENDING, ReadTimestamp: ts, @@ -161,23 +236,26 @@ type distSpec interface { // uniformDistSpec is a distSpec which represents uniform distributions over its // various dimensions. type uniformDistSpec struct { - tsFrom, tsTo int64 // seconds + tsSecFrom, tsSecTo int64 // seconds + tsSecMinIntent, tsSecOldIntentTo int64 keySuffixMin, keySuffixMax int valueLenMin, valueLenMax int deleteFrac float64 keysPerValueMin, keysPerValueMax int - intentFrac float64 + intentFrac, oldIntentFrac float64 } var _ distSpec = uniformDistSpec{} func (ds uniformDistSpec) dist(maxRows int, rng *rand.Rand) dataDistribution { return newDataDistribution( - uniformTimestampDistribution(ds.tsFrom*time.Second.Nanoseconds(), ds.tsTo*time.Second.Nanoseconds(), rng), - uniformTableKeyDistribution(ds.desc().StartKey.AsRawKey(), ds.keySuffixMin, ds.keySuffixMax, rng), - uniformValueDistribution(ds.valueLenMin, ds.valueLenMax, ds.deleteFrac, rng), + uniformTimestampDistribution(ds.tsSecFrom*time.Second.Nanoseconds(), ds.tsSecTo*time.Second.Nanoseconds(), rng), + hlc.Timestamp{WallTime: ds.tsSecMinIntent * time.Second.Nanoseconds()}, + hlc.Timestamp{WallTime: ds.tsSecOldIntentTo * time.Second.Nanoseconds()}, + uniformTableStringKeyDistribution(ds.desc().StartKey.AsRawKey(), ds.keySuffixMin, ds.keySuffixMax, rng), + uniformValueStringDistribution(ds.valueLenMin, ds.valueLenMax, ds.deleteFrac, rng), uniformValuesPerKey(ds.keysPerValueMin, ds.keysPerValueMax, rng), - ds.intentFrac, + ds.intentFrac, ds.oldIntentFrac, maxRows, rng, ) @@ -198,7 +276,7 @@ func (ds uniformDistSpec) String() string { "valueLen=[%d,%d],"+ "keysPerValue=[%d,%d],"+ "deleteFrac=%f,intentFrac=%f", - ds.tsFrom, ds.tsTo, + ds.tsSecFrom, ds.tsSecTo, ds.keySuffixMin, ds.keySuffixMax, ds.valueLenMin, ds.valueLenMax, ds.keysPerValueMin, ds.keysPerValueMax, @@ -239,6 +317,24 @@ func uniformValueDistribution( } } +// returns a uniform length random value distribution. +func uniformValueStringDistribution( + min, max int, deleteFrac float64, rng *rand.Rand, +) func() roachpb.Value { + if min > max { + panic(fmt.Errorf("min (%d) > max (%d)", min, max)) + } + n := (max - min) + 1 + return func() roachpb.Value { + if rng.Float64() < deleteFrac { + return roachpb.Value{} + } + var v roachpb.Value + v.SetString(randutil.RandString(rng, min+rng.Intn(n), randutil.PrintableKeyAlphabet)) + return v + } +} + func uniformValuesPerKey(valuesPerKeyMin, valuesPerKeyMax int, rng *rand.Rand) func() int { if valuesPerKeyMin > valuesPerKeyMax { panic(fmt.Errorf("min (%d) > max (%d)", valuesPerKeyMin, valuesPerKeyMax)) @@ -260,3 +356,21 @@ func uniformTableKeyDistribution( return encoding.EncodeBytesAscending(prefix[0:len(prefix):len(prefix)], randData) } } + +// TODO(oleg): Suppress lint for now, check how reduced byte choice affects +// performance of tests. +var _ = uniformTableKeyDistribution + +func uniformTableStringKeyDistribution( + prefix roachpb.Key, suffixMin, suffixMax int, rng *rand.Rand, +) func() roachpb.Key { + if suffixMin > suffixMax { + panic(fmt.Errorf("suffixMin (%d) > suffixMax (%d)", suffixMin, suffixMax)) + } + n := (suffixMax - suffixMin) + 1 + return func() roachpb.Key { + lenSuffix := suffixMin + rng.Intn(n) + key := randutil.RandString(rng, lenSuffix, randutil.PrintableKeyAlphabet) + return encoding.EncodeBytesAscending(prefix[0:len(prefix):len(prefix)], []byte(key)) + } +} diff --git a/pkg/kv/kvserver/gc/gc_old_test.go b/pkg/kv/kvserver/gc/gc_old_test.go index b60e5f530485..8c5ab7133ad7 100644 --- a/pkg/kv/kvserver/gc/gc_old_test.go +++ b/pkg/kv/kvserver/gc/gc_old_test.go @@ -58,7 +58,7 @@ func runGCOld( intentExp := now.Add(-options.IntentAgeThreshold.Nanoseconds(), 0) txnExp := now.Add(-kvserverbase.TxnCleanupThreshold.Nanoseconds(), 0) - gc := MakeGarbageCollector(now, gcTTL) + gc := makeGarbageCollector(now, gcTTL) if err := gcer.SetGCThreshold(ctx, Threshold{ Key: gc.Threshold, @@ -248,9 +248,9 @@ type GarbageCollector struct { ttl time.Duration } -// MakeGarbageCollector allocates and returns a new GC, with expiration +// makeGarbageCollector allocates and returns a new GC, with expiration // computed based on current time and the gc TTL. -func MakeGarbageCollector(now hlc.Timestamp, gcTTL time.Duration) GarbageCollector { +func makeGarbageCollector(now hlc.Timestamp, gcTTL time.Duration) GarbageCollector { return GarbageCollector{ Threshold: CalculateThreshold(now, gcTTL), ttl: gcTTL, @@ -326,10 +326,11 @@ var ( // TestGarbageCollectorFilter verifies the filter policies for // different sorts of MVCC keys. +// TODO(oleg): Remove once we don't need old GC. func TestGarbageCollectorFilter(t *testing.T) { defer leaktest.AfterTest(t)() - gcA := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, time.Second) - gcB := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, 2*time.Second) + gcA := makeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, time.Second) + gcB := makeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, 2*time.Second) n := []byte("data") d := []byte(nil) testData := []struct { diff --git a/pkg/kv/kvserver/gc/gc_random_test.go b/pkg/kv/kvserver/gc/gc_random_test.go index 0bb630d395b8..218392005146 100644 --- a/pkg/kv/kvserver/gc/gc_random_test.go +++ b/pkg/kv/kvserver/gc/gc_random_test.go @@ -15,34 +15,40 @@ import ( "fmt" "math/rand" "sort" + "strings" "testing" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// randomRunGCTestSpec specifies a distribution for to create random data for -// testing Run +// randomRunGCTestSpec specifies a distribution to create random data for +// running randomized GC tests as well as benchmarking new code vs preserved +// legacy GC. type randomRunGCTestSpec struct { - ds distSpec - now hlc.Timestamp - ttl int32 // seconds + ds distSpec + now hlc.Timestamp + ttlSec int32 + intentAgeSec int32 } var ( fewVersionsTinyRows = uniformDistSpec{ - tsFrom: 0, tsTo: 100, - keySuffixMin: 2, keySuffixMax: 3, + tsSecFrom: 1, tsSecTo: 100, + keySuffixMin: 2, keySuffixMax: 6, valueLenMin: 1, valueLenMax: 1, deleteFrac: 0, keysPerValueMin: 1, keysPerValueMax: 2, intentFrac: .1, } someVersionsMidSizeRows = uniformDistSpec{ - tsFrom: 0, tsTo: 100, + tsSecFrom: 1, tsSecTo: 100, keySuffixMin: 8, keySuffixMax: 8, valueLenMin: 8, valueLenMax: 16, deleteFrac: .1, @@ -50,13 +56,23 @@ var ( intentFrac: .1, } lotsOfVersionsMidSizeRows = uniformDistSpec{ - tsFrom: 0, tsTo: 100, + tsSecFrom: 1, tsSecTo: 100, keySuffixMin: 8, keySuffixMax: 8, valueLenMin: 8, valueLenMax: 16, deleteFrac: .1, keysPerValueMin: 1000, keysPerValueMax: 1000000, intentFrac: .1, } + // This spec is identical to someVersionsMidSizeRows except for number of + // intents. + someVersionsMidSizeRowsLotsOfIntents = uniformDistSpec{ + tsSecFrom: 1, tsSecTo: 100, + keySuffixMin: 8, keySuffixMax: 8, + valueLenMin: 8, valueLenMax: 16, + deleteFrac: .1, + keysPerValueMin: 1, keysPerValueMax: 100, + intentFrac: 1, + } ) const intentAgeThreshold = 2 * time.Hour @@ -69,8 +85,6 @@ func TestRunNewVsOld(t *testing.T) { ctx := context.Background() const N = 100000 - someVersionsMidSizeRowsLotsOfIntents := someVersionsMidSizeRows - someVersionsMidSizeRowsLotsOfIntents.intentFrac = 1 for _, tc := range []randomRunGCTestSpec{ { ds: someVersionsMidSizeRowsLotsOfIntents, @@ -79,17 +93,17 @@ func TestRunNewVsOld(t *testing.T) { WallTime: (intentAgeThreshold + 100*time.Second).Nanoseconds(), }, // GC everything beyond intent resolution threshold - ttl: int32(intentAgeThreshold.Seconds()), + ttlSec: int32(intentAgeThreshold.Seconds()), }, { ds: someVersionsMidSizeRows, now: hlc.Timestamp{ WallTime: 100 * time.Second.Nanoseconds(), }, - ttl: 1, + ttlSec: 1, }, } { - t.Run(fmt.Sprintf("%v@%v,ttl=%v", tc.ds, tc.now, tc.ttl), func(t *testing.T) { + t.Run(fmt.Sprintf("%v@%v,ttlSec=%v", tc.ds, tc.now, tc.ttlSec), func(t *testing.T) { eng := storage.NewDefaultInMemForTesting() defer eng.Close() @@ -97,7 +111,7 @@ func TestRunNewVsOld(t *testing.T) { snap := eng.NewSnapshot() oldGCer := makeFakeGCer() - ttl := time.Duration(tc.ttl) * time.Second + ttl := time.Duration(tc.ttlSec) * time.Second newThreshold := CalculateThreshold(tc.now, ttl) gcInfoOld, err := runGCOld(ctx, tc.ds.desc(), snap, tc.now, newThreshold, RunOptions{IntentAgeThreshold: intentAgeThreshold}, ttl, @@ -133,9 +147,13 @@ func BenchmarkRun(b *testing.B) { runGCFunc = runGCOld } snap := eng.NewSnapshot() - ttl := time.Duration(spec.ttl) * time.Second + ttl := time.Duration(spec.ttlSec) * time.Second + intentThreshold := intentAgeThreshold + if spec.intentAgeSec > 0 { + intentThreshold = time.Duration(spec.intentAgeSec) * time.Second + } return runGCFunc(ctx, spec.ds.desc(), snap, spec.now, - CalculateThreshold(spec.now, ttl), RunOptions{IntentAgeThreshold: intentAgeThreshold}, + CalculateThreshold(spec.now, ttl), RunOptions{IntentAgeThreshold: intentThreshold}, ttl, NoopGCer{}, func(ctx context.Context, intents []roachpb.Intent) error { @@ -162,9 +180,9 @@ func BenchmarkRun(b *testing.B) { ) (specs []randomRunGCTestSpec) { for _, ttl := range ttls { specs = append(specs, randomRunGCTestSpec{ - ds: ds, - now: now, - ttl: ttl, + ds: ds, + now: now, + ttlSec: ttl, }) } return specs @@ -183,6 +201,276 @@ func BenchmarkRun(b *testing.B) { } } +func TestNewVsInvariants(t *testing.T) { + ctx := context.Background() + const N = 100000 + + for _, tc := range []randomRunGCTestSpec{ + { + ds: someVersionsMidSizeRowsLotsOfIntents, + // Current time in the future enough for intents to get resolved + now: hlc.Timestamp{ + WallTime: (intentAgeThreshold + 100*time.Second).Nanoseconds(), + }, + // GC everything beyond intent resolution threshold + ttlSec: int32(intentAgeThreshold.Seconds()), + }, + { + ds: someVersionsMidSizeRows, + now: hlc.Timestamp{ + WallTime: 100 * time.Second.Nanoseconds(), + }, + ttlSec: 1, + }, + } { + t.Run(fmt.Sprintf("%v@%v,ttl=%vsec", tc.ds, tc.now, tc.ttlSec), func(t *testing.T) { + rng := rand.New(rand.NewSource(1)) + eng := storage.NewDefaultInMemForTesting() + defer eng.Close() + + tc.ds.dist(N, rng).setupTest(t, eng, *tc.ds.desc()) + beforeGC := eng.NewSnapshot() + + // Run GCer over snapshot. + ttl := time.Duration(tc.ttlSec) * time.Second + gcThreshold := CalculateThreshold(tc.now, ttl) + intentThreshold := tc.now.Add(-intentAgeThreshold.Nanoseconds(), 0) + + gcer := makeFakeGCer() + gcInfoNew, err := Run(ctx, tc.ds.desc(), beforeGC, tc.now, + gcThreshold, RunOptions{IntentAgeThreshold: intentAgeThreshold}, ttl, + &gcer, + gcer.resolveIntents, + gcer.resolveIntentsAsync) + require.NoError(t, err) + + // Handle GC + resolve intents. + var stats enginepb.MVCCStats + require.NoError(t, + storage.MVCCGarbageCollect(ctx, eng, &stats, gcer.requests(), gcThreshold)) + for _, i := range gcer.intents { + l := roachpb.LockUpdate{ + Span: roachpb.Span{Key: i.Key}, + Txn: i.Txn, + Status: roachpb.ABORTED, + } + _, err := storage.MVCCResolveWriteIntent(ctx, eng, &stats, l) + require.NoError(t, err, "failed to resolve intent") + } + + assertLiveData(t, eng, beforeGC, *tc.ds.desc(), tc.now, gcThreshold, intentThreshold, ttl, + gcInfoNew) + }) + } +} + +// assertLiveData will create a stream of expected values based on full data +// set contained in provided "before" reader and compare it with the "after" +// reader that contains data after applying GC request. +// Generated expected values are produces by simulating GC in a naive way where +// each value is considered live if: +// - it is a value or tombstone and its timestamp is higher than gc threshold +// - it is a first value at or below gc threshold and there are no deletions +// between gc threshold and the value +func assertLiveData( + t *testing.T, + after, before storage.Reader, + desc roachpb.RangeDescriptor, + now, gcThreshold, intentThreshold hlc.Timestamp, + gcTTL time.Duration, + gcInfo Info, +) { + failureDetails := func(key storage.MVCCKey) string { + return fmt.Sprintf("key=%s, GC time=%s, intent time=%s, key history=%s", key.String(), + gcThreshold.String(), intentThreshold.String(), getKeyHistory(t, before, key.Key)) + } + + // Validation works on original data applying simple GC eligibility to key + // history and only returning remaining elements and accounting for remaining + // ones as gc.Info. + expInfo := Info{ + Now: now, + GCTTL: gcTTL, + Threshold: gcThreshold, + } + pointIt := before.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, + storage.IterOptions{ + LowerBound: desc.StartKey.AsRawKey(), + UpperBound: desc.EndKey.AsRawKey(), + KeyTypes: storage.IterKeyTypePointsOnly, + }) + defer pointIt.Close() + pointIt.SeekGE(storage.MVCCKey{Key: desc.StartKey.AsRawKey()}) + pointExpectationsGenerator := getExpectationsGenerator(t, pointIt, gcThreshold, intentThreshold, + &expInfo) + + // Loop over engine data after applying GCer requests and compare with + // expected ranges. + itAfter := after.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + LowerBound: desc.StartKey.AsRawKey(), + UpperBound: desc.EndKey.AsRawKey(), + KeyTypes: storage.IterKeyTypePointsOnly, + }) + defer itAfter.Close() + + itAfter.SeekGE(storage.MVCCKey{Key: desc.StartKey.AsRawKey()}) + eKV, dataOk := pointExpectationsGenerator() + for { + ok, err := itAfter.Valid() + require.NoError(t, err, "failed to iterate engine after GC") + if !ok && !dataOk { + break + } + if !ok { + require.Failf(t, "reached end of GC'd engine data, but expect more", "missing key: %s", + failureDetails(eKV.Key)) + } + if !dataOk { + require.Failf(t, "reached end of expected data bug engine contains more", "ungc'd key: %s", + failureDetails(itAfter.UnsafeKey())) + } + + switch eKV.Key.Compare(itAfter.UnsafeKey()) { + case 1: + assert.Failf(t, "key was not collected", failureDetails(itAfter.UnsafeKey())) + itAfter.Next() + case -1: + assert.Failf(t, "key was collected by mistake", failureDetails(eKV.Key)) + eKV, dataOk = pointExpectationsGenerator() + default: + itAfter.Next() + eKV, dataOk = pointExpectationsGenerator() + } + } + + require.EqualValues(t, expInfo, gcInfo, "collected gc info mismatch") +} + +func getExpectationsGenerator( + t *testing.T, it storage.MVCCIterator, gcThreshold, intentThreshold hlc.Timestamp, expInfo *Info, +) func() (storage.MVCCKeyValue, bool) { + var pending []storage.MVCCKeyValue + return func() (storage.MVCCKeyValue, bool) { + for { + // First return all pending history for the previous key. + if len(pending) > 0 { + defer func() { + pending = pending[1:] + }() + return pending[0], true + } + + // For new key, collect intent and all versions from highest to lowest + // to make a decision. + var baseKey roachpb.Key + var history []storage.MVCCKeyValue + for { + ok, err := it.Valid() + require.NoError(t, err, "failed to read data from unmodified engine") + if !ok { + break + } + k := it.Key() + v := it.Value() + if len(baseKey) == 0 { + baseKey = k.Key + } else if !baseKey.Equal(k.Key) { + break + } + history = append(history, storage.MVCCKeyValue{Key: k, Value: v}) + it.Next() + } + if len(history) == 0 { + return storage.MVCCKeyValue{}, false + } + + // Process key history slice by first filtering intents as needed and then + // applying invariant that values on or above gc threshold should remain, + // deletions above threshold should remain. + // All eligible elements are copied to pending for emitting. + stop := false + i := 0 + for i < len(history) && !stop { + if history[i].Key.Timestamp.IsEmpty() { + // Intent, need to see if its TS is too old or not. + // We need to emit intents if they are above threshold as they would be ignored by + // resolver. + var meta enginepb.MVCCMetadata + require.NoError(t, protoutil.Unmarshal(history[i].Value, &meta), + "failed to unmarshal txn metadata") + if meta.Timestamp.ToTimestamp().Less(intentThreshold) { + // This is an old intent. Skip intent with proposed value and continue. + expInfo.IntentsConsidered++ + // We always use a new transaction for each intent and consider + // operations successful in testGCer. + expInfo.IntentTxns++ + expInfo.PushTxn++ + expInfo.ResolveTotal++ + } else { + // Intent is not considered as a part of GC removal cycle so we keep + // it intact if it doesn't satisfy push age check. + pending = append(pending, history[i]) + pending = append(pending, history[i+1]) + } + i += 2 + continue + } + + // Apply GC checks to produce expected state. + switch { + case gcThreshold.Less(history[i].Key.Timestamp): + // Any value above threshold including intents that have no timestamp. + pending = append(pending, history[i]) + i++ + case history[i].Key.Timestamp.LessEq(gcThreshold) && len(history[i].Value) > 0: + // First value on or under threshold should be preserved, but the rest + // of history should be skipped. + pending = append(pending, history[i]) + i++ + stop = true + default: + // This is ts <= threshold and v == nil + stop = true + } + } + + // Remaining part of the history is removed, so accumulate it as gc stats. + if i < len(history) { + expInfo.NumKeysAffected++ + for ; i < len(history); i++ { + expInfo.AffectedVersionsKeyBytes += int64(history[i].Key.EncodedSize()) + expInfo.AffectedVersionsValBytes += int64(len(history[i].Value)) + } + } + } + } +} + +func getKeyHistory(t *testing.T, r storage.Reader, key roachpb.Key) string { + var result []string + + it := r.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + LowerBound: key, + UpperBound: key.Next(), + KeyTypes: storage.IterKeyTypePointsOnly, + RangeKeyMaskingBelow: hlc.Timestamp{}, + }) + defer it.Close() + + it.SeekGE(storage.MVCCKey{Key: key}) + for { + ok, err := it.Valid() + require.NoError(t, err, "failed to read engine iterator") + if !ok || !it.UnsafeKey().Key.Equal(key) { + break + } + result = append(result, fmt.Sprintf("P:%s(%d)", it.UnsafeKey().String(), len(it.UnsafeValue()))) + it.Next() + } + + return strings.Join(result, ", ") +} + type fakeGCer struct { gcKeys map[string]roachpb.GCRequest_GCKey threshold Threshold @@ -236,6 +524,14 @@ func (f *fakeGCer) normalize() { f.batches = nil } +func (f *fakeGCer) requests() []roachpb.GCRequest_GCKey { + var reqs []roachpb.GCRequest_GCKey + for _, r := range f.gcKeys { + reqs = append(reqs, r) + } + return reqs +} + func intentLess(a, b *roachpb.Intent) bool { cmp := a.Key.Compare(b.Key) switch { diff --git a/pkg/kv/kvserver/gc/gc_test.go b/pkg/kv/kvserver/gc/gc_test.go index 4cb02651962d..5aadec7e0265 100644 --- a/pkg/kv/kvserver/gc/gc_test.go +++ b/pkg/kv/kvserver/gc/gc_test.go @@ -14,15 +14,22 @@ import ( "bytes" "context" "fmt" + "math" "math/rand" + "regexp" + "sort" + "strconv" + "strings" "testing" "time" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -136,13 +143,15 @@ func TestIntentAgeThresholdSetting(t *testing.T) { fakeGCer := makeFakeGCer() // Test GC desired behavior. - info, err := Run(ctx, &desc, snap, nowTs, nowTs, RunOptions{IntentAgeThreshold: intentLongThreshold}, gcTTL, &fakeGCer, fakeGCer.resolveIntents, + info, err := Run(ctx, &desc, snap, nowTs, nowTs, + RunOptions{IntentAgeThreshold: intentLongThreshold}, gcTTL, &fakeGCer, fakeGCer.resolveIntents, fakeGCer.resolveIntentsAsync) require.NoError(t, err, "GC Run shouldn't fail") assert.Zero(t, info.IntentsConsidered, "Expected no intents considered by GC with default threshold") - info, err = Run(ctx, &desc, snap, nowTs, nowTs, RunOptions{IntentAgeThreshold: intentShortThreshold}, gcTTL, &fakeGCer, fakeGCer.resolveIntents, + info, err = Run(ctx, &desc, snap, nowTs, nowTs, + RunOptions{IntentAgeThreshold: intentShortThreshold}, gcTTL, &fakeGCer, fakeGCer.resolveIntents, fakeGCer.resolveIntentsAsync) require.NoError(t, err, "GC Run shouldn't fail") assert.Equal(t, 1, info.IntentsConsidered, @@ -189,7 +198,8 @@ func TestIntentCleanupBatching(t *testing.T) { // Base GCer will cleanup all intents in one go and its result is used as a baseline // to compare batched runs for checking completeness. baseGCer := makeFakeGCer() - _, err := Run(ctx, &desc, snap, nowTs, nowTs, RunOptions{IntentAgeThreshold: intentAgeThreshold}, gcTTL, &baseGCer, baseGCer.resolveIntents, + _, err := Run(ctx, &desc, snap, nowTs, nowTs, RunOptions{IntentAgeThreshold: intentAgeThreshold}, + gcTTL, &baseGCer, baseGCer.resolveIntents, baseGCer.resolveIntentsAsync) if err != nil { t.Fatal("Can't prepare test fixture. Non batched GC run fails.") @@ -199,7 +209,8 @@ func TestIntentCleanupBatching(t *testing.T) { var batchSize int64 = 7 fakeGCer := makeFakeGCer() info, err := Run(ctx, &desc, snap, nowTs, nowTs, - RunOptions{IntentAgeThreshold: intentAgeThreshold, MaxIntentsPerIntentCleanupBatch: batchSize}, gcTTL, + RunOptions{IntentAgeThreshold: intentAgeThreshold, MaxIntentsPerIntentCleanupBatch: batchSize}, + gcTTL, &fakeGCer, fakeGCer.resolveIntents, fakeGCer.resolveIntentsAsync) require.NoError(t, err, "GC Run shouldn't fail") maxIntents := 0 @@ -244,8 +255,10 @@ func (r *testResolver) assertInvariants(t *testing.T, opts intentBatcherOptions) } // Last key could overspill over limit, but that's ok. if opts.maxIntentKeyBytesPerIntentCleanupBatch > 0 { - require.Less(t, int64(totalKeyBytes-len(batch[len(batch)-1].Key)), opts.maxIntentKeyBytesPerIntentCleanupBatch, - fmt.Sprintf("Byte limit was exceeded for more than the last key in batch %d/%d", i+1, len(*r))) + require.Less(t, int64(totalKeyBytes-len(batch[len(batch)-1].Key)), + opts.maxIntentKeyBytesPerIntentCleanupBatch, + fmt.Sprintf("Byte limit was exceeded for more than the last key in batch %d/%d", i+1, + len(*r))) } if opts.maxTxnsPerIntentCleanupBatch > 0 { require.LessOrEqual(t, int64(len(txnMap)), opts.maxTxnsPerIntentCleanupBatch, @@ -273,8 +286,10 @@ func generateScattered(total int, txns int, maxKeySize int, random *rand.Rand) [ var intents []testIntent for len(intents) < total { intents = append(intents, - testIntent{randomLengthKey(random, maxKeySize), - &enginepb.MVCCMetadata{Txn: &enginepb.TxnMeta{ID: txnIds[random.Intn(len(txnIds))]}}}) + testIntent{ + randomLengthKey(random, maxKeySize), + &enginepb.MVCCMetadata{Txn: &enginepb.TxnMeta{ID: txnIds[random.Intn(len(txnIds))]}}, + }) } return intents } @@ -298,8 +313,10 @@ func generateSequential(total int, maxTxnSize int, maxKeySize int, random *rand. txnUUID = uuid.FastMakeV4() } intents = append(intents, - testIntent{randomLengthKey(random, maxKeySize), - &enginepb.MVCCMetadata{Txn: &enginepb.TxnMeta{ID: txnUUID}}}) + testIntent{ + randomLengthKey(random, maxKeySize), + &enginepb.MVCCMetadata{Txn: &enginepb.TxnMeta{ID: txnUUID}}, + }) } return intents } @@ -320,7 +337,8 @@ func TestGCIntentBatcher(t *testing.T) { for _, batchSize := range []int64{0, 1, 10, 100, 1000} { for _, byteCount := range []int64{0, 1, 10, 100, 1000} { for _, txnCount := range []int64{0, 1, 10, 100, 1000} { - t.Run(fmt.Sprintf("batch=%d,bytes=%d,txns=%d,txn_intents=%s", batchSize, byteCount, txnCount, s.name), func(t *testing.T) { + t.Run(fmt.Sprintf("batch=%d,bytes=%d,txns=%d,txn_intents=%s", batchSize, byteCount, + txnCount, s.name), func(t *testing.T) { info := Info{} opts := intentBatcherOptions{ maxIntentsPerIntentCleanupBatch: batchSize, @@ -379,3 +397,561 @@ func TestGCIntentBatcherErrorHandling(t *testing.T) { cancel() require.Error(t, batcher.maybeFlushPendingIntents(ctx)) } + +/* +Table data format: +(see data below for examples referenced in brackets after each explanation) + +Top row contains keys separated by one or more spaces [a to j]. + +Data rows start with timestamp followed by values aligned with the first +character of a key in top row. + +Timestamp could be prefixed with '>' to set GC timestamp [gc threshold is 5]. +Intersection of key and timestamp defines value for the key [a@9 == 'A', +a@2 == 'c']. +If value contains any upper case characters it is expected that its key should +not be garbage collected [A, B, D, E]. + +Special value of . means a tombstone that should be collected [a@4 should be +garbage collected]. +Special value of * means a tombstone that should not be collected [tombstone c@8 +should not be garbage collected]. + +Values prefixed with ! are intents and follow the same expectation rules as +other values. + +Empty lines and horizontal and vertical axis separators are ignored. + +var data = ` + | a b c d e f g h i j +---+---------------------- + 9 | A !E + 8 | * + 7 | + 6 | B D +>5 | + 4 | . F + 3 | + 2 | c + 1 | +` +*/ + +var singleValueData = ` + | a b c d e f g h i j +---+---------------------- + 9 | + 8 | + 7 | + 6 | C +>5 | B + 4 | A + 3 | + 2 | + 1 | +` + +var multipleValuesNewerData = ` + | a b c d e f g h i j +---+---------------------- + 9 | + 8 | A C E + 7 | + 6 | F +>5 | D + 4 | B + 3 | + 2 | + 1 | +` + +var multipleValuesOlderData = ` + | a b c d e f g h i j +---+---------------------- + 9 | + 8 | + 7 | + 6 | E +>5 | C + 4 | A + 3 | + 2 | b d F + 1 | +` + +var deleteData = ` + | a b c d e f g h i j +---+---------------------- + 9 | + 8 | + 7 | + 6 | * +>5 | . + 4 | . + 3 | + 2 | a b C + 1 | +` + +var deleteWithNewerData = ` + | a b c d e f g h i j +---+---------------------- + 9 | + 8 | A C E + 7 | + 6 | * +>5 | . + 4 | . + 3 | + 2 | b d F + 1 | +` + +var multipleValuesData = ` + | a b c d e f g h i j +---+---------------------- + 9 | + 8 | A F * + 7 | + 6 | B * J +>5 | + 4 | C G K + 3 | d h . + 2 | e i m + 1 | +` + +var intents = ` + | a b c d e f g h i j +---+---------------------- + 9 | + 8 | + 7 | + 6 | !C +>5 | !B + 4 | !A + 3 | + 2 | + 1 | +` + +var intentsAfterData = ` + | a b c d e f g h i j +---+---------------------- + 9 | + 8 | !A !C !E + 7 | + 6 | F +>5 | D + 4 | B + 3 | + 2 | + 1 | +` + +var intentsAfterDelete = ` + | a b c d e f g h i j +---+---------------------- + 9 | + 8 | !A !C !E + 7 | + 6 | * +>5 | . + 4 | . + 3 | + 2 | b d F + 1 | +` + +func TestGC(t *testing.T) { + for _, d := range []struct { + name string + data string + }{ + {"single", singleValueData}, + {"multiple_newer", multipleValuesNewerData}, + {"multiple_older", multipleValuesOlderData}, + {"delete", deleteData}, + {"delete_with_newer", deleteWithNewerData}, + {"multiple_values", multipleValuesData}, + {"intents", intents}, + {"intents_after_data", intentsAfterData}, + {"intents_after_delete", intentsAfterDelete}, + } { + t.Run(d.name, func(t *testing.T) { + runTest(t, d.data) + }) + } +} + +func runTest(t *testing.T, data string) { + ctx := context.Background() + tablePrefix := keys.SystemSQLCodec.TablePrefix(42) + desc := roachpb.RangeDescriptor{ + StartKey: roachpb.RKey(tablePrefix), + EndKey: roachpb.RKey(tablePrefix.PrefixEnd()), + } + + eng := storage.NewDefaultInMemForTesting() + defer eng.Close() + + dataItems, gcTS, now := readTableData(t, desc.StartKey.AsRawKey(), data) + ds := dataItems.fullDistribution() + stats := ds.setupTest(t, eng, desc) + snap := eng.NewSnapshot() + defer snap.Close() + + gcer := makeFakeGCer() + _, err := Run(ctx, &desc, snap, now, gcTS, + RunOptions{IntentAgeThreshold: time.Nanosecond * time.Duration(now.WallTime)}, time.Second, + &gcer, + gcer.resolveIntents, gcer.resolveIntentsAsync) + require.NoError(t, err) + require.Empty(t, gcer.intents, "expecting no intents") + require.NoError(t, storage.MVCCGarbageCollect(ctx, eng, &stats, gcer.requests(), gcTS)) + + ctrlEng := storage.NewDefaultInMemForTesting() + defer eng.Close() + expectedStats := dataItems.liveDistribution().setupTest(t, ctrlEng, desc) + + if log.V(1) { + log.Info(ctx, "Expected data:") + for _, l := range formatTable(engineData(t, ctrlEng, desc), tablePrefix) { + log.Infof(ctx, "%s", l) + } + + log.Info(ctx, "Actual data:") + for _, l := range formatTable(engineData(t, eng, desc), tablePrefix) { + log.Infof(ctx, "%s", l) + } + } + + requireEqualReaders(t, ctrlEng, eng, desc) + require.Equal(t, expectedStats, stats, "mvcc stats don't match the data") +} + +// requireEqualReaders compares data in two readers +func requireEqualReaders( + t *testing.T, exected storage.Reader, actual storage.Reader, desc roachpb.RangeDescriptor, +) { + itExp := exected.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{ + LowerBound: desc.StartKey.AsRawKey(), + UpperBound: desc.EndKey.AsRawKey(), + KeyTypes: storage.IterKeyTypePointsOnly, + RangeKeyMaskingBelow: hlc.Timestamp{}, + }) + defer itExp.Close() + + itActual := actual.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{ + LowerBound: desc.StartKey.AsRawKey(), + UpperBound: desc.EndKey.AsRawKey(), + KeyTypes: storage.IterKeyTypePointsOnly, + RangeKeyMaskingBelow: hlc.Timestamp{}, + }) + defer itActual.Close() + itExp.SeekGE(storage.MVCCKey{Key: desc.StartKey.AsRawKey()}) + itActual.SeekGE(storage.MVCCKey{Key: desc.StartKey.AsRawKey()}) + for { + okExp, err := itExp.Valid() + require.NoError(t, err, "failed to iterate values") + okAct, err := itActual.Valid() + require.NoError(t, err, "failed to iterate values") + if !okExp && !okAct { + break + } + + require.Equal(t, okExp, okAct, "iterators have different number of elements") + require.True(t, itExp.UnsafeKey().Equal(itActual.UnsafeKey()), + "expected key not equal to actual (expected %s, found %s)", itExp.UnsafeKey(), + itActual.UnsafeKey()) + require.True(t, bytes.Equal(itExp.UnsafeValue(), itActual.UnsafeValue()), + "expected value not equal to actual for key %s", itExp.UnsafeKey()) + + itExp.Next() + itActual.Next() + } +} + +// dataItem is element read from test table containing mvcc key value along with +// metadata needed for filtering. +type dataItem struct { + value storage.MVCCKeyValue + txn *roachpb.Transaction + live bool +} + +type tableData []dataItem + +// readTableData reads all table data and returns data slice of items to +// initialize engine, gc timestamp, and now timestamp which is above any +// timestamp in items. items are sorted in ascending order. +func readTableData( + t *testing.T, prefix roachpb.Key, data string, +) (tableData, hlc.Timestamp, hlc.Timestamp) { + lines := strings.Split(data, "\n") + var items []dataItem + + var columnPositions []int + var columnKeys []roachpb.Key + var minLen int + + parseHeader := func(l string) { + // Find keys and their positions from the first line. + fs := strings.Fields(l) + lastPos := 0 + for _, key := range fs { + pos := lastPos + strings.Index(l[lastPos:], key) + lastPos = pos + len(key) + if key == "|" && len(columnKeys) == 0 { + continue + } + columnPositions = append(columnPositions, pos) + var mvccKey roachpb.Key + mvccKey = append(mvccKey, prefix...) + mvccKey = append(mvccKey, key...) + columnKeys = append(columnKeys, mvccKey) + } + minLen = columnPositions[len(columnPositions)-1] + 1 + columnPositions = append(columnPositions, 0) + } + + parsePoint := func(val string, i int, ts hlc.Timestamp) int { + val = strings.TrimSpace(val) + if len(val) > 0 { + value := []byte(val) + // Special meaning for deletions. + if val == "*" || val == "." { + value = nil + } + var txn *roachpb.Transaction + if val[0] == '!' { + value = value[1:] + txn = &roachpb.Transaction{ + Status: roachpb.PENDING, + ReadTimestamp: ts, + GlobalUncertaintyLimit: ts.Next().Next(), + } + txn.ID = uuid.MakeV4() + txn.WriteTimestamp = ts + txn.Key = txn.ID.GetBytes() + } + var v roachpb.Value + if value != nil { + v.SetBytes(value) + } + kv := storage.MVCCKeyValue{ + Key: storage.MVCCKey{ + Key: columnKeys[i], + Timestamp: ts, + }, + Value: v.RawBytes, + } + live := strings.ToLower(val) != val || val == "*" + items = append(items, dataItem{value: kv, txn: txn, live: live}) + } + return i + 1 + } + + var gcTS hlc.Timestamp + var lastTs int64 = math.MaxInt64 + parseTS := func(l string) (ts hlc.Timestamp) { + fs := strings.Fields(l) + tss := fs[0] + // Timestamp starting with '>' is a gc marker. + if tss[0] == '>' { + tss = tss[1:] + defer func() { + gcTS = ts + }() + } + tsInt, err := strconv.ParseInt(tss, 10, 64) + require.NoError(t, err, "Failed to parse timestamp from %s", l) + require.Less(t, tsInt, lastTs, "Timestamps should be decreasing") + lastTs = tsInt + return hlc.Timestamp{WallTime: tsInt} + } + + for _, l := range lines { + if len(l) == 0 || l[0] == '-' { + // Ignore empty lines and table separators. + continue + } + if len(columnPositions) == 0 { + parseHeader(l) + continue + } + + // We extend a line to always have at least characters to read values by + // index without caring to get beyond slice. + if shortBy := minLen - len(l); shortBy > 0 { + l = l + strings.Repeat(" ", shortBy) + } + // Add length to the end of keys to eliminate extra boundary checks. + columnPositions[len(columnPositions)-1] = len(l) + + ts := parseTS(l) + for i := 0; i < len(columnKeys); { + val := l[columnPositions[i]:columnPositions[i+1]] + i = parsePoint(val, i, ts) + } + } + + // Reverse from oldest to newest. + for i, j := 0, len(items)-1; i < j; i, j = i+1, j-1 { + items[i], items[j] = items[j], items[i] + } + + return items, gcTS, items[len(items)-1].value.Key.Timestamp.Add(1, 0) +} + +// fullDistribution creates a data distribution that contains all data read from +// table. +func (d tableData) fullDistribution() dataDistribution { + items := d + return func() (storage.MVCCKeyValue, *roachpb.Transaction, bool) { + if len(items) == 0 { + return storage.MVCCKeyValue{}, nil, false + } + defer func() { items = items[1:] }() + return items[0].value, items[0].txn, true + } +} + +// liveDistribution creates a data distribution from the table data is was only +// marked as live (see table data format above). +func (d tableData) liveDistribution() dataDistribution { + items := d + return func() (storage.MVCCKeyValue, *roachpb.Transaction, bool) { + for { + if len(items) == 0 { + return storage.MVCCKeyValue{}, nil, false + } + if items[0].live { + break + } + items = items[1:] + } + defer func() { items = items[1:] }() + return items[0].value, items[0].txn, true + } +} + +func engineData(t *testing.T, r storage.Reader, desc roachpb.RangeDescriptor) []tableCell { + var result []tableCell + it := r.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + LowerBound: desc.StartKey.AsRawKey(), + UpperBound: desc.EndKey.AsRawKey(), + KeyTypes: storage.IterKeyTypePointsOnly, + RangeKeyMaskingBelow: hlc.Timestamp{}, + }) + defer it.Close() + it.SeekGE(storage.MVCCKey{Key: desc.StartKey.AsRawKey()}) + prefix := "" + for { + okAct, err := it.Valid() + require.NoError(t, err, "failed to iterate values") + if !okAct { + break + } + if !it.UnsafeKey().IsValue() { + prefix = "!" + } else { + v := "." + if len(it.UnsafeValue()) > 0 { + val := roachpb.Value{ + RawBytes: it.UnsafeValue(), + } + b, err := val.GetBytes() + require.NoError(t, err, "failed to read byte value for cell") + v = prefix + string(b) + } + result = append(result, tableCell{ + key: it.Key(), + value: v, + }) + prefix = "" + } + it.Next() + } + return result +} + +type tableCell struct { + key storage.MVCCKey + value string +} + +type columnInfo struct { + key string + maxValueLength int +} + +// formatTable renders table with data. expecting data to be sorted naturally: +// keys ascending, timestamps descending. +// prefix if provided defines start of the key, that would be stripped from the +// keys to avoid clutter. +func formatTable(data []tableCell, prefix roachpb.Key) []string { + prefixStr := "" + if prefix != nil { + prefixStr = prefix.String() + } + keyRe := regexp.MustCompile(`^/"(.*)"$`) + var foundKeys []columnInfo + var lastKey roachpb.Key + rowData := make(map[int64][]string) + for _, c := range data { + ts := c.key.Timestamp.WallTime + key := c.key.Key.String() + if strings.Index(key, prefixStr) == 0 { + key = key[len(prefixStr):] + if keyRe.FindSubmatch([]byte(key)) != nil { + key = key[2 : len(key)-1] + } + } + if !c.key.Key.Equal(lastKey) { + foundKeys = append(foundKeys, columnInfo{ + key: key, + maxValueLength: len(key), + }) + lastKey = c.key.Key + } + row := rowData[ts] + for len(row) < len(foundKeys)-1 { + row = append(row, "") + } + rowData[ts] = append(row, c.value) + valueLen := len(c.value) + if i := len(foundKeys) - 1; valueLen > foundKeys[i].maxValueLength { + foundKeys[i].maxValueLength = valueLen + } + } + var tss []int64 + for ts := range rowData { + tss = append(tss, ts) + } + sort.Slice(tss, func(i, j int) bool { + return tss[i] > tss[j] + }) + + lsLen := len(fmt.Sprintf("%d", tss[0])) + rowPrefixFmt := fmt.Sprintf(" %%%dd | ", lsLen) + + var result []string + + firstRow := fmt.Sprintf(" %s | ", strings.Repeat(" ", lsLen)) + for _, colInfo := range foundKeys { + firstRow += fmt.Sprintf(fmt.Sprintf("%%%ds ", colInfo.maxValueLength), colInfo.key) + } + result = append(result, firstRow) + result = append(result, strings.Repeat("-", len(firstRow))) + for _, ts := range tss { + row := rowData[ts] + rowStr := fmt.Sprintf(rowPrefixFmt, ts) + for i, v := range row { + rowStr += fmt.Sprintf(fmt.Sprintf("%%%ds ", foundKeys[i].maxValueLength), v) + } + result = append(result, rowStr) + } + return result +} diff --git a/pkg/util/randutil/rand.go b/pkg/util/randutil/rand.go index b39e4ed4ce95..e6985ed54274 100644 --- a/pkg/util/randutil/rand.go +++ b/pkg/util/randutil/rand.go @@ -139,6 +139,23 @@ func ReadTestdataBytes(r *rand.Rand, arr []byte) { } } +// PrintableKeyAlphabet to use with random string generation to produce strings +// that doesn't need to be escaped when found as a part of a key and is +// generally human printable. +const PrintableKeyAlphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + +// RandString generates a random string of the desired length from the +// input alphabet. It is useful when you want to generate keys that would +// be printable without further escaping if alphabet is restricted to +// alphanumeric chars. +func RandString(rng *rand.Rand, length int, alphabet string) string { + buf := make([]byte, length) + for i := range buf { + buf[i] = alphabet[rng.Intn(len(alphabet))] + } + return string(buf) +} + // SeedForTests seeds the random number generator and prints the seed // value used. This function should be called from TestMain; individual tests // should not touch the seed of the global random number generator.