Skip to content

Commit

Permalink
changefeedccl: allow users to alter the sink URI of
Browse files Browse the repository at this point in the history
an existing changefeed

In this PR, we introduce the capability to alter
the sink URI of an existing changefeed. This
can be achieved by executing the following
statement:

ALTER CHANGEFEED <job_id> SET sink = '<sink_uri>'

Note that the sink type cannot be altered. That is,
the sink type must be the same type that was chosen
when the changefeed was initially created.

Release note (enterprise change): Users may now alter
the sink URI of an existing changefeed. This can be
achieved by executing the following statement:

ALTER CHANGEFEED <job_id> SET sink = '<sink_uri>'

Where the sink type of the new sink must match the
sink type of the old sink that was chosen at the
creation of the changefeed.
  • Loading branch information
Sherman Grewal committed Feb 25, 2022
1 parent 0679c4c commit 067f213
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 8 deletions.
40 changes: 32 additions & 8 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package changefeedccl

import (
"context"
"net/url"

"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupresolver"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
Expand Down Expand Up @@ -163,7 +164,7 @@ func alterChangefeedPlanHook(
delete(newDescs, desc.GetID())
}
case *tree.AlterChangefeedSetOptions:
optsFn, err := p.TypeAsStringOpts(ctx, v.Options, changefeedbase.ChangefeedOptionExpectValues)
optsFn, err := p.TypeAsStringOpts(ctx, v.Options, changefeedbase.AlterChangefeedOptionExpectValues)
if err != nil {
return err
}
Expand All @@ -174,21 +175,44 @@ func alterChangefeedPlanHook(
}

for key, value := range opts {
if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok {
return pgerror.Newf(pgcode.InvalidParameterValue, `invalid option %q`, key)
}
if _, ok := changefeedbase.AlterChangefeedUnsupportedOptions[key]; ok {
return pgerror.Newf(pgcode.InvalidParameterValue, `cannot alter option %q`, key)
}
opt := tree.KVOption{Key: tree.Name(key)}
if len(value) > 0 {
opt.Value = tree.NewDString(value)
if key == changefeedbase.OptSink {
newSinkURI, err := url.Parse(value)
if err != nil {
return err
}

prevSinkURI, err := url.Parse(prevDetails.SinkURI)
if err != nil {
return err
}

if newSinkURI.Scheme != prevSinkURI.Scheme {
return pgerror.Newf(
pgcode.InvalidParameterValue,
`new sink type %q does not match original sink type %q, sink type cannot be altered`,
newSinkURI.Scheme,
prevSinkURI.Scheme,
)
}

newChangefeedStmt.SinkURI = tree.NewDString(value)
} else {
opt := tree.KVOption{Key: tree.Name(key)}
if len(value) > 0 {
opt.Value = tree.NewDString(value)
}
optionsMap[key] = opt
}
optionsMap[key] = opt
}
case *tree.AlterChangefeedUnsetOptions:
optKeys := v.Options.ToStrings()
for _, key := range optKeys {
if key == changefeedbase.OptSink {
return pgerror.Newf(pgcode.InvalidParameterValue, `cannot unset option %q`, key)
}
if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok {
return pgerror.Newf(pgcode.InvalidParameterValue, `invalid option %q`, key)
}
Expand Down
69 changes: 69 additions & 0 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ func TestAlterChangefeedErrors(t *testing.T) {
`pq: cannot alter option "initial_scan"`,
fmt.Sprintf(`ALTER CHANGEFEED %d UNSET initial_scan`, feed.JobID()),
)

sqlDB.ExpectErr(t,
`cannot unset option "sink"`,
fmt.Sprintf(`ALTER CHANGEFEED %d UNSET sink`, feed.JobID()),
)
}

t.Run(`kafka`, kafkaTest(testFn))
Expand Down Expand Up @@ -304,3 +309,67 @@ func TestAlterChangefeedPersistSinkURI(t *testing.T) {

require.Equal(t, details.SinkURI, `s3://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456`)
}

func TestAlterChangefeedChangeSinkTypeError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
require.True(t, ok)

sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID())
waitForJobStatus(sqlDB, t, feed.JobID(), `paused`)

sqlDB.ExpectErr(t,
`pq: new sink type "s3" does not match original sink type "kafka", sink type cannot be altered`,
fmt.Sprintf(`ALTER CHANGEFEED %d SET sink = 's3://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456'`, feed.JobID()),
)
}

t.Run(`kafka`, kafkaTest(testFn))
}

func TestAlterChangefeedChangeSinkURI(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
registry := f.Server().JobRegistry().(*jobs.Registry)
ctx := context.Background()

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
require.True(t, ok)

sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID())
waitForJobStatus(sqlDB, t, feed.JobID(), `paused`)

newSinkURI := `kafka://new_kafka_uri`

sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET sink = '%s'`, feed.JobID(), newSinkURI))

sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, feed.JobID()))
waitForJobStatus(sqlDB, t, feed.JobID(), `running`)

job, err := registry.LoadJob(ctx, feed.JobID())
require.NoError(t, err)
details, ok := job.Details().(jobspb.ChangefeedDetails)
require.True(t, ok)

require.Equal(t, newSinkURI, details.SinkURI)
}

t.Run(`kafka`, kafkaTest(testFn))
}
15 changes: 15 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ const (
OptKafkaSinkConfig = `kafka_sink_config`
OptWebhookSinkConfig = `webhook_sink_config`

// OptSink allows users to alter the Sink URI of an existing changefeed.
// Note that this option is only allowed for alter changefeed statements.
OptSink = `sink`

SinkParamCACert = `ca_cert`
SinkParamClientCert = `client_cert`
SinkParamClientKey = `client_key`
Expand Down Expand Up @@ -237,3 +241,14 @@ var NoLongerExperimental = map[string]string{
// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow
// users to alter
var AlterChangefeedUnsupportedOptions = makeStringSet(OptCursor, OptInitialScan, OptNoInitialScan)

// AlterChangefeedOptionExpectValues is used to parse alter changefeed options
// using PlanHookState.TypeAsStringOpts().
var AlterChangefeedOptionExpectValues = func() map[string]sql.KVStringOptValidate {
alterChangefeedOptions := make(map[string]sql.KVStringOptValidate, len(ChangefeedOptionExpectValues)+1)
for key, value := range ChangefeedOptionExpectValues {
alterChangefeedOptions[key] = value
}
alterChangefeedOptions[OptSink] = sql.KVStringOptRequireValue
return alterChangefeedOptions
}()

0 comments on commit 067f213

Please sign in to comment.