Skip to content

Commit

Permalink
codec(ticdc): simple protocol set table id by using the physical tabl…
Browse files Browse the repository at this point in the history
…e id (#11845) (#11878)

close #11846
  • Loading branch information
ti-chi-bot authored Dec 16, 2024
1 parent 9ccca55 commit 9d7482a
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 4 deletions.
14 changes: 12 additions & 2 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,23 @@ func (s *SchemaTestHelper) DDL2Jobs(ddl string, jobCnt int) []*timodel.Job {
// DML2Event execute the dml and return the corresponding row changed event.
// caution: it does not support `delete` since the key value cannot be found
// after the query executed.
func (s *SchemaTestHelper) DML2Event(dml string, schema, table string) *model.RowChangedEvent {
func (s *SchemaTestHelper) DML2Event(dml string, schema, table string, partitionID ...string) *model.RowChangedEvent {
s.tk.MustExec(dml)

tableInfo, ok := s.schemaStorage.GetLastSnapshot().TableByName(schema, table)
require.True(s.t, ok)

key, value := s.getLastKeyValue(tableInfo.ID)
tableID := tableInfo.ID

var partitionTableID int64 = -1
if len(partitionID) == 1 {
partitionTableID = tableInfo.TableInfo.GetPartitionInfo().GetPartitionIDByName(partitionID[0])
}
if partitionTableID != -1 {
tableID = partitionTableID
}

key, value := s.getLastKeyValue(tableID)
ts := s.schemaStorage.GetLastSnapshot().CurrentTs()
rawKV := &model.RawKVEntry{
OpType: model.OpTypePut,
Expand Down
5 changes: 5 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,11 @@ func (e txnRows) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}

// GetTableID returns the table ID of the event.
func (r *RowChangedEvent) GetTableID() int64 {
return r.PhysicalTableID
}

// GetCommitTs returns the commit timestamp of this event.
func (r *RowChangedEvent) GetCommitTs() uint64 {
return r.CommitTs
Expand Down
52 changes: 52 additions & 0 deletions pkg/sink/codec/canal/canal_json_row_event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,58 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) {
}
}

func TestE2EPartitionTable(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolCanalJSON)

builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()

decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

helper.Tk().MustExec("use test")

createPartitionTableDDL := helper.DDL2Event(`create table test.t(a int primary key, b int) partition by range (a) (
partition p0 values less than (10),
partition p1 values less than (20),
partition p2 values less than MAXVALUE)`)
require.NotNil(t, createPartitionTableDDL)

insertEvent := helper.DML2Event(`insert into test.t values (1, 1)`, "test", "t", "p0")
require.NotNil(t, insertEvent)

insertEvent1 := helper.DML2Event(`insert into test.t values (11, 11)`, "test", "t", "p1")
require.NotNil(t, insertEvent1)

insertEvent2 := helper.DML2Event(`insert into test.t values (21, 21)`, "test", "t", "p2")
require.NotNil(t, insertEvent2)

events := []*model.RowChangedEvent{insertEvent, insertEvent1, insertEvent2}

for _, event := range events {
err = encoder.AppendRowChangedEvent(ctx, "", event, nil)
require.NoError(t, err)
message := encoder.Build()[0]

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)
tp, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, tp)

decodedEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
// canal-json does not support encode the table id, so it's 0
require.Equal(t, decodedEvent.GetTableID(), int64(0))
}
}

func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) {
_, insertEvent, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig())
ctx := context.Background()
Expand Down
53 changes: 53 additions & 0 deletions pkg/sink/codec/open/open_protocol_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package open
import (
"context"
"database/sql"
"github.com/pingcap/tiflow/cdc/entry"
"testing"

timodel "github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -439,6 +440,58 @@ func TestEncodeDecodeE2E(t *testing.T) {
}
}

func TestE2EPartitionTable(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolOpen)

builder, err := NewBatchEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()

decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

helper.Tk().MustExec("use test")

createPartitionTableDDL := helper.DDL2Event(`create table test.t(a int primary key, b int) partition by range (a) (
partition p0 values less than (10),
partition p1 values less than (20),
partition p2 values less than MAXVALUE)`)
require.NotNil(t, createPartitionTableDDL)

insertEvent := helper.DML2Event(`insert into test.t values (1, 1)`, "test", "t", "p0")
require.NotNil(t, insertEvent)

insertEvent1 := helper.DML2Event(`insert into test.t values (11, 11)`, "test", "t", "p1")
require.NotNil(t, insertEvent1)

insertEvent2 := helper.DML2Event(`insert into test.t values (21, 21)`, "test", "t", "p2")
require.NotNil(t, insertEvent2)

events := []*model.RowChangedEvent{insertEvent, insertEvent1, insertEvent2}

for _, event := range events {
err = encoder.AppendRowChangedEvent(ctx, "", event, nil)
require.NoError(t, err)
message := encoder.Build()[0]

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)
tp, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, tp)

decodedEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
// table id should be set to the partition table id, the PhysicalTableID
require.Equal(t, decodedEvent.GetTableID(), event.GetTableID())
}
}

