Skip to content

Commit

Permalink
sql: add bulkio.column_backfill.update_chunk_size_threshold_bytes
Browse files Browse the repository at this point in the history
This commit adds this cluster setting, defaulting to 10 MiB, which
complements the bulkio.column_backfill.batch_size setting to limit the
size of the column backfiller update batches.

Fixes #83199.

Release note (bug fix): fixes bug where ADD/DROP COLUMN with the legacy
schema changer could fail on tables with large rows due to exceeding the
raft command max size.
  • Loading branch information
Marius Posta committed Jun 30, 2022
1 parent ccd6f76 commit 119dc67
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 22 deletions.
27 changes: 22 additions & 5 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ var columnBackfillBatchSize = settings.RegisterIntSetting(
settings.NonNegativeInt, /* validateFn */
)

// columnBackfillUpdateChunkSizeThresholdBytes is the byte size threshold beyond which
// an update batch is run at once when adding or removing columns.
var columnBackfillUpdateChunkSizeThresholdBytes = settings.RegisterIntSetting(
settings.TenantWritable,
"bulkio.column_backfill.update_chunk_size_threshold_bytes",
"the batch size in bytes above which an update is immediately run when adding/removing columns",
10<<20, /* 10 MiB */
settings.NonNegativeInt, /* validateFn */
)

var _ sort.Interface = columnsByID{}
var _ sort.Interface = indexesByID{}

Expand Down Expand Up @@ -1204,6 +1214,7 @@ func (sc *SchemaChanger) distColumnBackfill(
ctx context.Context,
version descpb.DescriptorVersion,
backfillChunkSize int64,
backfillUpdateChunkSizeThresholdBytes uint64,
filter backfill.MutationFilter,
) error {
duration := checkpointInterval
Expand Down Expand Up @@ -1298,7 +1309,7 @@ func (sc *SchemaChanger) distColumnBackfill(

planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
DistributionTypeSystemTenantOnly)
spec, err := initColumnBackfillerSpec(*tableDesc.TableDesc(), duration, chunkSize, readAsOf)
spec, err := initColumnBackfillerSpec(tableDesc, duration, chunkSize, backfillUpdateChunkSizeThresholdBytes, readAsOf)
if err != nil {
return err
}
Expand Down Expand Up @@ -2191,8 +2202,12 @@ func (sc *SchemaChanger) truncateAndBackfillColumns(
log.Infof(ctx, "clearing and backfilling columns")

if err := sc.distColumnBackfill(
ctx, version, columnBackfillBatchSize.Get(&sc.settings.SV),
backfill.ColumnMutationFilter); err != nil {
ctx,
version,
columnBackfillBatchSize.Get(&sc.settings.SV),
uint64(columnBackfillUpdateChunkSizeThresholdBytes.Get(&sc.settings.SV)),
backfill.ColumnMutationFilter,
); err != nil {
return err
}
log.Info(ctx, "finished clearing and backfilling columns")
Expand Down Expand Up @@ -2621,9 +2636,11 @@ func columnBackfillInTxn(
sp := tableDesc.PrimaryIndexSpan(evalCtx.Codec)
for sp.Key != nil {
var err error
scanBatchSize := rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV))
updateChunkSizeThresholdBytes := rowinfra.BytesLimit(columnBackfillUpdateChunkSizeThresholdBytes.Get(&evalCtx.Settings.SV))
const alsoCommit = false
sp.Key, err = backfiller.RunColumnBackfillChunk(
ctx, txn, tableDesc, sp, rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)),
false /*alsoCommit*/, traceKV,
ctx, txn, tableDesc, sp, scanBatchSize, updateChunkSizeThresholdBytes, alsoCommit, traceKV,
)
if err != nil {
return err
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
tableDesc catalog.TableDescriptor,
sp roachpb.Span,
chunkSize rowinfra.RowLimit,
updateChunkSizeThresholdBytes rowinfra.BytesLimit,
alsoCommit bool,
traceKV bool,
) (roachpb.Key, error) {
Expand Down Expand Up @@ -390,6 +391,20 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
); err != nil {
return roachpb.Key{}, err
}

