Skip to content

Commit

Permalink
add a basic implementation of canal flat json decoder.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 8, 2021
1 parent c3fb12b commit a492461
Showing 1 changed file with 72 additions and 76 deletions.
148 changes: 72 additions & 76 deletions cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,96 +368,92 @@ func (c *CanalFlatEventBatchEncoder) SetParams(params map[string]string) error {

// CanalFlatEventBatchDecoder decodes the byte of a batch into the original messages.
type CanalFlatEventBatchDecoder struct {
// headers *craft.Headers
// decoder *craft.MessageDecoder
// index int
//
// allocator *craft.SliceAllocator
data []byte
msg *MQMessage
decoded bool
}

// HasNext implements the EventBatchDecoder interface
func (b *CanalFlatEventBatchDecoder) HasNext() (model.MqMessageType, bool, error) {
// if b.index >= b.headers.Count() {
// return model.MqMessageTypeUnknown, false, nil
// }
// return b.headers.GetType(b.index), true, nil
return model.MqMessageTypeUnknown, false, nil
if b.decoded {
return b.msg.Type, true, nil
}
if len(b.data) == 0 {
return model.MqMessageTypeUnknown, false, nil
}
if err := json.Unmarshal(b.data, b.msg); err != nil {
return model.MqMessageTypeUnknown, false, err
}
b.decoded = true
return b.msg.Type, true, nil
}

// NextRowChangedEvent implements the EventBatchDecoder interface
func (b *CanalFlatEventBatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
// ty, hasNext, err := b.HasNext()
// if err != nil {
// return nil, errors.Trace(err)
// }
// if !hasNext || ty != model.MqMessageTypeRow {
// return nil, cerror.ErrCraftCodecInvalidData.GenWithStack("not found row changed event message")
// }
// oldValue, newValue, err := b.decoder.RowChangedEvent(b.index)
// if err != nil {
// return nil, errors.Trace(err)
// }
// ev := &model.RowChangedEvent{}
// if oldValue != nil {
// if ev.PreColumns, err = oldValue.ToModel(); err != nil {
// return nil, errors.Trace(err)
// }
// }
// if newValue != nil {
// if ev.Columns, err = newValue.ToModel(); err != nil {
// return nil, errors.Trace(err)
// }
// }
// ev.CommitTs = b.headers.GetTs(b.index)
// ev.Table = &model.TableName{
// Schema: b.headers.GetSchema(b.index),
// Table: b.headers.GetTable(b.index),
// }
// partition := b.headers.GetPartition(b.index)
// if partition >= 0 {
// ev.Table.TableID = partition
// ev.Table.IsPartition = true
// }
// b.index++
// return ev, nil
return nil, nil
ty, hasNext, err := b.HasNext()
if err != nil {
return nil, errors.Trace(err)
}
if !hasNext || ty != model.MqMessageTypeRow {
return nil, cerrors.ErrCanalDecodeFailed.GenWithStack("not found row changed event message")
}
if b.msg == nil {
log.Panic("Cannot find message, bug?", zap.Any("decoder", b))
}
ev := &model.RowChangedEvent{}
if err := json.Unmarshal(b.msg.Value, ev); err != nil {
return nil, errors.Trace(err)
}
b.data = nil
return ev, nil
}

// NextDDLEvent implements the EventBatchDecoder interface
func (b *CanalFlatEventBatchDecoder) NextDDLEvent() (*model.DDLEvent, error) {
// if b.nextKey == nil {
// if err := b.decodeNextKey(); err != nil {
// return nil, err
// }
// }
// b.keyBytes = b.keyBytes[b.nextKeyLen+8:]
// if b.nextKey.Type != model.MqMessageTypeDDL {
// return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("not found ddl event message")
// }
// valueLen := binary.BigEndian.Uint64(b.valueBytes[:8])
// value := b.valueBytes[8 : valueLen+8]
// b.valueBytes = b.valueBytes[valueLen+8:]
// ddlMsg := new(mqMessageDDL)
// if err := ddlMsg.Decode(value); err != nil {
// return nil, errors.Trace(err)
// }
// ddlEvent := mqMessageToDDLEvent(b.nextKey, ddlMsg)
// b.nextKey = nil
// return ddlEvent, nil
return nil, nil
ty, hasNext, err := b.HasNext()
if err != nil {
return nil, errors.Trace(err)
}
if !hasNext || ty != model.MqMessageTypeDDL {
return nil, cerrors.ErrCanalDecodeFailed.GenWithStack("not found ddl event message")
}
if b.msg == nil {
log.Panic("Cannot find message, bug?", zap.Any("decoder", b))
}
msg := &model.DDLEvent{}
if err := json.Unmarshal(b.msg.Value, msg); err != nil {
return nil, errors.Trace(err)
}
b.data = nil
return msg, nil
}

// NextResolvedEvent implements the EventBatchDecoder interface
func (b *CanalFlatEventBatchDecoder) NextResolvedEvent() (uint64, error) {
// ty, hasNext, err := b.HasNext()
// if err != nil {
// return 0, errors.Trace(err)
// }
// if !hasNext || ty != model.MqMessageTypeResolved {
// return 0, cerror.ErrCraftCodecInvalidData.GenWithStack("not found resolved event message")
// }
// ts := b.headers.GetTs(b.index)
// b.index++
// return ts, nil
return 0, nil
ty, hasNext, err := b.HasNext()
if err != nil {
return 0, errors.Trace(err)
}
if !hasNext || ty != model.MqMessageTypeResolved {
return 0, cerrors.ErrCanalDecodeFailed.GenWithStack("not found resolved event message")
}
if b.msg == nil {
log.Panic("Cannot find message, bug?", zap.Any("decoder", b))
}

msg := &canalFlatMessage{}
if err := json.Unmarshal(b.msg.Value, msg); err != nil {
return 0, errors.Trace(err)
}

b.data = nil
return msg.CheckpointTs, nil
}

func NewCanalFlatEventBatchDecoder(data []byte) (EventBatchDecoder, error) {
return &CanalFlatEventBatchDecoder{
data: data,
msg: nil,
decoded: false,
}, nil
}

0 comments on commit a492461

Please sign in to comment.