diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 3a2e879bec9..ae1e4627cde 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -266,10 +266,11 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( var csvConfig *config.CSVConfig if c.Sink.CSVConfig != nil { csvConfig = &config.CSVConfig{ - Delimiter: c.Sink.CSVConfig.Delimiter, - Quote: c.Sink.CSVConfig.Quote, - NullString: c.Sink.CSVConfig.NullString, - IncludeCommitTs: c.Sink.CSVConfig.IncludeCommitTs, + Delimiter: c.Sink.CSVConfig.Delimiter, + Quote: c.Sink.CSVConfig.Quote, + NullString: c.Sink.CSVConfig.NullString, + IncludeCommitTs: c.Sink.CSVConfig.IncludeCommitTs, + BinaryEncodingMethod: c.Sink.CSVConfig.BinaryEncodingMethod, } } var kafkaConfig *config.KafkaConfig @@ -491,10 +492,11 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { var csvConfig *CSVConfig if cloned.Sink.CSVConfig != nil { csvConfig = &CSVConfig{ - Delimiter: cloned.Sink.CSVConfig.Delimiter, - Quote: cloned.Sink.CSVConfig.Quote, - NullString: cloned.Sink.CSVConfig.NullString, - IncludeCommitTs: cloned.Sink.CSVConfig.IncludeCommitTs, + Delimiter: cloned.Sink.CSVConfig.Delimiter, + Quote: cloned.Sink.CSVConfig.Quote, + NullString: cloned.Sink.CSVConfig.NullString, + IncludeCommitTs: cloned.Sink.CSVConfig.IncludeCommitTs, + BinaryEncodingMethod: cloned.Sink.CSVConfig.BinaryEncodingMethod, } } var kafkaConfig *KafkaConfig @@ -766,10 +768,11 @@ type SinkConfig struct { // CSVConfig denotes the csv config // This is the same as config.CSVConfig type CSVConfig struct { - Delimiter string `json:"delimiter"` - Quote string `json:"quote"` - NullString string `json:"null"` - IncludeCommitTs bool `json:"include_commit_ts"` + Delimiter string `json:"delimiter"` + Quote string `json:"quote"` + NullString string `json:"null"` + IncludeCommitTs bool `json:"include_commit_ts"` + BinaryEncodingMethod string `json:"binary_encoding_method"` } // LargeMessageHandleConfig denotes the large message handling config diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index 2ce07bcb49c..bec23c97b7e 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -45,9 +45,10 @@ var defaultAPIConfig = &ReplicaConfig{ }, Sink: &SinkConfig{ CSVConfig: &CSVConfig{ - Quote: string(config.DoubleQuoteChar), - Delimiter: config.Comma, - NullString: config.NULL, + Quote: string(config.DoubleQuoteChar), + Delimiter: config.Comma, + NullString: config.NULL, + BinaryEncodingMethod: config.BinaryEncodingBase64, }, EncoderConcurrency: util.AddressOf(16), Terminator: util.AddressOf(config.CRLF), diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 7f58c8fa33e..aa55e9452e5 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1345,6 +1345,10 @@ var doc = `{ "config.CSVConfig": { "type": "object", "properties": { + "binary-encoding-method": { + "description": "encoding method of binary type", + "type": "string" + }, "delimiter": { "description": "delimiter between fields", "type": "string" @@ -1977,6 +1981,9 @@ var doc = `{ "v2.CSVConfig": { "type": "object", "properties": { + "binary_encoding_method": { + "type": "string" + }, "delimiter": { "type": "string" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 1da9e6c2a17..27e3a4df128 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1326,6 +1326,10 @@ "config.CSVConfig": { "type": "object", "properties": { + "binary-encoding-method": { + "description": "encoding method of binary type", + "type": "string" + }, "delimiter": { "description": "delimiter between fields", "type": "string" @@ -1958,6 +1962,9 @@ "v2.CSVConfig": { "type": "object", "properties": { + "binary_encoding_method": { + "type": "string" + }, "delimiter": { "type": "string" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 9bdd0cecd89..cfde0bf41e1 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -1,6 +1,9 @@ definitions: config.CSVConfig: properties: + binary-encoding-method: + description: encoding method of binary type + type: string delimiter: description: delimiter between fields type: string @@ -435,6 +438,8 @@ definitions: type: object v2.CSVConfig: properties: + binary_encoding_method: + type: string delimiter: type: string include_commit_ts: diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index 2ba4f90cf8f..56700cfaf49 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -201,9 +201,10 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) { {Matcher: []string{"test3.*", "test4.*"}, Columns: []string{"!a", "column3"}}, }, CSVConfig: &config.CSVConfig{ - Quote: string(config.DoubleQuoteChar), - Delimiter: string(config.Comma), - NullString: config.NULL, + Quote: string(config.DoubleQuoteChar), + Delimiter: string(config.Comma), + NullString: config.NULL, + BinaryEncodingMethod: config.BinaryEncodingBase64, }, Terminator: util.AddressOf("\r\n"), DateSeparator: util.AddressOf(config.DateSeparatorDay.String()), @@ -236,10 +237,11 @@ func TestAndWriteStorageSinkTOML(t *testing.T) { FileIndexWidth: util.AddressOf(config.DefaultFileIndexWidth), EnableKafkaSinkV2: util.AddressOf(false), CSVConfig: &config.CSVConfig{ - Delimiter: ",", - Quote: "\"", - NullString: "\\N", - IncludeCommitTs: false, + Delimiter: ",", + Quote: "\"", + NullString: "\\N", + IncludeCommitTs: false, + BinaryEncodingMethod: config.BinaryEncodingBase64, }, OnlyOutputUpdatedColumns: util.AddressOf(false), DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false), diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 9d0e6eea3c3..034fe031dc7 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -203,7 +203,8 @@ const ( "delimiter": ",", "quote": "\"", "null": "\\N", - "include-commit-ts": true + "include-commit-ts": true, + "binary-encoding-method":"base64" }, "date-separator": "month", "enable-partition-separator": true, @@ -334,7 +335,8 @@ const ( "delimiter": ",", "quote": "\"", "null": "\\N", - "include-commit-ts": true + "include-commit-ts": true, + "binary-encoding-method":"base64" }, "terminator": "\r\n", "transaction-atomicity": "", diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index d09363e7fbf..37b2b58fdad 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -55,9 +55,10 @@ var defaultReplicaConfig = &ReplicaConfig{ }, Sink: &SinkConfig{ CSVConfig: &CSVConfig{ - Quote: string(DoubleQuoteChar), - Delimiter: Comma, - NullString: NULL, + Quote: string(DoubleQuoteChar), + Delimiter: Comma, + NullString: NULL, + BinaryEncodingMethod: BinaryEncodingBase64, }, EncoderConcurrency: util.AddressOf(16), Terminator: util.AddressOf(CRLF), diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index f1dbbee3613..b6535a6417e 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -49,10 +49,11 @@ func TestReplicaConfigMarshal(t *testing.T) { }, } conf.Sink.CSVConfig = &CSVConfig{ - Delimiter: ",", - Quote: "\"", - NullString: `\N`, - IncludeCommitTs: true, + Delimiter: ",", + Quote: "\"", + NullString: `\N`, + IncludeCommitTs: true, + BinaryEncodingMethod: BinaryEncodingBase64, } conf.Sink.TxnAtomicity = util.AddressOf(unknownTxnAtomicity) conf.Sink.DateSeparator = util.AddressOf("month") diff --git a/pkg/config/sink.go b/pkg/config/sink.go index d6cc94d7472..47ea612f444 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -63,6 +63,11 @@ const ( MaxFileIndexWidth = 20 // enough for 2^64 files // DefaultFileIndexWidth is the default width of file index. DefaultFileIndexWidth = MaxFileIndexWidth + + // BinaryEncodingHex encodes binary data to hex string. + BinaryEncodingHex = "hex" + // BinaryEncodingBase64 encodes binary data to base64 string. + BinaryEncodingBase64 = "base64" ) // AtomicityLevel represents the atomicity level of a changefeed. @@ -164,6 +169,8 @@ type CSVConfig struct { NullString string `toml:"null" json:"null"` // whether to include commit ts IncludeCommitTs bool `toml:"include-commit-ts" json:"include-commit-ts"` + // encoding method of binary type + BinaryEncodingMethod string `toml:"binary-encoding-method" json:"binary-encoding-method"` } func (c *CSVConfig) validateAndAdjust() error { @@ -199,6 +206,14 @@ func (c *CSVConfig) validateAndAdjust() error { errors.New("csv config quote and delimiter cannot be the same")) } + // validate binary encoding method + switch c.BinaryEncodingMethod { + case BinaryEncodingHex, BinaryEncodingBase64: + default: + return cerror.WrapError(cerror.ErrSinkInvalidConfig, + errors.New("csv config binary-encoding-method can only be hex or base64")) + } + return nil } diff --git a/pkg/config/sink_test.go b/pkg/config/sink_test.go index daa893e2578..cd1a77522d8 100644 --- a/pkg/config/sink_test.go +++ b/pkg/config/sink_test.go @@ -281,8 +281,9 @@ func TestValidateAndAdjustCSVConfig(t *testing.T) { { name: "valid quote", config: &CSVConfig{ - Quote: "\"", - Delimiter: ",", + Quote: "\"", + Delimiter: ",", + BinaryEncodingMethod: BinaryEncodingBase64, }, wantErr: "", }, @@ -303,8 +304,9 @@ func TestValidateAndAdjustCSVConfig(t *testing.T) { { name: "valid delimiter1", config: &CSVConfig{ - Quote: "\"", - Delimiter: ",", + Quote: "\"", + Delimiter: ",", + BinaryEncodingMethod: BinaryEncodingHex, }, wantErr: "", }, @@ -332,6 +334,15 @@ func TestValidateAndAdjustCSVConfig(t *testing.T) { }, wantErr: "csv config quote and delimiter cannot be the same", }, + { + name: "invalid binary encoding method", + config: &CSVConfig{ + Quote: "\"", + Delimiter: ",", + BinaryEncodingMethod: "invalid", + }, + wantErr: "csv config binary-encoding-method can only be hex or base64", + }, } for _, c := range tests { tc := c diff --git a/pkg/sink/codec/common/config.go b/pkg/sink/codec/common/config.go index 42abcbe0de2..07f004e622c 100644 --- a/pkg/sink/codec/common/config.go +++ b/pkg/sink/codec/common/config.go @@ -58,11 +58,12 @@ type Config struct { AvroEnableWatermark bool // for sinking to cloud storage - Delimiter string - Quote string - NullString string - IncludeCommitTs bool - Terminator string + Delimiter string + Quote string + NullString string + IncludeCommitTs bool + Terminator string + BinaryEncodingMethod string // for open protocol OnlyOutputUpdatedColumns bool @@ -174,6 +175,7 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er c.Quote = replicaConfig.Sink.CSVConfig.Quote c.NullString = replicaConfig.Sink.CSVConfig.NullString c.IncludeCommitTs = replicaConfig.Sink.CSVConfig.IncludeCommitTs + c.BinaryEncodingMethod = replicaConfig.Sink.CSVConfig.BinaryEncodingMethod } if replicaConfig.Sink.KafkaConfig != nil { c.LargeMessageHandle = replicaConfig.Sink.KafkaConfig.LargeMessageHandle diff --git a/pkg/sink/codec/csv/csv_decoder.go b/pkg/sink/codec/csv/csv_decoder.go index 84e1f29d0fa..b2cecee1026 100644 --- a/pkg/sink/codec/csv/csv_decoder.go +++ b/pkg/sink/codec/csv/csv_decoder.go @@ -109,7 +109,7 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed, errors.New("no csv row can be found")) } - e, err := csvMsg2RowChangedEvent(b.msg, b.tableInfo.Columns) + e, err := csvMsg2RowChangedEvent(b.codecConfig, b.msg, b.tableInfo.Columns) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/sink/codec/csv/csv_message.go b/pkg/sink/codec/csv/csv_message.go index be7d2600d95..e208cb36147 100644 --- a/pkg/sink/codec/csv/csv_message.go +++ b/pkg/sink/codec/csv/csv_message.go @@ -15,6 +15,7 @@ package csv import ( "encoding/base64" + "encoding/hex" "fmt" "strconv" "strings" @@ -233,7 +234,7 @@ func (c *csvMessage) formatValue(value any, strBuilder *strings.Builder) { } } -func fromCsvValToColValue(csvVal any, ft types.FieldType) (any, error) { +func fromCsvValToColValue(csvConfig *common.Config, csvVal any, ft types.FieldType) (any, error) { str, ok := csvVal.(string) if !ok { return csvVal, nil @@ -243,9 +244,18 @@ func fromCsvValToColValue(csvVal any, ft types.FieldType) (any, error) { case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: if ft.GetCharset() == charset.CharsetBin { - blob, err := base64.StdEncoding.DecodeString(str) - return blob, err + switch csvConfig.BinaryEncodingMethod { + case config.BinaryEncodingBase64: + return base64.StdEncoding.DecodeString(str) + case config.BinaryEncodingHex: + return hex.DecodeString(str) + default: + return nil, cerror.WrapError(cerror.ErrCSVEncodeFailed, + errors.Errorf("unsupported binary encoding method %s", + csvConfig.BinaryEncodingMethod)) + } } + return []byte(str), nil case mysql.TypeFloat: val, err := strconv.ParseFloat(str, 32) @@ -269,7 +279,7 @@ func fromCsvValToColValue(csvVal any, ft types.FieldType) (any, error) { } // fromColValToCsvVal converts column from TiDB type to csv type. -func fromColValToCsvVal(col *model.Column, ft *types.FieldType) (any, error) { +func fromColValToCsvVal(csvConfig *common.Config, col *model.Column, ft *types.FieldType) (any, error) { if col.Value == nil { return nil, nil } @@ -279,7 +289,16 @@ func fromColValToCsvVal(col *model.Column, ft *types.FieldType) (any, error) { mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: if col.Flag.IsBinary() { if v, ok := col.Value.([]byte); ok { - return base64.StdEncoding.EncodeToString(v), nil + switch csvConfig.BinaryEncodingMethod { + case config.BinaryEncodingBase64: + return base64.StdEncoding.EncodeToString(v), nil + case config.BinaryEncodingHex: + return hex.EncodeToString(v), nil + default: + return nil, cerror.WrapError(cerror.ErrCSVEncodeFailed, + errors.Errorf("unsupported binary encoding method %s", + csvConfig.BinaryEncodingMethod)) + } } return col.Value, nil } @@ -323,7 +342,7 @@ func rowChangedEvent2CSVMsg(csvConfig *common.Config, e *model.RowChangedEvent) } if e.IsDelete() { csvMsg.opType = operationDelete - csvMsg.columns, err = rowChangeColumns2CSVColumns(e.PreColumns, e.ColInfos) + csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.PreColumns, e.ColInfos) if err != nil { return nil, err } @@ -334,7 +353,7 @@ func rowChangedEvent2CSVMsg(csvConfig *common.Config, e *model.RowChangedEvent) csvMsg.opType = operationUpdate } // for insert and update operation, we only record the after columns. - csvMsg.columns, err = rowChangeColumns2CSVColumns(e.Columns, e.ColInfos) + csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.Columns, e.ColInfos) if err != nil { return nil, err } @@ -342,7 +361,7 @@ func rowChangedEvent2CSVMsg(csvConfig *common.Config, e *model.RowChangedEvent) return csvMsg, nil } -func csvMsg2RowChangedEvent(csvMsg *csvMessage, ticols []*timodel.ColumnInfo) (*model.RowChangedEvent, error) { +func csvMsg2RowChangedEvent(csvConfig *common.Config, csvMsg *csvMessage, ticols []*timodel.ColumnInfo) (*model.RowChangedEvent, error) { var err error if len(csvMsg.columns) != len(ticols) { return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed, @@ -357,9 +376,9 @@ func csvMsg2RowChangedEvent(csvMsg *csvMessage, ticols []*timodel.ColumnInfo) (* Table: csvMsg.tableName, } if csvMsg.opType == operationDelete { - e.PreColumns, err = csvColumns2RowChangeColumns(csvMsg.columns, ticols) + e.PreColumns, err = csvColumns2RowChangeColumns(csvConfig, csvMsg.columns, ticols) } else { - e.Columns, err = csvColumns2RowChangeColumns(csvMsg.columns, ticols) + e.Columns, err = csvColumns2RowChangeColumns(csvConfig, csvMsg.columns, ticols) } if err != nil { @@ -369,7 +388,7 @@ func csvMsg2RowChangedEvent(csvMsg *csvMessage, ticols []*timodel.ColumnInfo) (* return e, nil } -func rowChangeColumns2CSVColumns(cols []*model.Column, colInfos []rowcodec.ColInfo) ([]any, error) { +func rowChangeColumns2CSVColumns(csvConfig *common.Config, cols []*model.Column, colInfos []rowcodec.ColInfo) ([]any, error) { var csvColumns []any for i, column := range cols { // column could be nil in a condition described in @@ -378,7 +397,7 @@ func rowChangeColumns2CSVColumns(cols []*model.Column, colInfos []rowcodec.ColIn continue } - converted, err := fromColValToCsvVal(column, colInfos[i].Ft) + converted, err := fromColValToCsvVal(csvConfig, column, colInfos[i].Ft) if err != nil { return nil, errors.Trace(err) } @@ -388,7 +407,7 @@ func rowChangeColumns2CSVColumns(cols []*model.Column, colInfos []rowcodec.ColIn return csvColumns, nil } -func csvColumns2RowChangeColumns(csvCols []any, ticols []*timodel.ColumnInfo) ([]*model.Column, error) { +func csvColumns2RowChangeColumns(csvConfig *common.Config, csvCols []any, ticols []*timodel.ColumnInfo) ([]*model.Column, error) { cols := make([]*model.Column, 0, len(csvCols)) for idx, csvCol := range csvCols { col := new(model.Column) @@ -402,7 +421,7 @@ func csvColumns2RowChangeColumns(csvCols []any, ticols []*timodel.ColumnInfo) ([ col.Flag.SetIsPrimaryKey() } - val, err := fromCsvValToColValue(csvCol, ticol.FieldType) + val, err := fromCsvValToColValue(csvConfig, csvCol, ticol.FieldType) if err != nil { return cols, err } diff --git a/pkg/sink/codec/csv/csv_message_test.go b/pkg/sink/codec/csv/csv_message_test.go index a0b7f04485c..6dc680ae386 100644 --- a/pkg/sink/codec/csv/csv_message_test.go +++ b/pkg/sink/codec/csv/csv_message_test.go @@ -24,14 +24,16 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/stretchr/testify/require" ) type csvTestColumnTuple struct { - col model.Column - colInfo rowcodec.ColInfo - want interface{} + col model.Column + colInfo rowcodec.ColInfo + want interface{} + BinaryEncodingMethod string } var csvTestColumnsGroup = [][]*csvTestColumnTuple{ @@ -45,6 +47,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeTiny), }, int64(1), + config.BinaryEncodingBase64, }, { model.Column{Name: "short", Value: int64(1), Type: mysql.TypeShort}, @@ -55,6 +58,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeShort), }, int64(1), + config.BinaryEncodingBase64, }, { model.Column{Name: "int24", Value: int64(1), Type: mysql.TypeInt24}, @@ -65,6 +69,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeInt24), }, int64(1), + config.BinaryEncodingBase64, }, { model.Column{Name: "long", Value: int64(1), Type: mysql.TypeLong}, @@ -75,6 +80,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeLong), }, int64(1), + config.BinaryEncodingBase64, }, { model.Column{Name: "longlong", Value: int64(1), Type: mysql.TypeLonglong}, @@ -85,6 +91,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeLonglong), }, int64(1), + config.BinaryEncodingBase64, }, { model.Column{ @@ -100,6 +107,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: setFlag(types.NewFieldType(mysql.TypeTiny), uint(model.UnsignedFlag)), }, uint64(1), + config.BinaryEncodingBase64, }, { model.Column{ @@ -115,6 +123,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: setFlag(types.NewFieldType(mysql.TypeShort), uint(model.UnsignedFlag)), }, uint64(1), + config.BinaryEncodingBase64, }, { model.Column{ @@ -130,6 +139,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: setFlag(types.NewFieldType(mysql.TypeInt24), uint(model.UnsignedFlag)), }, uint64(1), + config.BinaryEncodingBase64, }, { model.Column{ @@ -145,6 +155,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: setFlag(types.NewFieldType(mysql.TypeLong), uint(model.UnsignedFlag)), }, uint64(1), + config.BinaryEncodingBase64, }, { model.Column{ @@ -163,6 +174,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ ), }, uint64(1), + config.BinaryEncodingBase64, }, }, { @@ -175,6 +187,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeFloat), }, float64(3.14), + config.BinaryEncodingBase64, }, { model.Column{Name: "double", Value: float64(3.14), Type: mysql.TypeDouble}, @@ -185,6 +198,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeDouble), }, float64(3.14), + config.BinaryEncodingBase64, }, }, { @@ -197,6 +211,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeBit), }, uint64(683), + config.BinaryEncodingBase64, }, }, { @@ -209,6 +224,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeNewDecimal), }, "129012.1230000", + config.BinaryEncodingBase64, }, }, { @@ -221,6 +237,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeBlob), }, "hello world", + config.BinaryEncodingBase64, }, { model.Column{Name: "mediumtext", Value: []byte("hello world"), Type: mysql.TypeMediumBlob}, @@ -231,6 +248,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeMediumBlob), }, "hello world", + config.BinaryEncodingBase64, }, { model.Column{Name: "text", Value: []byte("hello world"), Type: mysql.TypeBlob}, @@ -241,6 +259,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeBlob), }, "hello world", + config.BinaryEncodingBase64, }, { model.Column{Name: "longtext", Value: []byte("hello world"), Type: mysql.TypeLongBlob}, @@ -251,6 +270,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeLongBlob), }, "hello world", + config.BinaryEncodingBase64, }, { model.Column{Name: "varchar", Value: []byte("hello world"), Type: mysql.TypeVarchar}, @@ -261,6 +281,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeVarchar), }, "hello world", + config.BinaryEncodingBase64, }, { model.Column{Name: "varstring", Value: []byte("hello world"), Type: mysql.TypeVarString}, @@ -271,6 +292,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeVarString), }, "hello world", + config.BinaryEncodingBase64, }, { model.Column{Name: "string", Value: []byte("hello world"), Type: mysql.TypeString}, @@ -281,6 +303,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeString), }, "hello world", + config.BinaryEncodingBase64, }, { model.Column{Name: "json", Value: `{"key": "value"}`, Type: mysql.TypeJSON}, @@ -291,6 +314,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeJSON), }, `{"key": "value"}`, + config.BinaryEncodingBase64, }, }, { @@ -308,6 +332,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeTinyBlob)), }, "aGVsbG8gd29ybGQ=", + config.BinaryEncodingBase64, }, { model.Column{ @@ -323,6 +348,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeMediumBlob)), }, "aGVsbG8gd29ybGQ=", + config.BinaryEncodingBase64, }, { model.Column{ @@ -338,6 +364,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeBlob)), }, "aGVsbG8gd29ybGQ=", + config.BinaryEncodingBase64, }, { model.Column{ @@ -353,6 +380,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeLongBlob)), }, "aGVsbG8gd29ybGQ=", + config.BinaryEncodingBase64, }, { model.Column{ @@ -368,6 +396,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeVarchar)), }, "aGVsbG8gd29ybGQ=", + config.BinaryEncodingBase64, }, { model.Column{ @@ -383,6 +412,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeVarString)), }, "aGVsbG8gd29ybGQ=", + config.BinaryEncodingBase64, }, { model.Column{ @@ -398,6 +428,121 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeString)), }, "aGVsbG8gd29ybGQ=", + config.BinaryEncodingBase64, + }, + }, + { + { + model.Column{ + Name: "tinyblob", + Value: []byte("hello world"), + Type: mysql.TypeTinyBlob, + Flag: model.BinaryFlag, + }, + rowcodec.ColInfo{ + ID: 22, + IsPKHandle: false, + VirtualGenCol: false, + Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeTinyBlob)), + }, + "68656c6c6f20776f726c64", + config.BinaryEncodingHex, + }, + { + model.Column{ + Name: "mediumblob", + Value: []byte("hello world"), + Type: mysql.TypeMediumBlob, + Flag: model.BinaryFlag, + }, + rowcodec.ColInfo{ + ID: 23, + IsPKHandle: false, + VirtualGenCol: false, + Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeMediumBlob)), + }, + "68656c6c6f20776f726c64", + config.BinaryEncodingHex, + }, + { + model.Column{ + Name: "blob", + Value: []byte("hello world"), + Type: mysql.TypeBlob, + Flag: model.BinaryFlag, + }, + rowcodec.ColInfo{ + ID: 24, + IsPKHandle: false, + VirtualGenCol: false, + Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeBlob)), + }, + "68656c6c6f20776f726c64", + config.BinaryEncodingHex, + }, + { + model.Column{ + Name: "longblob", + Value: []byte("hello world"), + Type: mysql.TypeLongBlob, + Flag: model.BinaryFlag, + }, + rowcodec.ColInfo{ + ID: 25, + IsPKHandle: false, + VirtualGenCol: false, + Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeLongBlob)), + }, + "68656c6c6f20776f726c64", + config.BinaryEncodingHex, + }, + { + model.Column{ + Name: "varbinary", + Value: []byte("hello world"), + Type: mysql.TypeVarchar, + Flag: model.BinaryFlag, + }, + rowcodec.ColInfo{ + ID: 26, + IsPKHandle: false, + VirtualGenCol: false, + Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeVarchar)), + }, + "68656c6c6f20776f726c64", + config.BinaryEncodingHex, + }, + { + model.Column{ + Name: "varbinary1", + Value: []byte("hello world"), + Type: mysql.TypeVarString, + Flag: model.BinaryFlag, + }, + rowcodec.ColInfo{ + ID: 27, + IsPKHandle: false, + VirtualGenCol: false, + Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeVarString)), + }, + "68656c6c6f20776f726c64", + config.BinaryEncodingHex, + }, + { + model.Column{ + Name: "binary", + Value: []byte("hello world"), + Type: mysql.TypeString, + Flag: model.BinaryFlag, + }, + rowcodec.ColInfo{ + ID: 28, + IsPKHandle: false, + VirtualGenCol: false, + Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeString)), + }, + "68656c6c6f20776f726c64", + config.BinaryEncodingHex, }, }, { @@ -410,6 +555,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: setElems(types.NewFieldType(mysql.TypeEnum), []string{"a,", "b"}), }, "a,", + config.BinaryEncodingBase64, }, }, { @@ -422,6 +568,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: setElems(types.NewFieldType(mysql.TypeSet), []string{"a", "b", "c", "d"}), }, "a,d", + config.BinaryEncodingBase64, }, }, { @@ -434,6 +581,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeDate), }, "2000-01-01", + config.BinaryEncodingBase64, }, { model.Column{Name: "datetime", Value: "2015-12-20 23:58:58", Type: mysql.TypeDatetime}, @@ -444,6 +592,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeDatetime), }, "2015-12-20 23:58:58", + config.BinaryEncodingBase64, }, { model.Column{Name: "timestamp", Value: "1973-12-30 15:30:00", Type: mysql.TypeTimestamp}, @@ -454,6 +603,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeTimestamp), }, "1973-12-30 15:30:00", + config.BinaryEncodingBase64, }, { model.Column{Name: "time", Value: "23:59:59", Type: mysql.TypeDuration}, @@ -464,6 +614,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeDuration), }, "23:59:59", + config.BinaryEncodingBase64, }, }, { @@ -476,6 +627,7 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeYear), }, int64(1970), + config.BinaryEncodingBase64, }, }, } @@ -762,7 +914,9 @@ func TestCSVMessageEncode(t *testing.T) { func TestConvertToCSVType(t *testing.T) { for _, group := range csvTestColumnsGroup { for _, c := range group { - val, _ := fromColValToCsvVal(&c.col, c.colInfo.Ft) + val, _ := fromColValToCsvVal(&common.Config{ + BinaryEncodingMethod: c.BinaryEncodingMethod, + }, &c.col, c.colInfo.Ft) require.Equal(t, c.want, val, c.col.Name) } } @@ -792,11 +946,12 @@ func TestRowChangeEventConversion(t *testing.T) { row.Columns = cols } csvMsg, err := rowChangedEvent2CSVMsg(&common.Config{ - Delimiter: "\t", - Quote: "\"", - Terminator: "\n", - NullString: "\\N", - IncludeCommitTs: true, + Delimiter: "\t", + Quote: "\"", + Terminator: "\n", + NullString: "\\N", + IncludeCommitTs: true, + BinaryEncodingMethod: group[0].BinaryEncodingMethod, }, row) require.NotNil(t, csvMsg) require.Nil(t, err) @@ -815,7 +970,9 @@ func TestRowChangeEventConversion(t *testing.T) { ticols = append(ticols, ticol) } - row2, err := csvMsg2RowChangedEvent(csvMsg, ticols) + row2, err := csvMsg2RowChangedEvent(&common.Config{ + BinaryEncodingMethod: group[0].BinaryEncodingMethod, + }, csvMsg, ticols) require.Nil(t, err) require.NotNil(t, row2) }