Skip to content

Commit

Permalink
mysql(ticdc): consider collation when build the causality key (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Aug 29, 2023
1 parent 55b4e0f commit ac1c85b
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 155 deletions.
13 changes: 7 additions & 6 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,13 @@ func datum2Column(
offset := tableInfo.RowColumnsOffset[colID]
rawCols[offset] = colDatums
cols[offset] = &model.Column{
Name: colName,
Type: colInfo.GetType(),
Charset: colInfo.GetCharset(),
Value: colValue,
Default: defaultValue,
Flag: tableInfo.ColumnsFlag[colID],
Name: colName,
Type: colInfo.GetType(),
Charset: colInfo.GetCharset(),
Collation: colInfo.GetCollate(),
Value: colValue,
Default: defaultValue,
Flag: tableInfo.ColumnsFlag[colID],
// ApproximateBytes = column data size + column struct size
ApproximateBytes: size + sizeOfEmptyColumn,
}
Expand Down
13 changes: 7 additions & 6 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,12 +462,13 @@ func (r *RowChangedEvent) ApproximateBytes() int {

// Column represents a column value in row changed event
type Column struct {
Name string `json:"name" msg:"name"`
Type byte `json:"type" msg:"type"`
Charset string `json:"charset" msg:"charset"`
Flag ColumnFlagType `json:"flag" msg:"-"`
Value interface{} `json:"value" msg:"-"`
Default interface{} `json:"default" msg:"-"`
Name string `json:"name" msg:"name"`
Type byte `json:"type" msg:"type"`
Charset string `json:"charset" msg:"charset"`
Collation string `json:"collation" msg:"collation"`
Flag ColumnFlagType `json:"flag" msg:"-"`
Value interface{} `json:"value" msg:"-"`
Default interface{} `json:"default" msg:"-"`

// ApproximateBytes is approximate bytes consumed by the column.
ApproximateBytes int `json:"-"`
Expand Down
35 changes: 30 additions & 5 deletions cdc/model/sink_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 26 additions & 3 deletions cdc/sink/dmlsink/txn/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import (
"encoding/binary"
"hash/fnv"
"sort"
"strings"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"go.uber.org/zap"
Expand Down Expand Up @@ -100,16 +102,24 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte {
return keys
}

func genKeyList(columns []*model.Column, iIdx int, colIdx []int, tableID int64) []byte {
func genKeyList(
columns []*model.Column, iIdx int, colIdx []int, tableID int64,
) []byte {
var key []byte
for _, i := range colIdx {
// if a column value is null, we can ignore this index
// If the index contain generated column, we can't use this key to detect conflict with other DML,
// Because such as insert can't specified the generated value.
// Because such as insert can't specify the generated value.
if columns[i] == nil || columns[i].Value == nil || columns[i].Flag.IsGeneratedColumn() {
return nil
}
key = append(key, []byte(model.ColumnValueString(columns[i].Value))...)

val := model.ColumnValueString(columns[i].Value)
if columnNeeds2LowerCase(columns[i].Type, columns[i].Collation) {
val = strings.ToLower(val)
}

key = append(key, []byte(val)...)
key = append(key, 0)
}
if len(key) == 0 {
Expand All @@ -121,3 +131,16 @@ func genKeyList(columns []*model.Column, iIdx int, colIdx []int, tableID int64)
key = append(key, tableKey...)
return key
}

func columnNeeds2LowerCase(mysqlType byte, collation string) bool {
switch mysqlType {
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob,
mysql.TypeMediumBlob, mysql.TypeBlob, mysql.TypeLongBlob:
return collationNeeds2LowerCase(collation)
}
return false
}

func collationNeeds2LowerCase(collation string) bool {
return strings.HasSuffix(collation, "_ci")
}
205 changes: 205 additions & 0 deletions cdc/sink/dmlsink/txn/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package txn

import (
"sort"
"testing"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
)

func TestGenKeyListCaseInSensitive(t *testing.T) {
t.Parallel()

columns := []*model.Column{
{
Value: "XyZ",
Type: mysql.TypeVarchar,
Collation: "utf8_unicode_ci",
},
}

first := genKeyList(columns, 0, []int{0}, 1)

columns = []*model.Column{
{
Value: "xYZ",
Type: mysql.TypeVarchar,
Collation: "utf8_unicode_ci",
},
}
second := genKeyList(columns, 0, []int{0}, 1)

require.Equal(t, first, second)
}

func TestGenKeys(t *testing.T) {
t.Parallel()
testCases := []struct {
txn *model.SingleTableTxn
expected []uint64
}{{
txn: &model.SingleTableTxn{},
expected: nil,
}, {
txn: &model.SingleTableTxn{
Rows: []*model.RowChangedEvent{
{
StartTs: 418658114257813514,
CommitTs: 418658114257813515,
Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47},
PreColumns: []*model.Column{
nil,
{
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 12,
},
{
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 1,
},
},
IndexColumns: [][]int{{1, 2}},
}, {
StartTs: 418658114257813514,
CommitTs: 418658114257813515,
Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47},
PreColumns: []*model.Column{
nil,
{
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 1,
},
{
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 21,
},
},
IndexColumns: [][]int{{1, 2}},
},
},
},
expected: []uint64{2072713494, 3710968706},
}, {
txn: &model.SingleTableTxn{
Rows: []*model.RowChangedEvent{
{
StartTs: 418658114257813514,
CommitTs: 418658114257813515,
Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47},
PreColumns: []*model.Column{
nil,
{
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.HandleKeyFlag,
Value: 12,
},
{
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.HandleKeyFlag,
Value: 1,
},
},
IndexColumns: [][]int{{1}, {2}},
}, {
StartTs: 418658114257813514,
CommitTs: 418658114257813515,
Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47},
PreColumns: []*model.Column{
nil,
{
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.HandleKeyFlag,
Value: 1,
},
{
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.HandleKeyFlag,
Value: 21,
},
},
IndexColumns: [][]int{{1}, {2}},
},
},
},
expected: []uint64{318190470, 2109733718, 2658640457, 2989258527},
}, {
txn: &model.SingleTableTxn{
Rows: []*model.RowChangedEvent{
{
StartTs: 418658114257813514,
CommitTs: 418658114257813515,
Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47},
PreColumns: []*model.Column{
nil,
{
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.NullableFlag,
Value: nil,
},
{
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.NullableFlag,
Value: nil,
},
},
IndexColumns: [][]int{{1}, {2}},
}, {
StartTs: 418658114257813514,
CommitTs: 418658114257813515,
Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk", TableID: 47},
PreColumns: []*model.Column{
nil,
{
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.HandleKeyFlag,
Value: 1,
},
{
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.HandleKeyFlag,
Value: 21,
},
},
IndexColumns: [][]int{{1}, {2}},
},
},
},
expected: []uint64{318190470, 2095136920, 2658640457},
}}
for _, tc := range testCases {
keys := genTxnKeys(tc.txn)
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
require.Equal(t, tc.expected, keys)
}
}
Loading

0 comments on commit ac1c85b

Please sign in to comment.