Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, parser: Implement the write-reorg state split task related functions, and the related interfaces of backfill worker #39982

Merged
merged 13 commits into from
Jan 4, 2023
Merged
Prev Previous commit
Next Next commit
Merge branch 'master' into zimuxia/backfill-split
zimulala committed Jan 3, 2023
commit 12c8f72d4772565230e9f6850809f49fa91aef5d
153 changes: 0 additions & 153 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
@@ -641,13 +641,6 @@ func isDependencyJobDone(t *meta.Meta, job *model.Job) (bool, error) {
return true, nil
}

func newMetaWithQueueTp(txn kv.Transaction, tp workerType) *meta.Meta {
if tp == addIdxWorker {
return meta.NewMeta(txn, meta.AddIndexJobListKey)
}
return meta.NewMeta(txn)
}

func (w *JobContext) setDDLLabelForTopSQL(jobQuery string) {
if !topsqlstate.TopSQLEnabled() || jobQuery == "" {
return
@@ -838,152 +831,6 @@ func (w *JobContext) ddlJobSourceType() string {
return w.tp
}

// handleDDLJobQueue handles DDL jobs in DDL Job queue.
func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
once := true
waitDependencyJobCnt := 0
for {
if isChanClosed(w.ctx.Done()) {
return nil
}

var (
job *model.Job
schemaVer int64
runJobErr error
)
waitTime := 2 * d.lease
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err := kv.RunInNewTxn(ctx, d.store, false, func(ctx context.Context, txn kv.Transaction) error {
d.runningJobs.Lock()
// We are not owner, return and retry checking later.
if !d.isOwner() || variable.EnableConcurrentDDL.Load() || d.waiting.Load() {
d.runningJobs.Unlock()
return nil
}

var err error
t := newMetaWithQueueTp(txn, w.tp)

// We become the owner. Get the first job and run it.
job, err = w.getFirstDDLJob(t)
if job == nil || err != nil {
d.runningJobs.Unlock()
return errors.Trace(err)
}
d.runningJobs.ids[job.ID] = struct{}{}
d.runningJobs.Unlock()

defer d.deleteRunningDDLJobMap(job.ID)

// only general ddls allowed to be executed when TiKV is disk full.
if w.tp == addIdxWorker && job.IsRunning() {
txn.SetDiskFullOpt(kvrpcpb.DiskFullOpt_NotAllowedOnFull)
}

w.setDDLLabelForTopSQL(job.ID, job.Query)
w.setDDLSourceForDiagnosis(job)
jobContext := w.jobContext(job.ID)
if tagger := w.getResourceGroupTaggerForTopSQL(job.ID); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
if isDone, err1 := isDependencyJobDone(t, job); err1 != nil || !isDone {
return errors.Trace(err1)
}

if once {
err = waitSchemaSynced(d, job, waitTime)
if err == nil {
once = false
}
return err
}

if job.IsDone() || job.IsRollbackDone() {
if !job.IsRollbackDone() {
job.State = model.JobStateSynced
}
err = w.finishDDLJob(t, job)
return errors.Trace(err)
}

d.mu.RLock()
d.mu.hook.OnJobRunBefore(job)
d.mu.RUnlock()

// set request source type to DDL type
txn.SetOption(kv.RequestSourceType, jobContext.ddlJobSourceType())
// If running job meets error, we will save this error in job Error
// and retry later if the job is not cancelled.
schemaVer, runJobErr = w.runDDLJob(d, t, job)
if job.IsCancelled() {
txn.Reset()
err = w.finishDDLJob(t, job)
return errors.Trace(err)
}
if runJobErr != nil && !job.IsRollingback() && !job.IsRollbackDone() {
// If the running job meets an error
// and the job state is rolling back, it means that we have already handled this error.
// Some DDL jobs (such as adding indexes) may need to update the table info and the schema version,
// then shouldn't discard the KV modification.
// And the job state is rollback done, it means the job was already finished, also shouldn't discard too.
// Otherwise, we should discard the KV modification when running job.
txn.Reset()
// If error happens after updateSchemaVersion(), then the schemaVer is updated.
// Result in the retry duration is up to 2 * lease.
schemaVer = 0
}
err = w.updateDDLJob(t, job, runJobErr != nil)
if err = w.handleUpdateJobError(t, job, err); err != nil {
return errors.Trace(err)
}
writeBinlog(d.binlogCli, txn, job)
return nil
})

if runJobErr != nil {
// wait a while to retry again. If we don't wait here, DDL will retry this job immediately,
// which may act like a deadlock.
logutil.Logger(w.logCtx).Info("[ddl] run DDL job failed, sleeps a while then retries it.",
zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr))
time.Sleep(GetWaitTimeWhenErrorOccurred())
}
if job != nil {
d.unlockSchemaVersion(job.ID)
}

if err != nil {
w.unlockSeqNum(err)
return errors.Trace(err)
} else if job == nil {
// No job now, return and retry getting later.
return nil
}
w.unlockSeqNum(err)
w.waitDependencyJobFinished(job, &waitDependencyJobCnt)

// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
waitSchemaChanged(context.Background(), d, waitTime, schemaVer, job)

if RunInGoTest {
// d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously.
d.mu.RLock()
d.mu.hook.OnSchemaStateChanged(schemaVer)
d.mu.RUnlock()
}

d.mu.RLock()
d.mu.hook.OnJobUpdated(job)
d.mu.RUnlock()

if job.IsSynced() || job.IsCancelled() || job.IsRollbackDone() {
asyncNotify(d.ddlJobDoneCh)
}
}
}

func skipWriteBinlog(job *model.Job) bool {
switch job.Type {
// ActionUpdateTiFlashReplicaStatus is a TiDB internal DDL,
2 changes: 1 addition & 1 deletion ddl/partition.go
Original file line number Diff line number Diff line change
@@ -1756,7 +1756,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey})
}
}
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
rh := newReorgHandler(t, w.sess)
reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, physicalTableIDs, elements)

if err != nil || reorgInfo.first {
You are viewing a condensed version of this merge commit. You can view the full changes here.