From 9d7482aa137d991e57c94ddb4b6c801e552cf0de Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 16 Dec 2024 17:44:33 +0800 Subject: [PATCH] codec(ticdc): simple protocol set table id by using the physical table id (#11845) (#11878) close pingcap/tiflow#11846 --- cdc/entry/schema_test_helper.go | 14 +- cdc/model/sink.go | 5 + .../canal_json_row_event_encoder_test.go | 52 ++++++ .../codec/open/open_protocol_encoder_test.go | 53 ++++++ pkg/sink/codec/simple/avro.go | 2 +- pkg/sink/codec/simple/encoder_test.go | 152 ++++++++++++++++++ pkg/sink/codec/simple/message.go | 2 +- 7 files changed, 276 insertions(+), 4 deletions(-) diff --git a/cdc/entry/schema_test_helper.go b/cdc/entry/schema_test_helper.go index 4d7474188fb..0e4bc50ab43 100644 --- a/cdc/entry/schema_test_helper.go +++ b/cdc/entry/schema_test_helper.go @@ -175,13 +175,23 @@ func (s *SchemaTestHelper) DDL2Jobs(ddl string, jobCnt int) []*timodel.Job { // DML2Event execute the dml and return the corresponding row changed event. // caution: it does not support `delete` since the key value cannot be found // after the query executed. -func (s *SchemaTestHelper) DML2Event(dml string, schema, table string) *model.RowChangedEvent { +func (s *SchemaTestHelper) DML2Event(dml string, schema, table string, partitionID ...string) *model.RowChangedEvent { s.tk.MustExec(dml) tableInfo, ok := s.schemaStorage.GetLastSnapshot().TableByName(schema, table) require.True(s.t, ok) - key, value := s.getLastKeyValue(tableInfo.ID) + tableID := tableInfo.ID + + var partitionTableID int64 = -1 + if len(partitionID) == 1 { + partitionTableID = tableInfo.TableInfo.GetPartitionInfo().GetPartitionIDByName(partitionID[0]) + } + if partitionTableID != -1 { + tableID = partitionTableID + } + + key, value := s.getLastKeyValue(tableID) ts := s.schemaStorage.GetLastSnapshot().CurrentTs() rawKV := &model.RawKVEntry{ OpType: model.OpTypePut, diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 806e941a023..50151d40e75 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -433,6 +433,11 @@ func (e txnRows) Swap(i, j int) { e[i], e[j] = e[j], e[i] } +// GetTableID returns the table ID of the event. +func (r *RowChangedEvent) GetTableID() int64 { + return r.PhysicalTableID +} + // GetCommitTs returns the commit timestamp of this event. func (r *RowChangedEvent) GetCommitTs() uint64 { return r.CommitTs diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index 25fae69a1b5..17e3673a4b2 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -667,6 +667,58 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) { } } +func TestE2EPartitionTable(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + ctx := context.Background() + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + helper.Tk().MustExec("use test") + + createPartitionTableDDL := helper.DDL2Event(`create table test.t(a int primary key, b int) partition by range (a) ( + partition p0 values less than (10), + partition p1 values less than (20), + partition p2 values less than MAXVALUE)`) + require.NotNil(t, createPartitionTableDDL) + + insertEvent := helper.DML2Event(`insert into test.t values (1, 1)`, "test", "t", "p0") + require.NotNil(t, insertEvent) + + insertEvent1 := helper.DML2Event(`insert into test.t values (11, 11)`, "test", "t", "p1") + require.NotNil(t, insertEvent1) + + insertEvent2 := helper.DML2Event(`insert into test.t values (21, 21)`, "test", "t", "p2") + require.NotNil(t, insertEvent2) + + events := []*model.RowChangedEvent{insertEvent, insertEvent1, insertEvent2} + + for _, event := range events { + err = encoder.AppendRowChangedEvent(ctx, "", event, nil) + require.NoError(t, err) + message := encoder.Build()[0] + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + tp, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, tp) + + decodedEvent, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + // canal-json does not support encode the table id, so it's 0 + require.Equal(t, decodedEvent.GetTableID(), int64(0)) + } +} + func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { _, insertEvent, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) ctx := context.Background() diff --git a/pkg/sink/codec/open/open_protocol_encoder_test.go b/pkg/sink/codec/open/open_protocol_encoder_test.go index 514f8228956..b201484a28c 100644 --- a/pkg/sink/codec/open/open_protocol_encoder_test.go +++ b/pkg/sink/codec/open/open_protocol_encoder_test.go @@ -16,6 +16,7 @@ package open import ( "context" "database/sql" + "github.com/pingcap/tiflow/cdc/entry" "testing" timodel "github.com/pingcap/tidb/pkg/parser/model" @@ -439,6 +440,58 @@ func TestEncodeDecodeE2E(t *testing.T) { } } +func TestE2EPartitionTable(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + ctx := context.Background() + codecConfig := common.NewConfig(config.ProtocolOpen) + + builder, err := NewBatchEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + helper.Tk().MustExec("use test") + + createPartitionTableDDL := helper.DDL2Event(`create table test.t(a int primary key, b int) partition by range (a) ( + partition p0 values less than (10), + partition p1 values less than (20), + partition p2 values less than MAXVALUE)`) + require.NotNil(t, createPartitionTableDDL) + + insertEvent := helper.DML2Event(`insert into test.t values (1, 1)`, "test", "t", "p0") + require.NotNil(t, insertEvent) + + insertEvent1 := helper.DML2Event(`insert into test.t values (11, 11)`, "test", "t", "p1") + require.NotNil(t, insertEvent1) + + insertEvent2 := helper.DML2Event(`insert into test.t values (21, 21)`, "test", "t", "p2") + require.NotNil(t, insertEvent2) + + events := []*model.RowChangedEvent{insertEvent, insertEvent1, insertEvent2} + + for _, event := range events { + err = encoder.AppendRowChangedEvent(ctx, "", event, nil) + require.NoError(t, err) + message := encoder.Build()[0] + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + tp, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, tp) + + decodedEvent, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + // table id should be set to the partition table id, the PhysicalTableID + require.Equal(t, decodedEvent.GetTableID(), event.GetTableID()) + } +} + func TestE2EDDLCompression(t *testing.T) { t.Parallel() diff --git a/pkg/sink/codec/simple/avro.go b/pkg/sink/codec/simple/avro.go index da572495f9a..9ed492aba49 100644 --- a/pkg/sink/codec/simple/avro.go +++ b/pkg/sink/codec/simple/avro.go @@ -243,7 +243,7 @@ func (a *avroMarshaller) newDMLMessageMap( "version": defaultVersion, "database": event.TableInfo.GetSchemaName(), "table": event.TableInfo.GetTableName(), - "tableID": event.TableInfo.ID, + "tableID": event.GetTableID(), "commitTs": int64(event.CommitTs), "buildTs": time.Now().UnixMilli(), "schemaVersion": int64(event.TableInfo.UpdateTS), diff --git a/pkg/sink/codec/simple/encoder_test.go b/pkg/sink/codec/simple/encoder_test.go index 0ad3b8153e6..bd8eaa55086 100644 --- a/pkg/sink/codec/simple/encoder_test.go +++ b/pkg/sink/codec/simple/encoder_test.go @@ -201,6 +201,78 @@ func TestEncodeDMLEnableChecksum(t *testing.T) { require.Nil(t, decodedRow) } +func TestE2EPartitionTable(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + + createPartitionTableDDL := helper.DDL2Event(`create table test.t(a int primary key, b int) partition by range (a) ( + partition p0 values less than (10), + partition p1 values less than (20), + partition p2 values less than MAXVALUE)`) + require.NotNil(t, createPartitionTableDDL) + + insertEvent := helper.DML2Event(`insert into test.t values (1, 1)`, "test", "t", "p0") + require.NotNil(t, insertEvent) + + insertEvent1 := helper.DML2Event(`insert into test.t values (11, 11)`, "test", "t", "p1") + require.NotNil(t, insertEvent1) + + insertEvent2 := helper.DML2Event(`insert into test.t values (21, 21)`, "test", "t", "p2") + require.NotNil(t, insertEvent2) + + events := []*model.RowChangedEvent{insertEvent, insertEvent1, insertEvent2} + + ctx := context.Background() + codecConfig := common.NewConfig(config.ProtocolSimple) + + for _, format := range []common.EncodingFormatType{ + common.EncodingFormatAvro, + common.EncodingFormatJSON, + } { + codecConfig.EncodingFormat = format + builder, err := NewBuilder(ctx, codecConfig) + require.NoError(t, err) + enc := builder.Build() + + decoder, err := NewDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + message, err := enc.EncodeDDLEvent(createPartitionTableDDL) + require.NoError(t, err) + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + tp, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeDDL, tp) + + decodedDDL, err := decoder.NextDDLEvent() + require.NoError(t, err) + require.NotNil(t, decodedDDL) + + for _, event := range events { + err = enc.AppendRowChangedEvent(ctx, "", event, nil) + require.NoError(t, err) + message := enc.Build()[0] + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + tp, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, tp) + + decodedEvent, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + // table id should be set to the partition table id, the PhysicalTableID + require.Equal(t, decodedEvent.GetTableID(), event.GetTableID()) + } + } +} + func TestEncodeDDLSequence(t *testing.T) { helper := entry.NewSchemaTestHelper(t) defer helper.Close() @@ -1082,6 +1154,86 @@ func TestEncoderOtherTypes(t *testing.T) { } } +func TestE2EPartitionTableDMLBeforeDDL(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + + createPartitionTableDDL := helper.DDL2Event(`create table test.t(a int primary key, b int) partition by range (a) ( + partition p0 values less than (10), + partition p1 values less than (20), + partition p2 values less than MAXVALUE)`) + require.NotNil(t, createPartitionTableDDL) + + insertEvent := helper.DML2Event(`insert into test.t values (1, 1)`, "test", "t", "p0") + require.NotNil(t, insertEvent) + + insertEvent1 := helper.DML2Event(`insert into test.t values (11, 11)`, "test", "t", "p1") + require.NotNil(t, insertEvent1) + + insertEvent2 := helper.DML2Event(`insert into test.t values (21, 21)`, "test", "t", "p2") + require.NotNil(t, insertEvent2) + + events := []*model.RowChangedEvent{insertEvent, insertEvent1, insertEvent2} + + ctx := context.Background() + codecConfig := common.NewConfig(config.ProtocolSimple) + + for _, format := range []common.EncodingFormatType{ + common.EncodingFormatAvro, + common.EncodingFormatJSON, + } { + codecConfig.EncodingFormat = format + builder, err := NewBuilder(ctx, codecConfig) + require.NoError(t, err) + + enc := builder.Build() + + decoder, err := NewDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + codecConfig.EncodingFormat = format + for _, event := range events { + err = enc.AppendRowChangedEvent(ctx, "", event, nil) + require.NoError(t, err) + message := enc.Build()[0] + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + tp, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, tp) + + decodedEvent, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.Nil(t, decodedEvent) + } + + message, err := enc.EncodeDDLEvent(createPartitionTableDDL) + require.NoError(t, err) + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + tp, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeDDL, tp) + + decodedDDL, err := decoder.NextDDLEvent() + require.NoError(t, err) + require.NotNil(t, decodedDDL) + + cachedEvents := decoder.GetCachedEvents() + for idx, decodedRow := range cachedEvents { + require.NotNil(t, decodedRow) + require.NotNil(t, decodedRow.TableInfo) + require.Equal(t, decodedRow.GetTableID(), events[idx].GetTableID()) + } + } +} + func TestEncodeDMLBeforeDDL(t *testing.T) { helper := entry.NewSchemaTestHelper(t) defer helper.Close() diff --git a/pkg/sink/codec/simple/message.go b/pkg/sink/codec/simple/message.go index 532f658851e..1fa0ec2e174 100644 --- a/pkg/sink/codec/simple/message.go +++ b/pkg/sink/codec/simple/message.go @@ -589,7 +589,7 @@ func (a *jsonMarshaller) newDMLMessage( Version: defaultVersion, Schema: event.TableInfo.GetSchemaName(), Table: event.TableInfo.GetTableName(), - TableID: event.TableInfo.ID, + TableID: event.GetTableID(), CommitTs: event.CommitTs, BuildTs: time.Now().UnixMilli(), SchemaVersion: event.TableInfo.UpdateTS,