// Exit early to flush if the batch byte size exceeds a predefined
// threshold. This can happen when table rows are more on the "fat" side,
// typically with large BYTES or JSONB columns.
//
// This helps prevent exceedingly large raft commands which will
// for instance cause schema changes to be unable to either proceed or to
// roll back.
//
// The threshold is ignored when zero.
//
if updateChunkSizeThresholdBytes > 0 && b.ApproximateMutationBytes() >= int(updateChunkSizeThresholdBytes) {
break
}
}
// Write the new row values.
writeBatch := txn.Run
Expand Down
18 changes: 12 additions & 6 deletions pkg/sql/distsql_plan_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
Expand All @@ -26,14 +27,19 @@ import (
)

func initColumnBackfillerSpec(
desc descpb.TableDescriptor, duration time.Duration, chunkSize int64, readAsOf hlc.Timestamp,
tbl catalog.TableDescriptor,
duration time.Duration,
chunkSize int64,
updateChunkSizeThresholdBytes uint64,
readAsOf hlc.Timestamp,
) (execinfrapb.BackfillerSpec, error) {
return execinfrapb.BackfillerSpec{
Table: desc,
Duration: duration,
ChunkSize: chunkSize,
ReadAsOf: readAsOf,
Type: execinfrapb.BackfillerSpec_Column,
Table: *tbl.TableDesc(),
Duration: duration,
ChunkSize: chunkSize,
UpdateChunkSizeThresholdBytes: updateChunkSizeThresholdBytes,
ReadAsOf: readAsOf,
Type: execinfrapb.BackfillerSpec_Column,
}, nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ message BackfillerSpec {
// of entries backfilled per chunk.
optional int64 chunk_size = 5 [(gogoproto.nullable) = false];

// The column backfiller will run an update batch immediately
// once its estimated byte size reaches UpdateChunkSizeThresholdBytes, if nonzero.
optional uint64 update_chunk_size_threshold_bytes = 14 [(gogoproto.nullable) = false];

// WriteAsOf is the time that the backfill entries should be written.
// Note: Older nodes may also use this as the read time instead of readAsOf.
optional util.hlc.Timestamp writeAsOf = 7 [(gogoproto.nullable) = false];
Expand All @@ -84,7 +88,7 @@ message BackfillerSpec {
// check MVCCAddSSTable before setting this option.
optional bool write_at_batch_timestamp = 12 [(gogoproto.nullable) = false];

// NEXTID: 14.
// NEXTID: 15.
}

// JobProgress identifies the job to report progress on. This reporting
Expand Down
70 changes: 65 additions & 5 deletions pkg/sql/logictest/testdata/logic_test/alter_table
Original file line number Diff line number Diff line change
Expand Up @@ -2196,6 +2196,66 @@ SELECT * FROM multipleinstmt ORDER BY id ASC;
2 b b false NULL true NULL
3 c c false NULL true NULL

subtest column_backfiller_update_batching

let $use_decl_sc
SHOW use_declarative_schema_changer

statement ok
SET use_declarative_schema_changer = 'off';

statement ok
BEGIN;
CREATE TABLE tb AS SELECT 123::INT AS k FROM generate_series(1, 10);
SET tracing = on,kv;
ALTER TABLE tb ADD COLUMN v STRING NOT NULL DEFAULT ('abc'::STRING);
SET tracing = off;

# Check that the column backfiller batches all its Puts into one batch.
query I
SELECT count(*) FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE '%sending batch%' AND message LIKE '% Put to %';
----
1

query I
SELECT count(*) FROM tb WHERE v = 'abc';
----
10

statement ok
ROLLBACK;

# Bring the threshold way down to force column backfiller batches to have no more 1 Put each.
statement ok
SET CLUSTER SETTING bulkio.column_backfill.update_chunk_size_threshold_bytes = 1;

statement ok
BEGIN;
CREATE TABLE tb AS SELECT 123::INT AS k FROM generate_series(1, 10);
SET tracing = on,kv;
ALTER TABLE tb ADD COLUMN v STRING NOT NULL DEFAULT ('abc'::STRING);
SET tracing = off;

query I
SELECT count(*) FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE '%sending batch%' AND message LIKE '% Put to %';
----
10

query I
SELECT count(*) FROM tb WHERE v = 'abc';
----
10

statement ok
ROLLBACK;

# Undo subtest side effects.
statement ok
RESET CLUSTER SETTING bulkio.column_backfill.update_chunk_size_threshold_bytes;

statement ok
SET use_declarative_schema_changer = $use_decl_sc;

subtest storage_params

statement ok
Expand Down Expand Up @@ -2306,8 +2366,8 @@ FROM (
LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid;
----
table_id name state refobjid
205 test_serial_b_seq PUBLIC 204
204 test_serial PUBLIC NULL
207 test_serial_b_seq PUBLIC 206
206 test_serial PUBLIC NULL

statement ok
DROP TABLE test_serial;
Expand Down Expand Up @@ -2341,8 +2401,8 @@ FROM (
LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid;
----
table_id name state refobjid
207 test_serial_b_seq PUBLIC 206
206 test_serial PUBLIC NULL
209 test_serial_b_seq PUBLIC 208
208 test_serial PUBLIC NULL

statement ok
ALTER TABLE test_serial DROP COLUMN b;
Expand All @@ -2357,7 +2417,7 @@ FROM (
LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid;
----
table_id name state refobjid
206 test_serial PUBLIC NULL
208 test_serial PUBLIC NULL

statement ok
DROP TABLE test_serial;
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/rowexec/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type chunkBackfiller interface {
ctx context.Context,
span roachpb.Span,
chunkSize rowinfra.RowLimit,
updateChunkSizeThresholdBytes rowinfra.BytesLimit,
readAsOf hlc.Timestamp,
) (roachpb.Key, error)

Expand Down Expand Up @@ -135,6 +136,8 @@ func (b *backfiller) mainLoop(ctx context.Context) (roachpb.Spans, error) {
// fill more than this amount and cause a flush, then it likely also fills
// a non-trivial part of the next buffer.
const opportunisticCheckpointThreshold = 0.8
chunkSize := rowinfra.RowLimit(b.spec.ChunkSize)
updateChunkSizeThresholdBytes := rowinfra.BytesLimit(b.spec.UpdateChunkSizeThresholdBytes)
start := timeutil.Now()
totalChunks := 0
totalSpans := 0
Expand All @@ -148,7 +151,7 @@ func (b *backfiller) mainLoop(ctx context.Context) (roachpb.Spans, error) {
for todo.Key != nil {
log.VEventf(ctx, 3, "%s backfiller starting chunk %d: %s", b.name, chunks, todo)
var err error
todo.Key, err = b.chunks.runChunk(ctx, todo, rowinfra.RowLimit(b.spec.ChunkSize), b.spec.ReadAsOf)
todo.Key, err = b.chunks.runChunk(ctx, todo, chunkSize, updateChunkSizeThresholdBytes, b.spec.ReadAsOf)
if err != nil {
return nil, err
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/rowexec/columnbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ func (cb *columnBackfiller) CurrentBufferFill() float32 {

// runChunk implements the chunkBackfiller interface.
func (cb *columnBackfiller) runChunk(
ctx context.Context, sp roachpb.Span, chunkSize rowinfra.RowLimit, _ hlc.Timestamp,
ctx context.Context,
sp roachpb.Span,
chunkSize rowinfra.RowLimit,
updateChunkSizeThresholdBytes rowinfra.BytesLimit,
_ hlc.Timestamp,
) (roachpb.Key, error) {
var key roachpb.Key
var commitWaitFn func(context.Context) error
Expand All @@ -124,16 +128,16 @@ func (cb *columnBackfiller) runChunk(
// waiting for consistency when backfilling a column on GLOBAL tables.
commitWaitFn = txn.DeferCommitWait(ctx)

// TODO(knz): do KV tracing in DistSQL processors.
var err error
key, err = cb.RunColumnBackfillChunk(
ctx,
txn,
cb.desc,
sp,
chunkSize,
true, /*alsoCommit*/
false, /*traceKV*/
updateChunkSizeThresholdBytes,
true, /*alsoCommit*/
cb.flowCtx.TraceKV,
)
return err
})
Expand Down

0 comments on commit 119dc67

Please sign in to comment.