From f7ca7b185675d98b385c1cb71835a2bb817774a2 Mon Sep 17 00:00:00 2001 From: Sherman Grewal Date: Tue, 26 Apr 2022 11:39:26 -0400 Subject: [PATCH] changefeedccl: support a CSV format for changefeeds In this PR, we introduce a new CSV format for changefeeds. Note that this format is only supported with the initial_scan='only' option. For instance, one can now execute: CREATE CHANGEFEED FOR foo WITH format=csv, initial_scan='only' Release note (enterprise change): Support a CSV format for changefeeds. Only works with initial_scan='only', and does not work with diff/resolved options. --- pkg/ccl/changefeedccl/BUILD.bazel | 4 + pkg/ccl/changefeedccl/changefeed_stmt.go | 30 +- pkg/ccl/changefeedccl/changefeed_test.go | 172 +++++- .../changefeedccl/changefeedbase/options.go | 6 + pkg/ccl/changefeedccl/encoder.go | 578 +----------------- pkg/ccl/changefeedccl/encoder_avro.go | 328 ++++++++++ pkg/ccl/changefeedccl/encoder_csv.go | 137 +++++ pkg/ccl/changefeedccl/encoder_json.go | 289 +++++++++ pkg/ccl/changefeedccl/sink_cloudstorage.go | 5 + pkg/ccl/changefeedccl/sink_pubsub.go | 39 +- pkg/ccl/changefeedccl/sink_webhook.go | 49 +- pkg/ccl/changefeedccl/testfeed_test.go | 112 ++-- pkg/util/encoding/csv/writer.go | 20 +- 13 files changed, 1121 insertions(+), 648 deletions(-) create mode 100644 pkg/ccl/changefeedccl/encoder_avro.go create mode 100644 pkg/ccl/changefeedccl/encoder_csv.go create mode 100644 pkg/ccl/changefeedccl/encoder_json.go diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 652853d0ff1d..a761edd258d4 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -11,6 +11,9 @@ go_library( "changefeed_stmt.go", "doc.go", "encoder.go", + "encoder_avro.go", + "encoder_csv.go", + "encoder_json.go", "metrics.go", "name.go", "rowfetcher_cache.go", @@ -87,6 +90,7 @@ go_library( "//pkg/util/ctxgroup", "//pkg/util/duration", "//pkg/util/encoding", + "//pkg/util/encoding/csv", "//pkg/util/envutil", "//pkg/util/hlc", "//pkg/util/httputil", diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index ab1c3ee1e2c3..57c01154e61a 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -759,6 +759,10 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails // the job gets restarted. details.Opts = map[string]string{} } + initialScanType, err := initialScanTypeFromOpts(details.Opts) + if err != nil { + return jobspb.ChangefeedDetails{}, err + } { const opt = changefeedbase.OptResolvedTimestamps if o, ok := details.Opts[opt]; ok && o != `` { @@ -820,6 +824,13 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails switch v := changefeedbase.FormatType(details.Opts[opt]); v { case ``, changefeedbase.OptFormatJSON: details.Opts[opt] = string(changefeedbase.OptFormatJSON) + case changefeedbase.OptFormatCSV: + if initialScanType != changefeedbase.OnlyInitialScan { + return jobspb.ChangefeedDetails{}, errors.Errorf( + `%s=%s is only usable with %s='only'`, + changefeedbase.OptFormat, changefeedbase.OptFormatCSV, changefeedbase.OptInitialScan) + } + details.Opts[opt] = string(changefeedbase.OptFormatCSV) case changefeedbase.OptFormatAvro, changefeedbase.DeprecatedOptFormatAvro: // No-op. default: @@ -854,18 +865,13 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails } } { - initialScanType := details.Opts[changefeedbase.OptInitialScan] - _, onlyInitialScan := details.Opts[changefeedbase.OptInitialScanOnly] - _, endTime := details.Opts[changefeedbase.OptEndTime] - if endTime && onlyInitialScan { - return jobspb.ChangefeedDetails{}, errors.Errorf( - `cannot specify both %s and %s`, changefeedbase.OptInitialScanOnly, - changefeedbase.OptEndTime) - } - - if strings.ToLower(initialScanType) == `only` && endTime { - return jobspb.ChangefeedDetails{}, errors.Errorf( - `cannot specify both %s='only' and %s`, changefeedbase.OptInitialScan, changefeedbase.OptEndTime) + if initialScanType == changefeedbase.OnlyInitialScan { + for opt := range changefeedbase.InitialScanOnlyUnsupportedOptions { + if _, ok := details.Opts[opt]; ok { + return jobspb.ChangefeedDetails{}, errors.Errorf( + `cannot specify both %s='only' and %s`, changefeedbase.OptInitialScan, opt) + } + } } if !details.EndTime.IsEmpty() && details.EndTime.Less(details.StatementTime) { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 37a68162e94a..d4e820744446 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -3775,11 +3775,11 @@ func TestChangefeedErrors(t *testing.T) { // WITH only_initial_scan and end_time disallowed sqlDB.ExpectErr( - t, `cannot specify both initial_scan_only and end_time`, + t, `cannot specify both initial_scan='only' and end_time`, `CREATE CHANGEFEED FOR foo INTO $1 WITH initial_scan_only, end_time = '1'`, `kafka://nope`, ) sqlDB.ExpectErr( - t, `cannot specify both initial_scan_only and end_time`, + t, `cannot specify both initial_scan='only' and end_time`, `CREATE CHANGEFEED FOR foo INTO $1 WITH end_time = '1', initial_scan_only`, `kafka://nope`, ) @@ -3792,6 +3792,26 @@ func TestChangefeedErrors(t *testing.T) { `CREATE CHANGEFEED FOR foo INTO $1 WITH initial_scan = 'only', end_time = '1'`, `kafka://nope`, ) + sqlDB.ExpectErr( + t, `cannot specify both initial_scan='only' and resolved`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH resolved, initial_scan = 'only'`, `kafka://nope`, + ) + + sqlDB.ExpectErr( + t, `cannot specify both initial_scan='only' and diff`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH diff, initial_scan = 'only'`, `kafka://nope`, + ) + + sqlDB.ExpectErr( + t, `cannot specify both initial_scan='only' and mvcc_timestamp`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH mvcc_timestamp, initial_scan = 'only'`, `kafka://nope`, + ) + + sqlDB.ExpectErr( + t, `cannot specify both initial_scan='only' and updated`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH updated, initial_scan = 'only'`, `kafka://nope`, + ) + sqlDB.ExpectErr( t, `unknown initial_scan: foo`, `CREATE CHANGEFEED FOR foo INTO $1 WITH initial_scan = 'foo'`, `kafka://nope`, @@ -3805,6 +3825,11 @@ func TestChangefeedErrors(t *testing.T) { `CREATE CHANGEFEED FOR foo INTO $1 WITH initial_scan = 'no', initial_scan_only`, `kafka://nope`, ) + sqlDB.ExpectErr( + t, `format=csv is only usable with initial_scan='only'`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH format = csv`, `kafka://nope`, + ) + var tsCurrent string sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&tsCurrent) @@ -5835,6 +5860,149 @@ func TestChangefeedOnlyInitialScan(t *testing.T) { t.Run(`kafka`, kafkaTest(testFn)) } +func TestChangefeedOnlyInitialScanCSV(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + tests := map[string]struct { + changefeedStmt string + expectedPayload []string + }{ + `initial scan only with csv`: { + changefeedStmt: `CREATE CHANGEFEED FOR foo WITH initial_scan_only, format = csv`, + expectedPayload: []string{ + `1,'Alice'`, + `2,'Bob'`, + `3,'Carol'`, + }, + }, + `initial backfill only with csv`: { + changefeedStmt: `CREATE CHANGEFEED FOR foo WITH initial_scan = 'only', format = csv`, + expectedPayload: []string{ + `1,'Alice'`, + `2,'Bob'`, + `3,'Carol'`, + }, + }, + `initial backfill only with csv multiple tables`: { + changefeedStmt: `CREATE CHANGEFEED FOR foo, bar WITH initial_scan = 'only', format = csv`, + expectedPayload: []string{ + `1,'a'`, + `2,'b'`, + `3,'c'`, + `1,'Alice'`, + `2,'Bob'`, + `3,'Carol'`, + }, + }, + } + + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + sqlDB.Exec(t, "CREATE TABLE foo (id INT PRIMARY KEY, name STRING)") + sqlDB.Exec(t, "CREATE TABLE bar (id INT PRIMARY KEY, name STRING)") + + sqlDB.Exec(t, "INSERT INTO foo VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carol')") + sqlDB.Exec(t, "INSERT INTO bar VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + feed := feed(t, f, testData.changefeedStmt) + + sqlDB.Exec(t, "INSERT INTO foo VALUES (4, 'Doug'), (5, 'Elaine'), (6, 'Fred')") + sqlDB.Exec(t, "INSERT INTO bar VALUES (4, 'd'), (5, 'e'), (6, 'f')") + + var actualMessages []string + g := ctxgroup.WithContext(context.Background()) + g.Go(func() error { + for { + m, err := feed.Next() + if err != nil { + return err + } + actualMessages = append(actualMessages, string(m.Value)) + } + }) + defer func() { + closeFeed(t, feed) + sqlDB.Exec(t, `DROP TABLE foo`) + sqlDB.Exec(t, `DROP TABLE bar`) + _ = g.Wait() + require.Equal(t, len(testData.expectedPayload), len(actualMessages)) + sort.Strings(testData.expectedPayload) + sort.Strings(actualMessages) + for i := range testData.expectedPayload { + require.Equal(t, testData.expectedPayload[i], actualMessages[i]) + } + }() + + jobFeed := feed.(cdctest.EnterpriseTestFeed) + require.NoError(t, jobFeed.WaitForStatus(func(s jobs.Status) bool { + return s == jobs.StatusSucceeded + })) + }) + } + } + t.Run(`enterprise`, enterpriseTest(testFn)) + t.Run(`cloudstorage`, cloudStorageTest(testFn)) + t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) + t.Run(`pubsub`, pubsubTest(testFn)) +} + +func TestChangefeedOnlyInitialScanCSVSinkless(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + initialScanOnlyCSVTests := map[string]string{ + `initial scan only with csv`: `CREATE CHANGEFEED FOR foo WITH initial_scan_only, format = csv`, + `initial backfill only with csv`: `CREATE CHANGEFEED FOR foo WITH initial_scan = 'only', format = csv`, + } + + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) + + for testName, changefeedStmt := range initialScanOnlyCSVTests { + t.Run(testName, func(t *testing.T) { + sqlDB.Exec(t, "CREATE TABLE foo (id INT PRIMARY KEY, name STRING)") + sqlDB.Exec(t, "INSERT INTO foo VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carol')") + + feed := feed(t, f, changefeedStmt) + + sqlDB.Exec(t, "INSERT INTO foo VALUES (4, 'Doug'), (5, 'Elaine'), (6, 'Fred')") + + expectedMessages := []string{ + `1,'Alice'`, + `2,'Bob'`, + `3,'Carol'`, + } + var actualMessages []string + + defer func() { + closeFeed(t, feed) + sqlDB.Exec(t, `DROP TABLE foo`) + require.Equal(t, len(expectedMessages), len(actualMessages)) + sort.Strings(expectedMessages) + sort.Strings(actualMessages) + for i := range expectedMessages { + require.Equal(t, expectedMessages[i], actualMessages[i]) + } + }() + + for { + m, err := feed.Next() + if err != nil || m == nil { + break + } + actualMessages = append(actualMessages, string(m.Value)) + } + }) + } + } + t.Run(`sinkless`, sinklessTest(testFn)) +} + func startMonitorWithBudget(budget int64) *mon.BytesMonitor { mm := mon.NewMonitorWithLimit( "test-mm", mon.MemoryResource, budget, diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 4905a39abe78..5f758a152b3c 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -118,6 +118,7 @@ const ( OptFormatJSON FormatType = `json` OptFormatAvro FormatType = `avro` + OptFormatCSV FormatType = `csv` OptFormatNative FormatType = `native` @@ -259,6 +260,11 @@ var NoLongerExperimental = map[string]string{ DeprecatedSinkSchemeCloudStorageS3: SinkSchemeCloudStorageS3, } +// InitialScanOnlyUnsupportedOptions is options that are not supported with the +// initial scan only option +var InitialScanOnlyUnsupportedOptions = makeStringSet(OptEndTime, OptResolvedTimestamps, OptDiff, + OptMVCCTimestamps, OptUpdatedTimestamps) + // AlterChangefeedUnsupportedOptions are changefeed options that we do not allow // users to alter. // TODO(sherman): At the moment we disallow altering both the initial_scan_only diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index 749bb0b05741..5c046c59d445 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -9,32 +9,18 @@ package changefeedccl import ( - "bytes" "context" - "encoding/binary" - gojson "encoding/json" - "fmt" - "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" - "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" ) -const ( - confluentSubjectSuffixKey = `-key` - confluentSubjectSuffixValue = `-value` -) - // encodeRow holds all the pieces necessary to encode a row change into a key or // value. type encodeRow struct { @@ -99,573 +85,13 @@ func getEncoder( return newConfluentAvroEncoder(opts, targets) case changefeedbase.OptFormatNative: return &nativeEncoder{}, nil + case changefeedbase.OptFormatCSV: + return newCSVEncoder(opts), nil default: return nil, errors.Errorf(`unknown %s: %s`, changefeedbase.OptFormat, opts[changefeedbase.OptFormat]) } } -// jsonEncoder encodes changefeed entries as JSON. Keys are the primary key -// columns in a JSON array. Values are a JSON object mapping every column name -// to its value. Updated timestamps in rows and resolved timestamp payloads are -// stored in a sub-object under the `__crdb__` key in the top-level JSON object. -type jsonEncoder struct { - updatedField, mvccTimestampField, beforeField, wrapped, keyOnly, keyInValue, topicInValue bool - - targets []jobspb.ChangefeedTargetSpecification - alloc tree.DatumAlloc - buf bytes.Buffer - virtualColumnVisibility string - - // columnMapCache caches the TableColMap for the latest version of the - // table descriptor thus far seen. It avoids the need to recompute the - // map per row, which, prior to the change introducing this cache, could - // amount for 10% of row processing time. - columnMapCache map[descpb.ID]*tableColumnMapCacheEntry -} - -// tableColumnMapCacheEntry stores a TableColMap for a given descriptor version. -type tableColumnMapCacheEntry struct { - version descpb.DescriptorVersion - catalog.TableColMap -} - -var _ Encoder = &jsonEncoder{} - -func makeJSONEncoder( - opts map[string]string, targets []jobspb.ChangefeedTargetSpecification, -) (*jsonEncoder, error) { - e := &jsonEncoder{ - targets: targets, - keyOnly: changefeedbase.EnvelopeType(opts[changefeedbase.OptEnvelope]) == changefeedbase.OptEnvelopeKeyOnly, - wrapped: changefeedbase.EnvelopeType(opts[changefeedbase.OptEnvelope]) == changefeedbase.OptEnvelopeWrapped, - virtualColumnVisibility: opts[changefeedbase.OptVirtualColumns], - columnMapCache: map[descpb.ID]*tableColumnMapCacheEntry{}, - } - _, e.updatedField = opts[changefeedbase.OptUpdatedTimestamps] - _, e.mvccTimestampField = opts[changefeedbase.OptMVCCTimestamps] - _, e.beforeField = opts[changefeedbase.OptDiff] - if e.beforeField && !e.wrapped { - return nil, errors.Errorf(`%s is only usable with %s=%s`, - changefeedbase.OptDiff, changefeedbase.OptEnvelope, changefeedbase.OptEnvelopeWrapped) - } - _, e.keyInValue = opts[changefeedbase.OptKeyInValue] - if e.keyInValue && !e.wrapped { - return nil, errors.Errorf(`%s is only usable with %s=%s`, - changefeedbase.OptKeyInValue, changefeedbase.OptEnvelope, changefeedbase.OptEnvelopeWrapped) - } - _, e.topicInValue = opts[changefeedbase.OptTopicInValue] - if e.topicInValue && !e.wrapped { - return nil, errors.Errorf(`%s is only usable with %s=%s`, - changefeedbase.OptTopicInValue, changefeedbase.OptEnvelope, changefeedbase.OptEnvelopeWrapped) - } - return e, nil -} - -// EncodeKey implements the Encoder interface. -func (e *jsonEncoder) EncodeKey(_ context.Context, row encodeRow) ([]byte, error) { - jsonEntries, err := e.encodeKeyRaw(row) - if err != nil { - return nil, err - } - j, err := json.MakeJSON(jsonEntries) - if err != nil { - return nil, err - } - e.buf.Reset() - j.Format(&e.buf) - return e.buf.Bytes(), nil -} - -func (e *jsonEncoder) encodeKeyRaw(row encodeRow) ([]interface{}, error) { - colIdxByID := e.getTableColMap(row.tableDesc) - primaryIndex := row.tableDesc.GetPrimaryIndex() - jsonEntries := make([]interface{}, primaryIndex.NumKeyColumns()) - for i := 0; i < primaryIndex.NumKeyColumns(); i++ { - colID := primaryIndex.GetKeyColumnID(i) - idx, ok := colIdxByID.Get(colID) - if !ok { - return nil, errors.Errorf(`unknown column id: %d`, colID) - } - datum, col := row.datums[idx], row.tableDesc.PublicColumns()[idx] - if err := datum.EnsureDecoded(col.GetType(), &e.alloc); err != nil { - return nil, err - } - var err error - jsonEntries[i], err = tree.AsJSON( - datum.Datum, - sessiondatapb.DataConversionConfig{}, - time.UTC, - ) - if err != nil { - return nil, err - } - } - return jsonEntries, nil -} - -// EncodeValue implements the Encoder interface. -func (e *jsonEncoder) EncodeValue(_ context.Context, row encodeRow) ([]byte, error) { - if e.keyOnly || (!e.wrapped && row.deleted) { - return nil, nil - } - - var after map[string]interface{} - if !row.deleted { - family, err := row.tableDesc.FindFamilyByID(row.familyID) - if err != nil { - return nil, err - } - include := make(map[descpb.ColumnID]struct{}, len(family.ColumnIDs)) - var yes struct{} - for _, colID := range family.ColumnIDs { - include[colID] = yes - } - after = make(map[string]interface{}) - for i, col := range row.tableDesc.PublicColumns() { - _, inFamily := include[col.GetID()] - virtual := col.IsVirtual() && e.virtualColumnVisibility == string(changefeedbase.OptVirtualColumnsNull) - if inFamily || virtual { - datum := row.datums[i] - if err := datum.EnsureDecoded(col.GetType(), &e.alloc); err != nil { - return nil, err - } - after[col.GetName()], err = tree.AsJSON( - datum.Datum, - sessiondatapb.DataConversionConfig{}, - time.UTC, - ) - if err != nil { - return nil, err - } - } - } - } - - var before map[string]interface{} - if row.prevDatums != nil && !row.prevDeleted { - family, err := row.prevTableDesc.FindFamilyByID(row.prevFamilyID) - if err != nil { - return nil, err - } - include := make(map[descpb.ColumnID]struct{}) - var yes struct{} - for _, colID := range family.ColumnIDs { - include[colID] = yes - } - before = make(map[string]interface{}) - for i, col := range row.prevTableDesc.PublicColumns() { - _, inFamily := include[col.GetID()] - virtual := col.IsVirtual() && e.virtualColumnVisibility == string(changefeedbase.OptVirtualColumnsNull) - if inFamily || virtual { - datum := row.prevDatums[i] - if err := datum.EnsureDecoded(col.GetType(), &e.alloc); err != nil { - return nil, err - } - before[col.GetName()], err = tree.AsJSON( - datum.Datum, - sessiondatapb.DataConversionConfig{}, - time.UTC, - ) - if err != nil { - return nil, err - } - } - } - } - - var jsonEntries map[string]interface{} - if e.wrapped { - if after != nil { - jsonEntries = map[string]interface{}{`after`: after} - } else { - jsonEntries = map[string]interface{}{`after`: nil} - } - if e.beforeField { - if before != nil { - jsonEntries[`before`] = before - } else { - jsonEntries[`before`] = nil - } - } - if e.keyInValue { - keyEntries, err := e.encodeKeyRaw(row) - if err != nil { - return nil, err - } - jsonEntries[`key`] = keyEntries - } - if e.topicInValue { - jsonEntries[`topic`] = row.topic - } - } else { - jsonEntries = after - } - - if e.updatedField || e.mvccTimestampField { - var meta map[string]interface{} - if e.wrapped { - meta = jsonEntries - } else { - meta = make(map[string]interface{}, 1) - jsonEntries[jsonMetaSentinel] = meta - } - if e.updatedField { - meta[`updated`] = row.updated.AsOfSystemTime() - } - if e.mvccTimestampField { - meta[`mvcc_timestamp`] = row.mvccTimestamp.AsOfSystemTime() - } - } - - j, err := json.MakeJSON(jsonEntries) - if err != nil { - return nil, err - } - e.buf.Reset() - j.Format(&e.buf) - return e.buf.Bytes(), nil -} - -// EncodeResolvedTimestamp implements the Encoder interface. -func (e *jsonEncoder) EncodeResolvedTimestamp( - _ context.Context, _ string, resolved hlc.Timestamp, -) ([]byte, error) { - meta := map[string]interface{}{ - `resolved`: tree.TimestampToDecimalDatum(resolved).Decimal.String(), - } - var jsonEntries interface{} - if e.wrapped { - jsonEntries = meta - } else { - jsonEntries = map[string]interface{}{ - jsonMetaSentinel: meta, - } - } - return gojson.Marshal(jsonEntries) -} - -// getTableColMap gets the TableColMap for the provided table descriptor, -// optionally consulting its cache. -func (e *jsonEncoder) getTableColMap(desc catalog.TableDescriptor) catalog.TableColMap { - ce, exists := e.columnMapCache[desc.GetID()] - if exists { - switch { - case ce.version == desc.GetVersion(): - return ce.TableColMap - case ce.version > desc.GetVersion(): - return catalog.ColumnIDToOrdinalMap(desc.PublicColumns()) - default: - // Construct a new entry. - delete(e.columnMapCache, desc.GetID()) - } - } - ce = &tableColumnMapCacheEntry{ - version: desc.GetVersion(), - TableColMap: catalog.ColumnIDToOrdinalMap(desc.PublicColumns()), - } - e.columnMapCache[desc.GetID()] = ce - return ce.TableColMap -} - -// confluentAvroEncoder encodes changefeed entries as Avro's binary or textual -// JSON format. Keys are the primary key columns in a record. Values are all -// columns in a record. -type confluentAvroEncoder struct { - schemaRegistry schemaRegistry - schemaPrefix string - updatedField, beforeField, keyOnly bool - virtualColumnVisibility string - targets []jobspb.ChangefeedTargetSpecification - - keyCache *cache.UnorderedCache // [tableIDAndVersion]confluentRegisteredKeySchema - valueCache *cache.UnorderedCache // [tableIDAndVersionPair]confluentRegisteredEnvelopeSchema - - // resolvedCache doesn't need to be bounded like the other caches because the number of topics - // is fixed per changefeed. - resolvedCache map[string]confluentRegisteredEnvelopeSchema -} - -type tableIDAndVersion struct { - tableID descpb.ID - version descpb.DescriptorVersion - familyID descpb.FamilyID -} -type tableIDAndVersionPair [2]tableIDAndVersion // [before, after] - -type confluentRegisteredKeySchema struct { - schema *avroDataRecord - registryID int32 -} - -type confluentRegisteredEnvelopeSchema struct { - schema *avroEnvelopeRecord - registryID int32 -} - -var _ Encoder = &confluentAvroEncoder{} - -var encoderCacheConfig = cache.Config{ - Policy: cache.CacheFIFO, - // TODO: If we find ourselves thrashing here in changefeeds on many tables, - // we can improve performance by eagerly evicting versions using Resolved notifications. - // An old version with a timestamp entirely before a notification can be safely evicted. - ShouldEvict: func(size int, _ interface{}, _ interface{}) bool { return size > 1024 }, -} - -func newConfluentAvroEncoder( - opts map[string]string, targets []jobspb.ChangefeedTargetSpecification, -) (*confluentAvroEncoder, error) { - e := &confluentAvroEncoder{ - schemaPrefix: opts[changefeedbase.OptAvroSchemaPrefix], - targets: targets, - virtualColumnVisibility: opts[changefeedbase.OptVirtualColumns], - } - - switch opts[changefeedbase.OptEnvelope] { - case string(changefeedbase.OptEnvelopeKeyOnly): - e.keyOnly = true - case string(changefeedbase.OptEnvelopeWrapped): - default: - return nil, errors.Errorf(`%s=%s is not supported with %s=%s`, - changefeedbase.OptEnvelope, opts[changefeedbase.OptEnvelope], changefeedbase.OptFormat, changefeedbase.OptFormatAvro) - } - _, e.updatedField = opts[changefeedbase.OptUpdatedTimestamps] - if e.updatedField && e.keyOnly { - return nil, errors.Errorf(`%s is only usable with %s=%s`, - changefeedbase.OptUpdatedTimestamps, changefeedbase.OptEnvelope, changefeedbase.OptEnvelopeWrapped) - } - _, e.beforeField = opts[changefeedbase.OptDiff] - if e.beforeField && e.keyOnly { - return nil, errors.Errorf(`%s is only usable with %s=%s`, - changefeedbase.OptDiff, changefeedbase.OptEnvelope, changefeedbase.OptEnvelopeWrapped) - } - - if _, ok := opts[changefeedbase.OptKeyInValue]; ok { - return nil, errors.Errorf(`%s is not supported with %s=%s`, - changefeedbase.OptKeyInValue, changefeedbase.OptFormat, changefeedbase.OptFormatAvro) - } - if _, ok := opts[changefeedbase.OptTopicInValue]; ok { - return nil, errors.Errorf(`%s is not supported with %s=%s`, - changefeedbase.OptTopicInValue, changefeedbase.OptFormat, changefeedbase.OptFormatAvro) - } - if len(opts[changefeedbase.OptConfluentSchemaRegistry]) == 0 { - return nil, errors.Errorf(`WITH option %s is required for %s=%s`, - changefeedbase.OptConfluentSchemaRegistry, changefeedbase.OptFormat, changefeedbase.OptFormatAvro) - } - - reg, err := newConfluentSchemaRegistry(opts[changefeedbase.OptConfluentSchemaRegistry]) - if err != nil { - return nil, err - } - - e.schemaRegistry = reg - e.keyCache = cache.NewUnorderedCache(encoderCacheConfig) - e.valueCache = cache.NewUnorderedCache(encoderCacheConfig) - e.resolvedCache = make(map[string]confluentRegisteredEnvelopeSchema) - return e, nil -} - -// Get the raw SQL-formatted string for a table name -// and apply full_table_name and avro_schema_prefix options -func (e *confluentAvroEncoder) rawTableName( - desc catalog.TableDescriptor, familyID descpb.FamilyID, -) (string, error) { - for _, target := range e.targets { - if target.TableID == desc.GetID() { - switch target.Type { - case jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY: - return e.schemaPrefix + target.StatementTimeName, nil - case jobspb.ChangefeedTargetSpecification_EACH_FAMILY: - family, err := desc.FindFamilyByID(familyID) - if err != nil { - return "", err - } - return fmt.Sprintf("%s%s.%s", e.schemaPrefix, target.StatementTimeName, family.Name), nil - case jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY: - family, err := desc.FindFamilyByID(familyID) - if err != nil { - return "", err - } - if family.Name != target.FamilyName { - // Not the right target specification for this family - continue - } - return fmt.Sprintf("%s%s.%s", e.schemaPrefix, target.StatementTimeName, target.FamilyName), nil - default: - // fall through to error - } - return "", errors.AssertionFailedf("Found a matching target with unimplemented type %s", target.Type) - } - } - return desc.GetName(), errors.Newf("Could not find TargetSpecification for descriptor %v", desc) -} - -// EncodeKey implements the Encoder interface. -func (e *confluentAvroEncoder) EncodeKey(ctx context.Context, row encodeRow) ([]byte, error) { - // No familyID in the cache key for keys because it's the same schema for all families - cacheKey := tableIDAndVersion{tableID: row.tableDesc.GetID(), version: row.tableDesc.GetVersion()} - - var registered confluentRegisteredKeySchema - v, ok := e.keyCache.Get(cacheKey) - if ok { - registered = v.(confluentRegisteredKeySchema) - registered.schema.refreshTypeMetadata(row.tableDesc) - } else { - var err error - tableName, err := e.rawTableName(row.tableDesc, row.familyID) - if err != nil { - return nil, err - } - registered.schema, err = indexToAvroSchema(row.tableDesc, row.tableDesc.GetPrimaryIndex(), tableName, e.schemaPrefix) - if err != nil { - return nil, err - } - - // NB: This uses the kafka name escaper because it has to match the name - // of the kafka topic. - subject := SQLNameToKafkaName(tableName) + confluentSubjectSuffixKey - registered.registryID, err = e.register(ctx, ®istered.schema.avroRecord, subject) - if err != nil { - return nil, err - } - e.keyCache.Add(cacheKey, registered) - } - - // https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format - header := []byte{ - changefeedbase.ConfluentAvroWireFormatMagic, - 0, 0, 0, 0, // Placeholder for the ID. - } - binary.BigEndian.PutUint32(header[1:5], uint32(registered.registryID)) - return registered.schema.BinaryFromRow(header, row.datums) -} - -// EncodeValue implements the Encoder interface. -func (e *confluentAvroEncoder) EncodeValue(ctx context.Context, row encodeRow) ([]byte, error) { - if e.keyOnly { - return nil, nil - } - - var cacheKey tableIDAndVersionPair - if e.beforeField && row.prevTableDesc != nil { - cacheKey[0] = tableIDAndVersion{ - tableID: row.prevTableDesc.GetID(), version: row.prevTableDesc.GetVersion(), familyID: row.prevFamilyID, - } - } - cacheKey[1] = tableIDAndVersion{ - tableID: row.tableDesc.GetID(), version: row.tableDesc.GetVersion(), familyID: row.familyID, - } - - var registered confluentRegisteredEnvelopeSchema - v, ok := e.valueCache.Get(cacheKey) - if ok { - registered = v.(confluentRegisteredEnvelopeSchema) - registered.schema.after.refreshTypeMetadata(row.tableDesc) - if row.prevTableDesc != nil && registered.schema.before != nil { - registered.schema.before.refreshTypeMetadata(row.prevTableDesc) - } - } else { - var beforeDataSchema *avroDataRecord - if e.beforeField && row.prevTableDesc != nil { - var err error - beforeDataSchema, err = tableToAvroSchema(row.prevTableDesc, row.prevFamilyID, `before`, e.schemaPrefix, e.virtualColumnVisibility) - if err != nil { - return nil, err - } - } - - afterDataSchema, err := tableToAvroSchema(row.tableDesc, row.familyID, avroSchemaNoSuffix, e.schemaPrefix, e.virtualColumnVisibility) - if err != nil { - return nil, err - } - - opts := avroEnvelopeOpts{afterField: true, beforeField: e.beforeField, updatedField: e.updatedField} - name, err := e.rawTableName(row.tableDesc, row.familyID) - if err != nil { - return nil, err - } - registered.schema, err = envelopeToAvroSchema(name, opts, beforeDataSchema, afterDataSchema, e.schemaPrefix) - - if err != nil { - return nil, err - } - - // NB: This uses the kafka name escaper because it has to match the name - // of the kafka topic. - subject := SQLNameToKafkaName(name) + confluentSubjectSuffixValue - registered.registryID, err = e.register(ctx, ®istered.schema.avroRecord, subject) - if err != nil { - return nil, err - } - e.valueCache.Add(cacheKey, registered) - } - - var meta avroMetadata - if registered.schema.opts.updatedField { - meta = map[string]interface{}{ - `updated`: row.updated, - } - } - var beforeDatums, afterDatums rowenc.EncDatumRow - if row.prevDatums != nil && !row.prevDeleted { - beforeDatums = row.prevDatums - } - if !row.deleted { - afterDatums = row.datums - } - // https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format - header := []byte{ - changefeedbase.ConfluentAvroWireFormatMagic, - 0, 0, 0, 0, // Placeholder for the ID. - } - binary.BigEndian.PutUint32(header[1:5], uint32(registered.registryID)) - return registered.schema.BinaryFromRow(header, meta, beforeDatums, afterDatums) -} - -// EncodeResolvedTimestamp implements the Encoder interface. -func (e *confluentAvroEncoder) EncodeResolvedTimestamp( - ctx context.Context, topic string, resolved hlc.Timestamp, -) ([]byte, error) { - registered, ok := e.resolvedCache[topic] - if !ok { - opts := avroEnvelopeOpts{resolvedField: true} - var err error - registered.schema, err = envelopeToAvroSchema(topic, opts, nil /* before */, nil /* after */, e.schemaPrefix /* namespace */) - if err != nil { - return nil, err - } - - // NB: This uses the kafka name escaper because it has to match the name - // of the kafka topic. - subject := SQLNameToKafkaName(topic) + confluentSubjectSuffixValue - registered.registryID, err = e.register(ctx, ®istered.schema.avroRecord, subject) - if err != nil { - return nil, err - } - - e.resolvedCache[topic] = registered - } - var meta avroMetadata - if registered.schema.opts.resolvedField { - meta = map[string]interface{}{ - `resolved`: resolved, - } - } - // https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format - header := []byte{ - changefeedbase.ConfluentAvroWireFormatMagic, - 0, 0, 0, 0, // Placeholder for the ID. - } - binary.BigEndian.PutUint32(header[1:5], uint32(registered.registryID)) - return registered.schema.BinaryFromRow(header, meta, nil /* beforeRow */, nil /* afterRow */) -} - -func (e *confluentAvroEncoder) register( - ctx context.Context, schema *avroRecord, subject string, -) (int32, error) { - return e.schemaRegistry.RegisterSchemaForSubject(ctx, subject, schema.codec.Schema()) -} - // nativeEncoder only implements EncodeResolvedTimestamp. // Unfortunately, the encoder assumes that it operates with encodeRow -- something // that's just not the case when emitting raw KVs. diff --git a/pkg/ccl/changefeedccl/encoder_avro.go b/pkg/ccl/changefeedccl/encoder_avro.go new file mode 100644 index 000000000000..2ef12ff2f875 --- /dev/null +++ b/pkg/ccl/changefeedccl/encoder_avro.go @@ -0,0 +1,328 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "encoding/binary" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/util/cache" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +const ( + confluentSubjectSuffixKey = `-key` + confluentSubjectSuffixValue = `-value` +) + +// confluentAvroEncoder encodes changefeed entries as Avro's binary or textual +// JSON format. Keys are the primary key columns in a record. Values are all +// columns in a record. +type confluentAvroEncoder struct { + schemaRegistry schemaRegistry + schemaPrefix string + updatedField, beforeField, keyOnly bool + virtualColumnVisibility string + targets []jobspb.ChangefeedTargetSpecification + + keyCache *cache.UnorderedCache // [tableIDAndVersion]confluentRegisteredKeySchema + valueCache *cache.UnorderedCache // [tableIDAndVersionPair]confluentRegisteredEnvelopeSchema + + // resolvedCache doesn't need to be bounded like the other caches because the number of topics + // is fixed per changefeed. + resolvedCache map[string]confluentRegisteredEnvelopeSchema +} + +type tableIDAndVersion struct { + tableID descpb.ID + version descpb.DescriptorVersion + familyID descpb.FamilyID +} +type tableIDAndVersionPair [2]tableIDAndVersion // [before, after] + +type confluentRegisteredKeySchema struct { + schema *avroDataRecord + registryID int32 +} + +type confluentRegisteredEnvelopeSchema struct { + schema *avroEnvelopeRecord + registryID int32 +} + +var _ Encoder = &confluentAvroEncoder{} + +var encoderCacheConfig = cache.Config{ + Policy: cache.CacheFIFO, + // TODO: If we find ourselves thrashing here in changefeeds on many tables, + // we can improve performance by eagerly evicting versions using Resolved notifications. + // An old version with a timestamp entirely before a notification can be safely evicted. + ShouldEvict: func(size int, _ interface{}, _ interface{}) bool { return size > 1024 }, +} + +func newConfluentAvroEncoder( + opts map[string]string, targets []jobspb.ChangefeedTargetSpecification, +) (*confluentAvroEncoder, error) { + e := &confluentAvroEncoder{ + schemaPrefix: opts[changefeedbase.OptAvroSchemaPrefix], + targets: targets, + virtualColumnVisibility: opts[changefeedbase.OptVirtualColumns], + } + + switch opts[changefeedbase.OptEnvelope] { + case string(changefeedbase.OptEnvelopeKeyOnly): + e.keyOnly = true + case string(changefeedbase.OptEnvelopeWrapped): + default: + return nil, errors.Errorf(`%s=%s is not supported with %s=%s`, + changefeedbase.OptEnvelope, opts[changefeedbase.OptEnvelope], changefeedbase.OptFormat, changefeedbase.OptFormatAvro) + } + _, e.updatedField = opts[changefeedbase.OptUpdatedTimestamps] + if e.updatedField && e.keyOnly { + return nil, errors.Errorf(`%s is only usable with %s=%s`, + changefeedbase.OptUpdatedTimestamps, changefeedbase.OptEnvelope, changefeedbase.OptEnvelopeWrapped) + } + _, e.beforeField = opts[changefeedbase.OptDiff] + if e.beforeField && e.keyOnly { + return nil, errors.Errorf(`%s is only usable with %s=%s`, + changefeedbase.OptDiff, changefeedbase.OptEnvelope, changefeedbase.OptEnvelopeWrapped) + } + + if _, ok := opts[changefeedbase.OptKeyInValue]; ok { + return nil, errors.Errorf(`%s is not supported with %s=%s`, + changefeedbase.OptKeyInValue, changefeedbase.OptFormat, changefeedbase.OptFormatAvro) + } + if _, ok := opts[changefeedbase.OptTopicInValue]; ok { + return nil, errors.Errorf(`%s is not supported with %s=%s`, + changefeedbase.OptTopicInValue, changefeedbase.OptFormat, changefeedbase.OptFormatAvro) + } + if len(opts[changefeedbase.OptConfluentSchemaRegistry]) == 0 { + return nil, errors.Errorf(`WITH option %s is required for %s=%s`, + changefeedbase.OptConfluentSchemaRegistry, changefeedbase.OptFormat, changefeedbase.OptFormatAvro) + } + + reg, err := newConfluentSchemaRegistry(opts[changefeedbase.OptConfluentSchemaRegistry]) + if err != nil { + return nil, err + } + + e.schemaRegistry = reg + e.keyCache = cache.NewUnorderedCache(encoderCacheConfig) + e.valueCache = cache.NewUnorderedCache(encoderCacheConfig) + e.resolvedCache = make(map[string]confluentRegisteredEnvelopeSchema) + return e, nil +} + +// Get the raw SQL-formatted string for a table name +// and apply full_table_name and avro_schema_prefix options +func (e *confluentAvroEncoder) rawTableName( + desc catalog.TableDescriptor, familyID descpb.FamilyID, +) (string, error) { + for _, target := range e.targets { + if target.TableID == desc.GetID() { + switch target.Type { + case jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY: + return e.schemaPrefix + target.StatementTimeName, nil + case jobspb.ChangefeedTargetSpecification_EACH_FAMILY: + family, err := desc.FindFamilyByID(familyID) + if err != nil { + return "", err + } + return fmt.Sprintf("%s%s.%s", e.schemaPrefix, target.StatementTimeName, family.Name), nil + case jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY: + family, err := desc.FindFamilyByID(familyID) + if err != nil { + return "", err + } + if family.Name != target.FamilyName { + // Not the right target specification for this family + continue + } + return fmt.Sprintf("%s%s.%s", e.schemaPrefix, target.StatementTimeName, target.FamilyName), nil + default: + // fall through to error + } + return "", errors.AssertionFailedf("Found a matching target with unimplemented type %s", target.Type) + } + } + return desc.GetName(), errors.Newf("Could not find TargetSpecification for descriptor %v", desc) +} + +// EncodeKey implements the Encoder interface. +func (e *confluentAvroEncoder) EncodeKey(ctx context.Context, row encodeRow) ([]byte, error) { + // No familyID in the cache key for keys because it's the same schema for all families + cacheKey := tableIDAndVersion{tableID: row.tableDesc.GetID(), version: row.tableDesc.GetVersion()} + + var registered confluentRegisteredKeySchema + v, ok := e.keyCache.Get(cacheKey) + if ok { + registered = v.(confluentRegisteredKeySchema) + registered.schema.refreshTypeMetadata(row.tableDesc) + } else { + var err error + tableName, err := e.rawTableName(row.tableDesc, row.familyID) + if err != nil { + return nil, err + } + registered.schema, err = indexToAvroSchema(row.tableDesc, row.tableDesc.GetPrimaryIndex(), tableName, e.schemaPrefix) + if err != nil { + return nil, err + } + + // NB: This uses the kafka name escaper because it has to match the name + // of the kafka topic. + subject := SQLNameToKafkaName(tableName) + confluentSubjectSuffixKey + registered.registryID, err = e.register(ctx, ®istered.schema.avroRecord, subject) + if err != nil { + return nil, err + } + e.keyCache.Add(cacheKey, registered) + } + + // https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format + header := []byte{ + changefeedbase.ConfluentAvroWireFormatMagic, + 0, 0, 0, 0, // Placeholder for the ID. + } + binary.BigEndian.PutUint32(header[1:5], uint32(registered.registryID)) + return registered.schema.BinaryFromRow(header, row.datums) +} + +// EncodeValue implements the Encoder interface. +func (e *confluentAvroEncoder) EncodeValue(ctx context.Context, row encodeRow) ([]byte, error) { + if e.keyOnly { + return nil, nil + } + + var cacheKey tableIDAndVersionPair + if e.beforeField && row.prevTableDesc != nil { + cacheKey[0] = tableIDAndVersion{ + tableID: row.prevTableDesc.GetID(), version: row.prevTableDesc.GetVersion(), familyID: row.prevFamilyID, + } + } + cacheKey[1] = tableIDAndVersion{ + tableID: row.tableDesc.GetID(), version: row.tableDesc.GetVersion(), familyID: row.familyID, + } + + var registered confluentRegisteredEnvelopeSchema + v, ok := e.valueCache.Get(cacheKey) + if ok { + registered = v.(confluentRegisteredEnvelopeSchema) + registered.schema.after.refreshTypeMetadata(row.tableDesc) + if row.prevTableDesc != nil && registered.schema.before != nil { + registered.schema.before.refreshTypeMetadata(row.prevTableDesc) + } + } else { + var beforeDataSchema *avroDataRecord + if e.beforeField && row.prevTableDesc != nil { + var err error + beforeDataSchema, err = tableToAvroSchema(row.prevTableDesc, row.prevFamilyID, `before`, e.schemaPrefix, e.virtualColumnVisibility) + if err != nil { + return nil, err + } + } + + afterDataSchema, err := tableToAvroSchema(row.tableDesc, row.familyID, avroSchemaNoSuffix, e.schemaPrefix, e.virtualColumnVisibility) + if err != nil { + return nil, err + } + + opts := avroEnvelopeOpts{afterField: true, beforeField: e.beforeField, updatedField: e.updatedField} + name, err := e.rawTableName(row.tableDesc, row.familyID) + if err != nil { + return nil, err + } + registered.schema, err = envelopeToAvroSchema(name, opts, beforeDataSchema, afterDataSchema, e.schemaPrefix) + + if err != nil { + return nil, err + } + + // NB: This uses the kafka name escaper because it has to match the name + // of the kafka topic. + subject := SQLNameToKafkaName(name) + confluentSubjectSuffixValue + registered.registryID, err = e.register(ctx, ®istered.schema.avroRecord, subject) + if err != nil { + return nil, err + } + e.valueCache.Add(cacheKey, registered) + } + + var meta avroMetadata + if registered.schema.opts.updatedField { + meta = map[string]interface{}{ + `updated`: row.updated, + } + } + var beforeDatums, afterDatums rowenc.EncDatumRow + if row.prevDatums != nil && !row.prevDeleted { + beforeDatums = row.prevDatums + } + if !row.deleted { + afterDatums = row.datums + } + // https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format + header := []byte{ + changefeedbase.ConfluentAvroWireFormatMagic, + 0, 0, 0, 0, // Placeholder for the ID. + } + binary.BigEndian.PutUint32(header[1:5], uint32(registered.registryID)) + return registered.schema.BinaryFromRow(header, meta, beforeDatums, afterDatums) +} + +// EncodeResolvedTimestamp implements the Encoder interface. +func (e *confluentAvroEncoder) EncodeResolvedTimestamp( + ctx context.Context, topic string, resolved hlc.Timestamp, +) ([]byte, error) { + registered, ok := e.resolvedCache[topic] + if !ok { + opts := avroEnvelopeOpts{resolvedField: true} + var err error + registered.schema, err = envelopeToAvroSchema(topic, opts, nil /* before */, nil /* after */, e.schemaPrefix /* namespace */) + if err != nil { + return nil, err + } + + // NB: This uses the kafka name escaper because it has to match the name + // of the kafka topic. + subject := SQLNameToKafkaName(topic) + confluentSubjectSuffixValue + registered.registryID, err = e.register(ctx, ®istered.schema.avroRecord, subject) + if err != nil { + return nil, err + } + + e.resolvedCache[topic] = registered + } + var meta avroMetadata + if registered.schema.opts.resolvedField { + meta = map[string]interface{}{ + `resolved`: resolved, + } + } + // https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format + header := []byte{ + changefeedbase.ConfluentAvroWireFormatMagic, + 0, 0, 0, 0, // Placeholder for the ID. + } + binary.BigEndian.PutUint32(header[1:5], uint32(registered.registryID)) + return registered.schema.BinaryFromRow(header, meta, nil /* beforeRow */, nil /* afterRow */) +} + +func (e *confluentAvroEncoder) register( + ctx context.Context, schema *avroRecord, subject string, +) (int32, error) { + return e.schemaRegistry.RegisterSchemaForSubject(ctx, subject, schema.codec.Schema()) +} diff --git a/pkg/ccl/changefeedccl/encoder_csv.go b/pkg/ccl/changefeedccl/encoder_csv.go new file mode 100644 index 000000000000..e330f4a177a6 --- /dev/null +++ b/pkg/ccl/changefeedccl/encoder_csv.go @@ -0,0 +1,137 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "bytes" + "context" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/encoding/csv" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +type columnEntry struct { + column catalog.Column + idx int +} + +type tableEntry struct { + csvRow []string + columns []columnEntry +} + +type csvEncoder struct { + alloc tree.DatumAlloc + virtualColumnVisibility string + + buf *bytes.Buffer + writer *csv.Writer + + tableCache map[descpb.DescriptorVersion]tableEntry +} + +var _ Encoder = &csvEncoder{} + +func newCSVEncoder(opts map[string]string) *csvEncoder { + newBuf := bytes.NewBuffer([]byte{}) + newEncoder := &csvEncoder{ + virtualColumnVisibility: opts[changefeedbase.OptVirtualColumns], + buf: newBuf, + writer: csv.NewWriter(newBuf), + } + newEncoder.writer.SkipNewline = true + newEncoder.tableCache = make(map[descpb.DescriptorVersion]tableEntry) + return newEncoder +} + +func (e *csvEncoder) buildTableCacheEntry(row encodeRow) (tableEntry, error) { + family, err := row.tableDesc.FindFamilyByID(row.familyID) + if err != nil { + return tableEntry{}, err + } + + include := make(map[descpb.ColumnID]struct{}, len(family.ColumnIDs)) + var yes struct{} + for _, colID := range family.ColumnIDs { + include[colID] = yes + } + + var columnCache []columnEntry + + for i, col := range row.tableDesc.PublicColumns() { + _, inFamily := include[col.GetID()] + virtual := col.IsVirtual() && e.virtualColumnVisibility == string(changefeedbase.OptVirtualColumnsNull) + if inFamily || virtual { + columnCache = append(columnCache, columnEntry{ + column: col, + idx: i, + }) + } + } + + tableCSVRow := make([]string, 0, len(columnCache)) + + entry := tableEntry{ + csvRow: tableCSVRow, + columns: columnCache, + } + + return entry, nil +} + +// EncodeKey implements the Encoder interface. +func (e *csvEncoder) EncodeKey(_ context.Context, row encodeRow) ([]byte, error) { + return nil, nil +} + +// EncodeValue implements the Encoder interface. +func (e *csvEncoder) EncodeValue(_ context.Context, row encodeRow) ([]byte, error) { + if row.deleted { + return nil, errors.Errorf(`cannot encode deleted rows into CSV format`) + } + + var err error + rowVersion := row.tableDesc.GetVersion() + if _, ok := e.tableCache[rowVersion]; !ok { + e.tableCache[rowVersion], err = e.buildTableCacheEntry(row) + if err != nil { + return nil, err + } + } + + entry := e.tableCache[rowVersion] + entry.csvRow = entry.csvRow[:0] + + for _, colEntry := range entry.columns { + datum := row.datums[colEntry.idx] + if err := datum.EnsureDecoded(colEntry.column.GetType(), &e.alloc); err != nil { + return nil, err + } + entry.csvRow = append(entry.csvRow, tree.AsString(datum.Datum)) + } + + e.buf.Reset() + if err := e.writer.Write(entry.csvRow); err != nil { + return nil, err + } + e.writer.Flush() + return e.buf.Bytes(), nil +} + +// EncodeResolvedTimestamp implements the Encoder interface. +func (e *csvEncoder) EncodeResolvedTimestamp( + _ context.Context, _ string, resolved hlc.Timestamp, +) ([]byte, error) { + return nil, errors.New("EncodeResolvedTimestamp is not supported with the CSV encoder") +} diff --git a/pkg/ccl/changefeedccl/encoder_json.go b/pkg/ccl/changefeedccl/encoder_json.go new file mode 100644 index 000000000000..f36e3adce9cc --- /dev/null +++ b/pkg/ccl/changefeedccl/encoder_json.go @@ -0,0 +1,289 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "bytes" + "context" + gojson "encoding/json" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/json" + "github.com/cockroachdb/errors" +) + +// jsonEncoder encodes changefeed entries as JSON. Keys are the primary key +// columns in a JSON array. Values are a JSON object mapping every column name +// to its value. Updated timestamps in rows and resolved timestamp payloads are +// stored in a sub-object under the `__crdb__` key in the top-level JSON object. +type jsonEncoder struct { + updatedField, mvccTimestampField, beforeField, wrapped, keyOnly, keyInValue, topicInValue bool + + targets []jobspb.ChangefeedTargetSpecification + alloc tree.DatumAlloc + buf bytes.Buffer + virtualColumnVisibility string + + // columnMapCache caches the TableColMap for the latest version of the + // table descriptor thus far seen. It avoids the need to recompute the + // map per row, which, prior to the change introducing this cache, could + // amount for 10% of row processing time. + columnMapCache map[descpb.ID]*tableColumnMapCacheEntry +} + +// tableColumnMapCacheEntry stores a TableColMap for a given descriptor version. +type tableColumnMapCacheEntry struct { + version descpb.DescriptorVersion + catalog.TableColMap +} + +var _ Encoder = &jsonEncoder{} + +func makeJSONEncoder( + opts map[string]string, targets []jobspb.ChangefeedTargetSpecification, +) (*jsonEncoder, error) { + e := &jsonEncoder{ + targets: targets, + keyOnly: changefeedbase.EnvelopeType(opts[changefeedbase.OptEnvelope]) == changefeedbase.OptEnvelopeKeyOnly, + wrapped: changefeedbase.EnvelopeType(opts[changefeedbase.OptEnvelope]) == changefeedbase.OptEnvelopeWrapped, + virtualColumnVisibility: opts[changefeedbase.OptVirtualColumns], + columnMapCache: map[descpb.ID]*tableColumnMapCacheEntry{}, + } + _, e.updatedField = opts[changefeedbase.OptUpdatedTimestamps] + _, e.mvccTimestampField = opts[changefeedbase.OptMVCCTimestamps] + _, e.beforeField = opts[changefeedbase.OptDiff] + if e.beforeField && !e.wrapped { + return nil, errors.Errorf(`%s is only usable with %s=%s`, + changefeedbase.OptDiff, changefeedbase.OptEnvelope, changefeedbase.OptEnvelopeWrapped) + } + _, e.keyInValue = opts[changefeedbase.OptKeyInValue] + if e.keyInValue && !e.wrapped { + return nil, errors.Errorf(`%s is only usable with %s=%s`, + changefeedbase.OptKeyInValue, changefeedbase.OptEnvelope, changefeedbase.OptEnvelopeWrapped) + } + _, e.topicInValue = opts[changefeedbase.OptTopicInValue] + if e.topicInValue && !e.wrapped { + return nil, errors.Errorf(`%s is only usable with %s=%s`, + changefeedbase.OptTopicInValue, changefeedbase.OptEnvelope, changefeedbase.OptEnvelopeWrapped) + } + return e, nil +} + +// EncodeKey implements the Encoder interface. +func (e *jsonEncoder) EncodeKey(_ context.Context, row encodeRow) ([]byte, error) { + jsonEntries, err := e.encodeKeyRaw(row) + if err != nil { + return nil, err + } + j, err := json.MakeJSON(jsonEntries) + if err != nil { + return nil, err + } + e.buf.Reset() + j.Format(&e.buf) + return e.buf.Bytes(), nil +} + +func (e *jsonEncoder) encodeKeyRaw(row encodeRow) ([]interface{}, error) { + colIdxByID := e.getTableColMap(row.tableDesc) + primaryIndex := row.tableDesc.GetPrimaryIndex() + jsonEntries := make([]interface{}, primaryIndex.NumKeyColumns()) + for i := 0; i < primaryIndex.NumKeyColumns(); i++ { + colID := primaryIndex.GetKeyColumnID(i) + idx, ok := colIdxByID.Get(colID) + if !ok { + return nil, errors.Errorf(`unknown column id: %d`, colID) + } + datum, col := row.datums[idx], row.tableDesc.PublicColumns()[idx] + if err := datum.EnsureDecoded(col.GetType(), &e.alloc); err != nil { + return nil, err + } + var err error + jsonEntries[i], err = tree.AsJSON( + datum.Datum, + sessiondatapb.DataConversionConfig{}, + time.UTC, + ) + if err != nil { + return nil, err + } + } + return jsonEntries, nil +} + +// EncodeValue implements the Encoder interface. +func (e *jsonEncoder) EncodeValue(_ context.Context, row encodeRow) ([]byte, error) { + if e.keyOnly || (!e.wrapped && row.deleted) { + return nil, nil + } + + var after map[string]interface{} + if !row.deleted { + family, err := row.tableDesc.FindFamilyByID(row.familyID) + if err != nil { + return nil, err + } + include := make(map[descpb.ColumnID]struct{}, len(family.ColumnIDs)) + var yes struct{} + for _, colID := range family.ColumnIDs { + include[colID] = yes + } + after = make(map[string]interface{}) + for i, col := range row.tableDesc.PublicColumns() { + _, inFamily := include[col.GetID()] + virtual := col.IsVirtual() && e.virtualColumnVisibility == string(changefeedbase.OptVirtualColumnsNull) + if inFamily || virtual { + datum := row.datums[i] + if err := datum.EnsureDecoded(col.GetType(), &e.alloc); err != nil { + return nil, err + } + after[col.GetName()], err = tree.AsJSON( + datum.Datum, + sessiondatapb.DataConversionConfig{}, + time.UTC, + ) + if err != nil { + return nil, err + } + } + } + } + + var before map[string]interface{} + if row.prevDatums != nil && !row.prevDeleted { + family, err := row.prevTableDesc.FindFamilyByID(row.prevFamilyID) + if err != nil { + return nil, err + } + include := make(map[descpb.ColumnID]struct{}) + var yes struct{} + for _, colID := range family.ColumnIDs { + include[colID] = yes + } + before = make(map[string]interface{}) + for i, col := range row.prevTableDesc.PublicColumns() { + _, inFamily := include[col.GetID()] + virtual := col.IsVirtual() && e.virtualColumnVisibility == string(changefeedbase.OptVirtualColumnsNull) + if inFamily || virtual { + datum := row.prevDatums[i] + if err := datum.EnsureDecoded(col.GetType(), &e.alloc); err != nil { + return nil, err + } + before[col.GetName()], err = tree.AsJSON( + datum.Datum, + sessiondatapb.DataConversionConfig{}, + time.UTC, + ) + if err != nil { + return nil, err + } + } + } + } + + var jsonEntries map[string]interface{} + if e.wrapped { + if after != nil { + jsonEntries = map[string]interface{}{`after`: after} + } else { + jsonEntries = map[string]interface{}{`after`: nil} + } + if e.beforeField { + if before != nil { + jsonEntries[`before`] = before + } else { + jsonEntries[`before`] = nil + } + } + if e.keyInValue { + keyEntries, err := e.encodeKeyRaw(row) + if err != nil { + return nil, err + } + jsonEntries[`key`] = keyEntries + } + if e.topicInValue { + jsonEntries[`topic`] = row.topic + } + } else { + jsonEntries = after + } + + if e.updatedField || e.mvccTimestampField { + var meta map[string]interface{} + if e.wrapped { + meta = jsonEntries + } else { + meta = make(map[string]interface{}, 1) + jsonEntries[jsonMetaSentinel] = meta + } + if e.updatedField { + meta[`updated`] = row.updated.AsOfSystemTime() + } + if e.mvccTimestampField { + meta[`mvcc_timestamp`] = row.mvccTimestamp.AsOfSystemTime() + } + } + + j, err := json.MakeJSON(jsonEntries) + if err != nil { + return nil, err + } + e.buf.Reset() + j.Format(&e.buf) + return e.buf.Bytes(), nil +} + +// EncodeResolvedTimestamp implements the Encoder interface. +func (e *jsonEncoder) EncodeResolvedTimestamp( + _ context.Context, _ string, resolved hlc.Timestamp, +) ([]byte, error) { + meta := map[string]interface{}{ + `resolved`: tree.TimestampToDecimalDatum(resolved).Decimal.String(), + } + var jsonEntries interface{} + if e.wrapped { + jsonEntries = meta + } else { + jsonEntries = map[string]interface{}{ + jsonMetaSentinel: meta, + } + } + return gojson.Marshal(jsonEntries) +} + +// getTableColMap gets the TableColMap for the provided table descriptor, +// optionally consulting its cache. +func (e *jsonEncoder) getTableColMap(desc catalog.TableDescriptor) catalog.TableColMap { + ce, exists := e.columnMapCache[desc.GetID()] + if exists { + switch { + case ce.version == desc.GetVersion(): + return ce.TableColMap + case ce.version > desc.GetVersion(): + return catalog.ColumnIDToOrdinalMap(desc.PublicColumns()) + default: + // Construct a new entry. + delete(e.columnMapCache, desc.GetID()) + } + } + ce = &tableColumnMapCacheEntry{ + version: desc.GetVersion(), + TableColMap: catalog.ColumnIDToOrdinalMap(desc.PublicColumns()), + } + e.columnMapCache[desc.GetID()] = ce + return ce.TableColMap +} diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 93cca6872162..4e377747cafb 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -392,6 +392,11 @@ func makeCloudStorageSink( // would require a bit of refactoring. s.ext = `.ndjson` s.rowDelimiter = []byte{'\n'} + case changefeedbase.OptFormatCSV: + // TODO(dan): It seems like these should be on the encoder, but that + // would require a bit of refactoring. + s.ext = `.csv` + s.rowDelimiter = []byte{'\n'} default: return nil, errors.Errorf(`this sink is incompatible with %s=%s`, changefeedbase.OptFormat, opts[changefeedbase.OptFormat]) diff --git a/pkg/ccl/changefeedccl/sink_pubsub.go b/pkg/ccl/changefeedccl/sink_pubsub.go index 47c451f1d9c2..6df4efbebaaf 100644 --- a/pkg/ccl/changefeedccl/sink_pubsub.go +++ b/pkg/ccl/changefeedccl/sink_pubsub.go @@ -50,13 +50,19 @@ type pubsubClient interface { sendMessageToAllTopics(content []byte) error } -// payload struct is sent to the sink -type payload struct { +type jsonPayload struct { Key json.RawMessage `json:"key"` Value json.RawMessage `json:"value"` Topic string `json:"topic"` } +// payload struct is sent to the sink +type payload struct { + Key []byte + Value []byte + Topic string +} + // pubsubMessage is sent to worker channels for workers to consume type pubsubMessage struct { alloc kvevent.Alloc @@ -91,6 +97,8 @@ type pubsubSink struct { client pubsubClient topicNamer *TopicNamer + + format changefeedbase.FormatType } // TODO: unify gcp credentials code with gcp cloud storage credentials code @@ -142,8 +150,12 @@ func MakePubsubSink( pubsubURL := sinkURL{URL: u, q: u.Query()} pubsubTopicName := pubsubURL.consumeParam(changefeedbase.SinkParamTopicName) + var formatType changefeedbase.FormatType switch changefeedbase.FormatType(opts[changefeedbase.OptFormat]) { case changefeedbase.OptFormatJSON: + formatType = changefeedbase.OptFormatJSON + case changefeedbase.OptFormatCSV: + formatType = changefeedbase.OptFormatCSV default: return nil, errors.Errorf(`this sink is incompatible with %s=%s`, changefeedbase.OptFormat, opts[changefeedbase.OptFormat]) @@ -161,6 +173,7 @@ func MakePubsubSink( workerCtx: ctx, numWorkers: numOfWorkers, exitWorkers: cancel, + format: formatType, } // creates custom pubsub object based on scheme @@ -341,12 +354,24 @@ func (p *pubsubSink) workerLoop(workerIndex int) { // Signals a flush request, makes sure that the messages in eventsChans are finished sending continue } - m := msg.message - b, err := json.Marshal(m) - if err != nil { - p.exitWorkersWithError(err) + + var content []byte + var err error + switch p.format { + case changefeedbase.OptFormatJSON: + content, err = json.Marshal(jsonPayload{ + Key: msg.message.Key, + Value: msg.message.Value, + Topic: msg.message.Topic, + }) + if err != nil { + p.exitWorkersWithError(err) + } + case changefeedbase.OptFormatCSV: + content = msg.message.Value } - err = p.client.sendMessage(b, msg.message.Topic, string(msg.message.Key)) + + err = p.client.sendMessage(content, msg.message.Topic, string(msg.message.Key)) if err != nil { p.exitWorkersWithError(err) } diff --git a/pkg/ccl/changefeedccl/sink_webhook.go b/pkg/ccl/changefeedccl/sink_webhook.go index 5e07c45fee76..acfa34c4c466 100644 --- a/pkg/ccl/changefeedccl/sink_webhook.go +++ b/pkg/ccl/changefeedccl/sink_webhook.go @@ -38,6 +38,7 @@ import ( const ( applicationTypeJSON = `application/json` + applicationTypeCSV = `text/csv` authorizationHeader = `Authorization` defaultConnTimeout = 3 * time.Second ) @@ -58,6 +59,7 @@ type webhookSink struct { retryCfg retry.Options batchCfg batchConfig ts timeutil.TimeSource + format changefeedbase.FormatType // Webhook destination. url sinkURL @@ -95,7 +97,7 @@ type encodedPayload struct { mvcc hlc.Timestamp } -func encodePayloadWebhook(messages []messagePayload) (encodedPayload, error) { +func encodePayloadJSONWebhook(messages []messagePayload) (encodedPayload, error) { result := encodedPayload{ emitTime: timeutil.Now(), } @@ -124,6 +126,27 @@ func encodePayloadWebhook(messages []messagePayload) (encodedPayload, error) { return result, err } +func encodePayloadCSVWebhook(messages []messagePayload) (encodedPayload, error) { + result := encodedPayload{ + emitTime: timeutil.Now(), + } + + var mergedMsgs []byte + for _, m := range messages { + result.alloc.Merge(&m.alloc) + mergedMsgs = append(mergedMsgs, m.val...) + if m.emitTime.Before(result.emitTime) { + result.emitTime = m.emitTime + } + if result.mvcc.IsEmpty() || m.mvcc.Less(result.mvcc) { + result.mvcc = m.mvcc + } + } + + result.data = mergedMsgs + return result, nil +} + type messagePayload struct { // Payload message fields. key []byte @@ -256,9 +279,13 @@ func makeWebhookSink( } u.Scheme = strings.TrimPrefix(u.Scheme, `webhook-`) + var formatType changefeedbase.FormatType switch changefeedbase.FormatType(opts[changefeedbase.OptFormat]) { + // only JSON and CSV supported at this time for webhook sink case changefeedbase.OptFormatJSON: - // only JSON supported at this time for webhook sink + formatType = changefeedbase.OptFormatJSON + case changefeedbase.OptFormatCSV: + formatType = changefeedbase.OptFormatCSV default: return nil, errors.Errorf(`this sink is incompatible with %s=%s`, changefeedbase.OptFormat, opts[changefeedbase.OptFormat]) @@ -297,6 +324,7 @@ func makeWebhookSink( sink := &webhookSink{ workerCtx: ctx, authHeader: opts[changefeedbase.OptWebhookAuthHeader], + format: formatType, exitWorkers: cancel, parallelism: parallelism, ts: source, @@ -571,7 +599,14 @@ func (s *webhookSink) workerLoop(workerIndex int) { continue } - encoded, err := encodePayloadWebhook(msgs) + var encoded encodedPayload + var err error + switch s.format { + case changefeedbase.OptFormatJSON: + encoded, err = encodePayloadJSONWebhook(msgs) + case changefeedbase.OptFormatCSV: + encoded, err = encodePayloadCSVWebhook(msgs) + } if err != nil { s.exitWorkersWithError(err) return @@ -599,7 +634,13 @@ func (s *webhookSink) sendMessage(ctx context.Context, reqBody []byte) error { if err != nil { return err } - req.Header.Set("Content-Type", applicationTypeJSON) + switch s.format { + case changefeedbase.OptFormatJSON: + req.Header.Set("Content-Type", applicationTypeJSON) + case changefeedbase.OptFormatCSV: + req.Header.Set("Content-Type", applicationTypeCSV) + } + if s.authHeader != "" { req.Header.Set(authorizationHeader, s.authHeader) } diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 154f16111c7e..02355f1acc5c 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -885,23 +885,34 @@ func (c *cloudFeed) Next() (*cdctest.TestFeedMessage, error) { // The other TestFeed impls check both key and value here, but cloudFeeds // don't have keys. if len(m.Value) > 0 { - // Cloud storage sinks default the `WITH key_in_value` option so that - // the key is recoverable. Extract it out of the value (also removing it - // so the output matches the other sinks). Note that this assumes the - // format is json, this will have to be fixed once we add format=avro - // support to cloud storage. - // - // TODO(dan): Leave the key in the value if the TestFeed user - // specifically requested it. - var err error - if m.Key, m.Value, err = extractKeyFromJSONValue(m.Value); err != nil { + details, err := c.Details() + if err != nil { return nil, err } - if isNew := c.markSeen(m); !isNew { - continue + + switch v := changefeedbase.FormatType(details.Opts[changefeedbase.OptFormat]); v { + case ``, changefeedbase.OptFormatJSON: + // Cloud storage sinks default the `WITH key_in_value` option so that + // the key is recoverable. Extract it out of the value (also removing it + // so the output matches the other sinks). Note that this assumes the + // format is json, this will have to be fixed once we add format=avro + // support to cloud storage. + // + // TODO(dan): Leave the key in the value if the TestFeed user + // specifically requested it. + if m.Key, m.Value, err = extractKeyFromJSONValue(m.Value); err != nil { + return nil, err + } + if isNew := c.markSeen(m); !isNew { + continue + } + m.Resolved = nil + return m, nil + case changefeedbase.OptFormatCSV: + return m, nil + default: + return nil, errors.Errorf(`unknown %s: %s`, changefeedbase.OptFormat, v) } - m.Resolved = nil - return m, nil } m.Key, m.Value = nil, nil return m, nil @@ -1449,28 +1460,38 @@ func (f *webhookFeed) Next() (*cdctest.TestFeedMessage, error) { if msg != "" { m := &cdctest.TestFeedMessage{} if msg != "" { - var err error - var resolved bool - resolved, err = isResolvedTimestamp([]byte(msg)) + details, err := f.Details() if err != nil { return nil, err } - if resolved { - m.Resolved = []byte(msg) - } else { - wrappedValue, err := extractValueFromJSONMessage([]byte(msg)) + switch v := changefeedbase.FormatType(details.Opts[changefeedbase.OptFormat]); v { + case ``, changefeedbase.OptFormatJSON: + resolved, err := isResolvedTimestamp([]byte(msg)) if err != nil { return nil, err } - if m.Key, m.Value, err = extractKeyFromJSONValue(wrappedValue); err != nil { - return nil, err - } - if m.Topic, m.Value, err = extractTopicFromJSONValue(m.Value); err != nil { - return nil, err - } - if isNew := f.markSeen(m); !isNew { - continue + if resolved { + m.Resolved = []byte(msg) + } else { + wrappedValue, err := extractValueFromJSONMessage([]byte(msg)) + if err != nil { + return nil, err + } + if m.Key, m.Value, err = extractKeyFromJSONValue(wrappedValue); err != nil { + return nil, err + } + if m.Topic, m.Value, err = extractTopicFromJSONValue(m.Value); err != nil { + return nil, err + } + if isNew := f.markSeen(m); !isNew { + continue + } } + case changefeedbase.OptFormatCSV: + m.Value = []byte(msg) + return m, nil + default: + return nil, errors.Errorf(`unknown %s: %s`, changefeedbase.OptFormat, v) } return m, nil } @@ -1660,7 +1681,7 @@ func (p *pubsubFeed) Partitions() []string { // extractJSONMessagePubsub extracts the value, key, and topic from a pubsub message func extractJSONMessagePubsub(wrapped []byte) (value []byte, key []byte, topic string, err error) { - parsed := payload{} + parsed := jsonPayload{} err = gojson.Unmarshal(wrapped, &parsed) if err != nil { return @@ -1685,23 +1706,36 @@ func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) { for { msg := p.client.buffer.pop() if msg != nil { - m := &cdctest.TestFeedMessage{} - resolved, err := isResolvedTimestamp([]byte(msg.data)) + details, err := p.Details() if err != nil { return nil, err } - msgBytes := []byte(msg.data) - if resolved { - m.Resolved = msgBytes - } else { - m.Value, m.Key, m.Topic, err = extractJSONMessagePubsub(msgBytes) + + m := &cdctest.TestFeedMessage{} + switch v := changefeedbase.FormatType(details.Opts[changefeedbase.OptFormat]); v { + case ``, changefeedbase.OptFormatJSON: + resolved, err := isResolvedTimestamp([]byte(msg.data)) if err != nil { return nil, err } - if isNew := p.markSeen(m); !isNew { - continue + msgBytes := []byte(msg.data) + if resolved { + m.Resolved = msgBytes + } else { + m.Value, m.Key, m.Topic, err = extractJSONMessagePubsub(msgBytes) + if err != nil { + return nil, err + } + if isNew := p.markSeen(m); !isNew { + continue + } } + case changefeedbase.OptFormatCSV: + m.Value = []byte(msg.data) + default: + return nil, errors.Errorf(`unknown %s: %s`, changefeedbase.OptFormat, v) } + return m, nil } select { diff --git a/pkg/util/encoding/csv/writer.go b/pkg/util/encoding/csv/writer.go index 868991dce6cd..ea6664b28d6a 100644 --- a/pkg/util/encoding/csv/writer.go +++ b/pkg/util/encoding/csv/writer.go @@ -32,10 +32,11 @@ import ( // // If UseCRLF is true, the Writer ends each record with \r\n instead of \n. type Writer struct { - Comma rune // Field delimiter (set to ',' by NewWriter) - Escape rune - UseCRLF bool // True to use \r\n as the line terminator - w *bufio.Writer + Comma rune // Field delimiter (set to ',' by NewWriter) + Escape rune + UseCRLF bool // True to use \r\n as the line terminator + SkipNewline bool // True to skip \n as the line terminator + w *bufio.Writer } // NewWriter returns a new Writer that writes to w. @@ -103,10 +104,13 @@ func (w *Writer) Write(record []string) error { } } var err error - if w.UseCRLF { - _, err = w.w.WriteString("\r\n") - } else { - err = w.w.WriteByte('\n') + + if !w.SkipNewline { + if w.UseCRLF { + _, err = w.w.WriteString("\r\n") + } else { + err = w.w.WriteByte('\n') + } } return err }