Skip to content

Commit

Permalink
sql: TRUNCATE preserves split points of indexes
Browse files Browse the repository at this point in the history
Previously, TRUNCATE unconditionally blew away all index split points
when creating the replacement empty indexes. This manifested in some bad
behavior when performing TRUNCATE on tables or indexes that had heavy
load and were heavily split already, because all of a sudden all of the
traffic that was nicely dispersed across all of the ranges would
redirect to the single new range that TRUNCATE created.

The bad performance would remediate over time as the database re-split
the new ranges, but preserving the splits across the index swap
boundaries is a faster way to get there.

Release note (sql change): TRUNCATE is now less disruptive on tables
with a lot of concurrent traffic.
  • Loading branch information
jordanlewis committed Apr 16, 2021
1 parent 2c49249 commit db0714e
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 19 deletions.
5 changes: 4 additions & 1 deletion pkg/bench/ddl_analysis/ddl_analysis_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ func RunRoundTripBenchmark(b *testing.B, tests []RoundTripBenchTestCase) {

res := float64(roundTrips) / float64(b.N)
if haveExp && !exp.matches(int(res)) && *rewriteFlag == "" {
b.Fatalf("got %v, expected %v. trace: \n%v", res, exp, r)
b.Fatalf(`got %v, expected %v. trace:
%v
(above trace from test %s. got %v, expected %v)
`, res, exp, r, b.Name(), res, exp)
}
b.ReportMetric(res, "roundtrips")
})
Expand Down
12 changes: 6 additions & 6 deletions pkg/bench/ddl_analysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ exp,benchmark
1,SystemDatabaseQueries/select_system.users_with_empty_database_name
1,SystemDatabaseQueries/select_system.users_with_schema_name
2,SystemDatabaseQueries/select_system.users_without_schema_name
24,Truncate/truncate_1_column_0_rows
24,Truncate/truncate_1_column_1_row
24,Truncate/truncate_1_column_2_rows
24,Truncate/truncate_2_column_0_rows
24,Truncate/truncate_2_column_1_rows
24,Truncate/truncate_2_column_2_rows
27,Truncate/truncate_1_column_0_rows
26,Truncate/truncate_1_column_1_row
26,Truncate/truncate_1_column_2_rows
27,Truncate/truncate_2_column_0_rows
26,Truncate/truncate_2_column_1_rows
26,Truncate/truncate_2_column_2_rows
1,VirtualTableQueries/select_crdb_internal.invalid_objects_with_1_fk
1,VirtualTableQueries/select_crdb_internal.tables_with_1_fk
1 change: 1 addition & 0 deletions pkg/sql/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_test(
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/concurrency",
"//pkg/roachpb",
"//pkg/security",
Expand Down
69 changes: 69 additions & 0 deletions pkg/sql/tests/truncate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ package tests

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
Expand Down Expand Up @@ -352,3 +354,70 @@ func TestTruncateWithConcurrentMutations(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { run(t, tc) })
}
}
func TestTruncatePreservesSplitPoints(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()

testCases := []struct {
nodes int
expectedSplits int
}{
{
nodes: 1,
expectedSplits: 4,
},
{
nodes: 3,
expectedSplits: 12,
},
}

for _, testCase := range testCases {
t.Run(fmt.Sprintf("nodes=%d", testCase.nodes), func(t *testing.T) {
tc := testcluster.StartTestCluster(t, testCase.nodes, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
},
},
},
})
defer tc.Stopper().Stop(ctx)

var err error
_, err = tc.Conns[0].ExecContext(ctx, `
CREATE TABLE a(a INT PRIMARY KEY, b INT, INDEX(b));
INSERT INTO a SELECT g,g FROM generate_series(1,10000) g(g);
ALTER TABLE a SPLIT AT VALUES(1000), (2000), (3000), (4000), (5000), (6000), (7000), (8000), (9000);
ALTER INDEX a_b_idx SPLIT AT VALUES(1000), (2000), (3000), (4000), (5000), (6000), (7000), (8000), (9000);
`)
assert.NoError(t, err)

row := tc.Conns[0].QueryRowContext(ctx, `
SELECT count(*) FROM crdb_internal.ranges_no_leases WHERE table_id = 'a'::regclass`)
assert.NoError(t, row.Err())
var nRanges int
assert.NoError(t, row.Scan(&nRanges))

const origNRanges = 19
assert.Equal(t, origNRanges, nRanges)

_, err = tc.Conns[0].ExecContext(ctx, `TRUNCATE a`)
assert.NoError(t, err)

