Skip to content

Commit

Permalink
roachtest: mvcc_gc disable lease rebalancing for stability
Browse files Browse the repository at this point in the history
Test sometimes flakes if lease is moved from under GC and cleanup is
not performed in time. This commit disables rebalance as it is not
required to test GC functionality.
This commit also improves reliability of test by preventing a race
between workload stop and table data deletion which could cause
test to fail if workload doesn't terminate before clear range is
performed.

Release note: None
  • Loading branch information
aliher1911 committed Mar 10, 2023
1 parent a95ffcd commit 27f5166
Showing 1 changed file with 101 additions and 54 deletions.
155 changes: 101 additions & 54 deletions pkg/cmd/roachtest/tests/mvcc_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,21 @@ func runMVCCGC(ctx context.Context, t test.Test, c cluster.Cluster) {
execSQLOrFail(fmt.Sprintf(`SET CLUSTER SETTING %s = $1`, name), value)
}

// Explicitly enable range tombstones. Can be removed once ranges tombstones
// are enabled by default.
setClusterSetting("storage.mvcc.range_tombstones.enabled", true)
// Protected timestamps prevent GC from collecting data, even with low ttl
// we need to wait for protected ts to be moved. By reducing this interval
// we ensure that data will always be collectable after ttl + 5s.
setClusterSetting("kv.protectedts.poll_interval", "5s")
// Disable mvcc_gc queue throttling, we always manually enqueue replicas as
// fast as possible.
setClusterSetting("kv.mvcc_gc.queue_interval", "0s")
// Load based lease balancing could move leaseholders from under the GC. If
// that happens gc will end up running on non lease-holder store and its
// requests would be rejected. This is causing test to flake. Disabling
// rebalancing is better than increasing wait time further.
setClusterSetting("kv.allocator.load_based_lease_rebalancing.enabled", false)

if err := WaitFor3XReplication(ctx, t, conn); err != nil {
t.Fatalf("failed to up-replicate cluster: %s", err)
Expand Down Expand Up @@ -135,8 +147,7 @@ func runMVCCGC(ctx context.Context, t test.Test, c cluster.Cluster) {
t.L().Printf("partially deleted some data using tombstones")

assertRangesWithGCRetry(ctx, t, c, gcRetryTimeout, m, func() error {
totals, rangeCount := collectTableMVCCStatsOrFatal(t, conn, m)
return checkRangesHaveNoRangeTombstones(totals, rangeCount)
return assertTableMVCCStatsOrFatal(t, conn, m, checkRangesHaveNoRangeTombstones)
})

t.L().Printf("all range tombstones were garbage collected")
Expand All @@ -152,21 +163,38 @@ func runMVCCGC(ctx context.Context, t test.Test, c cluster.Cluster) {
wlCancel()

if err := retry.WithMaxAttempts(ctx, retry.Options{}, 3, func() error {
return deleteAllTableDataWithOverlappingTombstones(ctx, t, c, conn, rng, m, 5)
err := deleteAllTableDataWithOverlappingTombstones(ctx, t, c, conn, rng, m, 5)
if err != nil {
return err
}
// We must check if table is empty because workload termination is only
// cancelling context and termination will happen in the background.
return checkTableIsEmpty(t, conn, m)
}); err != nil {
t.Fatal(err)
}

t.L().Printf("deleted all table data using tombstones")

assertRangesWithGCRetry(ctx, t, c, gcRetryTimeout, m, func() error {
totals, details := collectStatsAndConsistencyOrFail(t, conn, m)
return checkRangesConsistentAndHaveNoData(totals, details)
return assertStatsAndConsistencyOrFatal(t, conn, m, checkRangesConsistentAndHaveNoData)
})

return nil
})
m.Wait()
}

func checkTableIsEmpty(t test.Test, conn *gosql.DB, m tableMetadata) error {
t.Helper()
var rows int
queryRowOrFatal(t, conn, fmt.Sprintf("select count(*) from %s.%s", m.databaseName, m.tableName), nil, []any{&rows})
if rows > 0 {
return errors.Newf("table still has %d rows", rows)
}
return nil
}

