Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add bulkio.column_backfill.update_chunk_size_threshold_bytes #83544

Merged
merged 1 commit into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ var columnBackfillBatchSize = settings.RegisterIntSetting(
settings.NonNegativeInt, /* validateFn */
)

// columnBackfillUpdateChunkSizeThresholdBytes is the byte size threshold beyond which
// an update batch is run at once when adding or removing columns.
var columnBackfillUpdateChunkSizeThresholdBytes = settings.RegisterIntSetting(
settings.TenantWritable,
"bulkio.column_backfill.update_chunk_size_threshold_bytes",
"the batch size in bytes above which an update is immediately run when adding/removing columns",
10<<20, /* 10 MiB */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took this default value because that's also the default max size when scanning.

settings.NonNegativeInt, /* validateFn */
)

var _ sort.Interface = columnsByID{}
var _ sort.Interface = indexesByID{}

Expand Down Expand Up @@ -1204,6 +1214,7 @@ func (sc *SchemaChanger) distColumnBackfill(
ctx context.Context,
version descpb.DescriptorVersion,
backfillChunkSize int64,
backfillUpdateChunkSizeThresholdBytes uint64,
filter backfill.MutationFilter,
) error {
duration := checkpointInterval
Expand Down Expand Up @@ -1298,7 +1309,7 @@ func (sc *SchemaChanger) distColumnBackfill(

planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
DistributionTypeSystemTenantOnly)
spec, err := initColumnBackfillerSpec(*tableDesc.TableDesc(), duration, chunkSize, readAsOf)
spec, err := initColumnBackfillerSpec(tableDesc, duration, chunkSize, backfillUpdateChunkSizeThresholdBytes, readAsOf)
if err != nil {
return err
}
Expand Down Expand Up @@ -2191,8 +2202,12 @@ func (sc *SchemaChanger) truncateAndBackfillColumns(
log.Infof(ctx, "clearing and backfilling columns")

if err := sc.distColumnBackfill(
ctx, version, columnBackfillBatchSize.Get(&sc.settings.SV),
backfill.ColumnMutationFilter); err != nil {
ctx,
version,
columnBackfillBatchSize.Get(&sc.settings.SV),
uint64(columnBackfillUpdateChunkSizeThresholdBytes.Get(&sc.settings.SV)),
backfill.ColumnMutationFilter,
); err != nil {
return err
}
log.Info(ctx, "finished clearing and backfilling columns")
Expand Down Expand Up @@ -2621,9 +2636,11 @@ func columnBackfillInTxn(
sp := tableDesc.PrimaryIndexSpan(evalCtx.Codec)
for sp.Key != nil {
var err error
scanBatchSize := rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV))
updateChunkSizeThresholdBytes := rowinfra.BytesLimit(columnBackfillUpdateChunkSizeThresholdBytes.Get(&evalCtx.Settings.SV))
const alsoCommit = false
sp.Key, err = backfiller.RunColumnBackfillChunk(
ctx, txn, tableDesc, sp, rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)),
false /*alsoCommit*/, traceKV,
ctx, txn, tableDesc, sp, scanBatchSize, updateChunkSizeThresholdBytes, alsoCommit, traceKV,
)
if err != nil {
return err
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
tableDesc catalog.TableDescriptor,
sp roachpb.Span,
chunkSize rowinfra.RowLimit,
updateChunkSizeThresholdBytes rowinfra.BytesLimit,
alsoCommit bool,
traceKV bool,
) (roachpb.Key, error) {
Expand Down Expand Up @@ -390,6 +391,20 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
); err != nil {
return roachpb.Key{}, err
}

// Exit early to flush if the batch byte size exceeds a predefined
// threshold. This can happen when table rows are more on the "fat" side,
// typically with large BYTES or JSONB columns.
//
// This helps prevent exceedingly large raft commands which will
// for instance cause schema changes to be unable to either proceed or to
// roll back.
//
// The threshold is ignored when zero.
//
if updateChunkSizeThresholdBytes > 0 && b.ApproximateMutationBytes() >= int(updateChunkSizeThresholdBytes) {
break
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that a threshold was more convenient than an absolute max size, both because it was easier to implement and because then I wouldn't have to worry about batches with zero puts, in the case where one put is just that big. What do we do then? not update the row? It didn't make sense, so I used a threshold.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, so, in addition to the old 64 MiB, you put this inside the for loop to say that if the to-be-put batch exceeds the threshold then break and flush, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost, it's in addition to the number-of-rows limit (default 200). The 64 MiB limit we've changed in the customer support issues doesn't come into play here, that's a hard limit on the size of raft commands, which if exceeded causes the schema change to be blocked. This new setting I'm adding helps ensure we never reach that hard limit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, my mistake, I meant the 200-row limit. Thanks for your explanation!

}
// Write the new row values.
writeBatch := txn.Run
Expand Down
18 changes: 12 additions & 6 deletions pkg/sql/distsql_plan_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
Expand All @@ -26,14 +27,19 @@ import (
)

