Skip to content

Commit

Permalink
changefeedccl: Expire protected timestamps
Browse files Browse the repository at this point in the history
Changefeeds utilize protected timestamp system (PTS)
to ensure that the data targeted by changefeed is not
garbage collected prematurely.  PTS record is managed
by running changefeed by periodically updating
PTS record timestamp, so that the data older than
the that timestamp may be GCed.  However, if the
changefeed stops running when it is paused (either due
to operator action, or due to `on_error=pause` option,
the PTS record remains so that the changefeed can
be resumed at a later time. However, it is also possible
that operator may not notice that the job is paused for
too long, thus causing buildup of garbage data.

Excessive buildup of GC work is not great since it
impacts overall cluster performance, and, once GC can resume,
its cost is proportional to how much GC work needs to be done.
This PR introduces a new changefeed option
`gc_protect_expires_after` to automatically expire PTS records that
are too old.  This automatic expiration is a safety mechanism
in case changefeed job gets paused by an operator or due to
an error, while holding onto PTS record due to `protect_gc_on_pause`
option.
The operator is still expected to monitor changefeed jobs,
and to restart paused changefeeds expediently.  If the changefeed
job remains paused, and the underlying PTS records expires, then
the changefeed job will be canceled to prevent build up of GC data.

Epic: CRDB-21953
Informs #84598

Release note (enterprise change): Changefeed will automatically
expire PTS records for paused jobs if changefeed is configured
with `gc_protect_expires_after` option.
  • Loading branch information
Yevgeniy Miretskiy committed Feb 23, 2023
1 parent a0d6c19 commit 16d17ff
Show file tree
Hide file tree
Showing 15 changed files with 199 additions and 24 deletions.
7 changes: 6 additions & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ func alterChangefeedPlanHook(
newPayload.Details = jobspb.WrapPayloadDetails(newDetails)
newPayload.Description = jobRecord.Description
newPayload.DescriptorIDs = jobRecord.DescriptorIDs

newExpiration, err := newOptions.GetPTSExpiration()
if err != nil {
return err
}
newPayload.MaximumPTSAge = newExpiration
j, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.InternalSQLTxn())
if err != nil {
return err
Expand All @@ -204,6 +208,7 @@ func alterChangefeedPlanHook(
if newProgress != nil {
ju.UpdateProgress(newProgress)
}

return nil
}); err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,9 @@ func (cf *changeFrontier) manageProtectedTimestamps(

recordID := progress.ProtectedTimestampRecord
if recordID == uuid.Nil {
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress)
ptr := createProtectedTimestampRecord(
ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress,
)
if err := pts.Protect(ctx, ptr); err != nil {
return err
}
Expand Down
33 changes: 28 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,9 +650,29 @@ func createChangefeedJobRecord(
// TODO: Ideally those option validations would happen in validateDetails()
// earlier, like the others.
err = validateSink(ctx, p, jobID, details, opts)

if err != nil {
return nil, err
}
details.Opts = opts.AsMap()

ptsExpiration, err := opts.GetPTSExpiration()
if err != nil {
return nil, err
}

if ptsExpiration > 0 && ptsExpiration < time.Hour {
// This threshold is rather arbitrary. But we want to warn users about
// the potential impact of keeping this setting too low.
p.BufferClientNotice(ctx, pgnotice.Newf(
`the value of %s for changefeed option %s might be too low. Having a low
value for this option should not have adverse effect as long as changefeed
is running. However, should the changefeed be paused, it will need to be
resumed before expiration time. The value of this setting should reflect
how much time the changefeed may remain paused, before it is canceled.
Few hours to a few days range are appropriate values for this option.
`, ptsExpiration, changefeedbase.OptExpirePTSAfter, ptsExpiration))
}

jr := &jobs.Record{
Description: jobDescription,
Username: p.User(),
Expand All @@ -662,11 +682,12 @@ func createChangefeedJobRecord(
}
return sqlDescIDs
}(),
Details: details,
CreatedBy: changefeedStmt.CreatedByInfo,
Details: details,
CreatedBy: changefeedStmt.CreatedByInfo,
MaximumPTSAge: ptsExpiration,
}

return jr, err
return jr, nil
}

func validateSettings(ctx context.Context, p sql.PlanHookState) error {
Expand Down Expand Up @@ -1206,7 +1227,9 @@ func (b *changefeedResumer) OnPauseRequest(
return nil
}
pts := execCfg.ProtectedTimestampProvider.WithTxn(txn)
ptr := createProtectedTimestampRecord(ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp)
ptr := createProtectedTimestampRecord(
ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp,
)
return pts.Protect(ctx, ptr)
}

Expand Down
43 changes: 43 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4166,6 +4166,49 @@ func TestChangefeedDataTTL(t *testing.T) {
cdcTestWithSystem(t, testFn, feedTestForceSink("sinkless"), feedTestNoTenants)
}

// TestChangefeedCanceledWhenPTSIsOld verifies paused changefeed job which holds PTS
// record gets canceled if paused for too long.
func TestChangefeedCanceledWhenPTSIsOld(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms';`)
// Create the data table; it will only contain a
// single row with multiple versions.
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT)`)

feed, err := f.Feed("CREATE CHANGEFEED FOR TABLE foo WITH protect_data_from_gc_on_pause, gc_protect_expires_after='24h'")
require.NoError(t, err)
defer func() {
closeFeed(t, feed)
}()

jobFeed := feed.(cdctest.EnterpriseTestFeed)
require.NoError(t, jobFeed.Pause())

// While the job is paused, take opportunity to test that alter changefeed
// works when setting gc_protect_expires_after option.

// Verify we can set it to 0 -- i.e. disable.
sqlDB.Exec(t, fmt.Sprintf("ALTER CHANGEFEED %d SET gc_protect_expires_after = '0s'", jobFeed.JobID()))
// Now, set it to something very small.
sqlDB.Exec(t, fmt.Sprintf("ALTER CHANGEFEED %d SET gc_protect_expires_after = '250ms'", jobFeed.JobID()))

// Stale PTS record should trigger job cancellation.
require.NoError(t, jobFeed.WaitForStatus(func(s jobs.Status) bool {
return s == jobs.StatusCanceled
}))
}

// Ensure metrics poller loop runs fast.
st := cluster.MakeTestingClusterSettings()
jobs.PollJobsMetricsInterval.Override(context.Background(), &st.SV, 100*time.Millisecond)
cdcTest(t, testFn, feedTestEnterpriseSinks, withSettings(st))
}

// TestChangefeedSchemaTTL ensures that changefeeds fail with an error in the case
// where the feed has fallen behind the GC TTL of the table's schema.
func TestChangefeedSchemaTTL(t *testing.T) {
Expand Down
17 changes: 16 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ const (
OptSchemaChangePolicy = `schema_change_policy`
OptSplitColumnFamilies = `split_column_families`
OptProtectDataFromGCOnPause = `protect_data_from_gc_on_pause`
OptExpirePTSAfter = `gc_protect_expires_after`
OptWebhookAuthHeader = `webhook_auth_header`
OptWebhookClientTimeout = `webhook_client_timeout`
OptOnError = `on_error`
Expand Down Expand Up @@ -320,6 +321,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
OptNoInitialScan: flagOption,
OptInitialScanOnly: flagOption,
OptProtectDataFromGCOnPause: flagOption,
OptExpirePTSAfter: durationOption.thatCanBeZero(),
OptKafkaSinkConfig: jsonOption,
OptWebhookSinkConfig: jsonOption,
OptWebhookAuthHeader: stringOption,
Expand All @@ -339,7 +341,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
OptSchemaChangeEvents, OptSchemaChangePolicy,
OptProtectDataFromGCOnPause, OptOnError,
OptInitialScan, OptNoInitialScan, OptInitialScanOnly, OptUnordered,
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics)
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter)

// SQLValidOptions is options exclusive to SQL sink
var SQLValidOptions map[string]struct{} = nil
Expand Down Expand Up @@ -883,6 +885,19 @@ func (s StatementOptions) GetMinCheckpointFrequency() (*time.Duration, error) {
return s.getDurationValue(OptMinCheckpointFrequency)
}

// GetPTSExpiration returns the maximum age of the protected timestamp record.
// Changefeeds that fail to update their records in time will be canceled.
func (s StatementOptions) GetPTSExpiration() (time.Duration, error) {
exp, err := s.getDurationValue(OptExpirePTSAfter)
if err != nil {
return 0, err
}
if exp == nil {
return 0, nil
}
return *exp, nil
}

// ForceKeyInValue sets the encoding option KeyInValue to true and then validates the
// resoluting encoding options.
func (s StatementOptions) ForceKeyInValue() error {
Expand Down
13 changes: 12 additions & 1 deletion pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -395,12 +396,12 @@ func startTestFullServer(
DisableDefaultTestTenant: true,
UseDatabase: `d`,
ExternalIODir: options.externalIODir,
Settings: options.settings,
}

if options.argsFn != nil {
options.argsFn(&args)
}

resetRetry := testingUseFastRetry()
resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency)
s, db, _ := serverutils.StartServer(t, args)
Expand Down Expand Up @@ -505,6 +506,7 @@ func startTestTenant(
UseDatabase: `d`,
TestingKnobs: knobs,
ExternalIODir: options.externalIODir,
Settings: options.settings,
}

tenantServer, tenantDB := serverutils.StartTenant(t, systemServer, tenantArgs)
Expand Down Expand Up @@ -534,6 +536,7 @@ type feedTestOptions struct {
allowedSinkTypes []string
disabledSinkTypes []string
disableSyntheticTimestamps bool
settings *cluster.Settings
}

type feedTestOption func(opts *feedTestOptions)
Expand Down Expand Up @@ -583,6 +586,14 @@ func withArgsFn(fn updateArgsFn) feedTestOption {
return func(opts *feedTestOptions) { opts.argsFn = fn }
}

// withSettingsFn arranges for a feed option to set the settings for
// both system and test tenant.
func withSettings(st *cluster.Settings) feedTestOption {
return func(opts *feedTestOptions) {
opts.settings = st
}
}

// withKnobsFn is a feedTestOption that allows the caller to modify
// the testing knobs used by the test server. For multi-tenant
// testing, these knobs are applied to both the kv and sql nodes.
Expand Down
5 changes: 4 additions & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,10 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
// update.
r.clearLeaseForJobID(id, txn, txn.KV())
}
md.Payload.Error = errJobCanceled.Error()
if md.Payload.Error == "" {
// Set default cancellation reason.
md.Payload.Error = errJobCanceled.Error()
}
encodedErr := errors.EncodeError(ctx, errJobCanceled)
md.Payload.FinalResumeError = &encodedErr
ju.UpdatePayload(md.Payload)
Expand Down
18 changes: 18 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"reflect"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -114,6 +115,9 @@ type Record struct {
// CreatedBy, if set, annotates this record with the information on
// this job creator.
CreatedBy *CreatedByInfo
// MaximumPTSAge specifies the maximum age of PTS record held by a job.
// 0 means no limit.
MaximumPTSAge time.Duration
}

// AppendDescription appends description to this records Description with a
Expand Down Expand Up @@ -423,6 +427,16 @@ func (u Updater) Unpaused(ctx context.Context) error {
// that it is in state StatusCancelRequested and will move it to state
// StatusReverting.
func (u Updater) CancelRequested(ctx context.Context) error {
return u.CancelRequestedWithReason(ctx, errJobCanceled)
}

// CancelRequestedWithReason sets the status of the tracked job to cancel-requested. It
// does not directly cancel the job; like job.Paused, it expects the job to call
// job.Progressed soon, observe a "job is cancel-requested" error, and abort.
// Further the node the runs the job will actively cancel it when it notices
// that it is in state StatusCancelRequested and will move it to state
// StatusReverting.
func (u Updater) CancelRequestedWithReason(ctx context.Context, reason error) error {
return u.Update(ctx, func(txn isql.Txn, md JobMetadata, ju *JobUpdater) error {
if md.Payload.Noncancelable {
return errors.Newf("job %d: not cancelable", md.ID)
Expand All @@ -438,6 +452,10 @@ func (u Updater) CancelRequested(ctx context.Context) error {
return errors.Wrapf(decodedErr, "job %d is paused and has non-nil FinalResumeError "+
"hence cannot be canceled and should be reverted", md.ID)
}
if !errors.Is(reason, errJobCanceled) {
md.Payload.Error = reason.Error()
ju.UpdatePayload(md.Payload)
}
ju.UpdateStatus(StatusCancelRequested)
return nil
})
Expand Down
6 changes: 6 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,12 @@ message Payload {
// cluster version, in case a job resuming later needs to use this information
// to migrate or update the job.
roachpb.Version creation_cluster_version = 36 [(gogoproto.nullable) = false];

// If a job lays protected timestamp records, this optional field
// specifies how old such record could get before this job is canceled.
int64 maximum_pts_age = 40 [(gogoproto.casttype) = "time.Duration", (gogoproto.customname) = "MaximumPTSAge"];

// NEXT ID: 41
}

message Progress {
Expand Down
16 changes: 14 additions & 2 deletions pkg/jobs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type JobTypeMetrics struct {
FailOrCancelFailed *metric.Counter

NumJobsWithPTS *metric.Gauge
ExpiredPTS *metric.Counter
ProtectedAge *metric.Gauge
}

Expand Down Expand Up @@ -181,8 +182,8 @@ func makeMetaProtectedCount(typeStr string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("jobs.%s.protected_record_count", typeStr),
Help: fmt.Sprintf("Number of protected timestamp records held by %s jobs", typeStr),
Measurement: "bytes",
Unit: metric.Unit_BYTES,
Measurement: "records",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_GAUGE,
}
}
Expand All @@ -197,6 +198,16 @@ func makeMetaProtectedAge(typeStr string) metric.Metadata {
}
}

func makeMetaExpiredPTS(typeStr string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("jobs.%s.expired_pts_records", typeStr),
Help: fmt.Sprintf("Number of expired protected timestamp records owned by %s jobs", typeStr),
Measurement: "records",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_COUNTER,
}
}

var (
metaAdoptIterations = metric.Metadata{
Name: "jobs.adopt_iterations",
Expand Down Expand Up @@ -268,6 +279,7 @@ func (m *Metrics) init(histogramWindowInterval time.Duration) {
FailOrCancelRetryError: metric.NewCounter(makeMetaFailOrCancelRetryError(typeStr)),
FailOrCancelFailed: metric.NewCounter(makeMetaFailOrCancelFailed(typeStr)),
NumJobsWithPTS: metric.NewGauge(makeMetaProtectedCount(typeStr)),
ExpiredPTS: metric.NewCounter(makeMetaExpiredPTS(typeStr)),
ProtectedAge: metric.NewGauge(makeMetaProtectedAge(typeStr)),
}
if opts, ok := options[jt]; ok && opts.metrics != nil {
Expand Down
Loading

0 comments on commit 16d17ff

Please sign in to comment.