diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index d7ed71229a5..e255c890cda 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -38,7 +38,6 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" pfilter "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/integrity" - "github.com/pingcap/tiflow/pkg/spanz" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -65,19 +64,6 @@ type rowKVEntry struct { PreRowExist bool } -// DDLTableInfo contains the tableInfo about tidb_ddl_job and tidb_ddl_history -// and the column id of `job_meta` in these two tables. -type DDLTableInfo struct { - // ddlJobsTable use to parse all ddl jobs except `create table` - DDLJobTable *model.TableInfo - // It holds the column id of `job_meta` in table `tidb_ddl_jobs`. - JobMetaColumnIDinJobTable int64 - // ddlHistoryTable only use to parse `create table` ddl job - DDLHistoryTable *model.TableInfo - // It holds the column id of `job_meta` in table `tidb_ddl_history`. - JobMetaColumnIDinHistoryTable int64 -} - // Mounter is used to parse SQL events from KV events type Mounter interface { // DecodeEvent accepts `model.PolymorphicEvent` with `RawKVEntry` filled and @@ -306,89 +292,39 @@ func IsLegacyFormatJob(rawKV *model.RawKVEntry) bool { return bytes.HasPrefix(rawKV.Key, metaPrefix) } -// ParseDDLJob parses the job from the raw KV entry. -func ParseDDLJob(rawKV *model.RawKVEntry, ddlTableInfo *DDLTableInfo) (*timodel.Job, error) { +// ParseDDLJob parses the job from the raw KV entry. id is the column id of `job_meta`. +func ParseDDLJob(tblInfo *model.TableInfo, rawKV *model.RawKVEntry, id int64) (*timodel.Job, error) { var v []byte - var datum types.Datum - - // for test case only if bytes.HasPrefix(rawKV.Key, metaPrefix) { + // old queue base job. v = rawKV.Value - job, err := parseJob(v, rawKV.StartTs, rawKV.CRTs, false) - if err != nil || job == nil { - job, err = parseJob(v, rawKV.StartTs, rawKV.CRTs, true) - } - return job, err - } - - recordID, err := tablecodec.DecodeRowKey(rawKV.Key) - if err != nil { - return nil, errors.Trace(err) - } - - tableID := tablecodec.DecodeTableID(rawKV.Key) - - // parse it with tidb_ddl_job - if tableID == spanz.JobTableID { - row, err := decodeRow(rawKV.Value, recordID, ddlTableInfo.DDLJobTable, time.UTC) + } else { + // DDL job comes from `tidb_ddl_job` table after we support concurrent DDL. We should decode the job from the column. + recordID, err := tablecodec.DecodeRowKey(rawKV.Key) if err != nil { return nil, errors.Trace(err) } - datum = row[ddlTableInfo.JobMetaColumnIDinJobTable] - v = datum.GetBytes() - - return parseJob(v, rawKV.StartTs, rawKV.CRTs, false) - } else if tableID == spanz.JobHistoryID { - // parse it with tidb_ddl_history - row, err := decodeRow(rawKV.Value, recordID, ddlTableInfo.DDLHistoryTable, time.UTC) + row, err := decodeRow(rawKV.Value, recordID, tblInfo, time.UTC) if err != nil { return nil, errors.Trace(err) } - datum = row[ddlTableInfo.JobMetaColumnIDinHistoryTable] + datum := row[id] v = datum.GetBytes() - - return parseJob(v, rawKV.StartTs, rawKV.CRTs, true) } - return nil, fmt.Errorf("Unvalid tableID %v in rawKV.Key", tableID) + return parseJob(v, rawKV.StartTs, rawKV.CRTs) } // parseJob unmarshal the job from "v". -// fromHistoryTable is used to distinguish the job is from tidb_dd_job or tidb_ddl_history -// We need to be compatible with the two modes, enable_fast_create_table=on and enable_fast_create_table=off -// When enable_fast_create_table=on, `create table` will only be inserted into tidb_ddl_history after being executed successfully. -// When enable_fast_create_table=off, `create table` just like other ddls will be firstly inserted to tidb_ddl_job, -// and being inserted into tidb_ddl_history after being executed successfully. -// In both two modes, other ddls are all firstly inserted into tidb_ddl_job, and then inserted into tidb_ddl_history after being executed successfully. -// -// To be compatible with these two modes, we will get `create table` ddl from tidb_ddl_history, and all ddls from tidb_ddl_job. -// When enable_fast_create_table=off, for each `create table` ddl we will get twice(once from tidb_ddl_history, once from tidb_ddl_job) -// Because in `handleJob` we will skip the repeated ddls, thus it's ok for us to get `create table` twice. -// Besides, the `create table` from tidb_ddl_job always have a earlier commitTs than from tidb_ddl_history. -// Therefore, we always use the commitTs of ddl from `tidb_ddl_job` as StartTs, which ensures we can get all the dmls. -func parseJob(v []byte, startTs, CRTs uint64, fromHistoryTable bool) (*timodel.Job, error) { +func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) { var job timodel.Job err := json.Unmarshal(v, &job) if err != nil { return nil, errors.Trace(err) } - - if fromHistoryTable { - // we only want to get `create table` and `create tables` ddl from tidb_ddl_history, so we just throw out others ddls. - // We only want the job with `JobStateSynced`, which is means the ddl job is done successfully. - // Besides, to satisfy the subsequent processing, - // We need to set the job to be Done to make it will replay in schemaStorage - if (job.Type != timodel.ActionCreateTable && job.Type != timodel.ActionCreateTables) || job.State != timodel.JobStateSynced { - return nil, nil - } - job.State = timodel.JobStateDone - } else { - // we need to get all ddl job which is done from tidb_ddl_job - if !job.IsDone() { - return nil, nil - } + if !job.IsDone() { + return nil, nil } - // FinishedTS is only set when the job is synced, // but we can use the entry's ts here job.StartTS = startTs diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index e552bb2ba78..daf8eb2c936 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -75,8 +75,11 @@ type ddlJobPullerImpl struct { schemaStorage entry.SchemaStorage resolvedTs uint64 filter filter.Filter - // ddlTableInfo is initialized when receive the first concurrent DDL job. - ddlTableInfo *entry.DDLTableInfo + // ddlJobsTable is initialized when receive the first concurrent DDL job. + // It holds the info of table `tidb_ddl_jobs` of upstream TiDB. + ddlJobsTable *model.TableInfo + // It holds the column id of `job_meta` in table `tidb_ddl_jobs`. + jobMetaColumnID int64 // outputCh sends the DDL job entries to the caller. outputCh chan *model.DDLJobEntry } @@ -236,14 +239,13 @@ func (p *ddlJobPullerImpl) unmarshalDDL(ctx context.Context, rawKV *model.RawKVE if rawKV.OpType != model.OpTypePut { return nil, nil } - if p.ddlTableInfo == nil && !entry.IsLegacyFormatJob(rawKV) { - err := p.initDDLTableInfo(ctx) + if p.ddlJobsTable == nil && !entry.IsLegacyFormatJob(rawKV) { + err := p.initJobTableMeta(ctx) if err != nil { return nil, errors.Trace(err) } } - - return entry.ParseDDLJob(rawKV, p.ddlTableInfo) + return entry.ParseDDLJob(p.ddlJobsTable, rawKV, p.jobMetaColumnID) } func (p *ddlJobPullerImpl) getResolvedTs() uint64 { @@ -254,7 +256,7 @@ func (p *ddlJobPullerImpl) setResolvedTs(ts uint64) { atomic.StoreUint64(&p.resolvedTs, ts) } -func (p *ddlJobPullerImpl) initDDLTableInfo(ctx context.Context) error { +func (p *ddlJobPullerImpl) initJobTableMeta(ctx context.Context) error { version, err := p.kvStorage.CurrentVersion(tidbkv.GlobalTxnScope) if err != nil { return errors.Trace(err) @@ -275,8 +277,6 @@ func (p *ddlJobPullerImpl) initDDLTableInfo(ctx context.Context) error { if err != nil { return errors.Trace(err) } - - // for tidb_ddl_job tableInfo, err := findTableByName(tbls, "tidb_ddl_job") if err != nil { return errors.Trace(err) @@ -287,24 +287,8 @@ func (p *ddlJobPullerImpl) initDDLTableInfo(ctx context.Context) error { return errors.Trace(err) } - p.ddlTableInfo = &entry.DDLTableInfo{} - p.ddlTableInfo.DDLJobTable = model.WrapTableInfo(db.ID, db.Name.L, 0, tableInfo) - p.ddlTableInfo.JobMetaColumnIDinJobTable = col.ID - - // for tidb_ddl_history - historyTableInfo, err := findTableByName(tbls, "tidb_ddl_history") - if err != nil { - return errors.Trace(err) - } - - historyTableCol, err := findColumnByName(historyTableInfo.Columns, "job_meta") - if err != nil { - return errors.Trace(err) - } - - p.ddlTableInfo.DDLHistoryTable = model.WrapTableInfo(db.ID, db.Name.L, 0, historyTableInfo) - p.ddlTableInfo.JobMetaColumnIDinHistoryTable = historyTableCol.ID - + p.ddlJobsTable = model.WrapTableInfo(db.ID, db.Name.L, 0, tableInfo) + p.jobMetaColumnID = col.ID return nil } diff --git a/pkg/spanz/span.go b/pkg/spanz/span.go index 43b715c1a93..1eed64e1a4c 100644 --- a/pkg/spanz/span.go +++ b/pkg/spanz/span.go @@ -29,8 +29,6 @@ import ( const ( // JobTableID is the id of `tidb_ddl_job`. JobTableID = ddl.JobTableID - // JobHistoryID is the id of `tidb_ddl_history` - JobHistoryID = ddl.HistoryTableID ) // UpperBoundKey represents the maximum value. @@ -64,17 +62,12 @@ func GetTableRange(tableID int64) (startKey, endKey []byte) { // GetAllDDLSpan return all cdc interested spans for DDL. func GetAllDDLSpan() []tablepb.Span { - spans := make([]tablepb.Span, 0, 2) + spans := make([]tablepb.Span, 0, 1) start, end := GetTableRange(JobTableID) spans = append(spans, tablepb.Span{ StartKey: ToComparableKey(start), EndKey: ToComparableKey(end), }) - start, end = GetTableRange(JobHistoryID) - spans = append(spans, tablepb.Span{ - StartKey: ToComparableKey(start), - EndKey: ToComparableKey(end), - }) return spans }