diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 5f8b7bd038fc4..919cc56e7c8da 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -153,7 +153,7 @@ func (m *JobManager) jobLoop() error { scheduleTaskTicker := time.Tick(getTaskManagerLoopTickerInterval()) updateTaskHeartBeatTicker := time.Tick(ttlTaskHeartBeatTickerInterval) - taskCheckTicker := time.Tick(getTaskManagerLoopTickerInterval()) + taskCheckTicker := time.Tick(time.Second * 5) checkScanTaskFinishedTicker := time.Tick(getTaskManagerLoopTickerInterval()) cmdWatcher := m.cmdCli.WatchCommand(m.ctx) @@ -535,6 +535,7 @@ func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cac // It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances. func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) (*ttlJob, error) { var expireTime time.Time + var jobID string err := se.RunInTxn(ctx, func() error { sql, args := cache.SelectFromTTLTableStatusWithID(table.ID) @@ -574,7 +575,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return err } - jobID := uuid.New().String() + jobID = uuid.New().String() jobExist := false if len(tableStatus.CurrentJobID) > 0 { // don't create new job if there is already one running @@ -629,7 +630,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return nil, err } - job := m.createNewJob(expireTime, now, table) + job := m.createNewJob(jobID, expireTime, now, table) // job is created, notify every scan managers to fetch new tasks err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, job.id) @@ -639,9 +640,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return job, nil } -func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob { - id := m.tableStatusCache.Tables[table.ID].CurrentJobID - +func (m *JobManager) createNewJob(id string, expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob { return &ttlJob{ id: id, ownerID: m.id, diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go index ac1c1cd85ab50..4cf3d919d9545 100644 --- a/ttl/ttlworker/scan.go +++ b/ttl/ttlworker/scan.go @@ -166,7 +166,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s zap.String("SQL", sql), zap.Int("retryTimes", retryTimes), zap.Bool("needRetry", needRetry), - zap.Error(err), + zap.Error(sqlErr), ) if !needRetry {