Skip to content

Commit

Permalink
sink(ticdc): add hex encode to csv (#9366)
Browse files Browse the repository at this point in the history
close #9373
  • Loading branch information
CharlesCheung96 authored Jul 12, 2023
1 parent 5ac4bda commit 73a9694
Show file tree
Hide file tree
Showing 15 changed files with 298 additions and 65 deletions.
27 changes: 15 additions & 12 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,11 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
var csvConfig *config.CSVConfig
if c.Sink.CSVConfig != nil {
csvConfig = &config.CSVConfig{
Delimiter: c.Sink.CSVConfig.Delimiter,
Quote: c.Sink.CSVConfig.Quote,
NullString: c.Sink.CSVConfig.NullString,
IncludeCommitTs: c.Sink.CSVConfig.IncludeCommitTs,
Delimiter: c.Sink.CSVConfig.Delimiter,
Quote: c.Sink.CSVConfig.Quote,
NullString: c.Sink.CSVConfig.NullString,
IncludeCommitTs: c.Sink.CSVConfig.IncludeCommitTs,
BinaryEncodingMethod: c.Sink.CSVConfig.BinaryEncodingMethod,
}
}
var kafkaConfig *config.KafkaConfig
Expand Down Expand Up @@ -491,10 +492,11 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
var csvConfig *CSVConfig
if cloned.Sink.CSVConfig != nil {
csvConfig = &CSVConfig{
Delimiter: cloned.Sink.CSVConfig.Delimiter,
Quote: cloned.Sink.CSVConfig.Quote,
NullString: cloned.Sink.CSVConfig.NullString,
IncludeCommitTs: cloned.Sink.CSVConfig.IncludeCommitTs,
Delimiter: cloned.Sink.CSVConfig.Delimiter,
Quote: cloned.Sink.CSVConfig.Quote,
NullString: cloned.Sink.CSVConfig.NullString,
IncludeCommitTs: cloned.Sink.CSVConfig.IncludeCommitTs,
BinaryEncodingMethod: cloned.Sink.CSVConfig.BinaryEncodingMethod,
}
}
var kafkaConfig *KafkaConfig
Expand Down Expand Up @@ -766,10 +768,11 @@ type SinkConfig struct {
// CSVConfig denotes the csv config
// This is the same as config.CSVConfig
type CSVConfig struct {
Delimiter string `json:"delimiter"`
Quote string `json:"quote"`
NullString string `json:"null"`
IncludeCommitTs bool `json:"include_commit_ts"`
Delimiter string `json:"delimiter"`
Quote string `json:"quote"`
NullString string `json:"null"`
IncludeCommitTs bool `json:"include_commit_ts"`
BinaryEncodingMethod string `json:"binary_encoding_method"`
}

// LargeMessageHandleConfig denotes the large message handling config
Expand Down
7 changes: 4 additions & 3 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ var defaultAPIConfig = &ReplicaConfig{
},
Sink: &SinkConfig{
CSVConfig: &CSVConfig{
Quote: string(config.DoubleQuoteChar),
Delimiter: config.Comma,
NullString: config.NULL,
Quote: string(config.DoubleQuoteChar),
Delimiter: config.Comma,
NullString: config.NULL,
BinaryEncodingMethod: config.BinaryEncodingBase64,
},
EncoderConcurrency: util.AddressOf(16),
Terminator: util.AddressOf(config.CRLF),
Expand Down
7 changes: 7 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,10 @@ var doc = `{
"config.CSVConfig": {
"type": "object",
"properties": {
"binary-encoding-method": {
"description": "encoding method of binary type",
"type": "string"
},
"delimiter": {
"description": "delimiter between fields",
"type": "string"
Expand Down Expand Up @@ -1977,6 +1981,9 @@ var doc = `{
"v2.CSVConfig": {
"type": "object",
"properties": {
"binary_encoding_method": {
"type": "string"
},
"delimiter": {
"type": "string"
},
Expand Down
7 changes: 7 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,10 @@
"config.CSVConfig": {
"type": "object",
"properties": {
"binary-encoding-method": {
"description": "encoding method of binary type",
"type": "string"
},
"delimiter": {
"description": "delimiter between fields",
"type": "string"
Expand Down Expand Up @@ -1958,6 +1962,9 @@
"v2.CSVConfig": {
"type": "object",
"properties": {
"binary_encoding_method": {
"type": "string"
},
"delimiter": {
"type": "string"
},
Expand Down
5 changes: 5 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
definitions:
config.CSVConfig:
properties:
binary-encoding-method:
description: encoding method of binary type
type: string
delimiter:
description: delimiter between fields
type: string
Expand Down Expand Up @@ -435,6 +438,8 @@ definitions:
type: object
v2.CSVConfig:
properties:
binary_encoding_method:
type: string
delimiter:
type: string
include_commit_ts:
Expand Down
16 changes: 9 additions & 7 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,10 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
{Matcher: []string{"test3.*", "test4.*"}, Columns: []string{"!a", "column3"}},
},
CSVConfig: &config.CSVConfig{
Quote: string(config.DoubleQuoteChar),
Delimiter: string(config.Comma),
NullString: config.NULL,
Quote: string(config.DoubleQuoteChar),
Delimiter: string(config.Comma),
NullString: config.NULL,
BinaryEncodingMethod: config.BinaryEncodingBase64,
},
Terminator: util.AddressOf("\r\n"),
DateSeparator: util.AddressOf(config.DateSeparatorDay.String()),
Expand Down Expand Up @@ -236,10 +237,11 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
FileIndexWidth: util.AddressOf(config.DefaultFileIndexWidth),
EnableKafkaSinkV2: util.AddressOf(false),
CSVConfig: &config.CSVConfig{
Delimiter: ",",
Quote: "\"",
NullString: "\\N",
IncludeCommitTs: false,
Delimiter: ",",
Quote: "\"",
NullString: "\\N",
IncludeCommitTs: false,
BinaryEncodingMethod: config.BinaryEncodingBase64,
},
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
Expand Down
6 changes: 4 additions & 2 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ const (
"delimiter": ",",
"quote": "\"",
"null": "\\N",
"include-commit-ts": true
"include-commit-ts": true,
"binary-encoding-method":"base64"
},
"date-separator": "month",
"enable-partition-separator": true,
Expand Down Expand Up @@ -334,7 +335,8 @@ const (
"delimiter": ",",
"quote": "\"",
"null": "\\N",
"include-commit-ts": true
"include-commit-ts": true,
"binary-encoding-method":"base64"
},
"terminator": "\r\n",
"transaction-atomicity": "",
Expand Down
7 changes: 4 additions & 3 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ var defaultReplicaConfig = &ReplicaConfig{
},
Sink: &SinkConfig{
CSVConfig: &CSVConfig{
Quote: string(DoubleQuoteChar),
Delimiter: Comma,
NullString: NULL,
Quote: string(DoubleQuoteChar),
Delimiter: Comma,
NullString: NULL,
BinaryEncodingMethod: BinaryEncodingBase64,
},
EncoderConcurrency: util.AddressOf(16),
Terminator: util.AddressOf(CRLF),
Expand Down
9 changes: 5 additions & 4 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ func TestReplicaConfigMarshal(t *testing.T) {
},
}
conf.Sink.CSVConfig = &CSVConfig{
Delimiter: ",",
Quote: "\"",
NullString: `\N`,
IncludeCommitTs: true,
Delimiter: ",",
Quote: "\"",
NullString: `\N`,
IncludeCommitTs: true,
BinaryEncodingMethod: BinaryEncodingBase64,
}
conf.Sink.TxnAtomicity = util.AddressOf(unknownTxnAtomicity)
conf.Sink.DateSeparator = util.AddressOf("month")
Expand Down
15 changes: 15 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ const (
MaxFileIndexWidth = 20 // enough for 2^64 files
// DefaultFileIndexWidth is the default width of file index.
DefaultFileIndexWidth = MaxFileIndexWidth

// BinaryEncodingHex encodes binary data to hex string.
BinaryEncodingHex = "hex"
// BinaryEncodingBase64 encodes binary data to base64 string.
BinaryEncodingBase64 = "base64"
)

// AtomicityLevel represents the atomicity level of a changefeed.
Expand Down Expand Up @@ -164,6 +169,8 @@ type CSVConfig struct {
NullString string `toml:"null" json:"null"`
// whether to include commit ts
IncludeCommitTs bool `toml:"include-commit-ts" json:"include-commit-ts"`
// encoding method of binary type
BinaryEncodingMethod string `toml:"binary-encoding-method" json:"binary-encoding-method"`
}

func (c *CSVConfig) validateAndAdjust() error {
Expand Down Expand Up @@ -199,6 +206,14 @@ func (c *CSVConfig) validateAndAdjust() error {
errors.New("csv config quote and delimiter cannot be the same"))
}

// validate binary encoding method
switch c.BinaryEncodingMethod {
case BinaryEncodingHex, BinaryEncodingBase64:
default:
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config binary-encoding-method can only be hex or base64"))
}

return nil
}

Expand Down
19 changes: 15 additions & 4 deletions pkg/config/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,9 @@ func TestValidateAndAdjustCSVConfig(t *testing.T) {
{
name: "valid quote",
config: &CSVConfig{
Quote: "\"",
Delimiter: ",",
Quote: "\"",
Delimiter: ",",
BinaryEncodingMethod: BinaryEncodingBase64,
},
wantErr: "",
},
Expand All @@ -303,8 +304,9 @@ func TestValidateAndAdjustCSVConfig(t *testing.T) {
{
name: "valid delimiter1",
config: &CSVConfig{
Quote: "\"",
Delimiter: ",",
Quote: "\"",
Delimiter: ",",
BinaryEncodingMethod: BinaryEncodingHex,
},
wantErr: "",
},
Expand Down Expand Up @@ -332,6 +334,15 @@ func TestValidateAndAdjustCSVConfig(t *testing.T) {
},
wantErr: "csv config quote and delimiter cannot be the same",
},
{
name: "invalid binary encoding method",
config: &CSVConfig{
Quote: "\"",
Delimiter: ",",
BinaryEncodingMethod: "invalid",
},
wantErr: "csv config binary-encoding-method can only be hex or base64",
},
}
for _, c := range tests {
tc := c
Expand Down
12 changes: 7 additions & 5 deletions pkg/sink/codec/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ type Config struct {
AvroEnableWatermark bool

// for sinking to cloud storage
Delimiter string
Quote string
NullString string
IncludeCommitTs bool
Terminator string
Delimiter string
Quote string
NullString string
IncludeCommitTs bool
Terminator string
BinaryEncodingMethod string

// for open protocol
OnlyOutputUpdatedColumns bool
Expand Down Expand Up @@ -174,6 +175,7 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er
c.Quote = replicaConfig.Sink.CSVConfig.Quote
c.NullString = replicaConfig.Sink.CSVConfig.NullString
c.IncludeCommitTs = replicaConfig.Sink.CSVConfig.IncludeCommitTs
c.BinaryEncodingMethod = replicaConfig.Sink.CSVConfig.BinaryEncodingMethod
}
if replicaConfig.Sink.KafkaConfig != nil {
c.LargeMessageHandle = replicaConfig.Sink.KafkaConfig.LargeMessageHandle
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/csv/csv_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed, errors.New("no csv row can be found"))
}

e, err := csvMsg2RowChangedEvent(b.msg, b.tableInfo.Columns)
e, err := csvMsg2RowChangedEvent(b.codecConfig, b.msg, b.tableInfo.Columns)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 73a9694

Please sign in to comment.