From 119dc673be0c62ad2cda741ae4a4f54b1b450fd4 Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Tue, 28 Jun 2022 17:31:10 -0400 Subject: [PATCH] sql: add bulkio.column_backfill.update_chunk_size_threshold_bytes This commit adds this cluster setting, defaulting to 10 MiB, which complements the bulkio.column_backfill.batch_size setting to limit the size of the column backfiller update batches. Fixes #83199. Release note (bug fix): fixes bug where ADD/DROP COLUMN with the legacy schema changer could fail on tables with large rows due to exceeding the raft command max size. --- pkg/sql/backfill.go | 27 +++++-- pkg/sql/backfill/backfill.go | 15 ++++ pkg/sql/distsql_plan_backfill.go | 18 +++-- pkg/sql/execinfrapb/processors_bulk_io.proto | 6 +- .../logictest/testdata/logic_test/alter_table | 70 +++++++++++++++++-- pkg/sql/rowexec/backfiller.go | 5 +- pkg/sql/rowexec/columnbackfiller.go | 12 ++-- 7 files changed, 131 insertions(+), 22 deletions(-) diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 59b00cbaf906..b46a9f157191 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -92,6 +92,16 @@ var columnBackfillBatchSize = settings.RegisterIntSetting( settings.NonNegativeInt, /* validateFn */ ) +// columnBackfillUpdateChunkSizeThresholdBytes is the byte size threshold beyond which +// an update batch is run at once when adding or removing columns. +var columnBackfillUpdateChunkSizeThresholdBytes = settings.RegisterIntSetting( + settings.TenantWritable, + "bulkio.column_backfill.update_chunk_size_threshold_bytes", + "the batch size in bytes above which an update is immediately run when adding/removing columns", + 10<<20, /* 10 MiB */ + settings.NonNegativeInt, /* validateFn */ +) + var _ sort.Interface = columnsByID{} var _ sort.Interface = indexesByID{} @@ -1204,6 +1214,7 @@ func (sc *SchemaChanger) distColumnBackfill( ctx context.Context, version descpb.DescriptorVersion, backfillChunkSize int64, + backfillUpdateChunkSizeThresholdBytes uint64, filter backfill.MutationFilter, ) error { duration := checkpointInterval @@ -1298,7 +1309,7 @@ func (sc *SchemaChanger) distColumnBackfill( planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn, DistributionTypeSystemTenantOnly) - spec, err := initColumnBackfillerSpec(*tableDesc.TableDesc(), duration, chunkSize, readAsOf) + spec, err := initColumnBackfillerSpec(tableDesc, duration, chunkSize, backfillUpdateChunkSizeThresholdBytes, readAsOf) if err != nil { return err } @@ -2191,8 +2202,12 @@ func (sc *SchemaChanger) truncateAndBackfillColumns( log.Infof(ctx, "clearing and backfilling columns") if err := sc.distColumnBackfill( - ctx, version, columnBackfillBatchSize.Get(&sc.settings.SV), - backfill.ColumnMutationFilter); err != nil { + ctx, + version, + columnBackfillBatchSize.Get(&sc.settings.SV), + uint64(columnBackfillUpdateChunkSizeThresholdBytes.Get(&sc.settings.SV)), + backfill.ColumnMutationFilter, + ); err != nil { return err } log.Info(ctx, "finished clearing and backfilling columns") @@ -2621,9 +2636,11 @@ func columnBackfillInTxn( sp := tableDesc.PrimaryIndexSpan(evalCtx.Codec) for sp.Key != nil { var err error + scanBatchSize := rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)) + updateChunkSizeThresholdBytes := rowinfra.BytesLimit(columnBackfillUpdateChunkSizeThresholdBytes.Get(&evalCtx.Settings.SV)) + const alsoCommit = false sp.Key, err = backfiller.RunColumnBackfillChunk( - ctx, txn, tableDesc, sp, rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)), - false /*alsoCommit*/, traceKV, + ctx, txn, tableDesc, sp, scanBatchSize, updateChunkSizeThresholdBytes, alsoCommit, traceKV, ) if err != nil { return err diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 9d8407e9a722..6e9fcab80720 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -275,6 +275,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk( tableDesc catalog.TableDescriptor, sp roachpb.Span, chunkSize rowinfra.RowLimit, + updateChunkSizeThresholdBytes rowinfra.BytesLimit, alsoCommit bool, traceKV bool, ) (roachpb.Key, error) { @@ -390,6 +391,20 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk( ); err != nil { return roachpb.Key{}, err } + + // Exit early to flush if the batch byte size exceeds a predefined + // threshold. This can happen when table rows are more on the "fat" side, + // typically with large BYTES or JSONB columns. + // + // This helps prevent exceedingly large raft commands which will + // for instance cause schema changes to be unable to either proceed or to + // roll back. + // + // The threshold is ignored when zero. + // + if updateChunkSizeThresholdBytes > 0 && b.ApproximateMutationBytes() >= int(updateChunkSizeThresholdBytes) { + break + } } // Write the new row values. writeBatch := txn.Run diff --git a/pkg/sql/distsql_plan_backfill.go b/pkg/sql/distsql_plan_backfill.go index 3cb208544fd8..e1755d6524e4 100644 --- a/pkg/sql/distsql_plan_backfill.go +++ b/pkg/sql/distsql_plan_backfill.go @@ -16,6 +16,7 @@ import ( "unsafe" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" @@ -26,14 +27,19 @@ import ( ) func initColumnBackfillerSpec( - desc descpb.TableDescriptor, duration time.Duration, chunkSize int64, readAsOf hlc.Timestamp, + tbl catalog.TableDescriptor, + duration time.Duration, + chunkSize int64, + updateChunkSizeThresholdBytes uint64, + readAsOf hlc.Timestamp, ) (execinfrapb.BackfillerSpec, error) { return execinfrapb.BackfillerSpec{ - Table: desc, - Duration: duration, - ChunkSize: chunkSize, - ReadAsOf: readAsOf, - Type: execinfrapb.BackfillerSpec_Column, + Table: *tbl.TableDesc(), + Duration: duration, + ChunkSize: chunkSize, + UpdateChunkSizeThresholdBytes: updateChunkSizeThresholdBytes, + ReadAsOf: readAsOf, + Type: execinfrapb.BackfillerSpec_Column, }, nil } diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index c07ebf7bb322..55f9663720cd 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -60,6 +60,10 @@ message BackfillerSpec { // of entries backfilled per chunk. optional int64 chunk_size = 5 [(gogoproto.nullable) = false]; + // The column backfiller will run an update batch immediately + // once its estimated byte size reaches UpdateChunkSizeThresholdBytes, if nonzero. + optional uint64 update_chunk_size_threshold_bytes = 14 [(gogoproto.nullable) = false]; + // WriteAsOf is the time that the backfill entries should be written. // Note: Older nodes may also use this as the read time instead of readAsOf. optional util.hlc.Timestamp writeAsOf = 7 [(gogoproto.nullable) = false]; @@ -84,7 +88,7 @@ message BackfillerSpec { // check MVCCAddSSTable before setting this option. optional bool write_at_batch_timestamp = 12 [(gogoproto.nullable) = false]; - // NEXTID: 14. + // NEXTID: 15. } // JobProgress identifies the job to report progress on. This reporting diff --git a/pkg/sql/logictest/testdata/logic_test/alter_table b/pkg/sql/logictest/testdata/logic_test/alter_table index 281d5e69ace6..c3b14e73927e 100644 --- a/pkg/sql/logictest/testdata/logic_test/alter_table +++ b/pkg/sql/logictest/testdata/logic_test/alter_table @@ -2196,6 +2196,66 @@ SELECT * FROM multipleinstmt ORDER BY id ASC; 2 b b false NULL true NULL 3 c c false NULL true NULL +subtest column_backfiller_update_batching + +let $use_decl_sc +SHOW use_declarative_schema_changer + +statement ok +SET use_declarative_schema_changer = 'off'; + +statement ok +BEGIN; +CREATE TABLE tb AS SELECT 123::INT AS k FROM generate_series(1, 10); +SET tracing = on,kv; +ALTER TABLE tb ADD COLUMN v STRING NOT NULL DEFAULT ('abc'::STRING); +SET tracing = off; + +# Check that the column backfiller batches all its Puts into one batch. +query I +SELECT count(*) FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE '%sending batch%' AND message LIKE '% Put to %'; +---- +1 + +query I +SELECT count(*) FROM tb WHERE v = 'abc'; +---- +10 + +statement ok +ROLLBACK; + +# Bring the threshold way down to force column backfiller batches to have no more 1 Put each. +statement ok +SET CLUSTER SETTING bulkio.column_backfill.update_chunk_size_threshold_bytes = 1; + +statement ok +BEGIN; +CREATE TABLE tb AS SELECT 123::INT AS k FROM generate_series(1, 10); +SET tracing = on,kv; +ALTER TABLE tb ADD COLUMN v STRING NOT NULL DEFAULT ('abc'::STRING); +SET tracing = off; + +query I +SELECT count(*) FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE '%sending batch%' AND message LIKE '% Put to %'; +---- +10 + +query I +SELECT count(*) FROM tb WHERE v = 'abc'; +---- +10 + +statement ok +ROLLBACK; + +# Undo subtest side effects. +statement ok +RESET CLUSTER SETTING bulkio.column_backfill.update_chunk_size_threshold_bytes; + +statement ok +SET use_declarative_schema_changer = $use_decl_sc; + subtest storage_params statement ok @@ -2306,8 +2366,8 @@ FROM ( LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid; ---- table_id name state refobjid -205 test_serial_b_seq PUBLIC 204 -204 test_serial PUBLIC NULL +207 test_serial_b_seq PUBLIC 206 +206 test_serial PUBLIC NULL statement ok DROP TABLE test_serial; @@ -2341,8 +2401,8 @@ FROM ( LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid; ---- table_id name state refobjid -207 test_serial_b_seq PUBLIC 206 -206 test_serial PUBLIC NULL +209 test_serial_b_seq PUBLIC 208 +208 test_serial PUBLIC NULL statement ok ALTER TABLE test_serial DROP COLUMN b; @@ -2357,7 +2417,7 @@ FROM ( LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid; ---- table_id name state refobjid -206 test_serial PUBLIC NULL +208 test_serial PUBLIC NULL statement ok DROP TABLE test_serial; diff --git a/pkg/sql/rowexec/backfiller.go b/pkg/sql/rowexec/backfiller.go index f47b3ab86993..fd384fbe3c3a 100644 --- a/pkg/sql/rowexec/backfiller.go +++ b/pkg/sql/rowexec/backfiller.go @@ -47,6 +47,7 @@ type chunkBackfiller interface { ctx context.Context, span roachpb.Span, chunkSize rowinfra.RowLimit, + updateChunkSizeThresholdBytes rowinfra.BytesLimit, readAsOf hlc.Timestamp, ) (roachpb.Key, error) @@ -135,6 +136,8 @@ func (b *backfiller) mainLoop(ctx context.Context) (roachpb.Spans, error) { // fill more than this amount and cause a flush, then it likely also fills // a non-trivial part of the next buffer. const opportunisticCheckpointThreshold = 0.8 + chunkSize := rowinfra.RowLimit(b.spec.ChunkSize) + updateChunkSizeThresholdBytes := rowinfra.BytesLimit(b.spec.UpdateChunkSizeThresholdBytes) start := timeutil.Now() totalChunks := 0 totalSpans := 0 @@ -148,7 +151,7 @@ func (b *backfiller) mainLoop(ctx context.Context) (roachpb.Spans, error) { for todo.Key != nil { log.VEventf(ctx, 3, "%s backfiller starting chunk %d: %s", b.name, chunks, todo) var err error - todo.Key, err = b.chunks.runChunk(ctx, todo, rowinfra.RowLimit(b.spec.ChunkSize), b.spec.ReadAsOf) + todo.Key, err = b.chunks.runChunk(ctx, todo, chunkSize, updateChunkSizeThresholdBytes, b.spec.ReadAsOf) if err != nil { return nil, err } diff --git a/pkg/sql/rowexec/columnbackfiller.go b/pkg/sql/rowexec/columnbackfiller.go index af0e05c98f1f..60f92c792783 100644 --- a/pkg/sql/rowexec/columnbackfiller.go +++ b/pkg/sql/rowexec/columnbackfiller.go @@ -103,7 +103,11 @@ func (cb *columnBackfiller) CurrentBufferFill() float32 { // runChunk implements the chunkBackfiller interface. func (cb *columnBackfiller) runChunk( - ctx context.Context, sp roachpb.Span, chunkSize rowinfra.RowLimit, _ hlc.Timestamp, + ctx context.Context, + sp roachpb.Span, + chunkSize rowinfra.RowLimit, + updateChunkSizeThresholdBytes rowinfra.BytesLimit, + _ hlc.Timestamp, ) (roachpb.Key, error) { var key roachpb.Key var commitWaitFn func(context.Context) error @@ -124,7 +128,6 @@ func (cb *columnBackfiller) runChunk( // waiting for consistency when backfilling a column on GLOBAL tables. commitWaitFn = txn.DeferCommitWait(ctx) - // TODO(knz): do KV tracing in DistSQL processors. var err error key, err = cb.RunColumnBackfillChunk( ctx, @@ -132,8 +135,9 @@ func (cb *columnBackfiller) runChunk( cb.desc, sp, chunkSize, - true, /*alsoCommit*/ - false, /*traceKV*/ + updateChunkSizeThresholdBytes, + true, /*alsoCommit*/ + cb.flowCtx.TraceKV, ) return err })