Skip to content

Commit

Permalink
Merge #77043
Browse files Browse the repository at this point in the history
77043: changefeedccl: allow users to alter the sink URI of an existing changefeed r=sherman-grewal a=sherman-grewal

changefeedccl: allow users to alter the sink URI of
an existing changefeed

References #75895

In this PR, we introduce the capability to alter
the sink URI of an existing changefeed. This
can be achieved by executing the following
statement:

ALTER CHANGEFEED <job_id> SET sink = '<sink_uri>'

Note that the sink type cannot be altered. That is,
the sink type must be the same type that was chosen
when the changefeed was initially created.

Release note (enterprise change): Users may now alter
the sink URI of an existing changefeed. This can be
achieved by executing the following statement:

ALTER CHANGEFEED <job_id> SET sink = '<sink_uri>'

Where the sink type of the new sink must match the
sink type of the old sink that was chosen at the
creation of the changefeed.

Co-authored-by: Sherman Grewal <[email protected]>
  • Loading branch information
craig[bot] and Sherman Grewal committed Feb 26, 2022
2 parents 4077017 + 067f213 commit c7b4a53
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 8 deletions.
40 changes: 32 additions & 8 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package changefeedccl

import (
"context"
"net/url"

"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupresolver"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
Expand Down Expand Up @@ -163,7 +164,7 @@ func alterChangefeedPlanHook(
delete(newDescs, desc.GetID())
}
case *tree.AlterChangefeedSetOptions:
optsFn, err := p.TypeAsStringOpts(ctx, v.Options, changefeedbase.ChangefeedOptionExpectValues)
optsFn, err := p.TypeAsStringOpts(ctx, v.Options, changefeedbase.AlterChangefeedOptionExpectValues)
if err != nil {
return err
}
Expand All @@ -174,21 +175,44 @@ func alterChangefeedPlanHook(
}

for key, value := range opts {
if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok {
return pgerror.Newf(pgcode.InvalidParameterValue, `invalid option %q`, key)
}
if _, ok := changefeedbase.AlterChangefeedUnsupportedOptions[key]; ok {
return pgerror.Newf(pgcode.InvalidParameterValue, `cannot alter option %q`, key)
}
opt := tree.KVOption{Key: tree.Name(key)}
if len(value) > 0 {
opt.Value = tree.NewDString(value)
if key == changefeedbase.OptSink {
newSinkURI, err := url.Parse(value)
if err != nil {
return err
}

prevSinkURI, err := url.Parse(prevDetails.SinkURI)
if err != nil {
return err
}

if newSinkURI.Scheme != prevSinkURI.Scheme {
return pgerror.Newf(
pgcode.InvalidParameterValue,
`new sink type %q does not match original sink type %q, sink type cannot be altered`,
newSinkURI.Scheme,
prevSinkURI.Scheme,
)
}

newChangefeedStmt.SinkURI = tree.NewDString(value)
} else {
opt := tree.KVOption{Key: tree.Name(key)}
if len(value) > 0 {
opt.Value = tree.NewDString(value)
}
optionsMap[key] = opt
}
optionsMap[key] = opt
}
case *tree.AlterChangefeedUnsetOptions:
optKeys := v.Options.ToStrings()
for _, key := range optKeys {
if key == changefeedbase.OptSink {
return pgerror.Newf(pgcode.InvalidParameterValue, `cannot unset option %q`, key)
}
if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok {
return pgerror.Newf(pgcode.InvalidParameterValue, `invalid option %q`, key)
}
Expand Down
69 changes: 69 additions & 0 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ func TestAlterChangefeedErrors(t *testing.T) {
`pq: cannot alter option "initial_scan"`,
fmt.Sprintf(`ALTER CHANGEFEED %d UNSET initial_scan`, feed.JobID()),
)

sqlDB.ExpectErr(t,
`cannot unset option "sink"`,
fmt.Sprintf(`ALTER CHANGEFEED %d UNSET sink`, feed.JobID()),
)
}

t.Run(`kafka`, kafkaTest(testFn))
Expand Down Expand Up @@ -304,3 +309,67 @@ func TestAlterChangefeedPersistSinkURI(t *testing.T) {

require.Equal(t, details.SinkURI, `s3://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456`)
}

func TestAlterChangefeedChangeSinkTypeError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
require.True(t, ok)

sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID())
waitForJobStatus(sqlDB, t, feed.JobID(), `paused`)

sqlDB.ExpectErr(t,
`pq: new sink type "s3" does not match original sink type "kafka", sink type cannot be altered`,
fmt.Sprintf(`ALTER CHANGEFEED %d SET sink = 's3://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456'`, feed.JobID()),
)
}

t.Run(`kafka`, kafkaTest(testFn))
}

func TestAlterChangefeedChangeSinkURI(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
registry := f.Server().JobRegistry().(*jobs.Registry)
ctx := context.Background()

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
require.True(t, ok)

sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID())
waitForJobStatus(sqlDB, t, feed.JobID(), `paused`)

newSinkURI := `kafka://new_kafka_uri`

sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET sink = '%s'`, feed.JobID(), newSinkURI))

sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, feed.JobID()))
waitForJobStatus(sqlDB, t, feed.JobID(), `running`)

job, err := registry.LoadJob(ctx, feed.JobID())
require.NoError(t, err)
details, ok := job.Details().(jobspb.ChangefeedDetails)
require.True(t, ok)

require.Equal(t, newSinkURI, details.SinkURI)
}

t.Run(`kafka`, kafkaTest(testFn))
}
15 changes: 15 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ const (
OptKafkaSinkConfig = `kafka_sink_config`
OptWebhookSinkConfig = `webhook_sink_config`

// OptSink allows users to alter the Sink URI of an existing changefeed.
// Note that this option is only allowed for alter changefeed statements.
OptSink = `sink`

SinkParamCACert = `ca_cert`
SinkParamClientCert = `client_cert`
SinkParamClientKey = `client_key`
Expand Down Expand Up @@ -237,3 +241,14 @@ var NoLongerExperimental = map[string]string{
// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow
// users to alter
var AlterChangefeedUnsupportedOptions = makeStringSet(OptCursor, OptInitialScan, OptNoInitialScan)

// AlterChangefeedOptionExpectValues is used to parse alter changefeed options
// using PlanHookState.TypeAsStringOpts().
var AlterChangefeedOptionExpectValues = func() map[string]sql.KVStringOptValidate {
alterChangefeedOptions := make(map[string]sql.KVStringOptValidate, len(ChangefeedOptionExpectValues)+1)
for key, value := range ChangefeedOptionExpectValues {
alterChangefeedOptions[key] = value
}
alterChangefeedOptions[OptSink] = sql.KVStringOptRequireValue
return alterChangefeedOptions
}()

0 comments on commit c7b4a53

Please sign in to comment.