Skip to content

Commit

Permalink
Merge #89975
Browse files Browse the repository at this point in the history
89975: changefeedccl: cleanup deprecated pts handling r=samiskin a=samiskin

Resolves #89450

Going forward we no longer want customers to be able to turn off active protected timestamps, as we always want to remain resilient to low ttl tables.

Release note (enterprise change):
The changefeed.active_protected_timestamps.enabled cluster setting has been removed and is always treated as if it was true.

Co-authored-by: Shiranka Miskin <[email protected]>
  • Loading branch information
craig[bot] and samiskin committed Nov 7, 2022
2 parents f783b08 + 365a8b1 commit d9abb80
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 71 deletions.
50 changes: 1 addition & 49 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -1243,13 +1242,7 @@ func (cf *changeFrontier) checkpointJobProgress(
changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
changefeedProgress.Checkpoint = &checkpoint

timestampManager := cf.manageProtectedTimestamps
// TODO(samiskin): Remove this conditional and the associated deprecated
// methods once we're confident in ActiveProtectedTimestampsEnabled
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil {
if err := cf.manageProtectedTimestamps(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
return err
}
Expand Down Expand Up @@ -1327,47 +1320,6 @@ func (cf *changeFrontier) manageProtectedTimestamps(
return nil
}

// deprecatedManageProtectedTimestamps only sets a protected timestamp when the
// changefeed is in a backfill or the highwater is lagging behind to a
// sufficient degree after a backfill. This was deprecated in favor of always
// maintaining a timestamp record to avoid issues with a low gcttl setting.
func (cf *changeFrontier) deprecatedManageProtectedTimestamps(
ctx context.Context, txn *kv.Txn, progress *jobspb.ChangefeedProgress,
) error {
pts := cf.flowCtx.Cfg.ProtectedTimestampProvider
if err := cf.deprecatedMaybeReleaseProtectedTimestamp(ctx, progress, pts, txn); err != nil {
return err
}

schemaChangePolicy := changefeedbase.SchemaChangePolicy(cf.spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy])
shouldProtectBoundaries := schemaChangePolicy == changefeedbase.OptSchemaChangePolicyBackfill
if cf.frontier.schemaChangeBoundaryReached() && shouldProtectBoundaries {
highWater := cf.frontier.Frontier()
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress)
return pts.Protect(ctx, txn, ptr)
}
return nil
}

func (cf *changeFrontier) deprecatedMaybeReleaseProtectedTimestamp(
ctx context.Context, progress *jobspb.ChangefeedProgress, pts protectedts.Storage, txn *kv.Txn,
) error {
if progress.ProtectedTimestampRecord == uuid.Nil {
return nil
}
if !cf.frontier.schemaChangeBoundaryReached() && cf.isBehind() {
log.VEventf(ctx, 2, "not releasing protected timestamp because changefeed is behind")
return nil
}
log.VEventf(ctx, 2, "releasing protected timestamp %v",
progress.ProtectedTimestampRecord)
if err := pts.Release(ctx, txn, progress.ProtectedTimestampRecord); err != nil {
return err
}
progress.ProtectedTimestampRecord = uuid.Nil
return nil
}

func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
if cf.freqEmitResolved == emitNoResolved || newResolved.IsEmpty() {
return nil
Expand Down
18 changes: 8 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,14 @@ func changefeedPlanHook(
{
var ptr *ptpb.Record
codec := p.ExecCfg().Codec

activeTimestampProtection := changefeedbase.ActiveProtectedTimestampsEnabled.Get(&p.ExecCfg().Settings.SV)
initialScanType, err := opts.GetInitialScanType()
if err != nil {
return err
}
shouldProtectTimestamp := activeTimestampProtection || (initialScanType != changefeedbase.NoInitialScan)
if shouldProtectTimestamp {
ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), details.StatementTime, progress.GetChangefeed())
}
ptr = createProtectedTimestampRecord(
ctx,
codec,
jobID,
AllTargets(details),
details.StatementTime,
progress.GetChangefeed(),
)

jr.Progress = *progress.GetChangefeed()

Expand Down
9 changes: 0 additions & 9 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,6 @@ var ProtectTimestampInterval = settings.RegisterDurationSetting(
settings.PositiveDuration,
)

// ActiveProtectedTimestampsEnabled enables always having protected timestamps
// laid down that are periodically advanced to the highwater mark.
var ActiveProtectedTimestampsEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.active_protected_timestamps.enabled",
"if set, rather than only protecting changefeed targets from garbage collection during backfills, data will always be protected up to the changefeed's frontier",
true,
)

// BatchReductionRetryEnabled enables the temporary reduction of batch sizes upon kafka message too large errors
var BatchReductionRetryEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
Expand Down
7 changes: 4 additions & 3 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,10 @@ var retiredSettings = map[string]struct{}{
"sql.ttl.default_range_concurrency": {},

// removed as of 23.1.
"sql.catalog.descs.validate_on_write.enabled": {},
"sql.distsql.max_running_flows": {},
"sql.distsql.flow_scheduler_queueing.enabled": {},
"sql.catalog.descs.validate_on_write.enabled": {},
"sql.distsql.max_running_flows": {},
"sql.distsql.flow_scheduler_queueing.enabled": {},
"changefeed.active_protected_timestamps.enabled": {},
}

// sqlDefaultSettings is the list of "grandfathered" existing sql.defaults
Expand Down

0 comments on commit d9abb80

Please sign in to comment.