From 365a8b180e40fc696eb9475e6e91d07ec2619397 Mon Sep 17 00:00:00 2001 From: Shiranka Miskin Date: Fri, 14 Oct 2022 10:03:36 -0400 Subject: [PATCH] changefeedccl: cleanup deprecated pts handling Going forward we no longer want customers to be able to turn off active protected timestamps, as we always want to remain resilient to low ttl tables. Release note (enterprise change): The changefeed.active_protected_timestamps.enabled cluster setting has been removed and is always treated as if it was true. --- .../changefeedccl/changefeed_processors.go | 50 +------------------ pkg/ccl/changefeedccl/changefeed_stmt.go | 18 +++---- .../changefeedccl/changefeedbase/settings.go | 9 ---- pkg/settings/registry.go | 7 +-- 4 files changed, 13 insertions(+), 71 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 3c5698025c7a..291402451bec 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -1248,13 +1247,7 @@ func (cf *changeFrontier) checkpointJobProgress( changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed changefeedProgress.Checkpoint = &checkpoint - timestampManager := cf.manageProtectedTimestamps - // TODO(samiskin): Remove this conditional and the associated deprecated - // methods once we're confident in ActiveProtectedTimestampsEnabled - if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) { - timestampManager = cf.deprecatedManageProtectedTimestamps - } - if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil { + if err := cf.manageProtectedTimestamps(cf.Ctx, txn, changefeedProgress); err != nil { log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err) return err } @@ -1332,47 +1325,6 @@ func (cf *changeFrontier) manageProtectedTimestamps( return nil } -// deprecatedManageProtectedTimestamps only sets a protected timestamp when the -// changefeed is in a backfill or the highwater is lagging behind to a -// sufficient degree after a backfill. This was deprecated in favor of always -// maintaining a timestamp record to avoid issues with a low gcttl setting. -func (cf *changeFrontier) deprecatedManageProtectedTimestamps( - ctx context.Context, txn *kv.Txn, progress *jobspb.ChangefeedProgress, -) error { - pts := cf.flowCtx.Cfg.ProtectedTimestampProvider - if err := cf.deprecatedMaybeReleaseProtectedTimestamp(ctx, progress, pts, txn); err != nil { - return err - } - - schemaChangePolicy := changefeedbase.SchemaChangePolicy(cf.spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy]) - shouldProtectBoundaries := schemaChangePolicy == changefeedbase.OptSchemaChangePolicyBackfill - if cf.frontier.schemaChangeBoundaryReached() && shouldProtectBoundaries { - highWater := cf.frontier.Frontier() - ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress) - return pts.Protect(ctx, txn, ptr) - } - return nil -} - -func (cf *changeFrontier) deprecatedMaybeReleaseProtectedTimestamp( - ctx context.Context, progress *jobspb.ChangefeedProgress, pts protectedts.Storage, txn *kv.Txn, -) error { - if progress.ProtectedTimestampRecord == uuid.Nil { - return nil - } - if !cf.frontier.schemaChangeBoundaryReached() && cf.isBehind() { - log.VEventf(ctx, 2, "not releasing protected timestamp because changefeed is behind") - return nil - } - log.VEventf(ctx, 2, "releasing protected timestamp %v", - progress.ProtectedTimestampRecord) - if err := pts.Release(ctx, txn, progress.ProtectedTimestampRecord); err != nil { - return err - } - progress.ProtectedTimestampRecord = uuid.Nil - return nil -} - func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error { if cf.freqEmitResolved == emitNoResolved || newResolved.IsEmpty() { return nil diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 18a813be66bf..53f9d53fbefd 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -267,16 +267,14 @@ func changefeedPlanHook( { var ptr *ptpb.Record codec := p.ExecCfg().Codec - - activeTimestampProtection := changefeedbase.ActiveProtectedTimestampsEnabled.Get(&p.ExecCfg().Settings.SV) - initialScanType, err := opts.GetInitialScanType() - if err != nil { - return err - } - shouldProtectTimestamp := activeTimestampProtection || (initialScanType != changefeedbase.NoInitialScan) - if shouldProtectTimestamp { - ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), details.StatementTime, progress.GetChangefeed()) - } + ptr = createProtectedTimestampRecord( + ctx, + codec, + jobID, + AllTargets(details), + details.StatementTime, + progress.GetChangefeed(), + ) jr.Progress = *progress.GetChangefeed() diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 22841234c7e1..4c09ad9a6d38 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -211,15 +211,6 @@ var ProtectTimestampInterval = settings.RegisterDurationSetting( settings.PositiveDuration, ) -// ActiveProtectedTimestampsEnabled enables always having protected timestamps -// laid down that are periodically advanced to the highwater mark. -var ActiveProtectedTimestampsEnabled = settings.RegisterBoolSetting( - settings.TenantWritable, - "changefeed.active_protected_timestamps.enabled", - "if set, rather than only protecting changefeed targets from garbage collection during backfills, data will always be protected up to the changefeed's frontier", - true, -) - // BatchReductionRetryEnabled enables the temporary reduction of batch sizes upon kafka message too large errors var BatchReductionRetryEnabled = settings.RegisterBoolSetting( settings.TenantWritable, diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index e50664666bdc..f75659d42535 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -150,9 +150,10 @@ var retiredSettings = map[string]struct{}{ "sql.ttl.default_range_concurrency": {}, // removed as of 23.1. - "sql.catalog.descs.validate_on_write.enabled": {}, - "sql.distsql.max_running_flows": {}, - "sql.distsql.flow_scheduler_queueing.enabled": {}, + "sql.catalog.descs.validate_on_write.enabled": {}, + "sql.distsql.max_running_flows": {}, + "sql.distsql.flow_scheduler_queueing.enabled": {}, + "changefeed.active_protected_timestamps.enabled": {}, } // sqlDefaultSettings is the list of "grandfathered" existing sql.defaults