Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): add hex encode to csv (#9366) #9380

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,11 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
var csvConfig *config.CSVConfig
if c.Sink.CSVConfig != nil {
csvConfig = &config.CSVConfig{
Delimiter: c.Sink.CSVConfig.Delimiter,
Quote: c.Sink.CSVConfig.Quote,
NullString: c.Sink.CSVConfig.NullString,
IncludeCommitTs: c.Sink.CSVConfig.IncludeCommitTs,
Delimiter: c.Sink.CSVConfig.Delimiter,
Quote: c.Sink.CSVConfig.Quote,
NullString: c.Sink.CSVConfig.NullString,
IncludeCommitTs: c.Sink.CSVConfig.IncludeCommitTs,
BinaryEncodingMethod: c.Sink.CSVConfig.BinaryEncodingMethod,
}
}
var kafkaConfig *config.KafkaConfig
Expand Down Expand Up @@ -466,10 +467,11 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
var csvConfig *CSVConfig
if cloned.Sink.CSVConfig != nil {
csvConfig = &CSVConfig{
Delimiter: cloned.Sink.CSVConfig.Delimiter,
Quote: cloned.Sink.CSVConfig.Quote,
NullString: cloned.Sink.CSVConfig.NullString,
IncludeCommitTs: cloned.Sink.CSVConfig.IncludeCommitTs,
Delimiter: cloned.Sink.CSVConfig.Delimiter,
Quote: cloned.Sink.CSVConfig.Quote,
NullString: cloned.Sink.CSVConfig.NullString,
IncludeCommitTs: cloned.Sink.CSVConfig.IncludeCommitTs,
BinaryEncodingMethod: cloned.Sink.CSVConfig.BinaryEncodingMethod,
}
}
var kafkaConfig *KafkaConfig
Expand Down Expand Up @@ -725,10 +727,11 @@ type SinkConfig struct {
// CSVConfig denotes the csv config
// This is the same as config.CSVConfig
type CSVConfig struct {
Delimiter string `json:"delimiter"`
Quote string `json:"quote"`
NullString string `json:"null"`
IncludeCommitTs bool `json:"include_commit_ts"`
Delimiter string `json:"delimiter"`
Quote string `json:"quote"`
NullString string `json:"null"`
IncludeCommitTs bool `json:"include_commit_ts"`
BinaryEncodingMethod string `json:"binary_encoding_method"`
}

// DispatchRule represents partition rule for a table
Expand Down
7 changes: 4 additions & 3 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ var defaultAPIConfig = &ReplicaConfig{
},
Sink: &SinkConfig{
CSVConfig: &CSVConfig{
Quote: string(config.DoubleQuoteChar),
Delimiter: config.Comma,
NullString: config.NULL,
Quote: string(config.DoubleQuoteChar),
Delimiter: config.Comma,
NullString: config.NULL,
BinaryEncodingMethod: config.BinaryEncodingBase64,
},
EncoderConcurrency: 16,
Terminator: config.CRLF,
Expand Down
7 changes: 7 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,6 +1303,10 @@ var doc = `{
"config.CSVConfig": {
"type": "object",
"properties": {
"binary-encoding-method": {
"description": "encoding method of binary type",
"type": "string"
},
"delimiter": {
"description": "delimiter between fields",
"type": "string"
Expand Down Expand Up @@ -1905,6 +1909,9 @@ var doc = `{
"v2.CSVConfig": {
"type": "object",
"properties": {
"binary_encoding_method": {
"type": "string"
},
"delimiter": {
"type": "string"
},
Expand Down
7 changes: 7 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,10 @@
"config.CSVConfig": {
"type": "object",
"properties": {
"binary-encoding-method": {
"description": "encoding method of binary type",
"type": "string"
},
"delimiter": {
"description": "delimiter between fields",
"type": "string"
Expand Down Expand Up @@ -1886,6 +1890,9 @@
"v2.CSVConfig": {
"type": "object",
"properties": {
"binary_encoding_method": {
"type": "string"
},
"delimiter": {
"type": "string"
},
Expand Down
5 changes: 5 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
definitions:
config.CSVConfig:
properties:
binary-encoding-method:
description: encoding method of binary type
type: string
delimiter:
description: delimiter between fields
type: string
Expand Down Expand Up @@ -405,6 +408,8 @@ definitions:
type: object
v2.CSVConfig:
properties:
binary_encoding_method:
type: string
delimiter:
type: string
include_commit_ts:
Expand Down
16 changes: 9 additions & 7 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,10 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
{Matcher: []string{"test3.*", "test4.*"}, Columns: []string{"!a", "column3"}},
},
CSVConfig: &config.CSVConfig{
Quote: string(config.DoubleQuoteChar),
Delimiter: string(config.Comma),
NullString: config.NULL,
Quote: string(config.DoubleQuoteChar),
Delimiter: string(config.Comma),
NullString: config.NULL,
BinaryEncodingMethod: config.BinaryEncodingBase64,
},
Terminator: "\r\n",
DateSeparator: config.DateSeparatorDay.String(),
Expand Down Expand Up @@ -230,10 +231,11 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
EnablePartitionSeparator: true,
FileIndexWidth: config.DefaultFileIndexWidth,
CSVConfig: &config.CSVConfig{
Delimiter: ",",
Quote: "\"",
NullString: "\\N",
IncludeCommitTs: false,
Delimiter: ",",
Quote: "\"",
NullString: "\\N",
IncludeCommitTs: false,
BinaryEncodingMethod: config.BinaryEncodingBase64,
},
}, cfg.Sink)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ const (
"delimiter": ",",
"quote": "\"",
"null": "\\N",
"include-commit-ts": true
"include-commit-ts": true,
"binary-encoding-method":"base64"
},
"transaction-atomicity": "",
"terminator": "",
Expand Down Expand Up @@ -318,7 +319,8 @@ const (
"delimiter": ",",
"quote": "\"",
"null": "\\N",
"include-commit-ts": true
"include-commit-ts": true,
"binary-encoding-method":"base64"
},
"terminator": "",
"date-separator": "month",
Expand Down
7 changes: 4 additions & 3 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ var defaultReplicaConfig = &ReplicaConfig{
},
Sink: &SinkConfig{
CSVConfig: &CSVConfig{
Quote: string(DoubleQuoteChar),
Delimiter: Comma,
NullString: NULL,
Quote: string(DoubleQuoteChar),
Delimiter: Comma,
NullString: NULL,
BinaryEncodingMethod: BinaryEncodingBase64,
},
EncoderConcurrency: 16,
Terminator: CRLF,
Expand Down
9 changes: 5 additions & 4 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ func TestReplicaConfigMarshal(t *testing.T) {
},
}
conf.Sink.CSVConfig = &CSVConfig{
Delimiter: ",",
Quote: "\"",
NullString: `\N`,
IncludeCommitTs: true,
Delimiter: ",",
Quote: "\"",
NullString: `\N`,
IncludeCommitTs: true,
BinaryEncodingMethod: BinaryEncodingBase64,
}
conf.Sink.Terminator = ""
conf.Sink.DateSeparator = "month"
Expand Down
15 changes: 15 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ const (
MaxFileIndexWidth = 20 // enough for 2^64 files
// DefaultFileIndexWidth is the default width of file index.
DefaultFileIndexWidth = MaxFileIndexWidth

// BinaryEncodingHex encodes binary data to hex string.
BinaryEncodingHex = "hex"
// BinaryEncodingBase64 encodes binary data to base64 string.
BinaryEncodingBase64 = "base64"
)

// AtomicityLevel represents the atomicity level of a changefeed.
Expand Down Expand Up @@ -147,6 +152,8 @@ type CSVConfig struct {
NullString string `toml:"null" json:"null"`
// whether to include commit ts
IncludeCommitTs bool `toml:"include-commit-ts" json:"include-commit-ts"`
// encoding method of binary type
BinaryEncodingMethod string `toml:"binary-encoding-method" json:"binary-encoding-method"`
}

func (c *CSVConfig) validateAndAdjust() error {
Expand Down Expand Up @@ -182,6 +189,14 @@ func (c *CSVConfig) validateAndAdjust() error {
errors.New("csv config quote and delimiter cannot be the same"))
}

// validate binary encoding method
switch c.BinaryEncodingMethod {
case BinaryEncodingHex, BinaryEncodingBase64:
default:
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config binary-encoding-method can only be hex or base64"))
}

return nil
}

Expand Down
19 changes: 15 additions & 4 deletions pkg/config/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,9 @@ func TestValidateAndAdjustCSVConfig(t *testing.T) {
{
name: "valid quote",
config: &CSVConfig{
Quote: "\"",
Delimiter: ",",
Quote: "\"",
Delimiter: ",",
BinaryEncodingMethod: BinaryEncodingBase64,
},
wantErr: "",
},
Expand All @@ -302,8 +303,9 @@ func TestValidateAndAdjustCSVConfig(t *testing.T) {
{
name: "valid delimiter1",
config: &CSVConfig{
Quote: "\"",
Delimiter: ",",
Quote: "\"",
Delimiter: ",",
BinaryEncodingMethod: BinaryEncodingHex,
},
wantErr: "",
},
Expand Down Expand Up @@ -331,6 +333,15 @@ func TestValidateAndAdjustCSVConfig(t *testing.T) {
},
wantErr: "csv config quote and delimiter cannot be the same",
},
{
name: "invalid binary encoding method",
config: &CSVConfig{
Quote: "\"",
Delimiter: ",",
BinaryEncodingMethod: "invalid",
},
wantErr: "csv config binary-encoding-method can only be hex or base64",
},
}
for _, c := range tests {
tc := c
Expand Down
12 changes: 7 additions & 5 deletions pkg/sink/codec/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ type Config struct {
AvroBigintUnsignedHandlingMode string

// for sinking to cloud storage
Delimiter string
Quote string
NullString string
IncludeCommitTs bool
Terminator string
Delimiter string
Quote string
NullString string
IncludeCommitTs bool
Terminator string
BinaryEncodingMethod string

// for open protocol
OnlyOutputUpdatedColumns bool
Expand Down Expand Up @@ -152,6 +153,7 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er
c.Quote = replicaConfig.Sink.CSVConfig.Quote
c.NullString = replicaConfig.Sink.CSVConfig.NullString
c.IncludeCommitTs = replicaConfig.Sink.CSVConfig.IncludeCommitTs
c.BinaryEncodingMethod = replicaConfig.Sink.CSVConfig.BinaryEncodingMethod
}
}
if urlParameter.OnlyOutputUpdatedColumns != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/csv/csv_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed, errors.New("no csv row can be found"))
}

e, err := csvMsg2RowChangedEvent(b.msg, b.tableInfo.Columns)
e, err := csvMsg2RowChangedEvent(b.codecConfig, b.msg, b.tableInfo.Columns)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading