From aaf1ca4cba36bb4e5f520e399d5128a59d1875bd Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Thu, 18 May 2023 13:30:03 -0400 Subject: [PATCH 1/2] server: Revert `server.shutdown.jobs_wait` to 0 Revert default setting for `server.shutdown.jobs_wait` to 0 to ensure that shutdown dows not wait for active jobs. Release note: None --- docs/generated/settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/server/drain.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 85e010b56825..d8ed12a2e239 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -74,7 +74,7 @@ server.oidc_authentication.scopes string openid sets OIDC scopes to include with server.rangelog.ttl duration 720h0m0s if nonzero, entries in system.rangelog older than this duration are periodically purged tenant-rw server.shutdown.connection_wait duration 0s the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw server.shutdown.drain_wait duration 0s the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.) tenant-rw -server.shutdown.jobs_wait duration 10s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw +server.shutdown.jobs_wait duration 0s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw server.shutdown.query_wait duration 10s the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw server.time_until_store_dead duration 5m0s the time after which if there is no new gossiped information about a store, it is considered dead tenant-rw server.user_login.cert_password_method.auto_scram_promotion.enabled boolean true whether to automatically promote cert-password authentication to use SCRAM tenant-rw diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 686268b50177..ced4a1b65947 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -104,7 +104,7 @@
server.secondary_tenants.redact_trace.enabled
booleantruecontrols if server side traces are redacted for tenant operationsDedicated/Self-Hosted
server.shutdown.connection_wait
duration0sthe maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)Serverless/Dedicated/Self-Hosted
server.shutdown.drain_wait
duration0sthe amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.)Serverless/Dedicated/Self-Hosted -
server.shutdown.jobs_wait
duration10sthe maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdownServerless/Dedicated/Self-Hosted +
server.shutdown.jobs_wait
duration0sthe maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdownServerless/Dedicated/Self-Hosted
server.shutdown.lease_transfer_wait
duration5sthe timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)Dedicated/Self-Hosted
server.shutdown.query_wait
duration10sthe timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)Serverless/Dedicated/Self-Hosted
server.time_until_store_dead
duration5m0sthe time after which if there is no new gossiped information about a store, it is considered deadServerless/Dedicated/Self-Hosted diff --git a/pkg/server/drain.go b/pkg/server/drain.go index c2c382a18a8d..0b1a6c9ebffe 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -72,7 +72,7 @@ var ( "server.shutdown.jobs_wait", "the maximum amount of time a server waits for all currently executing jobs "+ "to notice drain request and to perform orderly shutdown", - 10*time.Second, + 0*time.Second, settings.NonNegativeDurationWithMaximum(10*time.Hour), ).WithPublic() ) From c637746399e55dc796b2c65a2bac23b61c48ebbb Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 17 May 2023 15:32:16 -0400 Subject: [PATCH 2/2] changefeedccl: Improve protected timestamp handling Changefeeds utilize protected timestamp (PTS) in order to ensure that the data is not garbage collected (GC) prematurely. This subsystem underwent through many rounds of changes, resulting in an unintuitive, and potentially dangerous behavior. This PR updates and improves PTS handling as follows. PR #97148 introduce capability to cancel jobs that hold on to stale PTS records. This PR expands this functionality to apply to all jobs -- not just paused jobs. This is necessary because due to #90810, changefeeds will retry almost every error -- and that means that if a running changefeed jobs fails to make progress for very long time, it is possible that a PTS record will protect GC collection for many days, weeks, or even months. To guard against this case, introduce a new cluster setting `changefeed.protect_timestamp.max_age`, which defaults to generous 4 days, to make sure that even if the explicit changefeed option `gc_protect_expires_after` was not specified, the changefeed will fail after `changefeed.protect_timestamp.max_age` if no progress is made. This setting only applies to newly created changefeeds. Use `ALTER CHANGEFEED` statement to set `gc_protect_expires_after` option for existing changefeeds to enable PTS expiration. The fail safe can be disabled by setting `changefeed.protect_timestamp.max_age` to 0; Note, however, that doing so could result in stability issues once stale PTS record released. In addition, this PR deprecates `protect_data_from_gc_on_pause` option. This option is not needed since we now employ "active protected timestamp" management (meaning: there is always a PTS record when running changefeed jobs), and the handling of this record is consistent for both running and paused jobs. Fixes #103464 Release note (enterprise change): Introduce a new `changefeed.protect_timestamp.max_age` setting (default 4 days), which will cancel running changefeed jobs if they fail to make forward progress for much time. This setting is used if the explicit `gc_protect_expires_after` option was not set. In addition, deprecate `protect_data_from_gc_on_pause` option. This option is no longer needed since changefeed jobs always protect data. --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/ccl/changefeedccl/BUILD.bazel | 3 - pkg/ccl/changefeedccl/changefeed_stmt.go | 72 +++------ pkg/ccl/changefeedccl/changefeed_test.go | 80 ---------- .../changefeedccl/changefeedbase/options.go | 138 ++++++++++-------- .../changefeedccl/changefeedbase/settings.go | 9 ++ pkg/ccl/changefeedccl/sink.go | 3 + pkg/ccl/changefeedccl/testfeed_test.go | 8 - pkg/jobs/metricspoller/job_statistics.go | 8 +- 10 files changed, 109 insertions(+), 214 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 85e010b56825..9f38602528c7 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -17,6 +17,7 @@ changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consu changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled tenant-rw changefeed.fast_gzip.enabled boolean true use fast gzip implementation tenant-rw changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds tenant-rw +changefeed.protect_timestamp.max_age duration 96h0m0s fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration tenant-rw changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables tenant-rw changefeed.sink_io_workers integer 0 the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. tenant-rw cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload tenant-rw diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 686268b50177..1ee712f2a192 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -23,6 +23,7 @@
changefeed.event_consumer_workers
integer0the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabledServerless/Dedicated/Self-Hosted
changefeed.fast_gzip.enabled
booleantrueuse fast gzip implementationServerless/Dedicated/Self-Hosted
changefeed.node_throttle_config
stringspecifies node level throttling configuration for all changefeeedsServerless/Dedicated/Self-Hosted +
changefeed.protect_timestamp.max_age
duration96h0m0sfail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expirationServerless/Dedicated/Self-Hosted
changefeed.schema_feed.read_with_priority_after
duration1m0sretry with high priority if we were not able to read descriptors for too long; 0 disablesServerless/Dedicated/Self-Hosted
changefeed.sink_io_workers
integer0the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value.Serverless/Dedicated/Self-Hosted
cloudstorage.azure.concurrent_upload_buffers
integer1controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an uploadServerless/Dedicated/Self-Hosted diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 9286954617c9..4c34e8c4d0a7 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -239,7 +239,6 @@ go_test( "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/protectedts", - "//pkg/kv/kvserver/protectedts/ptstorage", "//pkg/roachpb", "//pkg/scheduledjobs", "//pkg/scheduledjobs/schedulebase", @@ -269,7 +268,6 @@ go_test( "//pkg/sql/execinfrapb", "//pkg/sql/flowinfra", "//pkg/sql/importer", - "//pkg/sql/isql", "//pkg/sql/parser", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", @@ -309,7 +307,6 @@ go_test( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/timeutil/pgdate", - "//pkg/util/uuid", "//pkg/workload/bank", "//pkg/workload/ledger", "//pkg/workload/workloadsql", diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 68b2d76e46d4..5d6ec8fff403 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -714,17 +714,28 @@ func createChangefeedJobRecord( return nil, err } + useDefaultExpiration := ptsExpiration == 0 + if useDefaultExpiration { + ptsExpiration = changefeedbase.MaxProtectedTimestampAge.Get(&p.ExecCfg().Settings.SV) + } + if ptsExpiration > 0 && ptsExpiration < time.Hour { // This threshold is rather arbitrary. But we want to warn users about // the potential impact of keeping this setting too low. - p.BufferClientNotice(ctx, pgnotice.Newf( - `the value of %s for changefeed option %s might be too low. Having a low - value for this option should not have adverse effect as long as changefeed - is running. However, should the changefeed be paused, it will need to be - resumed before expiration time. The value of this setting should reflect - how much time the changefeed may remain paused, before it is canceled. - Few hours to a few days range are appropriate values for this option. -`, ptsExpiration, changefeedbase.OptExpirePTSAfter, ptsExpiration)) + const explainer = `Having a low protected timestamp expiration value should not have adverse effect +as long as changefeed is running. However, should the changefeed be paused, it +will need to be resumed before expiration time. The value of this setting should +reflect how much time he changefeed may remain paused, before it is canceled. +Few hours to a few days range are appropriate values for this option.` + if useDefaultExpiration { + p.BufferClientNotice(ctx, pgnotice.Newf( + `the value of %s for changefeed.protect_timestamp.max_age setting might be too low. %s`, + ptsExpiration, changefeedbase.OptExpirePTSAfter, explainer)) + } else { + p.BufferClientNotice(ctx, pgnotice.Newf( + `the value of %s for changefeed option %s might be too low. %s`, + ptsExpiration, changefeedbase.OptExpirePTSAfter, explainer)) + } } jr := &jobs.Record{ @@ -1164,10 +1175,6 @@ func (b *changefeedResumer) handleChangefeedError( changefeedbase.OptOnError, changefeedbase.OptOnErrorPause) return b.job.NoTxn().PauseRequestedWithFunc(ctx, func(ctx context.Context, planHookState interface{}, txn isql.Txn, progress *jobspb.Progress) error { - err := b.OnPauseRequest(ctx, jobExec, txn, progress) - if err != nil { - return err - } // directly update running status to avoid the running/reverted job status check progress.RunningStatus = errorMessage log.Warningf(ctx, errorFmt, changefeedErr, changefeedbase.OptOnError, changefeedbase.OptOnErrorPause) @@ -1409,47 +1416,6 @@ func (b *changefeedResumer) maybeCleanUpProtectedTimestamp( } } -var _ jobs.PauseRequester = (*changefeedResumer)(nil) - -// OnPauseRequest implements jobs.PauseRequester. If this changefeed is being -// paused, we may want to clear the protected timestamp record. -func (b *changefeedResumer) OnPauseRequest( - ctx context.Context, jobExec interface{}, txn isql.Txn, progress *jobspb.Progress, -) error { - details := b.job.Details().(jobspb.ChangefeedDetails) - - cp := progress.GetChangefeed() - execCfg := jobExec.(sql.JobExecContext).ExecCfg() - - if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect { - // Release existing pts record to avoid a single changefeed left on pause - // resulting in storage issues - if cp.ProtectedTimestampRecord != uuid.Nil { - pts := execCfg.ProtectedTimestampProvider.WithTxn(txn) - if err := pts.Release(ctx, cp.ProtectedTimestampRecord); err != nil { - log.Warningf(ctx, "failed to release protected timestamp %v: %v", cp.ProtectedTimestampRecord, err) - } else { - cp.ProtectedTimestampRecord = uuid.Nil - } - } - return nil - } - - if cp.ProtectedTimestampRecord == uuid.Nil { - resolved := progress.GetHighWater() - if resolved == nil { - return nil - } - pts := execCfg.ProtectedTimestampProvider.WithTxn(txn) - ptr := createProtectedTimestampRecord( - ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp, - ) - return pts.Protect(ctx, ptr) - } - - return nil -} - // getQualifiedTableName returns the database-qualified name of the table // or view represented by the provided descriptor. func getQualifiedTableName( diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index e5d5bb078add..709d89486e43 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -52,7 +52,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptstorage" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" @@ -95,7 +94,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/dustin/go-humanize" "github.com/lib/pq" @@ -5626,84 +5624,6 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { })) } -func TestChangefeedProtectedTimestampOnPause(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - testFn := func(shouldPause bool) cdcTestFn { - return func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - sqlDB := sqlutils.MakeSQLRunner(s.DB) - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b'), (4, 'c'), (7, 'd'), (8, 'e')`) - - var tableID int - sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables `+ - `WHERE name = 'foo' AND database_name = current_database()`). - Scan(&tableID) - stmt := `CREATE CHANGEFEED FOR foo WITH resolved` - if shouldPause { - stmt += ", " + changefeedbase.OptProtectDataFromGCOnPause - } - foo := feed(t, f, stmt) - defer closeFeed(t, foo) - assertPayloads(t, foo, []string{ - `foo: [1]->{"after": {"a": 1, "b": "a"}}`, - `foo: [2]->{"after": {"a": 2, "b": "b"}}`, - `foo: [4]->{"after": {"a": 4, "b": "c"}}`, - `foo: [7]->{"after": {"a": 7, "b": "d"}}`, - `foo: [8]->{"after": {"a": 8, "b": "e"}}`, - }) - expectResolvedTimestamp(t, foo) - - // Pause the job then ensure that it has a reasonable protected timestamp. - - ctx := context.Background() - serverCfg := s.Server.DistSQLServer().(*distsql.ServerImpl).ServerConfig - jr := serverCfg.JobRegistry - pts := ptstorage.WithDatabase( - serverCfg.ProtectedTimestampProvider, serverCfg.DB, - ) - - feedJob := foo.(cdctest.EnterpriseTestFeed) - require.NoError(t, feedJob.Pause()) - { - j, err := jr.LoadJob(ctx, feedJob.JobID()) - require.NoError(t, err) - progress := j.Progress() - details := progress.Details.(*jobspb.Progress_Changefeed).Changefeed - if shouldPause { - require.NotEqual(t, uuid.Nil, details.ProtectedTimestampRecord) - r, err := pts.GetRecord(ctx, details.ProtectedTimestampRecord) - require.NoError(t, err) - require.True(t, r.Timestamp.LessEq(*progress.GetHighWater())) - } else { - require.Equal(t, uuid.Nil, details.ProtectedTimestampRecord) - } - } - - // Resume the job and ensure that the protected timestamp is removed once - // the changefeed has caught up. - require.NoError(t, feedJob.Resume()) - testutils.SucceedsSoon(t, func() error { - resolvedTs, _ := expectResolvedTimestamp(t, foo) - j, err := jr.LoadJob(ctx, feedJob.JobID()) - require.NoError(t, err) - details := j.Progress().Details.(*jobspb.Progress_Changefeed).Changefeed - r, err := pts.GetRecord(ctx, details.ProtectedTimestampRecord) - if err != nil || r.Timestamp.Less(resolvedTs) { - return fmt.Errorf("expected protected timestamp record %v to have timestamp greater than %v", r, resolvedTs) - } - return nil - }) - } - } - - testutils.RunTrueAndFalse(t, "protect_on_pause", func(t *testing.T, shouldPause bool) { - cdcTest(t, testFn(shouldPause), feedTestEnterpriseSinks) - }) - -} - func TestManyChangefeedsOneTable(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 413e824286f8..41d44bbf413a 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -73,34 +73,33 @@ const ( // Constants for the options. const ( - OptAvroSchemaPrefix = `avro_schema_prefix` - OptConfluentSchemaRegistry = `confluent_schema_registry` - OptCursor = `cursor` - OptCustomKeyColumn = `key_column` - OptEndTime = `end_time` - OptEnvelope = `envelope` - OptFormat = `format` - OptFullTableName = `full_table_name` - OptKeyInValue = `key_in_value` - OptTopicInValue = `topic_in_value` - OptResolvedTimestamps = `resolved` - OptMinCheckpointFrequency = `min_checkpoint_frequency` - OptUpdatedTimestamps = `updated` - OptMVCCTimestamps = `mvcc_timestamp` - OptDiff = `diff` - OptCompression = `compression` - OptSchemaChangeEvents = `schema_change_events` - OptSchemaChangePolicy = `schema_change_policy` - OptSplitColumnFamilies = `split_column_families` - OptProtectDataFromGCOnPause = `protect_data_from_gc_on_pause` - OptExpirePTSAfter = `gc_protect_expires_after` - OptWebhookAuthHeader = `webhook_auth_header` - OptWebhookClientTimeout = `webhook_client_timeout` - OptOnError = `on_error` - OptMetricsScope = `metrics_label` - OptUnordered = `unordered` - OptVirtualColumns = `virtual_columns` - OptExecutionLocality = `execution_locality` + OptAvroSchemaPrefix = `avro_schema_prefix` + OptConfluentSchemaRegistry = `confluent_schema_registry` + OptCursor = `cursor` + OptCustomKeyColumn = `key_column` + OptEndTime = `end_time` + OptEnvelope = `envelope` + OptFormat = `format` + OptFullTableName = `full_table_name` + OptKeyInValue = `key_in_value` + OptTopicInValue = `topic_in_value` + OptResolvedTimestamps = `resolved` + OptMinCheckpointFrequency = `min_checkpoint_frequency` + OptUpdatedTimestamps = `updated` + OptMVCCTimestamps = `mvcc_timestamp` + OptDiff = `diff` + OptCompression = `compression` + OptSchemaChangeEvents = `schema_change_events` + OptSchemaChangePolicy = `schema_change_policy` + OptSplitColumnFamilies = `split_column_families` + OptExpirePTSAfter = `gc_protect_expires_after` + OptWebhookAuthHeader = `webhook_auth_header` + OptWebhookClientTimeout = `webhook_client_timeout` + OptOnError = `on_error` + OptMetricsScope = `metrics_label` + OptUnordered = `unordered` + OptVirtualColumns = `virtual_columns` + OptExecutionLocality = `execution_locality` OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted` OptVirtualColumnsNull VirtualColumnVisibility = `null` @@ -172,6 +171,9 @@ const ( // Note that this option is only allowed for alter changefeed statements. OptSink = `sink` + // Deprecated options. + DeprecatedOptProtectDataFromGCOnPause = `protect_data_from_gc_on_pause` + SinkParamCACert = `ca_cert` SinkParamClientCert = `client_cert` SinkParamClientKey = `client_key` @@ -309,40 +311,40 @@ var jsonOption = OptionPermittedValues{Type: OptionTypeJSON} // ChangefeedOptionExpectValues is used to parse changefeed options using // PlanHookState.TypeAsStringOpts(). var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{ - OptAvroSchemaPrefix: stringOption, - OptConfluentSchemaRegistry: stringOption, - OptCursor: timestampOption, - OptCustomKeyColumn: stringOption, - OptEndTime: timestampOption, - OptEnvelope: enum("row", "key_only", "wrapped", "deprecated_row", "bare"), - OptFormat: enum("json", "avro", "csv", "experimental_avro", "parquet"), - OptFullTableName: flagOption, - OptKeyInValue: flagOption, - OptTopicInValue: flagOption, - OptResolvedTimestamps: durationOption.thatCanBeZero().orEmptyMeans("0"), - OptMinCheckpointFrequency: durationOption.thatCanBeZero(), - OptUpdatedTimestamps: flagOption, - OptMVCCTimestamps: flagOption, - OptDiff: flagOption, - OptCompression: enum("gzip", "zstd"), - OptSchemaChangeEvents: enum("column_changes", "default"), - OptSchemaChangePolicy: enum("backfill", "nobackfill", "stop", "ignore"), - OptSplitColumnFamilies: flagOption, - OptInitialScan: enum("yes", "no", "only").orEmptyMeans("yes"), - OptNoInitialScan: flagOption, - OptInitialScanOnly: flagOption, - OptProtectDataFromGCOnPause: flagOption, - OptExpirePTSAfter: durationOption.thatCanBeZero(), - OptKafkaSinkConfig: jsonOption, - OptPubsubSinkConfig: jsonOption, - OptWebhookSinkConfig: jsonOption, - OptWebhookAuthHeader: stringOption, - OptWebhookClientTimeout: durationOption, - OptOnError: enum("pause", "fail"), - OptMetricsScope: stringOption, - OptUnordered: flagOption, - OptVirtualColumns: enum("omitted", "null"), - OptExecutionLocality: stringOption, + OptAvroSchemaPrefix: stringOption, + OptConfluentSchemaRegistry: stringOption, + OptCursor: timestampOption, + OptCustomKeyColumn: stringOption, + OptEndTime: timestampOption, + OptEnvelope: enum("row", "key_only", "wrapped", "deprecated_row", "bare"), + OptFormat: enum("json", "avro", "csv", "experimental_avro", "parquet"), + OptFullTableName: flagOption, + OptKeyInValue: flagOption, + OptTopicInValue: flagOption, + OptResolvedTimestamps: durationOption.thatCanBeZero().orEmptyMeans("0"), + OptMinCheckpointFrequency: durationOption.thatCanBeZero(), + OptUpdatedTimestamps: flagOption, + OptMVCCTimestamps: flagOption, + OptDiff: flagOption, + OptCompression: enum("gzip", "zstd"), + OptSchemaChangeEvents: enum("column_changes", "default"), + OptSchemaChangePolicy: enum("backfill", "nobackfill", "stop", "ignore"), + OptSplitColumnFamilies: flagOption, + OptInitialScan: enum("yes", "no", "only").orEmptyMeans("yes"), + OptNoInitialScan: flagOption, + OptInitialScanOnly: flagOption, + DeprecatedOptProtectDataFromGCOnPause: flagOption, + OptExpirePTSAfter: durationOption.thatCanBeZero(), + OptKafkaSinkConfig: jsonOption, + OptPubsubSinkConfig: jsonOption, + OptWebhookSinkConfig: jsonOption, + OptWebhookAuthHeader: stringOption, + OptWebhookClientTimeout: durationOption, + OptOnError: enum("pause", "fail"), + OptMetricsScope: stringOption, + OptUnordered: flagOption, + OptVirtualColumns: enum("omitted", "null"), + OptExecutionLocality: stringOption, } // CommonOptions is options common to all sinks @@ -352,7 +354,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope, OptResolvedTimestamps, OptUpdatedTimestamps, OptMVCCTimestamps, OptDiff, OptSplitColumnFamilies, OptSchemaChangeEvents, OptSchemaChangePolicy, - OptProtectDataFromGCOnPause, OptOnError, + OptOnError, OptInitialScan, OptNoInitialScan, OptInitialScanOnly, OptUnordered, OptCustomKeyColumn, OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter, OptExecutionLocality, @@ -385,6 +387,9 @@ var ExternalConnectionValidOptions = unionStringSets(SQLValidOptions, KafkaValid var CaseInsensitiveOpts = makeStringSet(OptFormat, OptEnvelope, OptCompression, OptSchemaChangeEvents, OptSchemaChangePolicy, OptOnError, OptInitialScan) +// RetiredOptions are the options which are no longer active. +var RetiredOptions = makeStringSet(DeprecatedOptProtectDataFromGCOnPause) + // redactionFunc is a function applied to a string option which returns its redacted value. type redactionFunc func(string) (string, error) @@ -529,12 +534,17 @@ func (s StatementOptions) IsSet(key string) bool { } // DeprecationWarnings checks for options in forms we still support and serialize, -// but should be replaced with a new form. Currently hardcoded to just check format. +// but should be replaced with a new form. func (s StatementOptions) DeprecationWarnings() []string { if newFormat, ok := NoLongerExperimental[s.m[OptFormat]]; ok { return []string{fmt.Sprintf(`%[1]s is no longer experimental, use %[2]s=%[1]s`, newFormat, OptFormat)} } + for retiredOpt := range RetiredOptions { + if _, isSet := s.m[retiredOpt]; isSet { + return []string{fmt.Sprintf("%s option is no longer needed", retiredOpt)} + } + } return []string{} } diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 1f17af7e5ac7..82974564b1ba 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -211,6 +211,15 @@ var ProtectTimestampInterval = settings.RegisterDurationSetting( settings.PositiveDuration, ) +// MaxProtectedTimestampAge controls the frequency of protected timestamp record updates +var MaxProtectedTimestampAge = settings.RegisterDurationSetting( + settings.TenantWritable, + "changefeed.protect_timestamp.max_age", + "fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration", + 4*24*time.Hour, + settings.NonNegativeDuration, +).WithPublic() + // BatchReductionRetryEnabled enables the temporary reduction of batch sizes upon kafka message too large errors var BatchReductionRetryEnabled = settings.RegisterBoolSetting( settings.TenantWritable, diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 5c15ceca655d..1d19ee8a5e30 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -309,6 +309,9 @@ func validateSinkOptions(opts map[string]string, sinkSpecificOpts map[string]str if _, ok := changefeedbase.CommonOptions[opt]; ok { continue } + if _, retired := changefeedbase.RetiredOptions[opt]; retired { + continue + } if sinkSpecificOpts != nil { if _, ok := sinkSpecificOpts[opt]; ok { continue diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 2c83d846bafc..4b29f4cd967e 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -42,7 +42,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/distsql" - "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -351,13 +350,6 @@ func (r *reportErrorResumer) OnFailOrCancel( return r.wrapped.OnFailOrCancel(ctx, execCtx, jobErr) } -// OnPauseRequest implements PauseRequester interface. -func (r *reportErrorResumer) OnPauseRequest( - ctx context.Context, execCtx interface{}, txn isql.Txn, details *jobspb.Progress, -) error { - return r.wrapped.(*changefeedResumer).OnPauseRequest(ctx, execCtx, txn, details) -} - type wrapSinkFn func(sink Sink) Sink // jobFeed indicates that the feed is an "enterprise feed" -- that is, diff --git a/pkg/jobs/metricspoller/job_statistics.go b/pkg/jobs/metricspoller/job_statistics.go index b69b90eb2199..b4cc61606db4 100644 --- a/pkg/jobs/metricspoller/job_statistics.go +++ b/pkg/jobs/metricspoller/job_statistics.go @@ -170,12 +170,8 @@ func processJobPTSRecord( } // If MaximumPTSAge is set on the job payload, verify if PTS record - // timestamp is fresh enough. Note: we only look at paused jobs. - // If the running job wants to enforce an invariant wrt to PTS age, - // it can do so itself. This check here is a safety mechanism to detect - // paused jobs that own protected timestamp records. - if j.Status() == jobs.StatusPaused && - p.MaximumPTSAge > 0 && + // timestamp is fresh enough. + if p.MaximumPTSAge > 0 && rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) { stats.expired++ ptsExpired := errors.Newf(