func TestE2EDDLCompression(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/simple/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (a *avroMarshaller) newDMLMessageMap(
"version": defaultVersion,
"database": event.TableInfo.GetSchemaName(),
"table": event.TableInfo.GetTableName(),
"tableID": event.TableInfo.ID,
"tableID": event.GetTableID(),
"commitTs": int64(event.CommitTs),
"buildTs": time.Now().UnixMilli(),
"schemaVersion": int64(event.TableInfo.UpdateTS),
Expand Down
152 changes: 152 additions & 0 deletions pkg/sink/codec/simple/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,78 @@ func TestEncodeDMLEnableChecksum(t *testing.T) {
require.Nil(t, decodedRow)
}

func TestE2EPartitionTable(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")

createPartitionTableDDL := helper.DDL2Event(`create table test.t(a int primary key, b int) partition by range (a) (
partition p0 values less than (10),
partition p1 values less than (20),
partition p2 values less than MAXVALUE)`)
require.NotNil(t, createPartitionTableDDL)

insertEvent := helper.DML2Event(`insert into test.t values (1, 1)`, "test", "t", "p0")
require.NotNil(t, insertEvent)

insertEvent1 := helper.DML2Event(`insert into test.t values (11, 11)`, "test", "t", "p1")
require.NotNil(t, insertEvent1)

insertEvent2 := helper.DML2Event(`insert into test.t values (21, 21)`, "test", "t", "p2")
require.NotNil(t, insertEvent2)

events := []*model.RowChangedEvent{insertEvent, insertEvent1, insertEvent2}

ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolSimple)

for _, format := range []common.EncodingFormatType{
common.EncodingFormatAvro,
common.EncodingFormatJSON,
} {
codecConfig.EncodingFormat = format
builder, err := NewBuilder(ctx, codecConfig)
require.NoError(t, err)
enc := builder.Build()

decoder, err := NewDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

message, err := enc.EncodeDDLEvent(createPartitionTableDDL)
require.NoError(t, err)

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)
tp, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, tp)

decodedDDL, err := decoder.NextDDLEvent()
require.NoError(t, err)
require.NotNil(t, decodedDDL)

for _, event := range events {
err = enc.AppendRowChangedEvent(ctx, "", event, nil)
require.NoError(t, err)
message := enc.Build()[0]

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)
tp, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, tp)

decodedEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
// table id should be set to the partition table id, the PhysicalTableID
require.Equal(t, decodedEvent.GetTableID(), event.GetTableID())
}
}
}

func TestEncodeDDLSequence(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
Expand Down Expand Up @@ -1082,6 +1154,86 @@ func TestEncoderOtherTypes(t *testing.T) {
}
}

func TestE2EPartitionTableDMLBeforeDDL(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")

createPartitionTableDDL := helper.DDL2Event(`create table test.t(a int primary key, b int) partition by range (a) (
partition p0 values less than (10),
partition p1 values less than (20),
partition p2 values less than MAXVALUE)`)
require.NotNil(t, createPartitionTableDDL)

insertEvent := helper.DML2Event(`insert into test.t values (1, 1)`, "test", "t", "p0")
require.NotNil(t, insertEvent)

insertEvent1 := helper.DML2Event(`insert into test.t values (11, 11)`, "test", "t", "p1")
require.NotNil(t, insertEvent1)

insertEvent2 := helper.DML2Event(`insert into test.t values (21, 21)`, "test", "t", "p2")
require.NotNil(t, insertEvent2)

events := []*model.RowChangedEvent{insertEvent, insertEvent1, insertEvent2}

ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolSimple)

for _, format := range []common.EncodingFormatType{
common.EncodingFormatAvro,
common.EncodingFormatJSON,
} {
codecConfig.EncodingFormat = format
builder, err := NewBuilder(ctx, codecConfig)
require.NoError(t, err)

enc := builder.Build()

decoder, err := NewDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

codecConfig.EncodingFormat = format
for _, event := range events {
err = enc.AppendRowChangedEvent(ctx, "", event, nil)
require.NoError(t, err)
message := enc.Build()[0]

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)
tp, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, tp)

decodedEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.Nil(t, decodedEvent)
}

message, err := enc.EncodeDDLEvent(createPartitionTableDDL)
require.NoError(t, err)

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)
tp, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, tp)

decodedDDL, err := decoder.NextDDLEvent()
require.NoError(t, err)
require.NotNil(t, decodedDDL)

cachedEvents := decoder.GetCachedEvents()
for idx, decodedRow := range cachedEvents {
require.NotNil(t, decodedRow)
require.NotNil(t, decodedRow.TableInfo)
require.Equal(t, decodedRow.GetTableID(), events[idx].GetTableID())
}
}
}

func TestEncodeDMLBeforeDDL(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/simple/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ func (a *jsonMarshaller) newDMLMessage(
Version: defaultVersion,
Schema: event.TableInfo.GetSchemaName(),
Table: event.TableInfo.GetTableName(),
TableID: event.TableInfo.ID,
TableID: event.GetTableID(),
CommitTs: event.CommitTs,
BuildTs: time.Now().UnixMilli(),
SchemaVersion: event.TableInfo.UpdateTS,
Expand Down

0 comments on commit 9d7482a

Please sign in to comment.