From e04b346e95c2977369a70131921726cc12c69875 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Thu, 10 Aug 2023 18:36:26 +0800 Subject: [PATCH] mysql(ticdc): consider collation when build the causality key (#9534) close pingcap/tiflow#9533 --- cdc/entry/mounter.go | 13 +- cdc/model/sink.go | 13 +- cdc/model/sink_gen.go | 35 +++- cdc/sink/dmlsink/txn/event.go | 29 ++- cdc/sink/dmlsink/txn/event_test.go | 205 ++++++++++++++++++++++ cdc/sink/dmlsink/txn/txn_dml_sink_test.go | 135 -------------- 6 files changed, 275 insertions(+), 155 deletions(-) create mode 100644 cdc/sink/dmlsink/txn/event_test.go diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index ed411e6165d..f24886ce3e9 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -379,12 +379,13 @@ func datum2Column( offset := tableInfo.RowColumnsOffset[colID] rawCols[offset] = colDatums cols[offset] = &model.Column{ - Name: colName, - Type: colInfo.GetType(), - Charset: colInfo.GetCharset(), - Value: colValue, - Default: defaultValue, - Flag: tableInfo.ColumnsFlag[colID], + Name: colName, + Type: colInfo.GetType(), + Charset: colInfo.GetCharset(), + Collation: colInfo.GetCollate(), + Value: colValue, + Default: defaultValue, + Flag: tableInfo.ColumnsFlag[colID], // ApproximateBytes = column data size + column struct size ApproximateBytes: size + sizeOfEmptyColumn, } diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 1e1abf58725..bfc48bda2ee 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -462,12 +462,13 @@ func (r *RowChangedEvent) ApproximateBytes() int { // Column represents a column value in row changed event type Column struct { - Name string `json:"name" msg:"name"` - Type byte `json:"type" msg:"type"` - Charset string `json:"charset" msg:"charset"` - Flag ColumnFlagType `json:"flag" msg:"-"` - Value interface{} `json:"value" msg:"-"` - Default interface{} `json:"default" msg:"-"` + Name string `json:"name" msg:"name"` + Type byte `json:"type" msg:"type"` + Charset string `json:"charset" msg:"charset"` + Collation string `json:"collation" msg:"collation"` + Flag ColumnFlagType `json:"flag" msg:"-"` + Value interface{} `json:"value" msg:"-"` + Default interface{} `json:"default" msg:"-"` // ApproximateBytes is approximate bytes consumed by the column. ApproximateBytes int `json:"-"` diff --git a/cdc/model/sink_gen.go b/cdc/model/sink_gen.go index 98a616f1772..8876f43e88f 100644 --- a/cdc/model/sink_gen.go +++ b/cdc/model/sink_gen.go @@ -42,6 +42,12 @@ func (z *Column) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Charset") return } + case "collation": + z.Collation, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Collation") + return + } case "ApproximateBytes": z.ApproximateBytes, err = dc.ReadInt() if err != nil { @@ -61,9 +67,9 @@ func (z *Column) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *Column) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 4 + // map header, size 5 // write "name" - err = en.Append(0x84, 0xa4, 0x6e, 0x61, 0x6d, 0x65) + err = en.Append(0x85, 0xa4, 0x6e, 0x61, 0x6d, 0x65) if err != nil { return } @@ -92,6 +98,16 @@ func (z *Column) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Charset") return } + // write "collation" + err = en.Append(0xa9, 0x63, 0x6f, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e) + if err != nil { + return + } + err = en.WriteString(z.Collation) + if err != nil { + err = msgp.WrapError(err, "Collation") + return + } // write "ApproximateBytes" err = en.Append(0xb0, 0x41, 0x70, 0x70, 0x72, 0x6f, 0x78, 0x69, 0x6d, 0x61, 0x74, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73) if err != nil { @@ -108,9 +124,9 @@ func (z *Column) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *Column) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 4 + // map header, size 5 // string "name" - o = append(o, 0x84, 0xa4, 0x6e, 0x61, 0x6d, 0x65) + o = append(o, 0x85, 0xa4, 0x6e, 0x61, 0x6d, 0x65) o = msgp.AppendString(o, z.Name) // string "type" o = append(o, 0xa4, 0x74, 0x79, 0x70, 0x65) @@ -118,6 +134,9 @@ func (z *Column) MarshalMsg(b []byte) (o []byte, err error) { // string "charset" o = append(o, 0xa7, 0x63, 0x68, 0x61, 0x72, 0x73, 0x65, 0x74) o = msgp.AppendString(o, z.Charset) + // string "collation" + o = append(o, 0xa9, 0x63, 0x6f, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e) + o = msgp.AppendString(o, z.Collation) // string "ApproximateBytes" o = append(o, 0xb0, 0x41, 0x70, 0x70, 0x72, 0x6f, 0x78, 0x69, 0x6d, 0x61, 0x74, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73) o = msgp.AppendInt(o, z.ApproximateBytes) @@ -160,6 +179,12 @@ func (z *Column) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Charset") return } + case "collation": + z.Collation, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Collation") + return + } case "ApproximateBytes": z.ApproximateBytes, bts, err = msgp.ReadIntBytes(bts) if err != nil { @@ -180,7 +205,7 @@ func (z *Column) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *Column) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 5 + msgp.ByteSize + 8 + msgp.StringPrefixSize + len(z.Charset) + 17 + msgp.IntSize + s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 5 + msgp.ByteSize + 8 + msgp.StringPrefixSize + len(z.Charset) + 10 + msgp.StringPrefixSize + len(z.Collation) + 17 + msgp.IntSize return } diff --git a/cdc/sink/dmlsink/txn/event.go b/cdc/sink/dmlsink/txn/event.go index 441f1eb8ab8..18ea2090f8a 100644 --- a/cdc/sink/dmlsink/txn/event.go +++ b/cdc/sink/dmlsink/txn/event.go @@ -17,9 +17,11 @@ import ( "encoding/binary" "hash/fnv" "sort" + "strings" "time" "github.com/pingcap/log" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "go.uber.org/zap" @@ -100,16 +102,24 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte { return keys } -func genKeyList(columns []*model.Column, iIdx int, colIdx []int, tableID int64) []byte { +func genKeyList( + columns []*model.Column, iIdx int, colIdx []int, tableID int64, +) []byte { var key []byte for _, i := range colIdx { // if a column value is null, we can ignore this index // If the index contain generated column, we can't use this key to detect conflict with other DML, - // Because such as insert can't specified the generated value. + // Because such as insert can't specify the generated value. if columns[i] == nil || columns[i].Value == nil || columns[i].Flag.IsGeneratedColumn() { return nil } - key = append(key, []byte(model.ColumnValueString(columns[i].Value))...) + + val := model.ColumnValueString(columns[i].Value) + if columnNeeds2LowerCase(columns[i].Type, columns[i].Collation) { + val = strings.ToLower(val) + } + + key = append(key, []byte(val)...) key = append(key, 0) } if len(key) == 0 { @@ -121,3 +131,16 @@ func genKeyList(columns []*model.Column, iIdx int, colIdx []int, tableID int64) key = append(key, tableKey...) return key } + +func columnNeeds2LowerCase(mysqlType byte, collation string) bool { + switch mysqlType { + case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, + mysql.TypeMediumBlob, mysql.TypeBlob, mysql.TypeLongBlob: + return collationNeeds2LowerCase(collation) + } + return false +} + +func collationNeeds2LowerCase(collation string) bool { + return strings.HasSuffix(collation, "_ci") +} diff --git a/cdc/sink/dmlsink/txn/event_test.go b/cdc/sink/dmlsink/txn/event_test.go new file mode 100644 index 00000000000..c0dfb00492b --- /dev/null +++ b/cdc/sink/dmlsink/txn/event_test.go @@ -0,0 +1,205 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txn + +import ( + "sort" + "testing" + + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tiflow/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestGenKeyListCaseInSensitive(t *testing.T) { + t.Parallel() + + columns := []*model.Column{ + { + Value: "XyZ", + Type: mysql.TypeVarchar, + Collation: "utf8_unicode_ci", + }, + } + + first := genKeyList(columns, 0, []int{0}, 1) + + columns = []*model.Column{ + { + Value: "xYZ", + Type: mysql.TypeVarchar, + Collation: "utf8_unicode_ci", + }, + } + second := genKeyList(columns, 0, []int{0}, 1) + + require.Equal(t, first, second) +} + +func TestGenKeys(t *testing.T) { + t.Parallel() + testCases := []struct { + txn *model.SingleTableTxn + expected []uint64 + }{{ + txn: &model.SingleTableTxn{}, + expected: nil, + }, { + txn: &model.SingleTableTxn{ + Rows: []*model.RowChangedEvent{ + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47}, + PreColumns: []*model.Column{ + nil, + { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 12, + }, + { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }, + }, + IndexColumns: [][]int{{1, 2}}, + }, { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47}, + PreColumns: []*model.Column{ + nil, + { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }, + { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 21, + }, + }, + IndexColumns: [][]int{{1, 2}}, + }, + }, + }, + expected: []uint64{2072713494, 3710968706}, + }, { + txn: &model.SingleTableTxn{ + Rows: []*model.RowChangedEvent{ + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47}, + PreColumns: []*model.Column{ + nil, + { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.HandleKeyFlag, + Value: 12, + }, + { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.HandleKeyFlag, + Value: 1, + }, + }, + IndexColumns: [][]int{{1}, {2}}, + }, { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47}, + PreColumns: []*model.Column{ + nil, + { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.HandleKeyFlag, + Value: 1, + }, + { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.HandleKeyFlag, + Value: 21, + }, + }, + IndexColumns: [][]int{{1}, {2}}, + }, + }, + }, + expected: []uint64{318190470, 2109733718, 2658640457, 2989258527}, + }, { + txn: &model.SingleTableTxn{ + Rows: []*model.RowChangedEvent{ + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47}, + PreColumns: []*model.Column{ + nil, + { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.NullableFlag, + Value: nil, + }, + { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.NullableFlag, + Value: nil, + }, + }, + IndexColumns: [][]int{{1}, {2}}, + }, { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47}, + PreColumns: []*model.Column{ + nil, + { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.HandleKeyFlag, + Value: 1, + }, + { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.HandleKeyFlag, + Value: 21, + }, + }, + IndexColumns: [][]int{{1}, {2}}, + }, + }, + }, + expected: []uint64{318190470, 2095136920, 2658640457}, + }} + for _, tc := range testCases { + keys := genTxnKeys(tc.txn) + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + require.Equal(t, tc.expected, keys) + } +} diff --git a/cdc/sink/dmlsink/txn/txn_dml_sink_test.go b/cdc/sink/dmlsink/txn/txn_dml_sink_test.go index ac4762d8cef..68269123231 100644 --- a/cdc/sink/dmlsink/txn/txn_dml_sink_test.go +++ b/cdc/sink/dmlsink/txn/txn_dml_sink_test.go @@ -15,12 +15,10 @@ package txn import ( "context" - "sort" "sync/atomic" "testing" "time" - "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/tablesink/state" @@ -105,136 +103,3 @@ func TestTxnSinkNolocking(t *testing.T) { require.Equal(t, uint32(100), atomic.LoadUint32(&handled)) sink.Close() } - -func TestGenKeys(t *testing.T) { - t.Parallel() - testCases := []struct { - txn *model.SingleTableTxn - expected []uint64 - }{{ - txn: &model.SingleTableTxn{}, - expected: nil, - }, { - txn: &model.SingleTableTxn{ - Rows: []*model.RowChangedEvent{ - { - StartTs: 418658114257813514, - CommitTs: 418658114257813515, - Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47}, - PreColumns: []*model.Column{nil, { - Name: "a1", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, - Value: 12, - }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, - Value: 1, - }}, - IndexColumns: [][]int{{1, 2}}, - }, { - StartTs: 418658114257813514, - CommitTs: 418658114257813515, - Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47}, - PreColumns: []*model.Column{nil, { - Name: "a1", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, - Value: 1, - }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, - Value: 21, - }}, - IndexColumns: [][]int{{1, 2}}, - }, - }, - }, - expected: []uint64{2072713494, 3710968706}, - }, { - txn: &model.SingleTableTxn{ - Rows: []*model.RowChangedEvent{ - { - StartTs: 418658114257813514, - CommitTs: 418658114257813515, - Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47}, - PreColumns: []*model.Column{nil, { - Name: "a1", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.HandleKeyFlag, - Value: 12, - }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.HandleKeyFlag, - Value: 1, - }}, - IndexColumns: [][]int{{1}, {2}}, - }, { - StartTs: 418658114257813514, - CommitTs: 418658114257813515, - Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47}, - PreColumns: []*model.Column{nil, { - Name: "a1", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.HandleKeyFlag, - Value: 1, - }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.HandleKeyFlag, - Value: 21, - }}, - IndexColumns: [][]int{{1}, {2}}, - }, - }, - }, - expected: []uint64{318190470, 2109733718, 2658640457, 2989258527}, - }, { - txn: &model.SingleTableTxn{ - Rows: []*model.RowChangedEvent{ - { - StartTs: 418658114257813514, - CommitTs: 418658114257813515, - Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47}, - PreColumns: []*model.Column{nil, { - Name: "a1", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.NullableFlag, - Value: nil, - }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.NullableFlag, - Value: nil, - }}, - IndexColumns: [][]int{{1}, {2}}, - }, { - StartTs: 418658114257813514, - CommitTs: 418658114257813515, - Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47}, - PreColumns: []*model.Column{nil, { - Name: "a1", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.HandleKeyFlag, - Value: 1, - }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.HandleKeyFlag, - Value: 21, - }}, - IndexColumns: [][]int{{1}, {2}}, - }, - }, - }, - expected: []uint64{318190470, 2095136920, 2658640457}, - }} - for _, tc := range testCases { - keys := genTxnKeys(tc.txn) - sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) - require.Equal(t, tc.expected, keys) - } -}