Skip to content

Commit

Permalink
Merge #76266 #76583
Browse files Browse the repository at this point in the history
76266: pprofui: Increase concurrency for profiles r=dhartunian a=rimadeodhar

In this PR, we increase the concurrency limit while
running performance profiles (e.g. heap, CPU) from
the Advanced Debug page within DB Console. Previously,
attempting to run performance profiling in parallel
for the same node would result in race condition causing
one of the profiles to overwrite the other. This would cause
"Profile not found: profile may have expired" errors.
The occurrence of these errors was exacerbated by the new
feature enabling running profiles on any nodes as it
increased the likelihood of race conditions for profiles
being run on a node at the same time.
By allowing atleast two profile runs at the same time
decreases the likelihood of one request overwriting
the other. This does not completely eliminate
the problem but will reduce the frequency of occurrence.
This PR also updates the error message returned when
a profile is not found to provide more details on
the potential causes and remediation steps.

Release note: None

76583: changefeedccl: allow users to alter changefeed options r=sherman-grewal a=sherman-grewal

changefeedccl: allow users to alter changefeed options
with the ALTER CHANGEFEED statement

References #75895

Currently, with the ALTER CHANGEFEED statement users
can only add or drop targets from an existing
changefeed. In this PR, we would like to extend this
functionality so that an user can edit and unset
the options of an existing changefeed as well.
The syntax of this addition is the following:

ALTER CHANGEFEED <job_id> SET \<options\> UNSET <opt_list>

Note that the <options> must follow the same syntax
that is used when creating a changefeed with options.
In particular, if you would like to set an option
that requires a value, you must write

SET opt = 'value'

On the other hand, if you would like to set an
option that requires no value, you must write

SET opt

Furthermore, this PR allows users to unset options.
This can be achieved by writing

UNSET <opt_list>

Where <opt_list> is a list of options that you
would like to unset. For example, if we would like
to unset the diff and resolved options for
changefeed 123, we would achieve this by writing

ALTER CHANGEFEED 123 UNSET diff, resolved

Release note (enterprise change): Added support to
the ALTER CHANGEFEED statement so that users can edit
and unset the options of an existing changefeed. The
syntax of this addition is the following:

ALTER CHANGEFEED <job_id> SET \<options\> UNSET <opt_list>

Co-authored-by: rimadeodhar <[email protected]>
Co-authored-by: Sherman Grewal <[email protected]>
  • Loading branch information
3 people committed Feb 25, 2022
3 parents 08bbadf + 3c7d9a1 + 75dc9fe commit dab67a9
Show file tree
Hide file tree
Showing 15 changed files with 781 additions and 302 deletions.
3 changes: 3 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,7 @@ unreserved_keyword ::=
| 'UNCOMMITTED'
| 'UNKNOWN'
| 'UNLOGGED'
| 'UNSET'
| 'UNSPLIT'
| 'UNTIL'
| 'UPDATE'
Expand Down Expand Up @@ -2473,6 +2474,8 @@ alter_default_privileges_target_object ::=
alter_changefeed_cmd ::=
'ADD' changefeed_targets
| 'DROP' changefeed_targets
| 'SET' kv_option_list
| 'UNSET' name_list

alter_backup_cmd ::=
'ADD' backup_kms
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/flowinfra",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgnotice",
Expand Down
246 changes: 164 additions & 82 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ package changefeedccl
import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupresolver"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -30,11 +32,6 @@ func init() {
sql.AddPlanHook("alter changefeed", alterChangefeedPlanHook)
}

type alterChangefeedOpts struct {
AddTargets []tree.TargetList
DropTargets []tree.TargetList
}

// alterChangefeedPlanHook implements sql.PlanHookFn.
func alterChangefeedPlanHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
Expand Down Expand Up @@ -67,7 +64,7 @@ func alterChangefeedPlanHook(
return err
}

details, ok := job.Details().(jobspb.ChangefeedDetails)
prevDetails, ok := job.Details().(jobspb.ChangefeedDetails)
if !ok {
return errors.Errorf(`job %d is not changefeed job`, jobID)
}
Expand All @@ -76,116 +73,188 @@ func alterChangefeedPlanHook(
return errors.Errorf(`job %d is not paused`, jobID)
}

