Skip to content

Commit

Permalink
codec(ticdc): decoder set the table id directly (pingcap#11978)
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored Jan 8, 2025
1 parent a4552cb commit 8830dc8
Show file tree
Hide file tree
Showing 11 changed files with 328 additions and 213 deletions.
41 changes: 5 additions & 36 deletions cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"math"
"sync"
"time"
Expand All @@ -32,7 +31,6 @@ import (
"github.com/pingcap/tiflow/cdc/sink/tablesink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/avro"
"github.com/pingcap/tiflow/pkg/sink/codec/canal"
Expand Down Expand Up @@ -117,10 +115,9 @@ func (p *partitionProgress) loadWatermark() uint64 {
type writer struct {
option *option

ddlList []*model.DDLEvent
ddlWithMaxCommitTs *model.DDLEvent
ddlSink ddlsink.Sink
fakeTableIDGenerator *fakeTableIDGenerator
ddlList []*model.DDLEvent
ddlWithMaxCommitTs *model.DDLEvent
ddlSink ddlsink.Sink

// sinkFactory is used to create table sink for each table.
sinkFactory *eventsinkfactory.SinkFactory
Expand All @@ -131,10 +128,7 @@ type writer struct {

func newWriter(ctx context.Context, o *option) *writer {
w := &writer{
option: o,
fakeTableIDGenerator: &fakeTableIDGenerator{
tableIDs: make(map[string]int64),
},
option: o,
progresses: make([]*partitionProgress, o.partitionNum),
}
var (
Expand Down Expand Up @@ -353,9 +347,8 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool
cachedEvents := dec.GetCachedEvents()
for _, row := range cachedEvents {
w.checkPartition(row, partition, message.TopicPartition.Offset)
tableID := row.GetTableID()
log.Info("simple protocol cached event resolved, append to the group",
zap.Int64("tableID", tableID), zap.Uint64("commitTs", row.CommitTs),
zap.Int64("tableID", row.GetTableID()), zap.Uint64("commitTs", row.CommitTs),
zap.Int32("partition", partition), zap.Any("offset", offset))
w.appendRow2Group(row, progress, offset)
}
Expand Down Expand Up @@ -384,15 +377,6 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool
continue
}
w.checkPartition(row, partition, message.TopicPartition.Offset)

tableID := row.GetTableID()
switch w.option.protocol {
case config.ProtocolSimple, config.ProtocolCanalJSON:
default:
tableID = w.fakeTableIDGenerator.
generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), tableID)
row.PhysicalTableID = tableID
}
w.appendRow2Group(row, progress, offset)
case model.MessageTypeResolved:
newWatermark, err := progress.decoder.NextResolvedEvent()
Expand Down Expand Up @@ -515,21 +499,6 @@ func (w *writer) appendRow2Group(row *model.RowChangedEvent, progress *partition
group.Append(row, offset)
}

type fakeTableIDGenerator struct {
tableIDs map[string]int64
currentTableID int64
}

func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, tableID int64) int64 {
key := fmt.Sprintf("`%s`.`%s`.`%d`", quotes.EscapeName(schema), quotes.EscapeName(table), tableID)
if tableID, ok := g.tableIDs[key]; ok {
return tableID
}
g.currentTableID++
g.tableIDs[key] = g.currentTableID
return g.currentTableID
}

func syncFlushRowChangedEvents(ctx context.Context, progress *partitionProgress, watermark uint64) {
resolvedTs := model.NewResolvedTs(watermark)
for {
Expand Down
Loading

0 comments on commit 8830dc8

Please sign in to comment.