Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Improve protected timestamp handling #103539

Merged
merged 1 commit into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consu
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled tenant-rw
changefeed.fast_gzip.enabled boolean true use fast gzip implementation tenant-rw
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds tenant-rw
changefeed.protect_timestamp.max_age duration 96h0m0s fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration tenant-rw
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables tenant-rw
changefeed.sink_io_workers integer 0 the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. tenant-rw
cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload tenant-rw
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-protect-timestamp-max-age" class="anchored"><code>changefeed.protect_timestamp.max_age</code></div></td><td>duration</td><td><code>96h0m0s</code></td><td>fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-schema-feed-read-with-priority-after" class="anchored"><code>changefeed.schema_feed.read_with_priority_after</code></div></td><td>duration</td><td><code>1m0s</code></td><td>retry with high priority if we were not able to read descriptors for too long; 0 disables</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-sink-io-workers" class="anchored"><code>changefeed.sink_io_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers used by changefeeds when sending requests to the sink (currently webhook only): &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-cloudstorage-azure-concurrent-upload-buffers" class="anchored"><code>cloudstorage.azure.concurrent_upload_buffers</code></div></td><td>integer</td><td><code>1</code></td><td>controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ go_test(
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptstorage",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/scheduledjobs/schedulebase",
Expand Down Expand Up @@ -269,7 +268,6 @@ go_test(
"//pkg/sql/execinfrapb",
"//pkg/sql/flowinfra",
"//pkg/sql/importer",
"//pkg/sql/isql",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
Expand Down Expand Up @@ -309,7 +307,6 @@ go_test(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/timeutil/pgdate",
"//pkg/util/uuid",
"//pkg/workload/bank",
"//pkg/workload/ledger",
"//pkg/workload/workloadsql",
Expand Down
72 changes: 19 additions & 53 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,17 +714,28 @@ func createChangefeedJobRecord(
return nil, err
}

useDefaultExpiration := ptsExpiration == 0
if useDefaultExpiration {
ptsExpiration = changefeedbase.MaxProtectedTimestampAge.Get(&p.ExecCfg().Settings.SV)
}

if ptsExpiration > 0 && ptsExpiration < time.Hour {
// This threshold is rather arbitrary. But we want to warn users about
// the potential impact of keeping this setting too low.
p.BufferClientNotice(ctx, pgnotice.Newf(
`the value of %s for changefeed option %s might be too low. Having a low
value for this option should not have adverse effect as long as changefeed
is running. However, should the changefeed be paused, it will need to be
resumed before expiration time. The value of this setting should reflect
how much time the changefeed may remain paused, before it is canceled.
Few hours to a few days range are appropriate values for this option.
`, ptsExpiration, changefeedbase.OptExpirePTSAfter, ptsExpiration))
const explainer = `Having a low protected timestamp expiration value should not have adverse effect
as long as changefeed is running. However, should the changefeed be paused, it
will need to be resumed before expiration time. The value of this setting should
reflect how much time he changefeed may remain paused, before it is canceled.
Few hours to a few days range are appropriate values for this option.`
if useDefaultExpiration {
p.BufferClientNotice(ctx, pgnotice.Newf(
`the value of %s for changefeed.protect_timestamp.max_age setting might be too low. %s`,
ptsExpiration, changefeedbase.OptExpirePTSAfter, explainer))
} else {
p.BufferClientNotice(ctx, pgnotice.Newf(
`the value of %s for changefeed option %s might be too low. %s`,
ptsExpiration, changefeedbase.OptExpirePTSAfter, explainer))
}
}

jr := &jobs.Record{
Expand Down Expand Up @@ -1164,10 +1175,6 @@ func (b *changefeedResumer) handleChangefeedError(
changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
return b.job.NoTxn().PauseRequestedWithFunc(ctx, func(ctx context.Context,
planHookState interface{}, txn isql.Txn, progress *jobspb.Progress) error {
err := b.OnPauseRequest(ctx, jobExec, txn, progress)
if err != nil {
return err
}
// directly update running status to avoid the running/reverted job status check
progress.RunningStatus = errorMessage
log.Warningf(ctx, errorFmt, changefeedErr, changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
Expand Down Expand Up @@ -1409,47 +1416,6 @@ func (b *changefeedResumer) maybeCleanUpProtectedTimestamp(
}
}

var _ jobs.PauseRequester = (*changefeedResumer)(nil)

// OnPauseRequest implements jobs.PauseRequester. If this changefeed is being
// paused, we may want to clear the protected timestamp record.
func (b *changefeedResumer) OnPauseRequest(
ctx context.Context, jobExec interface{}, txn isql.Txn, progress *jobspb.Progress,
) error {
details := b.job.Details().(jobspb.ChangefeedDetails)

cp := progress.GetChangefeed()
execCfg := jobExec.(sql.JobExecContext).ExecCfg()

if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect {
// Release existing pts record to avoid a single changefeed left on pause
// resulting in storage issues
if cp.ProtectedTimestampRecord != uuid.Nil {
pts := execCfg.ProtectedTimestampProvider.WithTxn(txn)
if err := pts.Release(ctx, cp.ProtectedTimestampRecord); err != nil {
log.Warningf(ctx, "failed to release protected timestamp %v: %v", cp.ProtectedTimestampRecord, err)
} else {
cp.ProtectedTimestampRecord = uuid.Nil
}
}
return nil
}

if cp.ProtectedTimestampRecord == uuid.Nil {
resolved := progress.GetHighWater()
if resolved == nil {
return nil
}
pts := execCfg.ProtectedTimestampProvider.WithTxn(txn)
ptr := createProtectedTimestampRecord(
ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp,
)
return pts.Protect(ctx, ptr)
}

return nil
}

// getQualifiedTableName returns the database-qualified name of the table
// or view represented by the provided descriptor.
func getQualifiedTableName(
Expand Down
80 changes: 0 additions & 80 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptstorage"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -95,7 +94,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/dustin/go-humanize"
"github.com/lib/pq"
Expand Down Expand Up @@ -5626,84 +5624,6 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
}))
}

func TestChangefeedProtectedTimestampOnPause(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(shouldPause bool) cdcTestFn {
return func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b'), (4, 'c'), (7, 'd'), (8, 'e')`)

var tableID int
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables `+
`WHERE name = 'foo' AND database_name = current_database()`).
Scan(&tableID)
stmt := `CREATE CHANGEFEED FOR foo WITH resolved`
if shouldPause {
stmt += ", " + changefeedbase.OptProtectDataFromGCOnPause
}
foo := feed(t, f, stmt)
defer closeFeed(t, foo)
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1, "b": "a"}}`,
`foo: [2]->{"after": {"a": 2, "b": "b"}}`,
`foo: [4]->{"after": {"a": 4, "b": "c"}}`,
`foo: [7]->{"after": {"a": 7, "b": "d"}}`,
`foo: [8]->{"after": {"a": 8, "b": "e"}}`,
})
expectResolvedTimestamp(t, foo)

// Pause the job then ensure that it has a reasonable protected timestamp.

ctx := context.Background()
serverCfg := s.Server.DistSQLServer().(*distsql.ServerImpl).ServerConfig
jr := serverCfg.JobRegistry
pts := ptstorage.WithDatabase(
serverCfg.ProtectedTimestampProvider, serverCfg.DB,
)

feedJob := foo.(cdctest.EnterpriseTestFeed)
require.NoError(t, feedJob.Pause())
{
j, err := jr.LoadJob(ctx, feedJob.JobID())
require.NoError(t, err)
progress := j.Progress()
details := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
if shouldPause {
require.NotEqual(t, uuid.Nil, details.ProtectedTimestampRecord)
r, err := pts.GetRecord(ctx, details.ProtectedTimestampRecord)
require.NoError(t, err)
require.True(t, r.Timestamp.LessEq(*progress.GetHighWater()))
} else {
require.Equal(t, uuid.Nil, details.ProtectedTimestampRecord)
}
}

// Resume the job and ensure that the protected timestamp is removed once
// the changefeed has caught up.
require.NoError(t, feedJob.Resume())
testutils.SucceedsSoon(t, func() error {
resolvedTs, _ := expectResolvedTimestamp(t, foo)
j, err := jr.LoadJob(ctx, feedJob.JobID())
require.NoError(t, err)
details := j.Progress().Details.(*jobspb.Progress_Changefeed).Changefeed
r, err := pts.GetRecord(ctx, details.ProtectedTimestampRecord)
if err != nil || r.Timestamp.Less(resolvedTs) {
return fmt.Errorf("expected protected timestamp record %v to have timestamp greater than %v", r, resolvedTs)
}
return nil
})
}
}

testutils.RunTrueAndFalse(t, "protect_on_pause", func(t *testing.T, shouldPause bool) {
cdcTest(t, testFn(shouldPause), feedTestEnterpriseSinks)
})

}

func TestManyChangefeedsOneTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
Loading