var opts alterChangefeedOpts
for _, cmd := range alterChangefeedStmt.Cmds {
switch v := cmd.(type) {
case *tree.AlterChangefeedAddTarget:
opts.AddTargets = append(opts.AddTargets, v.Targets)
case *tree.AlterChangefeedDropTarget:
opts.DropTargets = append(opts.DropTargets, v.Targets)
// this CREATE CHANGEFEED node will be used to update the existing changefeed
newChangefeedStmt := &tree.CreateChangefeed{
SinkURI: tree.NewDString(prevDetails.SinkURI),
}

optionsMap := make(map[string]tree.KVOption, len(prevDetails.Opts))

// pull the options that are set for the existing changefeed
for key, value := range prevDetails.Opts {
// There are some options (e.g. topics) that we set during the creation of
// a changefeed, but we do not allow these options to be set by the user.
// Hence, we can not include these options in our new CREATE CHANGEFEED
// statement.
if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok {
continue
}
existingOpt := tree.KVOption{Key: tree.Name(key)}
if len(value) > 0 {
existingOpt.Value = tree.NewDString(value)
}
optionsMap[key] = existingOpt
}

var initialHighWater hlc.Timestamp
statementTime := hlc.Timestamp{
WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(),
}

if opts.AddTargets != nil {
var targetDescs []catalog.Descriptor

for _, targetList := range opts.AddTargets {
descs, err := getTableDescriptors(ctx, p, &targetList, statementTime, initialHighWater)
if err != nil {
return err
}
targetDescs = append(targetDescs, descs...)
}
allDescs, err := backupresolver.LoadAllDescs(ctx, p.ExecCfg(), statementTime)
if err != nil {
return err
}
descResolver, err := backupresolver.NewDescriptorResolver(allDescs)
if err != nil {
return err
}

newTargets, newTables, err := getTargetsAndTables(ctx, p, targetDescs, details.Opts)
if err != nil {
return err
}
// add old targets
for id, table := range details.Tables {
newTables[id] = table
}
details.Tables = newTables
details.TargetSpecifications = append(details.TargetSpecifications, newTargets...)
newDescs := make(map[descpb.ID]*tree.UnresolvedName)

for _, target := range AllTargets(prevDetails) {
desc := descResolver.DescByID[target.TableID]
newDescs[target.TableID] = tree.NewUnresolvedName(desc.GetName())
}

if opts.DropTargets != nil {
var targetDescs []catalog.Descriptor
for _, cmd := range alterChangefeedStmt.Cmds {
switch v := cmd.(type) {
case *tree.AlterChangefeedAddTarget:
for _, targetPattern := range v.Targets.Tables {
targetName, err := getTargetName(targetPattern)
if err != nil {
return err
}
found, _, desc, err := resolver.ResolveExisting(
ctx,
targetName.ToUnresolvedObjectName(),
descResolver,
tree.ObjectLookupFlags{},
p.CurrentDatabase(),
p.CurrentSearchPath(),
)
if err != nil {
return err
}
if !found {
return pgerror.Newf(pgcode.InvalidParameterValue, `target %q does not exist`, tree.ErrString(targetPattern))
}
newDescs[desc.GetID()] = tree.NewUnresolvedName(desc.GetName())
}
case *tree.AlterChangefeedDropTarget:
for _, targetPattern := range v.Targets.Tables {
targetName, err := getTargetName(targetPattern)
if err != nil {
return err
}
found, _, desc, err := resolver.ResolveExisting(
ctx,
targetName.ToUnresolvedObjectName(),
descResolver,
tree.ObjectLookupFlags{},
p.CurrentDatabase(),
p.CurrentSearchPath(),
)
if err != nil {
return err
}
if !found {
return pgerror.Newf(pgcode.InvalidParameterValue, `target %q does not exist`, tree.ErrString(targetPattern))
}
delete(newDescs, desc.GetID())
}
case *tree.AlterChangefeedSetOptions:
optsFn, err := p.TypeAsStringOpts(ctx, v.Options, changefeedbase.ChangefeedOptionExpectValues)
if err != nil {
return err
}

for _, targetList := range opts.DropTargets {
descs, err := getTableDescriptors(ctx, p, &targetList, statementTime, initialHighWater)
opts, err := optsFn()
if err != nil {
return err
}
targetDescs = append(targetDescs, descs...)
}

for _, desc := range targetDescs {
if table, isTable := desc.(catalog.TableDescriptor); isTable {
if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil {
return err
for key, value := range opts {
if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok {
return pgerror.Newf(pgcode.InvalidParameterValue, `invalid option %q`, key)
}
if _, ok := changefeedbase.AlterChangefeedUnsupportedOptions[key]; ok {
return pgerror.Newf(pgcode.InvalidParameterValue, `cannot alter option %q`, key)
}
delete(details.Tables, table.GetID())
opt := tree.KVOption{Key: tree.Name(key)}
if len(value) > 0 {
opt.Value = tree.NewDString(value)
}
optionsMap[key] = opt
}
}

newTargetSpecifications := make([]jobspb.ChangefeedTargetSpecification, len(details.TargetSpecifications)-len(opts.DropTargets))
for _, ts := range details.TargetSpecifications {
if _, stillThere := details.Tables[ts.TableID]; stillThere {
newTargetSpecifications = append(newTargetSpecifications, ts)
case *tree.AlterChangefeedUnsetOptions:
optKeys := v.Options.ToStrings()
for _, key := range optKeys {
if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok {
return pgerror.Newf(pgcode.InvalidParameterValue, `invalid option %q`, key)
}
if _, ok := changefeedbase.AlterChangefeedUnsupportedOptions[key]; ok {
return pgerror.Newf(pgcode.InvalidParameterValue, `cannot alter option %q`, key)
}
delete(optionsMap, key)
}
}
details.TargetSpecifications = newTargetSpecifications
}

if len(newDescs) == 0 {
return pgerror.Newf(pgcode.InvalidParameterValue, "cannot drop all targets for changefeed job %d", jobID)
}

if len(details.Tables) == 0 {
return errors.Errorf("cannot drop all targets for changefeed job %d", jobID)
for _, targetName := range newDescs {
newChangefeedStmt.Targets.Tables = append(newChangefeedStmt.Targets.Tables, targetName)
}

if err := validateSink(ctx, p, jobID, details, details.Opts); err != nil {
return err
for _, val := range optionsMap {
newChangefeedStmt.Options = append(newChangefeedStmt.Options, val)
}

oldStmt, err := parser.ParseOne(job.Payload().Description)
sinkURIFn, err := p.TypeAsString(ctx, newChangefeedStmt.SinkURI, `ALTER CHANGEFEED`)
if err != nil {
return err
}
oldChangefeedStmt, ok := oldStmt.AST.(*tree.CreateChangefeed)
if !ok {
return errors.Errorf(`could not parse create changefeed statement for job %d`, jobID)
}

var targets tree.TargetList
for _, target := range details.Tables {
targetName := tree.MakeTableNameFromPrefix(tree.ObjectNamePrefix{}, tree.Name(target.StatementTimeName))
targets.Tables = append(targets.Tables, &targetName)
optsFn, err := p.TypeAsStringOpts(ctx, newChangefeedStmt.Options, changefeedbase.ChangefeedOptionExpectValues)
if err != nil {
return err
}

oldChangefeedStmt.Targets = targets
jobDescription := tree.AsString(oldChangefeedStmt)

newPayload := job.Payload()
newPayload.Description = jobDescription
newPayload.Details = jobspb.WrapPayloadDetails(details)
sinkURI, err := sinkURIFn()
if err != nil {
return err
}

finalDescs, err := getTableDescriptors(ctx, p, &targets, statementTime, initialHighWater)
opts, err := optsFn()
if err != nil {
return err
}

newPayload.DescriptorIDs = func() (sqlDescIDs []descpb.ID) {
for _, desc := range finalDescs {
sqlDescIDs = append(sqlDescIDs, desc.GetID())
}
return sqlDescIDs
}()
jobRecord, err := createChangefeedJobRecord(
ctx,
p,
newChangefeedStmt,
sinkURI,
opts,
jobID,
``,
)
if err != nil {
return errors.Wrap(err, `failed to alter changefeed`)
}

newDetails := jobRecord.Details.(jobspb.ChangefeedDetails)

// We need to persist the statement time that was generated during the
// creation of the changefeed
newDetails.StatementTime = prevDetails.StatementTime

newPayload := job.Payload()
newPayload.Details = jobspb.WrapPayloadDetails(newDetails)
newPayload.Description = jobRecord.Description
newPayload.DescriptorIDs = jobRecord.DescriptorIDs

err = p.ExecCfg().JobRegistry.UpdateJobWithTxn(ctx, jobID, p.ExtendedEvalContext().Txn, lockForUpdate, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
Expand All @@ -203,11 +272,24 @@ func alterChangefeedPlanHook(
return ctx.Err()
case resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(jobID)),
tree.NewDString(jobDescription),
tree.NewDString(jobRecord.Description),
}:
return nil
}
}

return fn, header, nil, false, nil
}

func getTargetName(targetPattern tree.TablePattern) (*tree.TableName, error) {
pattern, err := targetPattern.NormalizeTablePattern()
if err != nil {
return nil, err
}
targetName, ok := pattern.(*tree.TableName)
if !ok {
return nil, errors.Errorf(`CHANGEFEED cannot target %q`, tree.AsString(targetPattern))
}

return targetName, nil
}
Loading

0 comments on commit dab67a9

Please sign in to comment.