From cb9d0beb19605bc7bd093b9752a6d22d04e2ccd5 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 12 Jul 2022 14:30:48 +0800 Subject: [PATCH 1/4] sink(ticdc): split maxwell.go --- cdc/sink/mq/codec/canal_encoder_test.go | 3 +- cdc/sink/mq/codec/encoder.go | 2 +- cdc/sink/mq/codec/maxwell_encoder.go | 115 +++++++++++++ ...axwell_test.go => maxwell_encoder_test.go} | 24 +-- .../codec/{maxwell.go => maxwell_message.go} | 151 ++++-------------- cdc/sink/mq/codec/maxwell_message_test.go | 40 +++++ 6 files changed, 187 insertions(+), 148 deletions(-) create mode 100644 cdc/sink/mq/codec/maxwell_encoder.go rename cdc/sink/mq/codec/{maxwell_test.go => maxwell_encoder_test.go} (77%) rename cdc/sink/mq/codec/{maxwell.go => maxwell_message.go} (65%) create mode 100644 cdc/sink/mq/codec/maxwell_message_test.go diff --git a/cdc/sink/mq/codec/canal_encoder_test.go b/cdc/sink/mq/codec/canal_encoder_test.go index bd4098adf0d..5ef25965390 100644 --- a/cdc/sink/mq/codec/canal_encoder_test.go +++ b/cdc/sink/mq/codec/canal_encoder_test.go @@ -17,9 +17,8 @@ import ( "context" "testing" - "github.com/pingcap/tiflow/cdc/model" - "github.com/golang/protobuf/proto" + "github.com/pingcap/tiflow/cdc/model" canal "github.com/pingcap/tiflow/proto/canal" "github.com/stretchr/testify/require" ) diff --git a/cdc/sink/mq/codec/encoder.go b/cdc/sink/mq/codec/encoder.go index 9ecd472a078..2eaec76a107 100644 --- a/cdc/sink/mq/codec/encoder.go +++ b/cdc/sink/mq/codec/encoder.go @@ -55,7 +55,7 @@ func NewEventBatchEncoderBuilder(ctx context.Context, c *Config) (EncoderBuilder case config.ProtocolAvro: return newAvroEventBatchEncoderBuilder(ctx, c) case config.ProtocolMaxwell: - return newMaxwellEventBatchEncoderBuilder(), nil + return newMaxwellBatchEncoderBuilder(), nil case config.ProtocolCanalJSON: return newCanalJSONBatchEncoderBuilder(c), nil case config.ProtocolCraft: diff --git a/cdc/sink/mq/codec/maxwell_encoder.go b/cdc/sink/mq/codec/maxwell_encoder.go new file mode 100644 index 00000000000..22cef99063d --- /dev/null +++ b/cdc/sink/mq/codec/maxwell_encoder.go @@ -0,0 +1,115 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "bytes" + "context" + "encoding/binary" + + "github.com/pingcap/errors" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" +) + +// maxwellBatchEncoder is a maxwell format encoder implementation +type maxwellBatchEncoder struct { + keyBuf *bytes.Buffer + valueBuf *bytes.Buffer + batchSize int +} + +// EncodeCheckpointEvent implements the EventBatchEncoder interface +func (d *maxwellBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { + // For maxwell now, there is no such a corresponding type to ResolvedEvent so far. + // Therefore the event is ignored. + return nil, nil +} + +// AppendRowChangedEvent implements the EventBatchEncoder interface +func (d *maxwellBatchEncoder) AppendRowChangedEvent( + _ context.Context, + _ string, + e *model.RowChangedEvent, + _ func(), +) error { + _, valueMsg := rowChangeToMaxwellMsg(e) + value, err := valueMsg.encode() + if err != nil { + return errors.Trace(err) + } + d.valueBuf.Write(value) + d.batchSize++ + return nil +} + +// EncodeDDLEvent implements the EventBatchEncoder interface +// DDL message unresolved tso +func (d *maxwellBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error) { + keyMsg, valueMsg := ddlEventToMaxwellMsg(e) + key, err := keyMsg.encode() + if err != nil { + return nil, errors.Trace(err) + } + value, err := valueMsg.encode() + if err != nil { + return nil, errors.Trace(err) + } + + return newDDLMsg(config.ProtocolMaxwell, key, value, e), nil +} + +// Build implements the EventBatchEncoder interface +func (d *maxwellBatchEncoder) Build() []*MQMessage { + if d.batchSize == 0 { + return nil + } + + ret := newMsg(config.ProtocolMaxwell, + d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MessageTypeRow, nil, nil) + ret.SetRowsCount(d.batchSize) + d.reset() + return []*MQMessage{ret} +} + +// reset implements the EventBatchEncoder interface +func (d *maxwellBatchEncoder) reset() { + d.keyBuf.Reset() + d.valueBuf.Reset() + d.batchSize = 0 + var versionByte [8]byte + binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) + d.keyBuf.Write(versionByte[:]) +} + +// newMaxwellBatchEncoder creates a new maxwellBatchEncoder. +func newMaxwellBatchEncoder() EventBatchEncoder { + batch := &maxwellBatchEncoder{ + keyBuf: &bytes.Buffer{}, + valueBuf: &bytes.Buffer{}, + } + batch.reset() + return batch +} + +type maxwellBatchEncoderBuilder struct{} + +func newMaxwellBatchEncoderBuilder() EncoderBuilder { + return &maxwellBatchEncoderBuilder{} +} + +// Build a `maxwellBatchEncoder` +func (b *maxwellBatchEncoderBuilder) Build() EventBatchEncoder { + return newMaxwellBatchEncoder() +} diff --git a/cdc/sink/mq/codec/maxwell_test.go b/cdc/sink/mq/codec/maxwell_encoder_test.go similarity index 77% rename from cdc/sink/mq/codec/maxwell_test.go rename to cdc/sink/mq/codec/maxwell_encoder_test.go index 6a1bb493e7a..171fba2283a 100644 --- a/cdc/sink/mq/codec/maxwell_test.go +++ b/cdc/sink/mq/codec/maxwell_encoder_test.go @@ -21,9 +21,9 @@ import ( "github.com/stretchr/testify/require" ) -func TestMaxwellEventBatchCodec(t *testing.T) { +func TestMaxwellBatchCodec(t *testing.T) { t.Parallel() - newEncoder := NewMaxwellEventBatchEncoder + newEncoder := newMaxwellBatchEncoder rowCases := [][]*model.RowChangedEvent{{{ CommitTs: 1, @@ -62,23 +62,3 @@ func TestMaxwellEventBatchCodec(t *testing.T) { } } } - -func TestMaxwellFormatCol(t *testing.T) { - t.Parallel() - row := &maxwellMessage{ - Ts: 1, - Database: "a", - Table: "b", - Type: "delete", - Xid: 1, - Xoffset: 1, - Position: "", - Gtid: "", - Data: map[string]interface{}{ - "id": "1", - }, - } - rowEncode, err := row.Encode() - require.Nil(t, err) - require.NotNil(t, rowEncode) -} diff --git a/cdc/sink/mq/codec/maxwell.go b/cdc/sink/mq/codec/maxwell_message.go similarity index 65% rename from cdc/sink/mq/codec/maxwell.go rename to cdc/sink/mq/codec/maxwell_message.go index 6d0305d0a7d..6a5e1a1ff16 100644 --- a/cdc/sink/mq/codec/maxwell.go +++ b/cdc/sink/mq/codec/maxwell_message.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2022 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,38 +14,15 @@ package codec import ( - "bytes" - "context" - "encoding/binary" "encoding/json" - "github.com/pingcap/errors" model2 "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/tikv/pd/pkg/tsoutil" ) -type maxwellEventBatchEncoderBuilder struct{} - -func newMaxwellEventBatchEncoderBuilder() EncoderBuilder { - return &maxwellEventBatchEncoderBuilder{} -} - -// Build a `MaxwellEventBatchEncoder` -func (b *maxwellEventBatchEncoderBuilder) Build() EventBatchEncoder { - return NewMaxwellEventBatchEncoder() -} - -// MaxwellEventBatchEncoder is a maxwell format encoder implementation -type MaxwellEventBatchEncoder struct { - keyBuf *bytes.Buffer - valueBuf *bytes.Buffer - batchSize int -} - type maxwellMessage struct { Database string `json:"database"` Table string `json:"table"` @@ -60,25 +37,12 @@ type maxwellMessage struct { } // Encode encodes the message to bytes -func (m *maxwellMessage) Encode() ([]byte, error) { - data, err := json.Marshal(m) - return data, cerror.WrapError(cerror.ErrMaxwellEncodeFailed, err) -} - -// Encode encodes the message to bytes -func (m *DdlMaxwellMessage) Encode() ([]byte, error) { +func (m *maxwellMessage) encode() ([]byte, error) { data, err := json.Marshal(m) return data, cerror.WrapError(cerror.ErrMaxwellEncodeFailed, err) } -// EncodeCheckpointEvent implements the EventBatchEncoder interface -func (d *MaxwellEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { - // For maxwell now, there is no such a corresponding type to ResolvedEvent so far. - // Therefore the event is ignored. - return nil, nil -} - -func rowEventToMaxwellMessage(e *model.RowChangedEvent) (*mqMessageKey, *maxwellMessage) { +func rowChangeToMaxwellMsg(e *model.RowChangedEvent) (*mqMessageKey, *maxwellMessage) { var partition *int64 if e.Table.IsPartition { partition = &e.Table.TableID @@ -163,25 +127,8 @@ func rowEventToMaxwellMessage(e *model.RowChangedEvent) (*mqMessageKey, *maxwell return key, value } -// AppendRowChangedEvent implements the EventBatchEncoder interface -func (d *MaxwellEventBatchEncoder) AppendRowChangedEvent( - _ context.Context, - _ string, - e *model.RowChangedEvent, - _ func(), -) error { - _, valueMsg := rowEventToMaxwellMessage(e) - value, err := valueMsg.Encode() - if err != nil { - return errors.Trace(err) - } - d.valueBuf.Write(value) - d.batchSize++ - return nil -} - -// Column represents a column in maxwell -type Column struct { +// maxwellColumn represents a column in maxwell +type maxwellColumn struct { Type string `json:"type"` Name string `json:"name"` // Do not mark the unique key temporarily @@ -190,44 +137,50 @@ type Column struct { Charset string `json:"charset,omitempty"` } -// TableStruct represents a table structure includes some table info -type TableStruct struct { - Database string `json:"database"` - Charset string `json:"charset,omitempty"` - Table string `json:"table"` - Columns []*Column `json:"columns"` +// tableStruct represents a table structure includes some table info +type tableStruct struct { + Database string `json:"database"` + Charset string `json:"charset,omitempty"` + Table string `json:"table"` + Columns []*maxwellColumn `json:"columns"` // Do not output whether it is a primary key temporarily PrimaryKey []string `json:"primary-key"` } -// DdlMaxwellMessage represents a DDL maxwell message +// ddlMaxwellMessage represents a DDL maxwell message // Old for table old schema // Def for table after ddl schema -type DdlMaxwellMessage struct { +type ddlMaxwellMessage struct { Type string `json:"type"` Database string `json:"database"` Table string `json:"table"` - Old TableStruct `json:"old,omitempty"` - Def TableStruct `json:"def,omitempty"` + Old tableStruct `json:"old,omitempty"` + Def tableStruct `json:"def,omitempty"` Ts uint64 `json:"ts"` SQL string `json:"sql"` Position string `json:"position,omitempty"` } -func ddlEventtoMaxwellMessage(e *model.DDLEvent) (*mqMessageKey, *DdlMaxwellMessage) { +// Encode encodes the message to bytes +func (m *ddlMaxwellMessage) encode() ([]byte, error) { + data, err := json.Marshal(m) + return data, cerror.WrapError(cerror.ErrMaxwellEncodeFailed, err) +} + +func ddlEventToMaxwellMsg(e *model.DDLEvent) (*mqMessageKey, *ddlMaxwellMessage) { key := &mqMessageKey{ Ts: e.CommitTs, Schema: e.TableInfo.Schema, Table: e.TableInfo.Table, Type: model.MessageTypeDDL, } - value := &DdlMaxwellMessage{ + value := &ddlMaxwellMessage{ Ts: e.CommitTs, Database: e.TableInfo.Schema, Type: "table-create", Table: e.TableInfo.Table, - Old: TableStruct{}, - Def: TableStruct{}, + Old: tableStruct{}, + Def: tableStruct{}, SQL: e.Query, } @@ -238,7 +191,7 @@ func ddlEventtoMaxwellMessage(e *model.DDLEvent) (*mqMessageKey, *DdlMaxwellMess value.Old.Table = e.PreTableInfo.Table for _, v := range e.PreTableInfo.ColumnInfo { maxwellcolumntype, _ := columnToMaxwellType(v.Type) - value.Old.Columns = append(value.Old.Columns, &Column{ + value.Old.Columns = append(value.Old.Columns, &maxwellColumn{ Name: v.Name, Type: maxwellcolumntype, }) @@ -250,12 +203,12 @@ func ddlEventtoMaxwellMessage(e *model.DDLEvent) (*mqMessageKey, *DdlMaxwellMess for _, v := range e.TableInfo.ColumnInfo { maxwellcolumntype, err := columnToMaxwellType(v.Type) if err != nil { - value.Old.Columns = append(value.Old.Columns, &Column{ + value.Old.Columns = append(value.Old.Columns, &maxwellColumn{ Name: v.Name, Type: err.Error(), }) } - value.Def.Columns = append(value.Def.Columns, &Column{ + value.Def.Columns = append(value.Def.Columns, &maxwellColumn{ Name: v.Name, Type: maxwellcolumntype, }) @@ -263,54 +216,6 @@ func ddlEventtoMaxwellMessage(e *model.DDLEvent) (*mqMessageKey, *DdlMaxwellMess return key, value } -// EncodeDDLEvent implements the EventBatchEncoder interface -// DDL message unresolved tso -func (d *MaxwellEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error) { - keyMsg, valueMsg := ddlEventtoMaxwellMessage(e) - key, err := keyMsg.encode() - if err != nil { - return nil, errors.Trace(err) - } - value, err := valueMsg.Encode() - if err != nil { - return nil, errors.Trace(err) - } - - return newDDLMsg(config.ProtocolMaxwell, key, value, e), nil -} - -// Build implements the EventBatchEncoder interface -func (d *MaxwellEventBatchEncoder) Build() []*MQMessage { - if d.batchSize == 0 { - return nil - } - - ret := newMsg(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MessageTypeRow, nil, nil) - ret.SetRowsCount(d.batchSize) - d.Reset() - return []*MQMessage{ret} -} - -// Reset implements the EventBatchEncoder interface -func (d *MaxwellEventBatchEncoder) Reset() { - d.keyBuf.Reset() - d.valueBuf.Reset() - d.batchSize = 0 - var versionByte [8]byte - binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) - d.keyBuf.Write(versionByte[:]) -} - -// NewMaxwellEventBatchEncoder creates a new MaxwellEventBatchEncoder. -func NewMaxwellEventBatchEncoder() EventBatchEncoder { - batch := &MaxwellEventBatchEncoder{ - keyBuf: &bytes.Buffer{}, - valueBuf: &bytes.Buffer{}, - } - batch.Reset() - return batch -} - // ddl typecode from parser/model/ddl.go func ddlToMaxwellType(ddlType model2.ActionType) string { if ddlType >= model2.ActionAddColumn && ddlType <= model2.ActionDropTablePartition { diff --git a/cdc/sink/mq/codec/maxwell_message_test.go b/cdc/sink/mq/codec/maxwell_message_test.go new file mode 100644 index 00000000000..bb6d3c6e224 --- /dev/null +++ b/cdc/sink/mq/codec/maxwell_message_test.go @@ -0,0 +1,40 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMaxwellFormatCol(t *testing.T) { + t.Parallel() + row := &maxwellMessage{ + Ts: 1, + Database: "a", + Table: "b", + Type: "delete", + Xid: 1, + Xoffset: 1, + Position: "", + Gtid: "", + Data: map[string]interface{}{ + "id": "1", + }, + } + rowEncode, err := row.encode() + require.Nil(t, err) + require.NotNil(t, rowEncode) +} From 633b5095007afa064b98112eacac170a87ba2f83 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 12 Jul 2022 14:48:10 +0800 Subject: [PATCH 2/4] sink(ticdc): split craft.go --- cdc/sink/mq/codec/codec_test.go | 21 +- cdc/sink/mq/codec/craft.go | 233 ------------------ cdc/sink/mq/codec/craft_decoder.go | 142 +++++++++++ cdc/sink/mq/codec/craft_encoder.go | 117 +++++++++ .../{craft_test.go => craft_encoder_test.go} | 158 ++++++------ cdc/sink/mq/codec/encoder.go | 2 +- 6 files changed, 350 insertions(+), 323 deletions(-) delete mode 100644 cdc/sink/mq/codec/craft.go create mode 100644 cdc/sink/mq/codec/craft_decoder.go create mode 100644 cdc/sink/mq/codec/craft_encoder.go rename cdc/sink/mq/codec/{craft_test.go => craft_encoder_test.go} (81%) diff --git a/cdc/sink/mq/codec/codec_test.go b/cdc/sink/mq/codec/codec_test.go index fe49d4ad358..209013107b8 100644 --- a/cdc/sink/mq/codec/codec_test.go +++ b/cdc/sink/mq/codec/codec_test.go @@ -215,9 +215,9 @@ func TestJsonVsCraftVsPB(t *testing.T) { if len(cs) == 0 { continue } - craftEncoder := NewCraftEventBatchEncoder() - craftEncoder.(*CraftEventBatchEncoder).maxMessageBytes = 8192 - craftEncoder.(*CraftEventBatchEncoder).maxBatchSize = 64 + craftEncoder := newCraftBatchEncoder() + craftEncoder.(*craftBatchEncoder).maxMessageBytes = 8192 + craftEncoder.(*craftBatchEncoder).maxBatchSize = 64 craftMessages := encodeRowCase(t, craftEncoder, cs) jsonEncoder := newOpenProtocolBatchEncoder() @@ -366,9 +366,9 @@ func codecEncodeRowCase(encoder EventBatchEncoder, events []*model.RowChangedEve func init() { var err error - encoder := NewCraftEventBatchEncoder() - encoder.(*CraftEventBatchEncoder).maxMessageBytes = 8192 - encoder.(*CraftEventBatchEncoder).maxBatchSize = 64 + encoder := newCraftBatchEncoder() + encoder.(*craftBatchEncoder).maxMessageBytes = 8192 + encoder.(*craftBatchEncoder).maxBatchSize = 64 if codecCraftEncodedRowChanges, err = codecEncodeRowCase(encoder, codecBenchmarkRowChanges); err != nil { panic(err) } @@ -385,9 +385,9 @@ func init() { func BenchmarkCraftEncoding(b *testing.B) { allocator := craft.NewSliceAllocator(128) - encoder := NewCraftEventBatchEncoderWithAllocator(allocator) - encoder.(*CraftEventBatchEncoder).maxMessageBytes = 8192 - encoder.(*CraftEventBatchEncoder).maxBatchSize = 64 + encoder := newCraftBatchEncoderWithAllocator(allocator) + encoder.(*craftBatchEncoder).maxMessageBytes = 8192 + encoder.(*craftBatchEncoder).maxBatchSize = 64 for i := 0; i < b.N; i++ { _, _ = codecEncodeRowCase(encoder, codecBenchmarkRowChanges) } @@ -418,7 +418,8 @@ func BenchmarkCraftDecoding(b *testing.B) { allocator := craft.NewSliceAllocator(128) for i := 0; i < b.N; i++ { for _, message := range codecCraftEncodedRowChanges { - if decoder, err := NewCraftEventBatchDecoderWithAllocator(message.Value, allocator); err != nil { + if decoder, err := newCraftBatchDecoderWithAllocator( + message.Value, allocator); err != nil { panic(err) } else { for { diff --git a/cdc/sink/mq/codec/craft.go b/cdc/sink/mq/codec/craft.go deleted file mode 100644 index 6ac68afacd3..00000000000 --- a/cdc/sink/mq/codec/craft.go +++ /dev/null @@ -1,233 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License") -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.orglicensesLICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package codec - -import ( - "context" - - "github.com/pingcap/errors" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink/mq/codec/craft" - "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" -) - -// CraftEventBatchEncoder encodes the events into the byte of a batch into craft binary format. -type CraftEventBatchEncoder struct { - rowChangedBuffer *craft.RowChangedEventBuffer - messageBuf []*MQMessage - - // configs - maxMessageBytes int - maxBatchSize int - - allocator *craft.SliceAllocator -} - -// EncodeCheckpointEvent implements the EventBatchEncoder interface -func (e *CraftEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { - return newResolvedMsg(config.ProtocolCraft, nil, craft.NewResolvedEventEncoder(e.allocator, ts).Encode(), ts), nil -} - -func (e *CraftEventBatchEncoder) flush() { - headers := e.rowChangedBuffer.GetHeaders() - ts := headers.GetTs(0) - schema := headers.GetSchema(0) - table := headers.GetTable(0) - rowsCnt := e.rowChangedBuffer.RowsCount() - mqMessage := newMsg(config.ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MessageTypeRow, &schema, &table) - mqMessage.SetRowsCount(rowsCnt) - e.messageBuf = append(e.messageBuf, mqMessage) -} - -// AppendRowChangedEvent implements the EventBatchEncoder interface -func (e *CraftEventBatchEncoder) AppendRowChangedEvent( - _ context.Context, - _ string, - ev *model.RowChangedEvent, - _ func(), -) error { - rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev) - if size > e.maxMessageBytes || rows >= e.maxBatchSize { - e.flush() - } - return nil -} - -// EncodeDDLEvent implements the EventBatchEncoder interface -func (e *CraftEventBatchEncoder) EncodeDDLEvent(ev *model.DDLEvent) (*MQMessage, error) { - return newDDLMsg(config.ProtocolCraft, nil, craft.NewDDLEventEncoder(e.allocator, ev).Encode(), ev), nil -} - -// Build implements the EventBatchEncoder interface -func (e *CraftEventBatchEncoder) Build() []*MQMessage { - if e.rowChangedBuffer.Size() > 0 { - // flush buffered data to message buffer - e.flush() - } - ret := e.messageBuf - e.messageBuf = make([]*MQMessage, 0, 2) - return ret -} - -// NewCraftEventBatchEncoder creates a new CraftEventBatchEncoder. -func NewCraftEventBatchEncoder() EventBatchEncoder { - // 64 is a magic number that come up with these assumptions and manual benchmark. - // 1. Most table will not have more than 64 columns - // 2. It only worth allocating slices in batch for slices that's small enough - return NewCraftEventBatchEncoderWithAllocator(craft.NewSliceAllocator(64)) -} - -type craftEventBatchEncoderBuilder struct { - config *Config -} - -// Build a CraftEventBatchEncoder -func (b *craftEventBatchEncoderBuilder) Build() EventBatchEncoder { - encoder := NewCraftEventBatchEncoder() - encoder.(*CraftEventBatchEncoder).maxMessageBytes = b.config.maxMessageBytes - encoder.(*CraftEventBatchEncoder).maxBatchSize = b.config.maxBatchSize - return encoder -} - -func newCraftEventBatchEncoderBuilder(config *Config) EncoderBuilder { - return &craftEventBatchEncoderBuilder{config: config} -} - -// NewCraftEventBatchEncoderWithAllocator creates a new CraftEventBatchEncoder with given allocator. -func NewCraftEventBatchEncoderWithAllocator(allocator *craft.SliceAllocator) EventBatchEncoder { - return &CraftEventBatchEncoder{ - allocator: allocator, - messageBuf: make([]*MQMessage, 0, 2), - rowChangedBuffer: craft.NewRowChangedEventBuffer(allocator), - } -} - -// CraftEventBatchDecoder decodes the byte of a batch into the original messages. -type CraftEventBatchDecoder struct { - headers *craft.Headers - decoder *craft.MessageDecoder - index int - - allocator *craft.SliceAllocator -} - -// HasNext implements the EventBatchDecoder interface -func (b *CraftEventBatchDecoder) HasNext() (model.MessageType, bool, error) { - if b.index >= b.headers.Count() { - return model.MessageTypeUnknown, false, nil - } - return b.headers.GetType(b.index), true, nil -} - -// NextResolvedEvent implements the EventBatchDecoder interface -func (b *CraftEventBatchDecoder) NextResolvedEvent() (uint64, error) { - ty, hasNext, err := b.HasNext() - if err != nil { - return 0, errors.Trace(err) - } - if !hasNext || ty != model.MessageTypeResolved { - return 0, cerror.ErrCraftCodecInvalidData.GenWithStack("not found resolved event message") - } - ts := b.headers.GetTs(b.index) - b.index++ - return ts, nil -} - -// NextRowChangedEvent implements the EventBatchDecoder interface -func (b *CraftEventBatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { - ty, hasNext, err := b.HasNext() - if err != nil { - return nil, errors.Trace(err) - } - if !hasNext || ty != model.MessageTypeRow { - return nil, cerror.ErrCraftCodecInvalidData.GenWithStack("not found row changed event message") - } - oldValue, newValue, err := b.decoder.RowChangedEvent(b.index) - if err != nil { - return nil, errors.Trace(err) - } - ev := &model.RowChangedEvent{} - if oldValue != nil { - if ev.PreColumns, err = oldValue.ToModel(); err != nil { - return nil, errors.Trace(err) - } - } - if newValue != nil { - if ev.Columns, err = newValue.ToModel(); err != nil { - return nil, errors.Trace(err) - } - } - ev.CommitTs = b.headers.GetTs(b.index) - ev.Table = &model.TableName{ - Schema: b.headers.GetSchema(b.index), - Table: b.headers.GetTable(b.index), - } - partition := b.headers.GetPartition(b.index) - if partition >= 0 { - ev.Table.TableID = partition - ev.Table.IsPartition = true - } - b.index++ - return ev, nil -} - -// NextDDLEvent implements the EventBatchDecoder interface -func (b *CraftEventBatchDecoder) NextDDLEvent() (*model.DDLEvent, error) { - ty, hasNext, err := b.HasNext() - if err != nil { - return nil, errors.Trace(err) - } - if !hasNext || ty != model.MessageTypeDDL { - return nil, cerror.ErrCraftCodecInvalidData.GenWithStack("not found ddl event message") - } - ddlType, query, err := b.decoder.DDLEvent(b.index) - if err != nil { - return nil, errors.Trace(err) - } - event := &model.DDLEvent{ - CommitTs: b.headers.GetTs(b.index), - Query: query, - Type: ddlType, - TableInfo: &model.SimpleTableInfo{ - Schema: b.headers.GetSchema(b.index), - Table: b.headers.GetTable(b.index), - }, - } - b.index++ - return event, nil -} - -// NewCraftEventBatchDecoder creates a new CraftEventBatchDecoder. -func NewCraftEventBatchDecoder(bits []byte) (EventBatchDecoder, error) { - return NewCraftEventBatchDecoderWithAllocator(bits, craft.NewSliceAllocator(64)) -} - -// NewCraftEventBatchDecoderWithAllocator creates a new CraftEventBatchDecoder with given allocator. -func NewCraftEventBatchDecoderWithAllocator(bits []byte, allocator *craft.SliceAllocator) (EventBatchDecoder, error) { - decoder, err := craft.NewMessageDecoder(bits, allocator) - if err != nil { - return nil, errors.Trace(err) - } - headers, err := decoder.Headers() - if err != nil { - return nil, errors.Trace(err) - } - - return &CraftEventBatchDecoder{ - headers: headers, - decoder: decoder, - allocator: allocator, - }, nil -} diff --git a/cdc/sink/mq/codec/craft_decoder.go b/cdc/sink/mq/codec/craft_decoder.go new file mode 100644 index 00000000000..052df01a50d --- /dev/null +++ b/cdc/sink/mq/codec/craft_decoder.go @@ -0,0 +1,142 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.orglicensesLICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/mq/codec/craft" + cerror "github.com/pingcap/tiflow/pkg/errors" +) + +// craftBatchDecoder decodes the byte of a batch into the original messages. +type craftBatchDecoder struct { + headers *craft.Headers + decoder *craft.MessageDecoder + index int + + allocator *craft.SliceAllocator +} + +// HasNext implements the EventBatchDecoder interface +func (b *craftBatchDecoder) HasNext() (model.MessageType, bool, error) { + if b.index >= b.headers.Count() { + return model.MessageTypeUnknown, false, nil + } + return b.headers.GetType(b.index), true, nil +} + +// NextResolvedEvent implements the EventBatchDecoder interface +func (b *craftBatchDecoder) NextResolvedEvent() (uint64, error) { + ty, hasNext, err := b.HasNext() + if err != nil { + return 0, errors.Trace(err) + } + if !hasNext || ty != model.MessageTypeResolved { + return 0, cerror.ErrCraftCodecInvalidData.GenWithStack("not found resolved event message") + } + ts := b.headers.GetTs(b.index) + b.index++ + return ts, nil +} + +// NextRowChangedEvent implements the EventBatchDecoder interface +func (b *craftBatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { + ty, hasNext, err := b.HasNext() + if err != nil { + return nil, errors.Trace(err) + } + if !hasNext || ty != model.MessageTypeRow { + return nil, + cerror.ErrCraftCodecInvalidData.GenWithStack("not found row changed event message") + } + oldValue, newValue, err := b.decoder.RowChangedEvent(b.index) + if err != nil { + return nil, errors.Trace(err) + } + ev := &model.RowChangedEvent{} + if oldValue != nil { + if ev.PreColumns, err = oldValue.ToModel(); err != nil { + return nil, errors.Trace(err) + } + } + if newValue != nil { + if ev.Columns, err = newValue.ToModel(); err != nil { + return nil, errors.Trace(err) + } + } + ev.CommitTs = b.headers.GetTs(b.index) + ev.Table = &model.TableName{ + Schema: b.headers.GetSchema(b.index), + Table: b.headers.GetTable(b.index), + } + partition := b.headers.GetPartition(b.index) + if partition >= 0 { + ev.Table.TableID = partition + ev.Table.IsPartition = true + } + b.index++ + return ev, nil +} + +// NextDDLEvent implements the EventBatchDecoder interface +func (b *craftBatchDecoder) NextDDLEvent() (*model.DDLEvent, error) { + ty, hasNext, err := b.HasNext() + if err != nil { + return nil, errors.Trace(err) + } + if !hasNext || ty != model.MessageTypeDDL { + return nil, cerror.ErrCraftCodecInvalidData.GenWithStack("not found ddl event message") + } + ddlType, query, err := b.decoder.DDLEvent(b.index) + if err != nil { + return nil, errors.Trace(err) + } + event := &model.DDLEvent{ + CommitTs: b.headers.GetTs(b.index), + Query: query, + Type: ddlType, + TableInfo: &model.SimpleTableInfo{ + Schema: b.headers.GetSchema(b.index), + Table: b.headers.GetTable(b.index), + }, + } + b.index++ + return event, nil +} + +// newCraftBatchDecoder creates a new craftBatchDecoder. +func newCraftBatchDecoder(bits []byte) (EventBatchDecoder, error) { + return newCraftBatchDecoderWithAllocator(bits, craft.NewSliceAllocator(64)) +} + +// newCraftBatchDecoderWithAllocator creates a new craftBatchDecoder with given allocator. +func newCraftBatchDecoderWithAllocator( + bits []byte, allocator *craft.SliceAllocator, +) (EventBatchDecoder, error) { + decoder, err := craft.NewMessageDecoder(bits, allocator) + if err != nil { + return nil, errors.Trace(err) + } + headers, err := decoder.Headers() + if err != nil { + return nil, errors.Trace(err) + } + + return &craftBatchDecoder{ + headers: headers, + decoder: decoder, + allocator: allocator, + }, nil +} diff --git a/cdc/sink/mq/codec/craft_encoder.go b/cdc/sink/mq/codec/craft_encoder.go new file mode 100644 index 00000000000..d6d6b290465 --- /dev/null +++ b/cdc/sink/mq/codec/craft_encoder.go @@ -0,0 +1,117 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.orglicensesLICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "context" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/mq/codec/craft" + "github.com/pingcap/tiflow/pkg/config" +) + +// craftBatchEncoder encodes the events into the byte of a batch into craft binary format. +type craftBatchEncoder struct { + rowChangedBuffer *craft.RowChangedEventBuffer + messageBuf []*MQMessage + + // configs + maxMessageBytes int + maxBatchSize int + + allocator *craft.SliceAllocator +} + +// EncodeCheckpointEvent implements the EventBatchEncoder interface +func (e *craftBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { + return newResolvedMsg( + config.ProtocolCraft, nil, + craft.NewResolvedEventEncoder(e.allocator, ts).Encode(), ts), nil +} + +// AppendRowChangedEvent implements the EventBatchEncoder interface +func (e *craftBatchEncoder) AppendRowChangedEvent( + _ context.Context, + _ string, + ev *model.RowChangedEvent, + _ func(), +) error { + rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev) + if size > e.maxMessageBytes || rows >= e.maxBatchSize { + e.flush() + } + return nil +} + +// EncodeDDLEvent implements the EventBatchEncoder interface +func (e *craftBatchEncoder) EncodeDDLEvent(ev *model.DDLEvent) (*MQMessage, error) { + return newDDLMsg(config.ProtocolCraft, + nil, craft.NewDDLEventEncoder(e.allocator, ev).Encode(), ev), nil +} + +// Build implements the EventBatchEncoder interface +func (e *craftBatchEncoder) Build() []*MQMessage { + if e.rowChangedBuffer.Size() > 0 { + // flush buffered data to message buffer + e.flush() + } + ret := e.messageBuf + e.messageBuf = make([]*MQMessage, 0, 2) + return ret +} + +func (e *craftBatchEncoder) flush() { + headers := e.rowChangedBuffer.GetHeaders() + ts := headers.GetTs(0) + schema := headers.GetSchema(0) + table := headers.GetTable(0) + rowsCnt := e.rowChangedBuffer.RowsCount() + mqMessage := newMsg(config.ProtocolCraft, + nil, e.rowChangedBuffer.Encode(), ts, model.MessageTypeRow, &schema, &table) + mqMessage.SetRowsCount(rowsCnt) + e.messageBuf = append(e.messageBuf, mqMessage) +} + +// newCraftBatchEncoder creates a new craftBatchEncoder. +func newCraftBatchEncoder() EventBatchEncoder { + // 64 is a magic number that come up with these assumptions and manual benchmark. + // 1. Most table will not have more than 64 columns + // 2. It only worth allocating slices in batch for slices that's small enough + return newCraftBatchEncoderWithAllocator(craft.NewSliceAllocator(64)) +} + +type craftBatchEncoderBuilder struct { + config *Config +} + +// Build a craftBatchEncoder +func (b *craftBatchEncoderBuilder) Build() EventBatchEncoder { + encoder := newCraftBatchEncoder() + encoder.(*craftBatchEncoder).maxMessageBytes = b.config.maxMessageBytes + encoder.(*craftBatchEncoder).maxBatchSize = b.config.maxBatchSize + return encoder +} + +func newCraftBatchEncoderBuilder(config *Config) EncoderBuilder { + return &craftBatchEncoderBuilder{config: config} +} + +// newCraftBatchEncoderWithAllocator creates a new craftBatchEncoder with given allocator. +func newCraftBatchEncoderWithAllocator(allocator *craft.SliceAllocator) EventBatchEncoder { + return &craftBatchEncoder{ + allocator: allocator, + messageBuf: make([]*MQMessage, 0, 2), + rowChangedBuffer: craft.NewRowChangedEventBuffer(allocator), + } +} diff --git a/cdc/sink/mq/codec/craft_test.go b/cdc/sink/mq/codec/craft_encoder_test.go similarity index 81% rename from cdc/sink/mq/codec/craft_test.go rename to cdc/sink/mq/codec/craft_encoder_test.go index 571365ea4e2..e193c6d3616 100644 --- a/cdc/sink/mq/codec/craft_test.go +++ b/cdc/sink/mq/codec/craft_encoder_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 PingCAP, Inc. +// Copyright 2022 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,6 +23,80 @@ import ( "github.com/stretchr/testify/require" ) +func TestCraftMaxMessageBytes(t *testing.T) { + t.Parallel() + cfg := NewConfig(config.ProtocolCraft).WithMaxMessageBytes(256) + encoder := newCraftBatchEncoderBuilder(cfg).Build() + + testEvent := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: []byte("aa")}}, + } + + for i := 0; i < 10000; i++ { + err := encoder.AppendRowChangedEvent(context.Background(), "", testEvent, nil) + require.Nil(t, err) + } + + messages := encoder.Build() + for _, msg := range messages { + require.LessOrEqual(t, msg.Length(), 256) + } +} + +func TestCraftMaxBatchSize(t *testing.T) { + t.Parallel() + cfg := NewConfig(config.ProtocolCraft).WithMaxMessageBytes(10485760) + cfg.maxBatchSize = 64 + encoder := newCraftBatchEncoderBuilder(cfg).Build() + + testEvent := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: []byte("aa")}}, + } + + for i := 0; i < 10000; i++ { + err := encoder.AppendRowChangedEvent(context.Background(), "", testEvent, nil) + require.Nil(t, err) + } + + messages := encoder.Build() + sum := 0 + for _, msg := range messages { + decoder, err := newCraftBatchDecoder(msg.Value) + require.Nil(t, err) + count := 0 + for { + v, hasNext, err := decoder.HasNext() + require.Nil(t, err) + if !hasNext { + break + } + + require.Equal(t, model.MessageTypeRow, v) + _, err = decoder.NextRowChangedEvent() + require.Nil(t, err) + count++ + } + require.LessOrEqual(t, count, 64) + sum += count + } + require.Equal(t, 10000, sum) +} + +func TestBuildCraftBatchEncoder(t *testing.T) { + t.Parallel() + cfg := NewConfig(config.ProtocolCraft) + + builder := &craftBatchEncoderBuilder{config: cfg} + encoder, ok := builder.Build().(*craftBatchEncoder) + require.True(t, ok) + require.Equal(t, cfg.maxBatchSize, encoder.maxBatchSize) + require.Equal(t, cfg.maxMessageBytes, encoder.maxMessageBytes) +} + func testBatchCodec( t *testing.T, encoderBuilder EncoderBuilder, @@ -119,82 +193,8 @@ func testBatchCodec( } } -func TestCraftMaxMessageBytes(t *testing.T) { - t.Parallel() - config := NewConfig(config.ProtocolCraft).WithMaxMessageBytes(256) - encoder := newCraftEventBatchEncoderBuilder(config).Build() - - testEvent := &model.RowChangedEvent{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: []byte("aa")}}, - } - - for i := 0; i < 10000; i++ { - err := encoder.AppendRowChangedEvent(context.Background(), "", testEvent, nil) - require.Nil(t, err) - } - - messages := encoder.Build() - for _, msg := range messages { - require.LessOrEqual(t, msg.Length(), 256) - } -} - -func TestCraftMaxBatchSize(t *testing.T) { - t.Parallel() - config := NewConfig(config.ProtocolCraft).WithMaxMessageBytes(10485760) - config.maxBatchSize = 64 - encoder := newCraftEventBatchEncoderBuilder(config).Build() - - testEvent := &model.RowChangedEvent{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: []byte("aa")}}, - } - - for i := 0; i < 10000; i++ { - err := encoder.AppendRowChangedEvent(context.Background(), "", testEvent, nil) - require.Nil(t, err) - } - - messages := encoder.Build() - sum := 0 - for _, msg := range messages { - decoder, err := NewCraftEventBatchDecoder(msg.Value) - require.Nil(t, err) - count := 0 - for { - v, hasNext, err := decoder.HasNext() - require.Nil(t, err) - if !hasNext { - break - } - - require.Equal(t, model.MessageTypeRow, v) - _, err = decoder.NextRowChangedEvent() - require.Nil(t, err) - count++ - } - require.LessOrEqual(t, count, 64) - sum += count - } - require.Equal(t, 10000, sum) -} - -func TestDefaultCraftEventBatchCodec(t *testing.T) { - config := NewConfig(config.ProtocolCraft).WithMaxMessageBytes(8192) - config.maxBatchSize = 64 - testBatchCodec(t, newCraftEventBatchEncoderBuilder(config), NewCraftEventBatchDecoder) -} - -func TestBuildCraftEventBatchEncoder(t *testing.T) { - t.Parallel() - config := NewConfig(config.ProtocolCraft) - - builder := &craftEventBatchEncoderBuilder{config: config} - encoder, ok := builder.Build().(*CraftEventBatchEncoder) - require.True(t, ok) - require.Equal(t, config.maxBatchSize, encoder.maxBatchSize) - require.Equal(t, config.maxMessageBytes, encoder.maxMessageBytes) +func TestDefaultCraftBatchCodec(t *testing.T) { + cfg := NewConfig(config.ProtocolCraft).WithMaxMessageBytes(8192) + cfg.maxBatchSize = 64 + testBatchCodec(t, newCraftBatchEncoderBuilder(cfg), newCraftBatchDecoder) } diff --git a/cdc/sink/mq/codec/encoder.go b/cdc/sink/mq/codec/encoder.go index 2eaec76a107..33cfb41fe6e 100644 --- a/cdc/sink/mq/codec/encoder.go +++ b/cdc/sink/mq/codec/encoder.go @@ -59,7 +59,7 @@ func NewEventBatchEncoderBuilder(ctx context.Context, c *Config) (EncoderBuilder case config.ProtocolCanalJSON: return newCanalJSONBatchEncoderBuilder(c), nil case config.ProtocolCraft: - return newCraftEventBatchEncoderBuilder(c), nil + return newCraftBatchEncoderBuilder(c), nil default: return nil, cerror.ErrMQSinkUnknownProtocol.GenWithStackByArgs(c.protocol) } From 318b4e98392a4f4e9331d6b7b6834f9003b40033 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 12 Jul 2022 15:54:18 +0800 Subject: [PATCH 3/4] sink(ticdc): maxwell support callback --- cdc/sink/mq/codec/canal_encoder.go | 2 +- cdc/sink/mq/codec/maxwell_encoder.go | 26 ++++++--- cdc/sink/mq/codec/maxwell_encoder_test.go | 65 +++++++++++++++++++++++ 3 files changed, 86 insertions(+), 7 deletions(-) diff --git a/cdc/sink/mq/codec/canal_encoder.go b/cdc/sink/mq/codec/canal_encoder.go index bbaff9d66f6..b1212ab3dba 100644 --- a/cdc/sink/mq/codec/canal_encoder.go +++ b/cdc/sink/mq/codec/canal_encoder.go @@ -117,7 +117,7 @@ func (d *canalBatchEncoder) Build() []*MQMessage { d.messages.Reset() d.resetPacket() - if len(d.callbackBuf) > 0 { + if len(d.callbackBuf) != 0 && len(d.callbackBuf) == rowCount { callbacks := d.callbackBuf ret.Callback = func() { for _, cb := range callbacks { diff --git a/cdc/sink/mq/codec/maxwell_encoder.go b/cdc/sink/mq/codec/maxwell_encoder.go index 22cef99063d..42e0ca6ba6a 100644 --- a/cdc/sink/mq/codec/maxwell_encoder.go +++ b/cdc/sink/mq/codec/maxwell_encoder.go @@ -25,9 +25,10 @@ import ( // maxwellBatchEncoder is a maxwell format encoder implementation type maxwellBatchEncoder struct { - keyBuf *bytes.Buffer - valueBuf *bytes.Buffer - batchSize int + keyBuf *bytes.Buffer + valueBuf *bytes.Buffer + callbackBuf []func() + batchSize int } // EncodeCheckpointEvent implements the EventBatchEncoder interface @@ -42,7 +43,7 @@ func (d *maxwellBatchEncoder) AppendRowChangedEvent( _ context.Context, _ string, e *model.RowChangedEvent, - _ func(), + callback func(), ) error { _, valueMsg := rowChangeToMaxwellMsg(e) value, err := valueMsg.encode() @@ -51,6 +52,9 @@ func (d *maxwellBatchEncoder) AppendRowChangedEvent( } d.valueBuf.Write(value) d.batchSize++ + if callback != nil { + d.callbackBuf = append(d.callbackBuf, callback) + } return nil } @@ -79,6 +83,15 @@ func (d *maxwellBatchEncoder) Build() []*MQMessage { ret := newMsg(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MessageTypeRow, nil, nil) ret.SetRowsCount(d.batchSize) + if len(d.callbackBuf) != 0 && len(d.callbackBuf) == d.batchSize { + callbacks := d.callbackBuf + ret.Callback = func() { + for _, cb := range callbacks { + cb() + } + } + d.callbackBuf = make([]func(), 0) + } d.reset() return []*MQMessage{ret} } @@ -96,8 +109,9 @@ func (d *maxwellBatchEncoder) reset() { // newMaxwellBatchEncoder creates a new maxwellBatchEncoder. func newMaxwellBatchEncoder() EventBatchEncoder { batch := &maxwellBatchEncoder{ - keyBuf: &bytes.Buffer{}, - valueBuf: &bytes.Buffer{}, + keyBuf: &bytes.Buffer{}, + valueBuf: &bytes.Buffer{}, + callbackBuf: make([]func(), 0), } batch.reset() return batch diff --git a/cdc/sink/mq/codec/maxwell_encoder_test.go b/cdc/sink/mq/codec/maxwell_encoder_test.go index 171fba2283a..3ade81e0c44 100644 --- a/cdc/sink/mq/codec/maxwell_encoder_test.go +++ b/cdc/sink/mq/codec/maxwell_encoder_test.go @@ -62,3 +62,68 @@ func TestMaxwellBatchCodec(t *testing.T) { } } } + +func TestMaxwellAppendRowChangedEventWithCallback(t *testing.T) { + encoder := newMaxwellBatchEncoder() + require.NotNil(t, encoder) + + count := 0 + + row := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, + } + + tests := []struct { + row *model.RowChangedEvent + callback func() + }{ + { + row: row, + callback: func() { + count += 1 + }, + }, + { + row: row, + callback: func() { + count += 2 + }, + }, + { + row: row, + callback: func() { + count += 3 + }, + }, + { + row: row, + callback: func() { + count += 4 + }, + }, + { + row: row, + callback: func() { + count += 5 + }, + }, + } + + // Empty build makes sure that the callback build logic not broken. + msgs := encoder.Build() + require.Len(t, msgs, 0, "no message should be built and no panic") + + // Append the events. + for _, test := range tests { + err := encoder.AppendRowChangedEvent(context.Background(), "", test.row, test.callback) + require.Nil(t, err) + } + require.Equal(t, 0, count, "nothing should be called") + + msgs = encoder.Build() + require.Len(t, msgs, 1, "expected one message") + msgs[0].Callback() + require.Equal(t, 15, count, "expected all callbacks to be called") +} From c22ae1cfb33d0200b6ad6d1076fc066b4de82757 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 12 Jul 2022 16:36:46 +0800 Subject: [PATCH 4/4] sink(ticdc): craft support callback --- cdc/sink/mq/codec/craft_encoder.go | 16 +++++- cdc/sink/mq/codec/craft_encoder_test.go | 72 +++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/cdc/sink/mq/codec/craft_encoder.go b/cdc/sink/mq/codec/craft_encoder.go index d6d6b290465..89d4d12833f 100644 --- a/cdc/sink/mq/codec/craft_encoder.go +++ b/cdc/sink/mq/codec/craft_encoder.go @@ -25,6 +25,7 @@ import ( type craftBatchEncoder struct { rowChangedBuffer *craft.RowChangedEventBuffer messageBuf []*MQMessage + callbackBuf []func() // configs maxMessageBytes int @@ -45,9 +46,12 @@ func (e *craftBatchEncoder) AppendRowChangedEvent( _ context.Context, _ string, ev *model.RowChangedEvent, - _ func(), + callback func(), ) error { rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev) + if callback != nil { + e.callbackBuf = append(e.callbackBuf, callback) + } if size > e.maxMessageBytes || rows >= e.maxBatchSize { e.flush() } @@ -80,6 +84,15 @@ func (e *craftBatchEncoder) flush() { mqMessage := newMsg(config.ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MessageTypeRow, &schema, &table) mqMessage.SetRowsCount(rowsCnt) + if len(e.callbackBuf) != 0 && len(e.callbackBuf) == rowsCnt { + callbacks := e.callbackBuf + mqMessage.Callback = func() { + for _, cb := range callbacks { + cb() + } + } + e.callbackBuf = make([]func(), 0) + } e.messageBuf = append(e.messageBuf, mqMessage) } @@ -112,6 +125,7 @@ func newCraftBatchEncoderWithAllocator(allocator *craft.SliceAllocator) EventBat return &craftBatchEncoder{ allocator: allocator, messageBuf: make([]*MQMessage, 0, 2), + callbackBuf: make([]func(), 0), rowChangedBuffer: craft.NewRowChangedEventBuffer(allocator), } } diff --git a/cdc/sink/mq/codec/craft_encoder_test.go b/cdc/sink/mq/codec/craft_encoder_test.go index e193c6d3616..72d400bf8b2 100644 --- a/cdc/sink/mq/codec/craft_encoder_test.go +++ b/cdc/sink/mq/codec/craft_encoder_test.go @@ -198,3 +198,75 @@ func TestDefaultCraftBatchCodec(t *testing.T) { cfg.maxBatchSize = 64 testBatchCodec(t, newCraftBatchEncoderBuilder(cfg), newCraftBatchDecoder) } + +func TestCraftAppendRowChangedEventWithCallback(t *testing.T) { + t.Parallel() + cfg := NewConfig(config.ProtocolCraft).WithMaxMessageBytes(10485760) + cfg.maxBatchSize = 2 + encoder := newCraftBatchEncoderBuilder(cfg).Build() + require.NotNil(t, encoder) + + count := 0 + + row := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: []byte("aa")}}, + } + + tests := []struct { + row *model.RowChangedEvent + callback func() + }{ + { + row: row, + callback: func() { + count += 1 + }, + }, + { + row: row, + callback: func() { + count += 2 + }, + }, + { + row: row, + callback: func() { + count += 3 + }, + }, + { + row: row, + callback: func() { + count += 4 + }, + }, + { + row: row, + callback: func() { + count += 5 + }, + }, + } + + // Empty build makes sure that the callback build logic not broken. + msgs := encoder.Build() + require.Len(t, msgs, 0, "no message should be built and no panic") + + // Append the events. + for _, test := range tests { + err := encoder.AppendRowChangedEvent(context.Background(), "", test.row, test.callback) + require.Nil(t, err) + } + require.Equal(t, 0, count, "nothing should be called") + + msgs = encoder.Build() + require.Len(t, msgs, 3, "expected 3 messages") + msgs[0].Callback() + require.Equal(t, 3, count, "expected 2 callbacks to be called") + msgs[1].Callback() + require.Equal(t, 10, count, "expected 2 callbacks to be called") + msgs[2].Callback() + require.Equal(t, 15, count, "expected one callback to be called") +}