From 75dc9fec4beb142142a195614ed5d75a7aa1dfdb Mon Sep 17 00:00:00 2001 From: Sherman Grewal Date: Thu, 24 Feb 2022 14:51:58 -0500 Subject: [PATCH 1/2] changefeedccl: allow users to alter changefeed options with the ALTER CHANGEFEED statement Currently, with the ALTER CHANGEFEED statement users can only add or drop targets from an existing changefeed. In this PR, we would like to extend this functionality so that an user can edit and unset the options of an existing changefeed as well. The syntax of this addition is the following: ALTER CHANGEFEED SET UNSET Note that the must follow the same syntax that is used when creating a changefeed with options. In particular, if you would like to set an option that requires a value, you must write SET opt = 'value' On the other hand, if you would like to set an option that requires no value, you must write SET opt Furthermore, this PR allows users to unset options. This can be achieved by writing UNSET Where is a list of options that you would like to unset. For example, if we would like to unset the diff and resolved options for changefeed 123, we would achieve this by writing ALTER CHANGEFEED 123 UNSET diff, resolved Release note (enterprise change): Added support to the ALTER CHANGEFEED statement so that users can edit and unset the options of an existing changefeed. The syntax of this addition is the following: ALTER CHANGEFEED SET UNSET --- docs/generated/sql/bnf/stmt_block.bnf | 3 + pkg/ccl/changefeedccl/BUILD.bazel | 1 - .../changefeedccl/alter_changefeed_stmt.go | 246 +++++++---- .../changefeedccl/alter_changefeed_test.go | 149 +++++++ pkg/ccl/changefeedccl/changefeed_stmt.go | 388 ++++++++++-------- .../changefeedccl/changefeedbase/options.go | 4 + .../show_changefeed_jobs_test.go | 88 ++-- pkg/sql/parser/sql.y | 17 +- pkg/sql/parser/testdata/alter_changefeed | 73 ++++ pkg/sql/sem/tree/alter_changefeed.go | 30 +- 10 files changed, 699 insertions(+), 300 deletions(-) diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index ba1b215c4f38..c38491dddcba 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -1258,6 +1258,7 @@ unreserved_keyword ::= | 'UNCOMMITTED' | 'UNKNOWN' | 'UNLOGGED' + | 'UNSET' | 'UNSPLIT' | 'UNTIL' | 'UPDATE' @@ -2473,6 +2474,8 @@ alter_default_privileges_target_object ::= alter_changefeed_cmd ::= 'ADD' changefeed_targets | 'DROP' changefeed_targets + | 'SET' kv_option_list + | 'UNSET' name_list alter_backup_cmd ::= 'ADD' backup_kms diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index d57d623636d2..5072615c1412 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -66,7 +66,6 @@ go_library( "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/flowinfra", - "//pkg/sql/parser", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/pgwire/pgnotice", diff --git a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go index 7baba9cd3605..55c540da9a1f 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go @@ -11,15 +11,17 @@ package changefeedccl import ( "context" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupresolver" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/parser" - "github.com/cockroachdb/cockroach/pkg/sql/privilege" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -30,11 +32,6 @@ func init() { sql.AddPlanHook("alter changefeed", alterChangefeedPlanHook) } -type alterChangefeedOpts struct { - AddTargets []tree.TargetList - DropTargets []tree.TargetList -} - // alterChangefeedPlanHook implements sql.PlanHookFn. func alterChangefeedPlanHook( ctx context.Context, stmt tree.Statement, p sql.PlanHookState, @@ -67,7 +64,7 @@ func alterChangefeedPlanHook( return err } - details, ok := job.Details().(jobspb.ChangefeedDetails) + prevDetails, ok := job.Details().(jobspb.ChangefeedDetails) if !ok { return errors.Errorf(`job %d is not changefeed job`, jobID) } @@ -76,116 +73,188 @@ func alterChangefeedPlanHook( return errors.Errorf(`job %d is not paused`, jobID) } - var opts alterChangefeedOpts - for _, cmd := range alterChangefeedStmt.Cmds { - switch v := cmd.(type) { - case *tree.AlterChangefeedAddTarget: - opts.AddTargets = append(opts.AddTargets, v.Targets) - case *tree.AlterChangefeedDropTarget: - opts.DropTargets = append(opts.DropTargets, v.Targets) + // this CREATE CHANGEFEED node will be used to update the existing changefeed + newChangefeedStmt := &tree.CreateChangefeed{ + SinkURI: tree.NewDString(prevDetails.SinkURI), + } + + optionsMap := make(map[string]tree.KVOption, len(prevDetails.Opts)) + + // pull the options that are set for the existing changefeed + for key, value := range prevDetails.Opts { + // There are some options (e.g. topics) that we set during the creation of + // a changefeed, but we do not allow these options to be set by the user. + // Hence, we can not include these options in our new CREATE CHANGEFEED + // statement. + if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok { + continue + } + existingOpt := tree.KVOption{Key: tree.Name(key)} + if len(value) > 0 { + existingOpt.Value = tree.NewDString(value) } + optionsMap[key] = existingOpt } - var initialHighWater hlc.Timestamp statementTime := hlc.Timestamp{ WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(), } - if opts.AddTargets != nil { - var targetDescs []catalog.Descriptor - - for _, targetList := range opts.AddTargets { - descs, err := getTableDescriptors(ctx, p, &targetList, statementTime, initialHighWater) - if err != nil { - return err - } - targetDescs = append(targetDescs, descs...) - } + allDescs, err := backupresolver.LoadAllDescs(ctx, p.ExecCfg(), statementTime) + if err != nil { + return err + } + descResolver, err := backupresolver.NewDescriptorResolver(allDescs) + if err != nil { + return err + } - newTargets, newTables, err := getTargetsAndTables(ctx, p, targetDescs, details.Opts) - if err != nil { - return err - } - // add old targets - for id, table := range details.Tables { - newTables[id] = table - } - details.Tables = newTables - details.TargetSpecifications = append(details.TargetSpecifications, newTargets...) + newDescs := make(map[descpb.ID]*tree.UnresolvedName) + for _, target := range AllTargets(prevDetails) { + desc := descResolver.DescByID[target.TableID] + newDescs[target.TableID] = tree.NewUnresolvedName(desc.GetName()) } - if opts.DropTargets != nil { - var targetDescs []catalog.Descriptor + for _, cmd := range alterChangefeedStmt.Cmds { + switch v := cmd.(type) { + case *tree.AlterChangefeedAddTarget: + for _, targetPattern := range v.Targets.Tables { + targetName, err := getTargetName(targetPattern) + if err != nil { + return err + } + found, _, desc, err := resolver.ResolveExisting( + ctx, + targetName.ToUnresolvedObjectName(), + descResolver, + tree.ObjectLookupFlags{}, + p.CurrentDatabase(), + p.CurrentSearchPath(), + ) + if err != nil { + return err + } + if !found { + return pgerror.Newf(pgcode.InvalidParameterValue, `target %q does not exist`, tree.ErrString(targetPattern)) + } + newDescs[desc.GetID()] = tree.NewUnresolvedName(desc.GetName()) + } + case *tree.AlterChangefeedDropTarget: + for _, targetPattern := range v.Targets.Tables { + targetName, err := getTargetName(targetPattern) + if err != nil { + return err + } + found, _, desc, err := resolver.ResolveExisting( + ctx, + targetName.ToUnresolvedObjectName(), + descResolver, + tree.ObjectLookupFlags{}, + p.CurrentDatabase(), + p.CurrentSearchPath(), + ) + if err != nil { + return err + } + if !found { + return pgerror.Newf(pgcode.InvalidParameterValue, `target %q does not exist`, tree.ErrString(targetPattern)) + } + delete(newDescs, desc.GetID()) + } + case *tree.AlterChangefeedSetOptions: + optsFn, err := p.TypeAsStringOpts(ctx, v.Options, changefeedbase.ChangefeedOptionExpectValues) + if err != nil { + return err + } - for _, targetList := range opts.DropTargets { - descs, err := getTableDescriptors(ctx, p, &targetList, statementTime, initialHighWater) + opts, err := optsFn() if err != nil { return err } - targetDescs = append(targetDescs, descs...) - } - for _, desc := range targetDescs { - if table, isTable := desc.(catalog.TableDescriptor); isTable { - if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil { - return err + for key, value := range opts { + if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok { + return pgerror.Newf(pgcode.InvalidParameterValue, `invalid option %q`, key) + } + if _, ok := changefeedbase.AlterChangefeedUnsupportedOptions[key]; ok { + return pgerror.Newf(pgcode.InvalidParameterValue, `cannot alter option %q`, key) } - delete(details.Tables, table.GetID()) + opt := tree.KVOption{Key: tree.Name(key)} + if len(value) > 0 { + opt.Value = tree.NewDString(value) + } + optionsMap[key] = opt } - } - - newTargetSpecifications := make([]jobspb.ChangefeedTargetSpecification, len(details.TargetSpecifications)-len(opts.DropTargets)) - for _, ts := range details.TargetSpecifications { - if _, stillThere := details.Tables[ts.TableID]; stillThere { - newTargetSpecifications = append(newTargetSpecifications, ts) + case *tree.AlterChangefeedUnsetOptions: + optKeys := v.Options.ToStrings() + for _, key := range optKeys { + if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok { + return pgerror.Newf(pgcode.InvalidParameterValue, `invalid option %q`, key) + } + if _, ok := changefeedbase.AlterChangefeedUnsupportedOptions[key]; ok { + return pgerror.Newf(pgcode.InvalidParameterValue, `cannot alter option %q`, key) + } + delete(optionsMap, key) } } - details.TargetSpecifications = newTargetSpecifications + } + if len(newDescs) == 0 { + return pgerror.Newf(pgcode.InvalidParameterValue, "cannot drop all targets for changefeed job %d", jobID) } - if len(details.Tables) == 0 { - return errors.Errorf("cannot drop all targets for changefeed job %d", jobID) + for _, targetName := range newDescs { + newChangefeedStmt.Targets.Tables = append(newChangefeedStmt.Targets.Tables, targetName) } - if err := validateSink(ctx, p, jobID, details, details.Opts); err != nil { - return err + for _, val := range optionsMap { + newChangefeedStmt.Options = append(newChangefeedStmt.Options, val) } - oldStmt, err := parser.ParseOne(job.Payload().Description) + sinkURIFn, err := p.TypeAsString(ctx, newChangefeedStmt.SinkURI, `ALTER CHANGEFEED`) if err != nil { return err } - oldChangefeedStmt, ok := oldStmt.AST.(*tree.CreateChangefeed) - if !ok { - return errors.Errorf(`could not parse create changefeed statement for job %d`, jobID) - } - var targets tree.TargetList - for _, target := range details.Tables { - targetName := tree.MakeTableNameFromPrefix(tree.ObjectNamePrefix{}, tree.Name(target.StatementTimeName)) - targets.Tables = append(targets.Tables, &targetName) + optsFn, err := p.TypeAsStringOpts(ctx, newChangefeedStmt.Options, changefeedbase.ChangefeedOptionExpectValues) + if err != nil { + return err } - oldChangefeedStmt.Targets = targets - jobDescription := tree.AsString(oldChangefeedStmt) - - newPayload := job.Payload() - newPayload.Description = jobDescription - newPayload.Details = jobspb.WrapPayloadDetails(details) + sinkURI, err := sinkURIFn() + if err != nil { + return err + } - finalDescs, err := getTableDescriptors(ctx, p, &targets, statementTime, initialHighWater) + opts, err := optsFn() if err != nil { return err } - newPayload.DescriptorIDs = func() (sqlDescIDs []descpb.ID) { - for _, desc := range finalDescs { - sqlDescIDs = append(sqlDescIDs, desc.GetID()) - } - return sqlDescIDs - }() + jobRecord, err := createChangefeedJobRecord( + ctx, + p, + newChangefeedStmt, + sinkURI, + opts, + jobID, + ``, + ) + if err != nil { + return errors.Wrap(err, `failed to alter changefeed`) + } + + newDetails := jobRecord.Details.(jobspb.ChangefeedDetails) + + // We need to persist the statement time that was generated during the + // creation of the changefeed + newDetails.StatementTime = prevDetails.StatementTime + + newPayload := job.Payload() + newPayload.Details = jobspb.WrapPayloadDetails(newDetails) + newPayload.Description = jobRecord.Description + newPayload.DescriptorIDs = jobRecord.DescriptorIDs err = p.ExecCfg().JobRegistry.UpdateJobWithTxn(ctx, jobID, p.ExtendedEvalContext().Txn, lockForUpdate, func( txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, @@ -203,7 +272,7 @@ func alterChangefeedPlanHook( return ctx.Err() case resultsCh <- tree.Datums{ tree.NewDInt(tree.DInt(jobID)), - tree.NewDString(jobDescription), + tree.NewDString(jobRecord.Description), }: return nil } @@ -211,3 +280,16 @@ func alterChangefeedPlanHook( return fn, header, nil, false, nil } + +func getTargetName(targetPattern tree.TablePattern) (*tree.TableName, error) { + pattern, err := targetPattern.NormalizeTablePattern() + if err != nil { + return nil, err + } + targetName, ok := pattern.(*tree.TableName) + if !ok { + return nil, errors.Errorf(`CHANGEFEED cannot target %q`, tree.AsString(targetPattern)) + } + + return targetName, nil +} diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index fc1a7a56fa85..f5ea7b580928 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -9,12 +9,16 @@ package changefeedccl import ( + "context" gosql "database/sql" "fmt" "testing" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -93,6 +97,68 @@ func TestAlterChangefeedDropTarget(t *testing.T) { t.Run(`kafka`, kafkaTest(testFn)) } +func TestAlterChangefeedSetDiffOption(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`) + defer closeFeed(t, testFeed) + + feed, ok := testFeed.(cdctest.EnterpriseTestFeed) + require.True(t, ok) + + sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID()) + waitForJobStatus(sqlDB, t, feed.JobID(), `paused`) + + sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET diff`, feed.JobID())) + + sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, feed.JobID())) + waitForJobStatus(sqlDB, t, feed.JobID(), `running`) + + sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) + assertPayloads(t, testFeed, []string{ + `foo: [0]->{"after": {"a": 0, "b": "initial"}, "before": null}`, + }) + } + + t.Run(`kafka`, kafkaTest(testFn)) +} + +func TestAlterChangefeedUnsetDiffOption(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff`) + defer closeFeed(t, testFeed) + + feed, ok := testFeed.(cdctest.EnterpriseTestFeed) + require.True(t, ok) + + sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID()) + waitForJobStatus(sqlDB, t, feed.JobID(), `paused`) + + sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d UNSET diff`, feed.JobID())) + + sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, feed.JobID())) + waitForJobStatus(sqlDB, t, feed.JobID(), `running`) + + sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) + assertPayloads(t, testFeed, []string{ + `foo: [0]->{"after": {"a": 0, "b": "initial"}}`, + }) + } + + t.Run(`kafka`, kafkaTest(testFn)) +} + func TestAlterChangefeedErrors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -124,6 +190,34 @@ func TestAlterChangefeedErrors(t *testing.T) { fmt.Sprintf(`job %d is not paused`, feed.JobID()), fmt.Sprintf(`ALTER CHANGEFEED %d ADD bar`, feed.JobID()), ) + + sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID()) + waitForJobStatus(sqlDB, t, feed.JobID(), `paused`) + + sqlDB.ExpectErr(t, + `pq: target "baz" does not exist`, + fmt.Sprintf(`ALTER CHANGEFEED %d ADD baz`, feed.JobID()), + ) + sqlDB.ExpectErr(t, + `pq: target "baz" does not exist`, + fmt.Sprintf(`ALTER CHANGEFEED %d DROP baz`, feed.JobID()), + ) + sqlDB.ExpectErr(t, + `pq: invalid option "qux"`, + fmt.Sprintf(`ALTER CHANGEFEED %d SET qux`, feed.JobID()), + ) + sqlDB.ExpectErr(t, + `pq: cannot alter option "initial_scan"`, + fmt.Sprintf(`ALTER CHANGEFEED %d SET initial_scan`, feed.JobID()), + ) + sqlDB.ExpectErr(t, + `pq: invalid option "qux"`, + fmt.Sprintf(`ALTER CHANGEFEED %d UNSET qux`, feed.JobID()), + ) + sqlDB.ExpectErr(t, + `pq: cannot alter option "initial_scan"`, + fmt.Sprintf(`ALTER CHANGEFEED %d UNSET initial_scan`, feed.JobID()), + ) } t.Run(`kafka`, kafkaTest(testFn)) @@ -155,3 +249,58 @@ func TestAlterChangefeedDropAllTargetsError(t *testing.T) { t.Run(`kafka`, kafkaTest(testFn)) } + +// The purpose of this test is to ensure that the ALTER CHANGEFEED statement +// does not accidentally redact secret keys in the changefeed details +func TestAlterChangefeedPersistSinkURI(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + params, _ := tests.CreateTestServerParams() + s, rawSQLDB, _ := serverutils.StartServer(t, params) + sqlDB := sqlutils.MakeSQLRunner(rawSQLDB) + registry := s.JobRegistry().(*jobs.Registry) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + query := `CREATE TABLE foo (a string)` + sqlDB.Exec(t, query) + + query = `CREATE TABLE bar (b string)` + sqlDB.Exec(t, query) + + query = `SET CLUSTER SETTING kv.rangefeed.enabled = true` + sqlDB.Exec(t, query) + + var changefeedID jobspb.JobID + + doneCh := make(chan struct{}) + defer close(doneCh) + registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ + jobspb.TypeChangefeed: func(raw jobs.Resumer) jobs.Resumer { + r := fakeResumer{ + done: doneCh, + } + return &r + }, + } + + query = `CREATE CHANGEFEED FOR TABLE foo, bar INTO + 's3://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456'` + sqlDB.QueryRow(t, query).Scan(&changefeedID) + + sqlDB.Exec(t, `PAUSE JOB $1`, changefeedID) + waitForJobStatus(sqlDB, t, changefeedID, `paused`) + + sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET diff`, changefeedID)) + + sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, changefeedID)) + waitForJobStatus(sqlDB, t, changefeedID, `running`) + + job, err := registry.LoadJob(ctx, changefeedID) + require.NoError(t, err) + details, ok := job.Details().(jobspb.ChangefeedDetails) + require.True(t, ok) + + require.Equal(t, details.SinkURI, `s3://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456`) +} diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 745d350deaa5..23012b33e9c1 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -129,6 +129,7 @@ func changefeedPlanHook( if err != nil { return err } + if !unspecifiedSink && sinkURI == `` { // Error if someone specifies an INTO with the empty string. We've // already sent the wrong result column headers. @@ -140,59 +141,20 @@ func changefeedPlanHook( return err } - for key, value := range opts { - // if option is case insensitive then convert its value to lower case - if _, ok := changefeedbase.CaseInsensitiveOpts[key]; ok { - opts[key] = strings.ToLower(value) - } - } - - if newFormat, ok := changefeedbase.NoLongerExperimental[opts[changefeedbase.OptFormat]]; ok { - p.BufferClientNotice(ctx, pgnotice.Newf( - `%[1]s is no longer experimental, use %[2]s=%[1]s`, - newFormat, changefeedbase.OptFormat), - ) - // Still serialize the experimental_ form for backwards compatibility - } - - jobDescription, err := changefeedJobDescription(p, changefeedStmt, sinkURI, opts) - if err != nil { - return err - } - - statementTime := hlc.Timestamp{ - WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(), - } - var initialHighWater hlc.Timestamp - if cursor, ok := opts[changefeedbase.OptCursor]; ok { - asOfClause := tree.AsOfClause{Expr: tree.NewStrVal(cursor)} - var err error - asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause) - if err != nil { - return err - } - initialHighWater = asOf.Timestamp - statementTime = initialHighWater - } - - // This grabs table descriptors once to get their ids. - targetDescs, err := getTableDescriptors(ctx, p, &changefeedStmt.Targets, statementTime, initialHighWater) - if err != nil { - return err - } - - targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, opts) + jr, err := createChangefeedJobRecord( + ctx, + p, + changefeedStmt, + sinkURI, + opts, + jobspb.InvalidJobID, + `changefeed.create`, + ) if err != nil { return err } - details := jobspb.ChangefeedDetails{ - Tables: tables, - Opts: opts, - SinkURI: sinkURI, - StatementTime: statementTime, - TargetSpecifications: targets, - } + details := jr.Details.(jobspb.ChangefeedDetails) progress := jobspb.Progress{ Progress: &jobspb.Progress_HighWater{}, Details: &jobspb.Progress_Changefeed{ @@ -200,97 +162,6 @@ func changefeedPlanHook( }, } - // TODO(dan): In an attempt to present the most helpful error message to the - // user, the ordering requirements between all these usage validations have - // become extremely fragile and non-obvious. - // - // - `validateDetails` has to run first to fill in defaults for `envelope` - // and `format` if the user didn't specify them. - // - Then `getEncoder` is run to return any configuration errors. - // - Then the changefeed is opted in to `OptKeyInValue` for any cloud - // storage sink or webhook sink. Kafka etc have a key and value field in - // each message but cloud storage sinks and webhook sinks don't have - // anywhere to put the key. So if the key is not in the value, then for - // DELETEs there is no way to recover which key was deleted. We could make - // the user explicitly pass this option for every cloud storage sink/ - // webhook sink and error if they don't, but that seems user-hostile for - // insufficient reason. We can't do this any earlier, because we might - // return errors about `key_in_value` being incompatible which is - // confusing when the user didn't type that option. - // This is the same for the topic and webhook sink, which uses - // `topic_in_value` to embed the topic in the value by default, since it - // has no other avenue to express the topic. - // - Finally, we create a "canary" sink to test sink configuration and - // connectivity. This has to go last because it is strange to return sink - // connectivity errors before we've finished validating all the other - // options. We should probably split sink configuration checking and sink - // connectivity checking into separate methods. - // - // The only upside in all this nonsense is the tests are decent. I've tuned - // this particular order simply by rearranging stuff until the changefeedccl - // tests all pass. - parsedSink, err := url.Parse(sinkURI) - if err != nil { - return err - } - if newScheme, ok := changefeedbase.NoLongerExperimental[parsedSink.Scheme]; ok { - parsedSink.Scheme = newScheme // This gets munged anyway when building the sink - p.BufferClientNotice(ctx, pgnotice.Newf(`%[1]s is no longer experimental, use %[1]s://`, - newScheme), - ) - } - - if details, err = validateDetails(details); err != nil { - return err - } - - if _, err := getEncoder(details.Opts, AllTargets(details)); err != nil { - return err - } - - if isCloudStorageSink(parsedSink) || isWebhookSink(parsedSink) { - details.Opts[changefeedbase.OptKeyInValue] = `` - } - if isWebhookSink(parsedSink) { - details.Opts[changefeedbase.OptTopicInValue] = `` - } - - if !unspecifiedSink && p.ExecCfg().ExternalIODirConfig.DisableOutbound { - return errors.Errorf("Outbound IO is disabled by configuration, cannot create changefeed into %s", parsedSink.Scheme) - } - - if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; shouldProtect && !p.ExecCfg().Codec.ForSystemTenant() { - return errorutil.UnsupportedWithMultiTenancy(67271) - } - - // Feature telemetry - telemetrySink := parsedSink.Scheme - if telemetrySink == `` { - telemetrySink = `sinkless` - } - telemetry.Count(`changefeed.create.sink.` + telemetrySink) - telemetry.Count(`changefeed.create.format.` + details.Opts[changefeedbase.OptFormat]) - telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(tables))) - - if scope, ok := opts[changefeedbase.OptMetricsScope]; ok { - if err := utilccl.CheckEnterpriseEnabled( - p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED", - ); err != nil { - return errors.Wrapf(err, - "use of %q option requires enterprise license.", changefeedbase.OptMetricsScope) - } - - if scope == defaultSLIScope { - return pgerror.Newf(pgcode.InvalidParameterValue, - "%[1]q=%[2]q is the default metrics scope which keeps track of statistics "+ - "across all changefeeds without explicit label. "+ - "If this is an intended behavior, please re-run the statement "+ - "without specifying %[1]q parameter. "+ - "Otherwise, please re-run with a different %[1]q value.", - changefeedbase.OptMetricsScope, defaultSLIScope) - } - } - if details.SinkURI == `` { telemetry.Count(`changefeed.create.core`) err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh) @@ -300,23 +171,6 @@ func changefeedPlanHook( return changefeedbase.MaybeStripRetryableErrorMarker(err) } - if err := utilccl.CheckEnterpriseEnabled( - p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED", - ); err != nil { - return err - } - - telemetry.Count(`changefeed.create.enterprise`) - - // In the case where a user is executing a CREATE CHANGEFEED and is still - // waiting for the statement to return, we take the opportunity to ensure - // that the user has not made any obvious errors when specifying the sink in - // the CREATE CHANGEFEED statement. To do this, we create a "canary" sink, - // which will be immediately closed, only to check for errors. - if err := validateSink(ctx, p, jobspb.InvalidJobID, details, opts); err != nil { - return err - } - // The below block creates the job and protects the data required for the // changefeed to function from being garbage collected even if the // changefeed lags behind the gcttl. We protect the data here rather than in @@ -333,25 +187,14 @@ func changefeedPlanHook( var protectedTimestampID uuid.UUID codec := p.ExecCfg().Codec if shouldProtectTimestamps(codec) { - ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), statementTime, progress.GetChangefeed()) + ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), details.StatementTime, progress.GetChangefeed()) protectedTimestampID = ptr.ID.GetUUID() } - jr := jobs.Record{ - Description: jobDescription, - Username: p.User(), - DescriptorIDs: func() (sqlDescIDs []descpb.ID) { - for _, desc := range targetDescs { - sqlDescIDs = append(sqlDescIDs, desc.GetID()) - } - return sqlDescIDs - }(), - Details: details, - Progress: *progress.GetChangefeed(), - } + jr.Progress = *progress.GetChangefeed() if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr); err != nil { + if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, *jr); err != nil { return err } if ptr != nil { @@ -399,6 +242,209 @@ func changefeedPlanHook( return fn, header, nil, avoidBuffering, nil } +func createChangefeedJobRecord( + ctx context.Context, + p sql.PlanHookState, + changefeedStmt *tree.CreateChangefeed, + sinkURI string, + opts map[string]string, + jobID jobspb.JobID, + telemetryPath string, +) (*jobs.Record, error) { + unspecifiedSink := changefeedStmt.SinkURI == nil + + for key, value := range opts { + // if option is case insensitive then convert its value to lower case + if _, ok := changefeedbase.CaseInsensitiveOpts[key]; ok { + opts[key] = strings.ToLower(value) + } + } + + if newFormat, ok := changefeedbase.NoLongerExperimental[opts[changefeedbase.OptFormat]]; ok { + p.BufferClientNotice(ctx, pgnotice.Newf( + `%[1]s is no longer experimental, use %[2]s=%[1]s`, + newFormat, changefeedbase.OptFormat), + ) + // Still serialize the experimental_ form for backwards compatibility + } + + jobDescription, err := changefeedJobDescription(p, changefeedStmt, sinkURI, opts) + if err != nil { + return nil, err + } + + statementTime := hlc.Timestamp{ + WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(), + } + var initialHighWater hlc.Timestamp + if cursor, ok := opts[changefeedbase.OptCursor]; ok { + asOfClause := tree.AsOfClause{Expr: tree.NewStrVal(cursor)} + var err error + asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause) + if err != nil { + return nil, err + } + initialHighWater = asOf.Timestamp + statementTime = initialHighWater + } + + // This grabs table descriptors once to get their ids. + targetDescs, err := getTableDescriptors(ctx, p, &changefeedStmt.Targets, statementTime, initialHighWater) + if err != nil { + return nil, err + } + + targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, opts) + if err != nil { + return nil, err + } + + details := jobspb.ChangefeedDetails{ + Tables: tables, + Opts: opts, + SinkURI: sinkURI, + StatementTime: statementTime, + TargetSpecifications: targets, + } + + // TODO(dan): In an attempt to present the most helpful error message to the + // user, the ordering requirements between all these usage validations have + // become extremely fragile and non-obvious. + // + // - `validateDetails` has to run first to fill in defaults for `envelope` + // and `format` if the user didn't specify them. + // - Then `getEncoder` is run to return any configuration errors. + // - Then the changefeed is opted in to `OptKeyInValue` for any cloud + // storage sink or webhook sink. Kafka etc have a key and value field in + // each message but cloud storage sinks and webhook sinks don't have + // anywhere to put the key. So if the key is not in the value, then for + // DELETEs there is no way to recover which key was deleted. We could make + // the user explicitly pass this option for every cloud storage sink/ + // webhook sink and error if they don't, but that seems user-hostile for + // insufficient reason. We can't do this any earlier, because we might + // return errors about `key_in_value` being incompatible which is + // confusing when the user didn't type that option. + // This is the same for the topic and webhook sink, which uses + // `topic_in_value` to embed the topic in the value by default, since it + // has no other avenue to express the topic. + // - Finally, we create a "canary" sink to test sink configuration and + // connectivity. This has to go last because it is strange to return sink + // connectivity errors before we've finished validating all the other + // options. We should probably split sink configuration checking and sink + // connectivity checking into separate methods. + // + // The only upside in all this nonsense is the tests are decent. I've tuned + // this particular order simply by rearranging stuff until the changefeedccl + // tests all pass. + parsedSink, err := url.Parse(sinkURI) + if err != nil { + return nil, err + } + if newScheme, ok := changefeedbase.NoLongerExperimental[parsedSink.Scheme]; ok { + parsedSink.Scheme = newScheme // This gets munged anyway when building the sink + p.BufferClientNotice(ctx, pgnotice.Newf(`%[1]s is no longer experimental, use %[1]s://`, + newScheme), + ) + } + + if details, err = validateDetails(details); err != nil { + return nil, err + } + + if _, err := getEncoder(details.Opts, AllTargets(details)); err != nil { + return nil, err + } + + if isCloudStorageSink(parsedSink) || isWebhookSink(parsedSink) { + details.Opts[changefeedbase.OptKeyInValue] = `` + } + if isWebhookSink(parsedSink) { + details.Opts[changefeedbase.OptTopicInValue] = `` + } + + if !unspecifiedSink && p.ExecCfg().ExternalIODirConfig.DisableOutbound { + return nil, errors.Errorf("Outbound IO is disabled by configuration, cannot create changefeed into %s", parsedSink.Scheme) + } + + if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; shouldProtect && !p.ExecCfg().Codec.ForSystemTenant() { + return nil, errorutil.UnsupportedWithMultiTenancy(67271) + } + + if telemetryPath != `` { + // Feature telemetry + telemetrySink := parsedSink.Scheme + if telemetrySink == `` { + telemetrySink = `sinkless` + } + telemetry.Count(telemetryPath + `.sink.` + telemetrySink) + telemetry.Count(telemetryPath + `.format.` + details.Opts[changefeedbase.OptFormat]) + telemetry.CountBucketed(telemetryPath+`.num_tables`, int64(len(tables))) + } + + if scope, ok := opts[changefeedbase.OptMetricsScope]; ok { + if err := utilccl.CheckEnterpriseEnabled( + p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED", + ); err != nil { + return nil, errors.Wrapf(err, + "use of %q option requires enterprise license.", changefeedbase.OptMetricsScope) + } + + if scope == defaultSLIScope { + return nil, pgerror.Newf(pgcode.InvalidParameterValue, + "%[1]q=%[2]q is the default metrics scope which keeps track of statistics "+ + "across all changefeeds without explicit label. "+ + "If this is an intended behavior, please re-run the statement "+ + "without specifying %[1]q parameter. "+ + "Otherwise, please re-run with a different %[1]q value.", + changefeedbase.OptMetricsScope, defaultSLIScope) + } + } + + if details.SinkURI == `` { + // Jobs should not be created for sinkless changefeeds. However, note that + // we create and return a job record for sinkless changefeeds below. This is + // because we need the details field to create our sinkless changefeed. + // After this job record is returned, we create our forever running sinkless + // changefeed, thus ensuring that no job is created for this changefeed as + // desired. + sinklessRecord := &jobs.Record{ + Details: details, + } + return sinklessRecord, nil + } + + if err := utilccl.CheckEnterpriseEnabled( + p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED", + ); err != nil { + return nil, err + } + + if telemetryPath != `` { + telemetry.Count(telemetryPath + `.enterprise`) + } + + // In the case where a user is executing a CREATE CHANGEFEED and is still + // waiting for the statement to return, we take the opportunity to ensure + // that the user has not made any obvious errors when specifying the sink in + // the CREATE CHANGEFEED statement. To do this, we create a "canary" sink, + // which will be immediately closed, only to check for errors. + err = validateSink(ctx, p, jobID, details, opts) + + jr := &jobs.Record{ + Description: jobDescription, + Username: p.User(), + DescriptorIDs: func() (sqlDescIDs []descpb.ID) { + for _, desc := range targetDescs { + sqlDescIDs = append(sqlDescIDs, desc.GetID()) + } + return sqlDescIDs + }(), + Details: details, + } + + return jr, err +} + func validateSettings(ctx context.Context, p sql.PlanHookState) error { if err := featureflag.CheckEnabled( ctx, diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 543a042d3f04..50d506927af3 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -233,3 +233,7 @@ var NoLongerExperimental = map[string]string{ DeprecatedSinkSchemeCloudStorageNodelocal: SinkSchemeCloudStorageNodelocal, DeprecatedSinkSchemeCloudStorageS3: SinkSchemeCloudStorageS3, } + +// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow +// users to alter +var AlterChangefeedUnsupportedOptions = makeStringSet(OptCursor, OptInitialScan, OptNoInitialScan) diff --git a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go index b358cef8ed18..33ecca26a33c 100644 --- a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go +++ b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go @@ -362,41 +362,51 @@ func TestShowChangefeedJobsAlterChangefeed(t *testing.T) { feed, ok := foo.(cdctest.EnterpriseTestFeed) require.True(t, ok) - sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID()) - waitForJobStatus(sqlDB, t, feed.JobID(), `paused`) - - sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d ADD bar`, feed.JobID())) + jobID := feed.JobID() + details, err := feed.Details() + require.NoError(t, err) + sinkURI := details.SinkURI type row struct { id jobspb.JobID + description string SinkURI string FullTableNames []uint8 format string topics string } - var out row - - query := `SELECT job_id, sink_uri, full_table_names, format, IFNULL(topics, '') FROM [SHOW CHANGEFEED JOBS] ORDER BY sink_uri` - rowResults := sqlDB.Query(t, query) - - if !rowResults.Next() { - err := rowResults.Err() + obtainJobRowFn := func() row { + var out row + + query := fmt.Sprintf( + `SELECT job_id, description, sink_uri, full_table_names, format, IFNULL(topics, '') FROM [SHOW CHANGEFEED JOB %d]`, + jobID, + ) + + rowResults := sqlDB.Query(t, query) + if !rowResults.Next() { + err := rowResults.Err() + if err != nil { + t.Fatalf("Error encountered while querying the next row: %v", err) + } else { + t.Fatalf("Expected more rows when querying and none found for query: %s", query) + } + } + err := rowResults.Scan(&out.id, &out.description, &out.SinkURI, &out.FullTableNames, &out.format, &out.topics) if err != nil { - t.Fatalf("Error encountered while querying the next row: %v", err) - } else { - t.Fatalf("Expected more rows when querying and none found for query: %s", query) + t.Fatal(err) } - } - err := rowResults.Scan(&out.id, &out.SinkURI, &out.FullTableNames, &out.format, &out.topics) - if err != nil { - t.Fatal(err) + + return out } - details, err := feed.Details() - require.NoError(t, err) - sinkURI := details.SinkURI - jobID := feed.JobID() + sqlDB.Exec(t, `PAUSE JOB $1`, jobID) + waitForJobStatus(sqlDB, t, jobID, `paused`) + + sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d ADD bar`, jobID)) + + out := obtainJobRowFn() topicsArr := strings.Split(out.topics, ",") sort.Strings(topicsArr) @@ -408,32 +418,26 @@ func TestShowChangefeedJobsAlterChangefeed(t *testing.T) { require.Equal(t, "json", out.format, "Expected format:%s but found format:%s", "json", out.format) sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP foo`, feed.JobID())) - rowResults = sqlDB.Query(t, query) - - if !rowResults.Next() { - err := rowResults.Err() - if err != nil { - t.Fatalf("Error encountered while querying the next row: %v", err) - } else { - t.Fatalf("Expected more rows when querying and none found for query: %s", query) - } - } - err = rowResults.Scan(&out.id, &out.SinkURI, &out.FullTableNames, &out.format, &out.topics) - if err != nil { - t.Fatal(err) - } - details, err = feed.Details() - require.NoError(t, err) - sinkURI = details.SinkURI - jobID = feed.JobID() + out = obtainJobRowFn() require.Equal(t, jobID, out.id, "Expected id:%d but found id:%d", jobID, out.id) + require.Equal(t, "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/' WITH envelope = 'wrapped', format = 'json', on_error = 'fail', schema_change_events = 'default', schema_change_policy = 'backfill', virtual_columns = 'omitted'", out.description, "Expected description:%s but found description:%s", "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/'", out.description) require.Equal(t, sinkURI, out.SinkURI, "Expected sinkUri:%s but found sinkUri:%s", sinkURI, out.SinkURI) - require.Equal(t, "bar", out.topics, "Expected topics:%s but found topics:%s", "bar,foo", sortedTopics) - require.Equal(t, "{d.public.bar}", string(out.FullTableNames), "Expected fullTableNames:%s but found fullTableNames:%s", "{d.public.foo,d.public.bar}", string(out.FullTableNames)) + require.Equal(t, "bar", out.topics, "Expected topics:%s but found topics:%s", "bar", sortedTopics) + require.Equal(t, "{d.public.bar}", string(out.FullTableNames), "Expected fullTableNames:%s but found fullTableNames:%s", "{d.public.bar}", string(out.FullTableNames)) require.Equal(t, "json", out.format, "Expected format:%s but found format:%s", "json", out.format) + sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET resolved = '5s'`, feed.JobID())) + + out = obtainJobRowFn() + + require.Equal(t, jobID, out.id, "Expected id:%d but found id:%d", jobID, out.id) + require.Equal(t, "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/' WITH envelope = 'wrapped', format = 'json', on_error = 'fail', resolved = '5s', schema_change_events = 'default', schema_change_policy = 'backfill', virtual_columns = 'omitted'", out.description, "Expected description:%s but found description:%s", "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/'", out.description) + require.Equal(t, sinkURI, out.SinkURI, "Expected sinkUri:%s but found sinkUri:%s", sinkURI, out.SinkURI) + require.Equal(t, "bar", out.topics, "Expected topics:%s but found topics:%s", "bar", sortedTopics) + require.Equal(t, "{d.public.bar}", string(out.FullTableNames), "Expected fullTableNames:%s but found fullTableNames:%s", "{d.public.bar}", string(out.FullTableNames)) + require.Equal(t, "json", out.format, "Expected format:%s but found format:%s", "json", out.format) } t.Run(`kafka`, kafkaTest(testFn)) diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index cdef75d7c5b5..a562b8fec17e 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -887,7 +887,7 @@ func (u *sqlSymUnion) fetchCursor() *tree.FetchCursor { %token TRACING %token UNBOUNDED UNCOMMITTED UNION UNIQUE UNKNOWN UNLOGGED UNSPLIT -%token UPDATE UPSERT UNTIL USE USER USERS USING UUID +%token UPDATE UPSERT UNSET UNTIL USE USER USERS USING UUID %token VALID VALIDATE VALUE VALUES VARBIT VARCHAR VARIADIC VIEW VARYING VIEWACTIVITY VIEWACTIVITYREDACTED %token VIEWCLUSTERSETTING VIRTUAL VISIBLE VOTERS @@ -4376,7 +4376,7 @@ explain_option_list: // %Help: ALTER CHANGEFEED - alter an existing changefeed // %Category: CCL // %Text: -// ALTER CHANGEFEED {{ADD|DROP} }... +// ALTER CHANGEFEED {{ADD|DROP } | SET }... alter_changefeed_stmt: ALTER CHANGEFEED a_expr alter_changefeed_cmds { @@ -4412,6 +4412,18 @@ alter_changefeed_cmd: Targets: $2.targetList(), } } +| SET kv_option_list + { + $$.val = &tree.AlterChangefeedSetOptions{ + Options: $2.kvOptions(), + } + } +| UNSET name_list + { + $$.val = &tree.AlterChangefeedUnsetOptions{ + Options: $2.nameList(), + } + } // %Help: ALTER BACKUP - alter an existing backup's encryption keys // %Category: CCL @@ -14110,6 +14122,7 @@ unreserved_keyword: | UNCOMMITTED | UNKNOWN | UNLOGGED +| UNSET | UNSPLIT | UNTIL | UPDATE diff --git a/pkg/sql/parser/testdata/alter_changefeed b/pkg/sql/parser/testdata/alter_changefeed index 388715f7fea6..db4c276804c3 100644 --- a/pkg/sql/parser/testdata/alter_changefeed +++ b/pkg/sql/parser/testdata/alter_changefeed @@ -56,3 +56,76 @@ ALTER CHANGEFEED 123 ADD foo DROP bar ADD baz, qux DROP quux -- normalized! ALTER CHANGEFEED (123) ADD (foo) DROP (bar) ADD (baz), (qux) DROP (quux) -- fully parenthesized ALTER CHANGEFEED _ ADD foo DROP bar ADD baz, qux DROP quux -- literals removed ALTER CHANGEFEED 123 ADD _ DROP _ ADD _, _ DROP _ -- identifiers removed + +parse +ALTER CHANGEFEED 123 SET foo = 'bar' +---- +ALTER CHANGEFEED 123 SET foo = 'bar' +ALTER CHANGEFEED (123) SET foo = ('bar') -- fully parenthesized +ALTER CHANGEFEED _ SET foo = '_' -- literals removed +ALTER CHANGEFEED 123 SET _ = 'bar' -- identifiers removed + + +parse +ALTER CHANGEFEED 123 ADD foo SET bar = 'baz', qux = 'quux' +---- +ALTER CHANGEFEED 123 ADD foo SET bar = 'baz', qux = 'quux' -- normalized! +ALTER CHANGEFEED (123) ADD (foo) SET bar = ('baz'), qux = ('quux') -- fully parenthesized +ALTER CHANGEFEED _ ADD foo SET bar = '_', qux = '_' -- literals removed +ALTER CHANGEFEED 123 ADD _ SET _ = 'baz', _ = 'quux' -- identifiers removed + +parse +ALTER CHANGEFEED 123 DROP foo SET bar = 'baz', qux = 'quux' +---- +ALTER CHANGEFEED 123 DROP foo SET bar = 'baz', qux = 'quux' -- normalized! +ALTER CHANGEFEED (123) DROP (foo) SET bar = ('baz'), qux = ('quux') -- fully parenthesized +ALTER CHANGEFEED _ DROP foo SET bar = '_', qux = '_' -- literals removed +ALTER CHANGEFEED 123 DROP _ SET _ = 'baz', _ = 'quux' -- identifiers removed + +parse +ALTER CHANGEFEED 123 SET foo = 'bar' ADD baz DROP qux +---- +ALTER CHANGEFEED 123 SET foo = 'bar' ADD baz DROP qux -- normalized! +ALTER CHANGEFEED (123) SET foo = ('bar') ADD (baz) DROP (qux) -- fully parenthesized +ALTER CHANGEFEED _ SET foo = '_' ADD baz DROP qux -- literals removed +ALTER CHANGEFEED 123 SET _ = 'bar' ADD _ DROP _ -- identifiers removed + +parse +ALTER CHANGEFEED 123 ADD foo SET bar = 'baz', qux = 'quux' DROP corge +---- +ALTER CHANGEFEED 123 ADD foo SET bar = 'baz', qux = 'quux' DROP corge -- normalized! +ALTER CHANGEFEED (123) ADD (foo) SET bar = ('baz'), qux = ('quux') DROP (corge) -- fully parenthesized +ALTER CHANGEFEED _ ADD foo SET bar = '_', qux = '_' DROP corge -- literals removed +ALTER CHANGEFEED 123 ADD _ SET _ = 'baz', _ = 'quux' DROP _ -- identifiers removed + +parse +ALTER CHANGEFEED 123 UNSET foo +---- +ALTER CHANGEFEED 123 UNSET foo +ALTER CHANGEFEED (123) UNSET foo -- fully parenthesized +ALTER CHANGEFEED _ UNSET foo -- literals removed +ALTER CHANGEFEED 123 UNSET _ -- identifiers removed + +parse +ALTER CHANGEFEED 123 ADD foo UNSET bar, baz +---- +ALTER CHANGEFEED 123 ADD foo UNSET bar, baz -- normalized! +ALTER CHANGEFEED (123) ADD (foo) UNSET bar, baz -- fully parenthesized +ALTER CHANGEFEED _ ADD foo UNSET bar, baz -- literals removed +ALTER CHANGEFEED 123 ADD _ UNSET _, _ -- identifiers removed + +parse +ALTER CHANGEFEED 123 UNSET foo, bar ADD baz DROP qux +---- +ALTER CHANGEFEED 123 UNSET foo, bar ADD baz DROP qux -- normalized! +ALTER CHANGEFEED (123) UNSET foo, bar ADD (baz) DROP (qux) -- fully parenthesized +ALTER CHANGEFEED _ UNSET foo, bar ADD baz DROP qux -- literals removed +ALTER CHANGEFEED 123 UNSET _, _ ADD _ DROP _ -- identifiers removed + +parse +ALTER CHANGEFEED 123 ADD foo DROP bar SET baz = 'qux' UNSET quux, corge +---- +ALTER CHANGEFEED 123 ADD foo DROP bar SET baz = 'qux' UNSET quux, corge -- normalized! +ALTER CHANGEFEED (123) ADD (foo) DROP (bar) SET baz = ('qux') UNSET quux, corge -- fully parenthesized +ALTER CHANGEFEED _ ADD foo DROP bar SET baz = '_' UNSET quux, corge -- literals removed +ALTER CHANGEFEED 123 ADD _ DROP _ SET _ = 'qux' UNSET _, _ -- identifiers removed diff --git a/pkg/sql/sem/tree/alter_changefeed.go b/pkg/sql/sem/tree/alter_changefeed.go index c4caeae5b931..885c0f54b0d0 100644 --- a/pkg/sql/sem/tree/alter_changefeed.go +++ b/pkg/sql/sem/tree/alter_changefeed.go @@ -46,11 +46,15 @@ type AlterChangefeedCmd interface { alterChangefeedCmd() } -func (*AlterChangefeedAddTarget) alterChangefeedCmd() {} -func (*AlterChangefeedDropTarget) alterChangefeedCmd() {} +func (*AlterChangefeedAddTarget) alterChangefeedCmd() {} +func (*AlterChangefeedDropTarget) alterChangefeedCmd() {} +func (*AlterChangefeedSetOptions) alterChangefeedCmd() {} +func (*AlterChangefeedUnsetOptions) alterChangefeedCmd() {} var _ AlterChangefeedCmd = &AlterChangefeedAddTarget{} var _ AlterChangefeedCmd = &AlterChangefeedDropTarget{} +var _ AlterChangefeedCmd = &AlterChangefeedSetOptions{} +var _ AlterChangefeedCmd = &AlterChangefeedUnsetOptions{} // AlterChangefeedAddTarget represents an ADD command type AlterChangefeedAddTarget struct { @@ -73,3 +77,25 @@ func (node *AlterChangefeedDropTarget) Format(ctx *FmtCtx) { ctx.WriteString(" DROP ") ctx.FormatNode(&node.Targets.Tables) } + +// AlterChangefeedSetOptions represents an SET command +type AlterChangefeedSetOptions struct { + Options KVOptions +} + +// Format implements the NodeFormatter interface. +func (node *AlterChangefeedSetOptions) Format(ctx *FmtCtx) { + ctx.WriteString(" SET ") + ctx.FormatNode(&node.Options) +} + +// AlterChangefeedUnsetOptions represents an UNSET command +type AlterChangefeedUnsetOptions struct { + Options NameList +} + +// Format implements the NodeFormatter interface. +func (node *AlterChangefeedUnsetOptions) Format(ctx *FmtCtx) { + ctx.WriteString(" UNSET ") + ctx.FormatNode(&node.Options) +} From 3c7d9a18d4d711ff2a7e06e1b36cdd5c7795ba24 Mon Sep 17 00:00:00 2001 From: rimadeodhar Date: Mon, 7 Feb 2022 15:15:22 -0800 Subject: [PATCH 2/2] pprofui:Increase concurrency for profiles In this PR, we increase the concurrency limit while running performance profiles (e.g. heap, CPU) from the Advanced Debug page within DB Console. Previously, attempting to run performance profiling in parallel for the same node would result in race condition causing one of the profiles to overwrite the other. This would cause "Profile not found: profile may have expired" errors. The occurrence of these errors was exacerbated by the new feature enabling running profiles on any nodes as it increased the likelihood of race conditions for profiles being run on a node at the same time. By allowing atleast two profile runs at the same time decreases the likelihood of one request overwriting the other. This does not completely eliminate the problem but will reduce the frequency of occurrence. This PR also updates the error message returned when a profile is not found to provide more details on the potential causes and remediation steps. Release note (bug fix): Attempting to run concurrent profiles works up to a concurrency limit of two. This will remove the occurrence of "profile id not found" errors while running upto two profiles concurrently. When a profile is not found, the error message has been updated to suggest remediation steps in order to unblock the user. --- pkg/server/debug/pprofui/BUILD.bazel | 1 + pkg/server/debug/pprofui/server.go | 16 +++++++ pkg/server/debug/pprofui/server_test.go | 60 +++++++++++++++++++++++++ pkg/server/debug/pprofui/storage_mem.go | 5 ++- pkg/server/debug/server.go | 2 +- 5 files changed, 82 insertions(+), 2 deletions(-) diff --git a/pkg/server/debug/pprofui/BUILD.bazel b/pkg/server/debug/pprofui/BUILD.bazel index aec52ad2320d..0e81935122c3 100644 --- a/pkg/server/debug/pprofui/BUILD.bazel +++ b/pkg/server/debug/pprofui/BUILD.bazel @@ -35,6 +35,7 @@ go_test( "//pkg/build/bazel", "//pkg/server/serverpb", "//pkg/testutils", + "//pkg/testutils/skip", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/server/debug/pprofui/server.go b/pkg/server/debug/pprofui/server.go index 508da9c8e314..525d89f5b4a3 100644 --- a/pkg/server/debug/pprofui/server.go +++ b/pkg/server/debug/pprofui/server.go @@ -40,6 +40,22 @@ type Profiler interface { ) (*serverpb.JSONResponse, error) } +const ( + // ProfileConcurrency governs how many concurrent profiles can be collected. + // This impacts the maximum number of profiles stored in memory at any given point + // in time. Increasing this number further will increase the number of profile + // requests that can be served concurrently but will also increase + // storage requirements and should be done with caution. + ProfileConcurrency = 2 + + // ProfileExpiry governs how long a profile is retained in memory during concurrent + // profile requests. A profile is considered expired once its profile expiry duration + // is met. However, expired profiles are only cleaned up from memory when a new profile + // is requested. So ProfileExpiry can be considered as a soft expiry which impacts + // duration for which a profile is stored only when other profile requests are received. + ProfileExpiry = 2 * time.Second +) + // A Server serves up the pprof web ui. A request to / // generates a profile of the desired type and redirects to the UI for // it at //. Valid profile types at the time of diff --git a/pkg/server/debug/pprofui/server_test.go b/pkg/server/debug/pprofui/server_test.go index c2942fc67aef..54acd24503fa 100644 --- a/pkg/server/debug/pprofui/server_test.go +++ b/pkg/server/debug/pprofui/server_test.go @@ -14,14 +14,18 @@ import ( "context" "fmt" "io/ioutil" + "math/rand" "net/http" "net/http/httptest" "os" + "sync" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/build/bazel" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/stretchr/testify/require" ) @@ -95,3 +99,59 @@ func TestServer(t *testing.T) { require.Equal(t, "/heap/4/flamegraph?node=3", loc) }) } + +func TestServerConcurrentAccess(t *testing.T) { + expectedNodeID := "local" + skip.UnderRace(t, "test fails under race due to known race condition with profiles") + const ( + runsPerWorker = 1 + workers = ProfileConcurrency + ) + mockProfile := func(ctx context.Context, req *serverpb.ProfileRequest) (*serverpb.JSONResponse, error) { + require.Equal(t, expectedNodeID, req.NodeId) + fileName := "heap.profile" + if req.Type == serverpb.ProfileRequest_CPU { + fileName = "cpu.profile" + } + b, err := ioutil.ReadFile(testutils.TestDataPath(t, fileName)) + require.NoError(t, err) + return &serverpb.JSONResponse{Data: b}, nil + } + + s := NewServer(NewMemStorage(ProfileConcurrency, ProfileExpiry), ProfilerFunc(mockProfile)) + getProfile := func(profile string, t *testing.T) { + t.Helper() + + r := httptest.NewRequest("GET", "/heap/", nil) + w := httptest.NewRecorder() + s.ServeHTTP(w, r) + + require.Equal(t, http.StatusTemporaryRedirect, w.Code) + + loc := w.Result().Header.Get("Location") + + r = httptest.NewRequest("GET", loc, nil) + w = httptest.NewRecorder() + + s.ServeHTTP(w, r) + + require.Equal(t, http.StatusOK, w.Code) + require.Contains(t, w.Body.String(), "pprof") + } + var wg sync.WaitGroup + profiles := [2]string{"/heap/", "/cpu"} + runWorker := func() { + defer wg.Done() + for i := 0; i < runsPerWorker; i++ { + time.Sleep(time.Microsecond) + profileID := rand.Intn(len(profiles)) + getProfile(profiles[profileID], t) + } + } + // Run the workers. + for i := 0; i < workers; i++ { + wg.Add(1) + go runWorker() + } + wg.Wait() +} diff --git a/pkg/server/debug/pprofui/storage_mem.go b/pkg/server/debug/pprofui/storage_mem.go index d21e39ebb28a..f60c5c1adc2b 100644 --- a/pkg/server/debug/pprofui/storage_mem.go +++ b/pkg/server/debug/pprofui/storage_mem.go @@ -102,5 +102,8 @@ func (s *MemStorage) Get(id string, read func(io.Reader) error) error { return read(bytes.NewReader(v.b)) } } - return errors.Errorf("profile not found; it may have expired") + return errors.Errorf("profile not found; it may have expired, please regenerate the profile.\n" + + "To generate profile for a node, use the profile generation link from the Advanced Debug page.\n" + + "Attempting to generate a profile by modifying the node query parameter in the URL will not work.", + ) } diff --git a/pkg/server/debug/server.go b/pkg/server/debug/server.go index 15cdef61d07d..4f21ee6a7ea3 100644 --- a/pkg/server/debug/server.go +++ b/pkg/server/debug/server.go @@ -125,7 +125,7 @@ func NewServer( } mux.HandleFunc("/debug/logspy", spy.handleDebugLogSpy) - ps := pprofui.NewServer(pprofui.NewMemStorage(1, 0), profiler) + ps := pprofui.NewServer(pprofui.NewMemStorage(pprofui.ProfileConcurrency, pprofui.ProfileExpiry), profiler) mux.Handle("/debug/pprof/ui/", http.StripPrefix("/debug/pprof/ui", ps)) mux.HandleFunc("/debug/pprof/goroutineui/", func(w http.ResponseWriter, req *http.Request) {