row = tc.Conns[0].QueryRowContext(ctx, `
SELECT count(*) FROM crdb_internal.ranges_no_leases WHERE table_id = 'a'::regclass`)
assert.NoError(t, row.Err())
assert.NoError(t, row.Scan(&nRanges))

// We subtract 1 from the original n ranges because the first range can't
// be migrated to the new keyspace, as its prefix doesn't include an
// index ID.
assert.Equal(t, origNRanges+testCase.nodes*int(sql.PreservedSplitCountMultiple.Get(&tc.Servers[0].Cfg.
Settings.SV)),
nRanges)
})
}
}
186 changes: 174 additions & 12 deletions pkg/sql/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ package sql

import (
"context"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
Expand Down Expand Up @@ -170,6 +174,17 @@ func (t *truncateNode) Next(runParams) (bool, error) { return false, nil }
func (t *truncateNode) Values() tree.Datums { return tree.Datums{} }
func (t *truncateNode) Close(context.Context) {}

// PreservedSplitCountMultiple is the setting that configures the number of
// split points that we re-create on a table after a truncate. It's scaled by
// the number of nodes in the cluster.
var PreservedSplitCountMultiple = settings.RegisterIntSetting(
"sql.truncate.preserved_split_count_multiple",
"set to non-zero to cause TRUNCATE to preserve range splits from the "+
"table's indexes. The multiple given will be multiplied with the number of "+
"nodes in the cluster to produce the number of preserved range splits. This "+
"can improve performance when truncating a table with significant write traffic.",
4)

// truncateTable truncates the data of a table in a single transaction. It does
// so by dropping all existing indexes on the table and creating new ones without
// backfilling any data into the new indexes. The old indexes are cleaned up
Expand Down Expand Up @@ -270,6 +285,24 @@ func (p *planner) truncateTable(
return err
}

oldIndexIDs := make([]descpb.IndexID, len(oldIndexes))
for i := range oldIndexIDs {
oldIndexIDs[i] = oldIndexes[i].ID
}
newIndexIDs := make([]descpb.IndexID, len(tableDesc.ActiveIndexes()))
newIndexes := tableDesc.ActiveIndexes()
for i := range newIndexIDs {
newIndexIDs[i] = newIndexes[i].GetID()
}

// Move existing range split points in the pre-truncated table's indexes to
// the new indexes we're creating, to avoid a thundering herd effect where
// any existing traffic on the table will slam into a single range after the
// truncate is completed.
if err := p.copySplitPointsToNewIndexes(ctx, id, oldIndexIDs, newIndexIDs); err != nil {
return err
}

// Reassign any referenced index ID's from other tables.
if err := p.reassignInterleaveIndexReferences(ctx, allRefs, tableDesc.ID, indexIDMapping); err != nil {
return err
Expand All @@ -285,19 +318,11 @@ func (p *planner) truncateTable(
}

// Move any zone configs on indexes over to the new set of indexes.
oldIndexIDs := make([]descpb.IndexID, len(oldIndexes)-1)
for i := range oldIndexIDs {
oldIndexIDs[i] = oldIndexes[i+1].ID
}
newIndexIDs := make([]descpb.IndexID, len(tableDesc.PublicNonPrimaryIndexes()))
for i := range newIndexIDs {
newIndexIDs[i] = tableDesc.PublicNonPrimaryIndexes()[i].GetID()
}
swapInfo := &descpb.PrimaryKeySwap{
OldPrimaryIndexId: oldIndexes[0].ID,
OldIndexes: oldIndexIDs,
NewPrimaryIndexId: tableDesc.GetPrimaryIndexID(),
NewIndexes: newIndexIDs,
OldPrimaryIndexId: oldIndexIDs[0],
OldIndexes: oldIndexIDs[1:],
NewPrimaryIndexId: newIndexIDs[0],
NewIndexes: newIndexIDs[1:],
}
if err := maybeUpdateZoneConfigsForPKChange(ctx, p.txn, p.ExecCfg(), tableDesc, swapInfo); err != nil {
return err
Expand Down Expand Up @@ -456,6 +481,143 @@ func (p *planner) findAllReferencingInterleaves(
return tables, nil
}

// copySplitPointsToNewIndexes copies any range split points from the indexes
// given by the oldIndexIDs slice to the indexes given by the newIndexIDs slice
// on the table given by the tableID.
// oldIndexIDs and newIndexIDs must be in the same order and be the same length.
func (p *planner) copySplitPointsToNewIndexes(
ctx context.Context,
tableID descpb.ID,
oldIndexIDs []descpb.IndexID,
newIndexIDs []descpb.IndexID,
) error {
if p.EvalContext().Codec.ForSystemTenant() {
// Can't do any of this direct manipulation of ranges in multi-tenant mode.
return nil
}

preservedSplitsMultiple := int(PreservedSplitCountMultiple.Get(p.execCfg.SV()))
if preservedSplitsMultiple <= 0 {
return nil
}
row, err := p.EvalContext().InternalExecutor.QueryRow(
ctx, "count-active-nodes", nil, "SELECT count(*) FROM crdb_internal.kv_node_status")
if err != nil || row == nil {
return err
}
nNodes := int(tree.MustBeDInt(row[0]))
nSplits := preservedSplitsMultiple * nNodes

log.Infof(ctx, "making %d new truncate split points (%d * %d)", nSplits, preservedSplitsMultiple, nNodes)

// Re-split the new set of indexes along the same split points as the old
// indexes.
var b kv.Batch
tablePrefix := p.execCfg.Codec.TablePrefix(uint32(tableID))

// Fetch all of the range descriptors for this index.
ranges, err := kvclient.ScanMetaKVs(ctx, p.txn, roachpb.Span{
Key: tablePrefix,
EndKey: tablePrefix.PrefixEnd(),
})
if err != nil {
return err
}

// Shift the range split points from the old keyspace into the new keyspace,
// filtering out any ranges that we can't translate.
var desc roachpb.RangeDescriptor
splitPoints := make([][]byte, 0, len(ranges))
for i := range ranges {
if err := ranges[i].ValueProto(&desc); err != nil {
return err
}
// For every range's start key, translate the start key into the keyspace
// of the replacement index. We'll split the replacement index along this
// same boundary later.
startKey := desc.StartKey

restOfKey, foundTable, foundIndex, err := p.execCfg.Codec.DecodeIndexPrefix(roachpb.Key(startKey))
if err != nil {
// If we get an error here, it means that either our key didn't contain
// an index ID (because it was the first range in a table) or the key
// didn't contain a table ID (because it's still the first range in the
// system that hasn't split off yet).
// In this case, we can't translate this range into the new keyspace,
// so we just have to continue along.
continue
}
if foundTable != uint32(tableID) {
// We found a split point that started somewhere else in the database,
// so we can't translate it to the new keyspace. Don't bother with this
// range.
continue
}
var newIndexID descpb.IndexID
var found bool
for k := range oldIndexIDs {
if oldIndexIDs[k] == descpb.IndexID(foundIndex) {
newIndexID = newIndexIDs[k]
found = true
}
}
if !found {
// We found a split point that is on an index that no longer exists.
// This can happen if the table was truncated more than once in a row,
// and there are old split points sitting around in the ranges. In this
// case, we can't translate the range into the new keyspace, so we don't
// bother with this range.
continue
}

newStartKey := append(p.execCfg.Codec.IndexPrefix(uint32(tableID), uint32(newIndexID)), restOfKey...)
splitPoints = append(splitPoints, newStartKey)
}

if len(splitPoints) == 0 {
// No split points to carry over. We can leave early.
return nil
}

// Finally, downsample the split points - choose just nSplits of them to keep.
step := float64(len(splitPoints)) / float64(nSplits)
if step < 1 {
step = 1
}
for i := 0; i < nSplits; i++ {
// Evenly space out the ranges that we select from the ranges that are
// returned.
sp := splitPoints[int(step*float64(i))]

expirationTime := 10 * time.Minute.Nanoseconds()
// Jitter the expiration time by 20% up or down from the default.
maxJitter := expirationTime / 5
jitter := rand.Int63n(maxJitter*2) - maxJitter
expirationTime += jitter

log.Infof(ctx, "truncate sending split request for key %s", sp)
b.AddRawRequest(&roachpb.AdminSplitRequest{
RequestHeader: roachpb.RequestHeader{
Key: sp,
},
SplitKey: sp,
ExpirationTime: p.execCfg.Clock.Now().Add(expirationTime, 0),
})
}

b.AddRawRequest(&roachpb.AdminScatterRequest{
// Scatter all of the data between the start key of the first new index, and
// the PrefixEnd of the last new index.
RequestHeader: roachpb.RequestHeader{
Key: p.execCfg.Codec.IndexPrefix(uint32(tableID), uint32(newIndexIDs[0])),
EndKey: p.execCfg.Codec.IndexPrefix(uint32(tableID), uint32(newIndexIDs[len(newIndexIDs)-1])).PrefixEnd(),
},
RandomizeLeases: true,
})

return p.txn.DB().Run(ctx, &b)
}

// reassignInterleaveIndexReferences reassigns all index ID's present in
// interleave descriptor references according to indexIDMapping.
func (p *planner) reassignInterleaveIndexReferences(
Expand Down

0 comments on commit db0714e

Please sign in to comment.