Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Expire protected timestamps #97148

Merged
merged 3 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,7 @@ GO_TARGETS = [
"//pkg/jobs/jobsprotectedts:jobsprotectedts",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs/jobstest:jobstest",
"//pkg/jobs/metricspoller:metricspoller",
"//pkg/jobs:jobs",
"//pkg/jobs:jobs_test",
"//pkg/keys:keys",
Expand Down Expand Up @@ -2597,6 +2598,7 @@ GET_X_DATA_TARGETS = [
"//pkg/jobs/jobspb:get_x_data",
"//pkg/jobs/jobsprotectedts:get_x_data",
"//pkg/jobs/jobstest:get_x_data",
"//pkg/jobs/metricspoller:get_x_data",
"//pkg/keys:get_x_data",
"//pkg/keysbase:get_x_data",
"//pkg/keyvisualizer:get_x_data",
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ func alterChangefeedPlanHook(
newPayload.Details = jobspb.WrapPayloadDetails(newDetails)
newPayload.Description = jobRecord.Description
newPayload.DescriptorIDs = jobRecord.DescriptorIDs

newExpiration, err := newOptions.GetPTSExpiration()
if err != nil {
return err
}
newPayload.MaximumPTSAge = newExpiration
j, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.InternalSQLTxn())
if err != nil {
return err
Expand All @@ -204,6 +208,7 @@ func alterChangefeedPlanHook(
if newProgress != nil {
ju.UpdateProgress(newProgress)
}

return nil
}); err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,9 @@ func (cf *changeFrontier) manageProtectedTimestamps(

recordID := progress.ProtectedTimestampRecord
if recordID == uuid.Nil {
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress)
ptr := createProtectedTimestampRecord(
ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress,
)
if err := pts.Protect(ctx, ptr); err != nil {
return err
}
Expand Down
33 changes: 28 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,9 +650,29 @@ func createChangefeedJobRecord(
// TODO: Ideally those option validations would happen in validateDetails()
// earlier, like the others.
err = validateSink(ctx, p, jobID, details, opts)

if err != nil {
return nil, err
}
details.Opts = opts.AsMap()

ptsExpiration, err := opts.GetPTSExpiration()
if err != nil {
return nil, err
}

if ptsExpiration > 0 && ptsExpiration < time.Hour {
// This threshold is rather arbitrary. But we want to warn users about
// the potential impact of keeping this setting too low.
p.BufferClientNotice(ctx, pgnotice.Newf(
`the value of %s for changefeed option %s might be too low. Having a low
value for this option should not have adverse effect as long as changefeed
is running. However, should the changefeed be paused, it will need to be
resumed before expiration time. The value of this setting should reflect
how much time the changefeed may remain paused, before it is canceled.
Few hours to a few days range are appropriate values for this option.
`, ptsExpiration, changefeedbase.OptExpirePTSAfter, ptsExpiration))
}

jr := &jobs.Record{
Description: jobDescription,
Username: p.User(),
Expand All @@ -662,11 +682,12 @@ func createChangefeedJobRecord(
}
return sqlDescIDs
}(),
Details: details,
CreatedBy: changefeedStmt.CreatedByInfo,
Details: details,
CreatedBy: changefeedStmt.CreatedByInfo,
MaximumPTSAge: ptsExpiration,
}

return jr, err
return jr, nil
}

func validateSettings(ctx context.Context, p sql.PlanHookState) error {
Expand Down Expand Up @@ -1206,7 +1227,9 @@ func (b *changefeedResumer) OnPauseRequest(
return nil
}
pts := execCfg.ProtectedTimestampProvider.WithTxn(txn)
ptr := createProtectedTimestampRecord(ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp)
ptr := createProtectedTimestampRecord(
ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp,
)
return pts.Protect(ctx, ptr)
}

Expand Down
43 changes: 43 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4166,6 +4166,49 @@ func TestChangefeedDataTTL(t *testing.T) {
cdcTestWithSystem(t, testFn, feedTestForceSink("sinkless"), feedTestNoTenants)
}

// TestChangefeedCanceledWhenPTSIsOld verifies paused changefeed job which holds PTS
// record gets canceled if paused for too long.
func TestChangefeedCanceledWhenPTSIsOld(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms';`)
// Create the data table; it will only contain a
// single row with multiple versions.
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT)`)

feed, err := f.Feed("CREATE CHANGEFEED FOR TABLE foo WITH protect_data_from_gc_on_pause, gc_protect_expires_after='24h'")
require.NoError(t, err)
defer func() {
closeFeed(t, feed)
}()

jobFeed := feed.(cdctest.EnterpriseTestFeed)
require.NoError(t, jobFeed.Pause())

// While the job is paused, take opportunity to test that alter changefeed
// works when setting gc_protect_expires_after option.

// Verify we can set it to 0 -- i.e. disable.
sqlDB.Exec(t, fmt.Sprintf("ALTER CHANGEFEED %d SET gc_protect_expires_after = '0s'", jobFeed.JobID()))
// Now, set it to something very small.
sqlDB.Exec(t, fmt.Sprintf("ALTER CHANGEFEED %d SET gc_protect_expires_after = '250ms'", jobFeed.JobID()))

// Stale PTS record should trigger job cancellation.
require.NoError(t, jobFeed.WaitForStatus(func(s jobs.Status) bool {
return s == jobs.StatusCanceled
}))
}

// Ensure metrics poller loop runs fast.
st := cluster.MakeTestingClusterSettings()
jobs.PollJobsMetricsInterval.Override(context.Background(), &st.SV, 100*time.Millisecond)
cdcTest(t, testFn, feedTestEnterpriseSinks, withSettings(st))
}

// TestChangefeedSchemaTTL ensures that changefeeds fail with an error in the case
// where the feed has fallen behind the GC TTL of the table's schema.
func TestChangefeedSchemaTTL(t *testing.T) {
Expand Down
17 changes: 16 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ const (
OptSchemaChangePolicy = `schema_change_policy`
OptSplitColumnFamilies = `split_column_families`
OptProtectDataFromGCOnPause = `protect_data_from_gc_on_pause`
OptExpirePTSAfter = `gc_protect_expires_after`
OptWebhookAuthHeader = `webhook_auth_header`
OptWebhookClientTimeout = `webhook_client_timeout`
OptOnError = `on_error`
Expand Down Expand Up @@ -320,6 +321,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
OptNoInitialScan: flagOption,
OptInitialScanOnly: flagOption,
OptProtectDataFromGCOnPause: flagOption,
OptExpirePTSAfter: durationOption.thatCanBeZero(),
OptKafkaSinkConfig: jsonOption,
OptWebhookSinkConfig: jsonOption,
OptWebhookAuthHeader: stringOption,
Expand All @@ -339,7 +341,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
OptSchemaChangeEvents, OptSchemaChangePolicy,
OptProtectDataFromGCOnPause, OptOnError,
OptInitialScan, OptNoInitialScan, OptInitialScanOnly, OptUnordered,
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics)
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter)

// SQLValidOptions is options exclusive to SQL sink
var SQLValidOptions map[string]struct{} = nil
Expand Down Expand Up @@ -883,6 +885,19 @@ func (s StatementOptions) GetMinCheckpointFrequency() (*time.Duration, error) {
return s.getDurationValue(OptMinCheckpointFrequency)
}

// GetPTSExpiration returns the maximum age of the protected timestamp record.
// Changefeeds that fail to update their records in time will be canceled.
func (s StatementOptions) GetPTSExpiration() (time.Duration, error) {
exp, err := s.getDurationValue(OptExpirePTSAfter)
if err != nil {
return 0, err
}
if exp == nil {
return 0, nil
}
return *exp, nil
}

// ForceKeyInValue sets the encoding option KeyInValue to true and then validates the
// resoluting encoding options.
func (s StatementOptions) ForceKeyInValue() error {
Expand Down
13 changes: 12 additions & 1 deletion pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -395,12 +396,12 @@ func startTestFullServer(
DisableDefaultTestTenant: true,
UseDatabase: `d`,
ExternalIODir: options.externalIODir,
Settings: options.settings,
}

if options.argsFn != nil {
options.argsFn(&args)
}

resetRetry := testingUseFastRetry()
resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency)
s, db, _ := serverutils.StartServer(t, args)
Expand Down Expand Up @@ -505,6 +506,7 @@ func startTestTenant(
UseDatabase: `d`,
TestingKnobs: knobs,
ExternalIODir: options.externalIODir,
Settings: options.settings,
}

tenantServer, tenantDB := serverutils.StartTenant(t, systemServer, tenantArgs)
Expand Down Expand Up @@ -534,6 +536,7 @@ type feedTestOptions struct {
allowedSinkTypes []string
disabledSinkTypes []string
disableSyntheticTimestamps bool
settings *cluster.Settings
}

type feedTestOption func(opts *feedTestOptions)
Expand Down Expand Up @@ -583,6 +586,14 @@ func withArgsFn(fn updateArgsFn) feedTestOption {
return func(opts *feedTestOptions) { opts.argsFn = fn }
}

// withSettingsFn arranges for a feed option to set the settings for
// both system and test tenant.
func withSettings(st *cluster.Settings) feedTestOption {
return func(opts *feedTestOptions) {
opts.settings = st
}
}

// withKnobsFn is a feedTestOption that allows the caller to modify
// the testing knobs used by the test server. For multi-tenant
// testing, these knobs are applied to both the kv and sql nodes.
Expand Down
3 changes: 3 additions & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,14 @@ go_test(
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/jobs/jobstest",
"//pkg/keys",
"//pkg/keyvisualizer",
"//pkg/kv",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/security/securityassets",
Expand Down Expand Up @@ -155,6 +157,7 @@ go_test(
"@com_github_gogo_protobuf//types",
"@com_github_google_go_cmp//cmp",
"@com_github_kr_pretty//:pretty",
"@com_github_prometheus_client_model//go",
"@com_github_robfig_cron_v3//:cron",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
5 changes: 4 additions & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,10 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
// update.
r.clearLeaseForJobID(id, txn, txn.KV())
}
md.Payload.Error = errJobCanceled.Error()
if md.Payload.Error == "" {
// Set default cancellation reason.
md.Payload.Error = errJobCanceled.Error()
}
encodedErr := errors.EncodeError(ctx, errJobCanceled)
md.Payload.FinalResumeError = &encodedErr
ju.UpdatePayload(md.Payload)
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const (

// defaultPollForMetricsInterval is the default interval to poll the jobs
// table for metrics.
defaultPollForMetricsInterval = 10 * time.Second
defaultPollForMetricsInterval = 30 * time.Second
)

var (
Expand Down
18 changes: 18 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"reflect"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -114,6 +115,9 @@ type Record struct {
// CreatedBy, if set, annotates this record with the information on
// this job creator.
CreatedBy *CreatedByInfo
// MaximumPTSAge specifies the maximum age of PTS record held by a job.
// 0 means no limit.
MaximumPTSAge time.Duration
}

// AppendDescription appends description to this records Description with a
Expand Down Expand Up @@ -423,6 +427,16 @@ func (u Updater) Unpaused(ctx context.Context) error {
// that it is in state StatusCancelRequested and will move it to state
// StatusReverting.
func (u Updater) CancelRequested(ctx context.Context) error {
return u.CancelRequestedWithReason(ctx, errJobCanceled)
}

// CancelRequestedWithReason sets the status of the tracked job to cancel-requested. It
// does not directly cancel the job; like job.Paused, it expects the job to call
// job.Progressed soon, observe a "job is cancel-requested" error, and abort.
// Further the node the runs the job will actively cancel it when it notices
// that it is in state StatusCancelRequested and will move it to state
// StatusReverting.
func (u Updater) CancelRequestedWithReason(ctx context.Context, reason error) error {
return u.Update(ctx, func(txn isql.Txn, md JobMetadata, ju *JobUpdater) error {
if md.Payload.Noncancelable {
return errors.Newf("job %d: not cancelable", md.ID)
Expand All @@ -438,6 +452,10 @@ func (u Updater) CancelRequested(ctx context.Context) error {
return errors.Wrapf(decodedErr, "job %d is paused and has non-nil FinalResumeError "+
"hence cannot be canceled and should be reverted", md.ID)
}
if !errors.Is(reason, errJobCanceled) {
md.Payload.Error = reason.Error()
ju.UpdatePayload(md.Payload)
}
ju.UpdateStatus(StatusCancelRequested)
return nil
})
Expand Down
Loading