func assertRangesWithGCRetry(
ctx context.Context,
t test.Test,
Expand All @@ -178,17 +206,26 @@ func assertRangesWithGCRetry(
t.Helper()
deadline := timeutil.Now().Add(timeout)
gcRetryTime := timeutil.Now()
// Track number of ranges in the table to handle cases when merges run
// concurrently with GC and doesn't allow it to collect all data.
var rangeCount int
lastMergeDeadline := timeutil.Now()
for {
if timeutil.Now().After(gcRetryTime) {
enqueueAllTableRangesForGC(ctx, t, c, m)
t.L().Printf("enqueued ranges for GC")
rc := enqueueAllTableRangesForGC(ctx, t, c, m)
t.L().Printf("enqueued %d ranges for GC", rc)
gcRetryTime = timeutil.Now().Add(2 * time.Minute)
if rc != rangeCount {
// Wait at least one more GC run if number of ranges changed.
lastMergeDeadline = gcRetryTime.Add(time.Minute)
rangeCount = rc
}
}
err := assertion()
if err == nil {
return
}
if timeutil.Now().After(deadline) {
if now := timeutil.Now(); now.After(deadline) && now.After(lastMergeDeadline) {
t.Fatalf("assertion still failing after %s: %s", timeout, err)
}
select {
Expand All @@ -200,12 +237,9 @@ func assertRangesWithGCRetry(
}
}

func checkRangesHaveNoRangeTombstones(totals enginepb.MVCCStats, c int) error {
if c == 0 {
return errors.New("failed to find any ranges for table")
}
if totals.RangeKeyCount > 0 || totals.RangeKeyBytes > 0 {
return errors.Errorf("table ranges contain range tombstones %s", totals.String())
func checkRangesHaveNoRangeTombstones(rangeStats enginepb.MVCCStats) error {
if rangeStats.RangeKeyCount > 0 || rangeStats.RangeKeyBytes > 0 {
return errors.Errorf("range contain range tombstones %s", rangeStats.String())
}
return nil
}
Expand Down Expand Up @@ -238,12 +272,7 @@ func getSplitKey(index, fragments int) int64 {
return math.MinInt64 + int64(getSplitPoint(index, fragments))
}

func checkRangesConsistentAndHaveNoData(
totals enginepb.MVCCStats, details map[int]rangeDetails,
) error {
if len(details) == 0 {
return errors.New("failed to find any ranges for table")
}
func checkRangesConsistentAndHaveNoData(totals enginepb.MVCCStats, details rangeDetails) error {
if totals.RangeKeyCount > 0 || totals.RangeKeyBytes > 0 {
return errors.Errorf("table ranges contain range tombstones %s", totals.String())
}
Expand All @@ -254,21 +283,19 @@ func checkRangesConsistentAndHaveNoData(
totals.IntentBytes > 0 || totals.IntentCount > 0 || totals.SeparatedIntentCount > 0 {
return errors.Errorf("table ranges contain live data %s", totals.String())
}
for id, d := range details {
if d.status != kvpb.CheckConsistencyResponse_RANGE_CONSISTENT.String() {
return errors.Errorf("consistency check failed for r%d: %s detail: %s", id, d.status,
d.detail)
}
if details.status != kvpb.CheckConsistencyResponse_RANGE_CONSISTENT.String() {
return errors.Errorf("consistency check failed %s detail: %s", details.status,
details.detail)
}
return nil
}

func collectTableMVCCStatsOrFatal(
t test.Test, conn *gosql.DB, m tableMetadata,
) (enginepb.MVCCStats, int) {
func assertTableMVCCStatsOrFatal(
t test.Test, conn *gosql.DB, m tableMetadata, assert func(enginepb.MVCCStats) error,
) error {
t.Helper()
rows, err := conn.Query(fmt.Sprintf(`
SELECT range_id, raw_start_key, crdb_internal.range_stats(raw_start_key)
SELECT range_id, raw_start_key, raw_end_key, crdb_internal.range_stats(raw_start_key)
FROM [SHOW RANGES FROM TABLE %s.%s WITH KEYS]
ORDER BY start_key`,
tree.NameString(m.databaseName), tree.NameString(m.tableName)))
Expand All @@ -278,25 +305,31 @@ ORDER BY start_key`,
defer rows.Close()

jsonpb := protoutil.JSONPb{}
var tableStats enginepb.MVCCStats
var rangeCount int
for rows.Next() {
var (
rangeID int
rangeStartKey roachpb.Key
rangeEndKey roachpb.Key
jsonb []byte
rangeStats enginepb.MVCCStats
)
if err := rows.Scan(&rangeID, &rangeStartKey, &jsonb); err != nil {
if err := rows.Scan(&rangeID, &rangeStartKey, &rangeEndKey, &jsonb); err != nil {
t.Fatalf("failed to run consistency check query on table: %s", err)
}
if err := jsonpb.Unmarshal(jsonb, &rangeStats); err != nil {
t.Fatalf("failed to unmarshal json %s stats: %s", string(jsonb), err)
}
tableStats.Add(rangeStats)
if err := assert(rangeStats); err != nil {
return errors.Wrapf(err, "assertion failed on range r%d %s", rangeID,
roachpb.Span{Key: rangeStartKey, EndKey: rangeEndKey})
}
rangeCount++
}
return tableStats, rangeCount
if rangeCount == 0 {
return errors.New("failed to find any ranges for table")
}
return nil
}

// rangeDetails contains results of check_consistency for a single range.
Expand All @@ -306,9 +339,9 @@ type rangeDetails struct {
detail string
}

func collectStatsAndConsistencyOrFail(
t test.Test, conn *gosql.DB, m tableMetadata,
) (enginepb.MVCCStats, map[int]rangeDetails) {
func assertStatsAndConsistencyOrFatal(
t test.Test, conn *gosql.DB, m tableMetadata, assert func(enginepb.MVCCStats, rangeDetails) error,
) error {
t.Helper()
var startKey, endKey roachpb.Key
queryRowOrFatal(t, conn,
Expand All @@ -323,9 +356,8 @@ FROM [SHOW RANGES FROM TABLE %s.%s WITH KEYS]`,
}
defer rows.Close()

var details = make(map[int]rangeDetails)
jsonpb := protoutil.JSONPb{}
var tableStats enginepb.MVCCStats
var tableRanges int
for rows.Next() {
var (
rangeID int
Expand All @@ -341,14 +373,19 @@ FROM [SHOW RANGES FROM TABLE %s.%s WITH KEYS]`,
if err := jsonpb.Unmarshal(jsonb, &rangeStats); err != nil {
t.Fatalf("failed to unmarshal json %s stats: %s", string(jsonb), err)
}
tableStats.Add(rangeStats)
details[rangeID] = rangeDetails{
if err := assert(rangeStats, rangeDetails{
stats: rangeStats,
status: status,
detail: detail,
}); err != nil {
return errors.Wrapf(err, "assertion failed on range r%d %s", rangeID, rangeStartKey)
}
tableRanges++
}
if tableRanges == 0 {
return errors.New("failed to find any ranges for table")
}
return tableStats, details
return nil
}

type tableMetadata struct {
Expand Down Expand Up @@ -380,44 +417,54 @@ func queryTableMetaOrFatal(

func enqueueAllTableRangesForGC(
ctx context.Context, t test.Test, c cluster.Cluster, m tableMetadata,
) {
) int {
t.Helper()
var conns []*gosql.DB
for _, node := range c.All() {
conns = append(conns, c.Conn(ctx, t.L(), node))
}
if err := visitTableRanges(ctx, t, conns[0], m, func(rangeID int) error {
var tableRanges int
if err := visitTableRanges(ctx, t, conns[0], m, func(info rangeInfo) error {
t.L().Printf("enqueuing range r%d [%s,%s) for GC", info.id, info.startKey, info.endKey)
for _, c := range conns {
_, _ = c.ExecContext(ctx, `select crdb_internal.kv_enqueue_replica($1, 'mvccGC', true)`, rangeID)
_, _ = c.ExecContext(ctx, `select crdb_internal.kv_enqueue_replica($1, 'mvccGC', true)`, info.id)
}
tableRanges++
return nil
}); err != nil {
t.Fatalf("failed to enqueue all ranges for GC: %s", err)
t.Fatalf("failed to enqueue table ranges for GC: %s", err)
}
return tableRanges
}

type rangeInfo struct {
id int
startKey, endKey string
}

func visitTableRanges(
ctx context.Context, t test.Test, conn *gosql.DB, m tableMetadata, f func(rangeID int) error,
ctx context.Context, t test.Test, conn *gosql.DB, m tableMetadata, f func(info rangeInfo) error,
) error {
t.Helper()
rows, err := conn.QueryContext(
ctx, fmt.Sprintf(`SELECT range_id FROM [ SHOW RANGES FROM TABLE %s.%s ]`, m.databaseName, m.tableName),
ctx, fmt.Sprintf(`SELECT range_id, start_key, end_key FROM [ SHOW RANGES FROM TABLE %s.%s ]`, m.databaseName, m.tableName),
)
if err != nil {
t.Fatalf("failed to run consistency check query on table: %s", err)
t.Fatalf("failed to query ranges for table: %s", err)
}
defer rows.Close()
var rangeIDs []int

var ranges []rangeInfo
for rows.Next() {
var rangeID int
if err := rows.Scan(&rangeID); err != nil {
t.Fatalf("failed to run consistency check query on table: %s", err)
var info rangeInfo
if err := rows.Scan(&info.id, &info.startKey, &info.endKey); err != nil {
t.Fatalf("failed to query ranges for table: %s", err)
}
rangeIDs = append(rangeIDs, rangeID)
ranges = append(ranges, info)
}
rows.Close()

for _, id := range rangeIDs {
for _, id := range ranges {
if err := f(id); err != nil {
return err
}
Expand Down

0 comments on commit 27f5166

Please sign in to comment.