Skip to content

Commit

Permalink
ddl(ticdc): process only JobStateDone DDL jobs and remove the need …
Browse files Browse the repository at this point in the history
…to capture the `tidb_ddl_history` (pingcap#11836)

close pingcap#11816
  • Loading branch information
wlwilliamx authored Dec 20, 2024
1 parent 086c93d commit d07fcd1
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 111 deletions.
88 changes: 12 additions & 76 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
pfilter "github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand All @@ -65,19 +64,6 @@ type rowKVEntry struct {
PreRowExist bool
}

// DDLTableInfo contains the tableInfo about tidb_ddl_job and tidb_ddl_history
// and the column id of `job_meta` in these two tables.
type DDLTableInfo struct {
// ddlJobsTable use to parse all ddl jobs except `create table`
DDLJobTable *model.TableInfo
// It holds the column id of `job_meta` in table `tidb_ddl_jobs`.
JobMetaColumnIDinJobTable int64
// ddlHistoryTable only use to parse `create table` ddl job
DDLHistoryTable *model.TableInfo
// It holds the column id of `job_meta` in table `tidb_ddl_history`.
JobMetaColumnIDinHistoryTable int64
}

// Mounter is used to parse SQL events from KV events
type Mounter interface {
// DecodeEvent accepts `model.PolymorphicEvent` with `RawKVEntry` filled and
Expand Down Expand Up @@ -306,89 +292,39 @@ func IsLegacyFormatJob(rawKV *model.RawKVEntry) bool {
return bytes.HasPrefix(rawKV.Key, metaPrefix)
}

// ParseDDLJob parses the job from the raw KV entry.
func ParseDDLJob(rawKV *model.RawKVEntry, ddlTableInfo *DDLTableInfo) (*timodel.Job, error) {
// ParseDDLJob parses the job from the raw KV entry. id is the column id of `job_meta`.
func ParseDDLJob(tblInfo *model.TableInfo, rawKV *model.RawKVEntry, id int64) (*timodel.Job, error) {
var v []byte
var datum types.Datum

// for test case only
if bytes.HasPrefix(rawKV.Key, metaPrefix) {
// old queue base job.
v = rawKV.Value
job, err := parseJob(v, rawKV.StartTs, rawKV.CRTs, false)
if err != nil || job == nil {
job, err = parseJob(v, rawKV.StartTs, rawKV.CRTs, true)
}
return job, err
}

recordID, err := tablecodec.DecodeRowKey(rawKV.Key)
if err != nil {
return nil, errors.Trace(err)
}

tableID := tablecodec.DecodeTableID(rawKV.Key)

// parse it with tidb_ddl_job
if tableID == spanz.JobTableID {
row, err := decodeRow(rawKV.Value, recordID, ddlTableInfo.DDLJobTable, time.UTC)
} else {
// DDL job comes from `tidb_ddl_job` table after we support concurrent DDL. We should decode the job from the column.
recordID, err := tablecodec.DecodeRowKey(rawKV.Key)
if err != nil {
return nil, errors.Trace(err)
}
datum = row[ddlTableInfo.JobMetaColumnIDinJobTable]
v = datum.GetBytes()

return parseJob(v, rawKV.StartTs, rawKV.CRTs, false)
} else if tableID == spanz.JobHistoryID {
// parse it with tidb_ddl_history
row, err := decodeRow(rawKV.Value, recordID, ddlTableInfo.DDLHistoryTable, time.UTC)
row, err := decodeRow(rawKV.Value, recordID, tblInfo, time.UTC)
if err != nil {
return nil, errors.Trace(err)
}
datum = row[ddlTableInfo.JobMetaColumnIDinHistoryTable]
datum := row[id]
v = datum.GetBytes()

return parseJob(v, rawKV.StartTs, rawKV.CRTs, true)
}

return nil, fmt.Errorf("Unvalid tableID %v in rawKV.Key", tableID)
return parseJob(v, rawKV.StartTs, rawKV.CRTs)
}

// parseJob unmarshal the job from "v".
// fromHistoryTable is used to distinguish the job is from tidb_dd_job or tidb_ddl_history
// We need to be compatible with the two modes, enable_fast_create_table=on and enable_fast_create_table=off
// When enable_fast_create_table=on, `create table` will only be inserted into tidb_ddl_history after being executed successfully.
// When enable_fast_create_table=off, `create table` just like other ddls will be firstly inserted to tidb_ddl_job,
// and being inserted into tidb_ddl_history after being executed successfully.
// In both two modes, other ddls are all firstly inserted into tidb_ddl_job, and then inserted into tidb_ddl_history after being executed successfully.
//
// To be compatible with these two modes, we will get `create table` ddl from tidb_ddl_history, and all ddls from tidb_ddl_job.
// When enable_fast_create_table=off, for each `create table` ddl we will get twice(once from tidb_ddl_history, once from tidb_ddl_job)
// Because in `handleJob` we will skip the repeated ddls, thus it's ok for us to get `create table` twice.
// Besides, the `create table` from tidb_ddl_job always have a earlier commitTs than from tidb_ddl_history.
// Therefore, we always use the commitTs of ddl from `tidb_ddl_job` as StartTs, which ensures we can get all the dmls.
func parseJob(v []byte, startTs, CRTs uint64, fromHistoryTable bool) (*timodel.Job, error) {
func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {
var job timodel.Job
err := json.Unmarshal(v, &job)
if err != nil {
return nil, errors.Trace(err)
}

if fromHistoryTable {
// we only want to get `create table` and `create tables` ddl from tidb_ddl_history, so we just throw out others ddls.
// We only want the job with `JobStateSynced`, which is means the ddl job is done successfully.
// Besides, to satisfy the subsequent processing,
// We need to set the job to be Done to make it will replay in schemaStorage
if (job.Type != timodel.ActionCreateTable && job.Type != timodel.ActionCreateTables) || job.State != timodel.JobStateSynced {
return nil, nil
}
job.State = timodel.JobStateDone
} else {
// we need to get all ddl job which is done from tidb_ddl_job
if !job.IsDone() {
return nil, nil
}
if !job.IsDone() {
return nil, nil
}

// FinishedTS is only set when the job is synced,
// but we can use the entry's ts here
job.StartTS = startTs
Expand Down
38 changes: 11 additions & 27 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ type ddlJobPullerImpl struct {
schemaStorage entry.SchemaStorage
resolvedTs uint64
filter filter.Filter
// ddlTableInfo is initialized when receive the first concurrent DDL job.
ddlTableInfo *entry.DDLTableInfo
// ddlJobsTable is initialized when receive the first concurrent DDL job.
// It holds the info of table `tidb_ddl_jobs` of upstream TiDB.
ddlJobsTable *model.TableInfo
// It holds the column id of `job_meta` in table `tidb_ddl_jobs`.
jobMetaColumnID int64
// outputCh sends the DDL job entries to the caller.
outputCh chan *model.DDLJobEntry
}
Expand Down Expand Up @@ -236,14 +239,13 @@ func (p *ddlJobPullerImpl) unmarshalDDL(ctx context.Context, rawKV *model.RawKVE
if rawKV.OpType != model.OpTypePut {
return nil, nil
}
if p.ddlTableInfo == nil && !entry.IsLegacyFormatJob(rawKV) {
err := p.initDDLTableInfo(ctx)
if p.ddlJobsTable == nil && !entry.IsLegacyFormatJob(rawKV) {
err := p.initJobTableMeta(ctx)
if err != nil {
return nil, errors.Trace(err)
}
}

return entry.ParseDDLJob(rawKV, p.ddlTableInfo)
return entry.ParseDDLJob(p.ddlJobsTable, rawKV, p.jobMetaColumnID)
}

func (p *ddlJobPullerImpl) getResolvedTs() uint64 {
Expand All @@ -254,7 +256,7 @@ func (p *ddlJobPullerImpl) setResolvedTs(ts uint64) {
atomic.StoreUint64(&p.resolvedTs, ts)
}

func (p *ddlJobPullerImpl) initDDLTableInfo(ctx context.Context) error {
func (p *ddlJobPullerImpl) initJobTableMeta(ctx context.Context) error {
version, err := p.kvStorage.CurrentVersion(tidbkv.GlobalTxnScope)
if err != nil {
return errors.Trace(err)
Expand All @@ -275,8 +277,6 @@ func (p *ddlJobPullerImpl) initDDLTableInfo(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}

// for tidb_ddl_job
tableInfo, err := findTableByName(tbls, "tidb_ddl_job")
if err != nil {
return errors.Trace(err)
Expand All @@ -287,24 +287,8 @@ func (p *ddlJobPullerImpl) initDDLTableInfo(ctx context.Context) error {
return errors.Trace(err)
}

p.ddlTableInfo = &entry.DDLTableInfo{}
p.ddlTableInfo.DDLJobTable = model.WrapTableInfo(db.ID, db.Name.L, 0, tableInfo)
p.ddlTableInfo.JobMetaColumnIDinJobTable = col.ID

// for tidb_ddl_history
historyTableInfo, err := findTableByName(tbls, "tidb_ddl_history")
if err != nil {
return errors.Trace(err)
}

historyTableCol, err := findColumnByName(historyTableInfo.Columns, "job_meta")
if err != nil {
return errors.Trace(err)
}

p.ddlTableInfo.DDLHistoryTable = model.WrapTableInfo(db.ID, db.Name.L, 0, historyTableInfo)
p.ddlTableInfo.JobMetaColumnIDinHistoryTable = historyTableCol.ID

p.ddlJobsTable = model.WrapTableInfo(db.ID, db.Name.L, 0, tableInfo)
p.jobMetaColumnID = col.ID
return nil
}

Expand Down
9 changes: 1 addition & 8 deletions pkg/spanz/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
const (
// JobTableID is the id of `tidb_ddl_job`.
JobTableID = ddl.JobTableID
// JobHistoryID is the id of `tidb_ddl_history`
JobHistoryID = ddl.HistoryTableID
)

// UpperBoundKey represents the maximum value.
Expand Down Expand Up @@ -64,17 +62,12 @@ func GetTableRange(tableID int64) (startKey, endKey []byte) {

// GetAllDDLSpan return all cdc interested spans for DDL.
func GetAllDDLSpan() []tablepb.Span {
spans := make([]tablepb.Span, 0, 2)
spans := make([]tablepb.Span, 0, 1)
start, end := GetTableRange(JobTableID)
spans = append(spans, tablepb.Span{
StartKey: ToComparableKey(start),
EndKey: ToComparableKey(end),
})
start, end = GetTableRange(JobHistoryID)
spans = append(spans, tablepb.Span{
StartKey: ToComparableKey(start),
EndKey: ToComparableKey(end),
})
return spans
}

Expand Down

0 comments on commit d07fcd1

Please sign in to comment.