func initColumnBackfillerSpec(
desc descpb.TableDescriptor, duration time.Duration, chunkSize int64, readAsOf hlc.Timestamp,
tbl catalog.TableDescriptor,
duration time.Duration,
chunkSize int64,
updateChunkSizeThresholdBytes uint64,
readAsOf hlc.Timestamp,
) (execinfrapb.BackfillerSpec, error) {
return execinfrapb.BackfillerSpec{
Table: desc,
Duration: duration,
ChunkSize: chunkSize,
ReadAsOf: readAsOf,
Type: execinfrapb.BackfillerSpec_Column,
Table: *tbl.TableDesc(),
Duration: duration,
ChunkSize: chunkSize,
UpdateChunkSizeThresholdBytes: updateChunkSizeThresholdBytes,
ReadAsOf: readAsOf,
Type: execinfrapb.BackfillerSpec_Column,
}, nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ message BackfillerSpec {
// of entries backfilled per chunk.
optional int64 chunk_size = 5 [(gogoproto.nullable) = false];

// The column backfiller will run an update batch immediately
// once its estimated byte size reaches UpdateChunkSizeThresholdBytes, if nonzero.
optional uint64 update_chunk_size_threshold_bytes = 14 [(gogoproto.nullable) = false];

// WriteAsOf is the time that the backfill entries should be written.
// Note: Older nodes may also use this as the read time instead of readAsOf.
optional util.hlc.Timestamp writeAsOf = 7 [(gogoproto.nullable) = false];
Expand All @@ -84,7 +88,7 @@ message BackfillerSpec {
// check MVCCAddSSTable before setting this option.
optional bool write_at_batch_timestamp = 12 [(gogoproto.nullable) = false];

// NEXTID: 14.
// NEXTID: 15.
}

// JobProgress identifies the job to report progress on. This reporting
Expand Down
70 changes: 65 additions & 5 deletions pkg/sql/logictest/testdata/logic_test/alter_table
Original file line number Diff line number Diff line change
Expand Up @@ -2196,6 +2196,66 @@ SELECT * FROM multipleinstmt ORDER BY id ASC;
2 b b false NULL true NULL
3 c c false NULL true NULL

subtest column_backfiller_update_batching

let $use_decl_sc
SHOW use_declarative_schema_changer

statement ok
SET use_declarative_schema_changer = 'off';

statement ok
BEGIN;
CREATE TABLE tb AS SELECT 123::INT AS k FROM generate_series(1, 10);
SET tracing = on,kv;
ALTER TABLE tb ADD COLUMN v STRING NOT NULL DEFAULT ('abc'::STRING);
SET tracing = off;

# Check that the column backfiller batches all its Puts into one batch.
query I
SELECT count(*) FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE '%sending batch%' AND message LIKE '% Put to %';
----
1

query I
SELECT count(*) FROM tb WHERE v = 'abc';
----
10

statement ok
ROLLBACK;

# Bring the threshold way down to force column backfiller batches to have no more 1 Put each.
statement ok
SET CLUSTER SETTING bulkio.column_backfill.update_chunk_size_threshold_bytes = 1;

statement ok
BEGIN;
CREATE TABLE tb AS SELECT 123::INT AS k FROM generate_series(1, 10);
SET tracing = on,kv;
ALTER TABLE tb ADD COLUMN v STRING NOT NULL DEFAULT ('abc'::STRING);
SET tracing = off;

query I
SELECT count(*) FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE '%sending batch%' AND message LIKE '% Put to %';
----
10

query I
SELECT count(*) FROM tb WHERE v = 'abc';
----
10

statement ok
ROLLBACK;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This subtest only exercises in-txn column backfills, I couldn't figure out how to retrieve the traces in the other case. Those traces are emitted, according to the debugger at least, but I couldn't collect them. Anyway I figured that this would be enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I like the tests you added as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also have a "correctness" test? Namely, if set this threshold to be small and add a column, we expect it to use >1 batch and succeed by being able to observe the newly added column with the correct default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's that valuable, we already arbitrarily break down batches in 200-row increments, but it also doesn't cost anything, so I'll add a SELECT.


# Undo subtest side effects.
statement ok
RESET CLUSTER SETTING bulkio.column_backfill.update_chunk_size_threshold_bytes;

statement ok
SET use_declarative_schema_changer = $use_decl_sc;

subtest storage_params

statement ok
Expand Down Expand Up @@ -2306,8 +2366,8 @@ FROM (
LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid;
----
table_id name state refobjid
205 test_serial_b_seq PUBLIC 204
204 test_serial PUBLIC NULL
207 test_serial_b_seq PUBLIC 206
206 test_serial PUBLIC NULL
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These and the others all get bumped because my subtest non-transactionally increments the descriptor ID counter twice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we put your subtest at the end of the file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a sane world I would, however, doing so often causes annoying merge conflicts when backporting.


statement ok
DROP TABLE test_serial;
Expand Down Expand Up @@ -2341,8 +2401,8 @@ FROM (
LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid;
----
table_id name state refobjid
207 test_serial_b_seq PUBLIC 206
206 test_serial PUBLIC NULL
209 test_serial_b_seq PUBLIC 208
208 test_serial PUBLIC NULL

statement ok
ALTER TABLE test_serial DROP COLUMN b;
Expand All @@ -2357,7 +2417,7 @@ FROM (
LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid;
----
table_id name state refobjid
206 test_serial PUBLIC NULL
208 test_serial PUBLIC NULL

statement ok
DROP TABLE test_serial;
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/rowexec/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type chunkBackfiller interface {
ctx context.Context,
span roachpb.Span,
chunkSize rowinfra.RowLimit,
updateChunkSizeThresholdBytes rowinfra.BytesLimit,
readAsOf hlc.Timestamp,
) (roachpb.Key, error)

Expand Down Expand Up @@ -135,6 +136,8 @@ func (b *backfiller) mainLoop(ctx context.Context) (roachpb.Spans, error) {
// fill more than this amount and cause a flush, then it likely also fills
// a non-trivial part of the next buffer.
const opportunisticCheckpointThreshold = 0.8
chunkSize := rowinfra.RowLimit(b.spec.ChunkSize)
updateChunkSizeThresholdBytes := rowinfra.BytesLimit(b.spec.UpdateChunkSizeThresholdBytes)
start := timeutil.Now()
totalChunks := 0
totalSpans := 0
Expand All @@ -148,7 +151,7 @@ func (b *backfiller) mainLoop(ctx context.Context) (roachpb.Spans, error) {
for todo.Key != nil {
log.VEventf(ctx, 3, "%s backfiller starting chunk %d: %s", b.name, chunks, todo)
var err error
todo.Key, err = b.chunks.runChunk(ctx, todo, rowinfra.RowLimit(b.spec.ChunkSize), b.spec.ReadAsOf)
todo.Key, err = b.chunks.runChunk(ctx, todo, chunkSize, updateChunkSizeThresholdBytes, b.spec.ReadAsOf)
if err != nil {
return nil, err
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/rowexec/columnbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ func (cb *columnBackfiller) CurrentBufferFill() float32 {

// runChunk implements the chunkBackfiller interface.
func (cb *columnBackfiller) runChunk(
ctx context.Context, sp roachpb.Span, chunkSize rowinfra.RowLimit, _ hlc.Timestamp,
ctx context.Context,
sp roachpb.Span,
chunkSize rowinfra.RowLimit,
updateChunkSizeThresholdBytes rowinfra.BytesLimit,
_ hlc.Timestamp,
) (roachpb.Key, error) {
var key roachpb.Key
var commitWaitFn func(context.Context) error
Expand All @@ -124,16 +128,16 @@ func (cb *columnBackfiller) runChunk(
// waiting for consistency when backfilling a column on GLOBAL tables.
commitWaitFn = txn.DeferCommitWait(ctx)

// TODO(knz): do KV tracing in DistSQL processors.
var err error
key, err = cb.RunColumnBackfillChunk(
ctx,
txn,
cb.desc,
sp,
chunkSize,
true, /*alsoCommit*/
false, /*traceKV*/
updateChunkSizeThresholdBytes,
true, /*alsoCommit*/
cb.flowCtx.TraceKV,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figure I'd leave this change in, as the debugger assures me that this flag is set when SET tracing = on,kv.

)
return err
})
Expand Down