Skip to content

Commit

Permalink
changefeedccl: support a CSV format for changefeeds
Browse files Browse the repository at this point in the history
In this PR, we introduce a new CSV format for changefeeds.
Note that this format is only supported with the
initial_scan='only' option. For instance, one can now
execute:

CREATE CHANGEFEED FOR foo WITH format=csv, initial_scan='only'

Release note (enterprise change): Support a CSV format for
changefeeds. Only works with initial_scan='only', and
does not work with diff/resolved options.
  • Loading branch information
Sherman Grewal committed Apr 26, 2022
1 parent a43afc6 commit f7ca7b1
Show file tree
Hide file tree
Showing 13 changed files with 1,121 additions and 648 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ go_library(
"changefeed_stmt.go",
"doc.go",
"encoder.go",
"encoder_avro.go",
"encoder_csv.go",
"encoder_json.go",
"metrics.go",
"name.go",
"rowfetcher_cache.go",
Expand Down Expand Up @@ -87,6 +90,7 @@ go_library(
"//pkg/util/ctxgroup",
"//pkg/util/duration",
"//pkg/util/encoding",
"//pkg/util/encoding/csv",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/httputil",
Expand Down
30 changes: 18 additions & 12 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,10 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
// the job gets restarted.
details.Opts = map[string]string{}
}
initialScanType, err := initialScanTypeFromOpts(details.Opts)
if err != nil {
return jobspb.ChangefeedDetails{}, err
}
{
const opt = changefeedbase.OptResolvedTimestamps
if o, ok := details.Opts[opt]; ok && o != `` {
Expand Down Expand Up @@ -820,6 +824,13 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
switch v := changefeedbase.FormatType(details.Opts[opt]); v {
case ``, changefeedbase.OptFormatJSON:
details.Opts[opt] = string(changefeedbase.OptFormatJSON)
case changefeedbase.OptFormatCSV:
if initialScanType != changefeedbase.OnlyInitialScan {
return jobspb.ChangefeedDetails{}, errors.Errorf(
`%s=%s is only usable with %s='only'`,
changefeedbase.OptFormat, changefeedbase.OptFormatCSV, changefeedbase.OptInitialScan)
}
details.Opts[opt] = string(changefeedbase.OptFormatCSV)
case changefeedbase.OptFormatAvro, changefeedbase.DeprecatedOptFormatAvro:
// No-op.
default:
Expand Down Expand Up @@ -854,18 +865,13 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
}
}
{
initialScanType := details.Opts[changefeedbase.OptInitialScan]
_, onlyInitialScan := details.Opts[changefeedbase.OptInitialScanOnly]
_, endTime := details.Opts[changefeedbase.OptEndTime]
if endTime && onlyInitialScan {
return jobspb.ChangefeedDetails{}, errors.Errorf(
`cannot specify both %s and %s`, changefeedbase.OptInitialScanOnly,
changefeedbase.OptEndTime)
}

if strings.ToLower(initialScanType) == `only` && endTime {
return jobspb.ChangefeedDetails{}, errors.Errorf(
`cannot specify both %s='only' and %s`, changefeedbase.OptInitialScan, changefeedbase.OptEndTime)
if initialScanType == changefeedbase.OnlyInitialScan {
for opt := range changefeedbase.InitialScanOnlyUnsupportedOptions {
if _, ok := details.Opts[opt]; ok {
return jobspb.ChangefeedDetails{}, errors.Errorf(
`cannot specify both %s='only' and %s`, changefeedbase.OptInitialScan, opt)
}
}
}

if !details.EndTime.IsEmpty() && details.EndTime.Less(details.StatementTime) {
Expand Down
172 changes: 170 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3775,11 +3775,11 @@ func TestChangefeedErrors(t *testing.T) {

// WITH only_initial_scan and end_time disallowed
sqlDB.ExpectErr(
t, `cannot specify both initial_scan_only and end_time`,
t, `cannot specify both initial_scan='only' and end_time`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH initial_scan_only, end_time = '1'`, `kafka://nope`,
)
sqlDB.ExpectErr(
t, `cannot specify both initial_scan_only and end_time`,
t, `cannot specify both initial_scan='only' and end_time`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH end_time = '1', initial_scan_only`, `kafka://nope`,
)

Expand All @@ -3792,6 +3792,26 @@ func TestChangefeedErrors(t *testing.T) {
`CREATE CHANGEFEED FOR foo INTO $1 WITH initial_scan = 'only', end_time = '1'`, `kafka://nope`,
)

sqlDB.ExpectErr(
t, `cannot specify both initial_scan='only' and resolved`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH resolved, initial_scan = 'only'`, `kafka://nope`,
)

sqlDB.ExpectErr(
t, `cannot specify both initial_scan='only' and diff`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH diff, initial_scan = 'only'`, `kafka://nope`,
)

sqlDB.ExpectErr(
t, `cannot specify both initial_scan='only' and mvcc_timestamp`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH mvcc_timestamp, initial_scan = 'only'`, `kafka://nope`,
)

sqlDB.ExpectErr(
t, `cannot specify both initial_scan='only' and updated`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH updated, initial_scan = 'only'`, `kafka://nope`,
)

sqlDB.ExpectErr(
t, `unknown initial_scan: foo`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH initial_scan = 'foo'`, `kafka://nope`,
Expand All @@ -3805,6 +3825,11 @@ func TestChangefeedErrors(t *testing.T) {
`CREATE CHANGEFEED FOR foo INTO $1 WITH initial_scan = 'no', initial_scan_only`, `kafka://nope`,
)

sqlDB.ExpectErr(
t, `format=csv is only usable with initial_scan='only'`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH format = csv`, `kafka://nope`,
)

var tsCurrent string
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&tsCurrent)

Expand Down Expand Up @@ -5835,6 +5860,149 @@ func TestChangefeedOnlyInitialScan(t *testing.T) {
t.Run(`kafka`, kafkaTest(testFn))
}

func TestChangefeedOnlyInitialScanCSV(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tests := map[string]struct {
changefeedStmt string
expectedPayload []string
}{
`initial scan only with csv`: {
changefeedStmt: `CREATE CHANGEFEED FOR foo WITH initial_scan_only, format = csv`,
expectedPayload: []string{
`1,'Alice'`,
`2,'Bob'`,
`3,'Carol'`,
},
},
`initial backfill only with csv`: {
changefeedStmt: `CREATE CHANGEFEED FOR foo WITH initial_scan = 'only', format = csv`,
expectedPayload: []string{
`1,'Alice'`,
`2,'Bob'`,
`3,'Carol'`,
},
},
`initial backfill only with csv multiple tables`: {
changefeedStmt: `CREATE CHANGEFEED FOR foo, bar WITH initial_scan = 'only', format = csv`,
expectedPayload: []string{
`1,'a'`,
`2,'b'`,
`3,'c'`,
`1,'Alice'`,
`2,'Bob'`,
`3,'Carol'`,
},
},
}

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
sqlDB.Exec(t, "CREATE TABLE foo (id INT PRIMARY KEY, name STRING)")
sqlDB.Exec(t, "CREATE TABLE bar (id INT PRIMARY KEY, name STRING)")

sqlDB.Exec(t, "INSERT INTO foo VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carol')")
sqlDB.Exec(t, "INSERT INTO bar VALUES (1, 'a'), (2, 'b'), (3, 'c')")

feed := feed(t, f, testData.changefeedStmt)

sqlDB.Exec(t, "INSERT INTO foo VALUES (4, 'Doug'), (5, 'Elaine'), (6, 'Fred')")
sqlDB.Exec(t, "INSERT INTO bar VALUES (4, 'd'), (5, 'e'), (6, 'f')")

var actualMessages []string
g := ctxgroup.WithContext(context.Background())
g.Go(func() error {
for {
m, err := feed.Next()
if err != nil {
return err
}
actualMessages = append(actualMessages, string(m.Value))
}
})
defer func() {
closeFeed(t, feed)
sqlDB.Exec(t, `DROP TABLE foo`)
sqlDB.Exec(t, `DROP TABLE bar`)
_ = g.Wait()
require.Equal(t, len(testData.expectedPayload), len(actualMessages))
sort.Strings(testData.expectedPayload)
sort.Strings(actualMessages)
for i := range testData.expectedPayload {
require.Equal(t, testData.expectedPayload[i], actualMessages[i])
}
}()

jobFeed := feed.(cdctest.EnterpriseTestFeed)
require.NoError(t, jobFeed.WaitForStatus(func(s jobs.Status) bool {
return s == jobs.StatusSucceeded
}))
})
}
}
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`cloudstorage`, cloudStorageTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
t.Run(`webhook`, webhookTest(testFn))
t.Run(`pubsub`, pubsubTest(testFn))
}

func TestChangefeedOnlyInitialScanCSVSinkless(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

initialScanOnlyCSVTests := map[string]string{
`initial scan only with csv`: `CREATE CHANGEFEED FOR foo WITH initial_scan_only, format = csv`,
`initial backfill only with csv`: `CREATE CHANGEFEED FOR foo WITH initial_scan = 'only', format = csv`,
}

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)

for testName, changefeedStmt := range initialScanOnlyCSVTests {
t.Run(testName, func(t *testing.T) {
sqlDB.Exec(t, "CREATE TABLE foo (id INT PRIMARY KEY, name STRING)")
sqlDB.Exec(t, "INSERT INTO foo VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carol')")

feed := feed(t, f, changefeedStmt)

sqlDB.Exec(t, "INSERT INTO foo VALUES (4, 'Doug'), (5, 'Elaine'), (6, 'Fred')")

expectedMessages := []string{
`1,'Alice'`,
`2,'Bob'`,
`3,'Carol'`,
}
var actualMessages []string

defer func() {
closeFeed(t, feed)
sqlDB.Exec(t, `DROP TABLE foo`)
require.Equal(t, len(expectedMessages), len(actualMessages))
sort.Strings(expectedMessages)
sort.Strings(actualMessages)
for i := range expectedMessages {
require.Equal(t, expectedMessages[i], actualMessages[i])
}
}()

for {
m, err := feed.Next()
if err != nil || m == nil {
break
}
actualMessages = append(actualMessages, string(m.Value))
}
})
}
}
t.Run(`sinkless`, sinklessTest(testFn))
}

func startMonitorWithBudget(budget int64) *mon.BytesMonitor {
mm := mon.NewMonitorWithLimit(
"test-mm", mon.MemoryResource, budget,
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ const (

OptFormatJSON FormatType = `json`
OptFormatAvro FormatType = `avro`
OptFormatCSV FormatType = `csv`

OptFormatNative FormatType = `native`

Expand Down Expand Up @@ -259,6 +260,11 @@ var NoLongerExperimental = map[string]string{
DeprecatedSinkSchemeCloudStorageS3: SinkSchemeCloudStorageS3,
}

// InitialScanOnlyUnsupportedOptions is options that are not supported with the
// initial scan only option
var InitialScanOnlyUnsupportedOptions = makeStringSet(OptEndTime, OptResolvedTimestamps, OptDiff,
OptMVCCTimestamps, OptUpdatedTimestamps)

// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow
// users to alter.
// TODO(sherman): At the moment we disallow altering both the initial_scan_only
Expand Down
Loading

0 comments on commit f7ca7b1

Please sign in to comment.