Skip to content

Commit

Permalink
ttl: fix ttl job manager will panic if the status cache doesn't conta…
Browse files Browse the repository at this point in the history
…in table (#41069)

close #41067, close #41068
  • Loading branch information
YangKeao authored Feb 6, 2023
1 parent 1e54ee4 commit e245a93
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 7 deletions.
11 changes: 5 additions & 6 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (m *JobManager) jobLoop() error {

scheduleTaskTicker := time.Tick(getTaskManagerLoopTickerInterval())
updateTaskHeartBeatTicker := time.Tick(ttlTaskHeartBeatTickerInterval)
taskCheckTicker := time.Tick(getTaskManagerLoopTickerInterval())
taskCheckTicker := time.Tick(time.Second * 5)
checkScanTaskFinishedTicker := time.Tick(getTaskManagerLoopTickerInterval())

cmdWatcher := m.cmdCli.WatchCommand(m.ctx)
Expand Down Expand Up @@ -535,6 +535,7 @@ func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cac
// It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances.
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) (*ttlJob, error) {
var expireTime time.Time
var jobID string

err := se.RunInTxn(ctx, func() error {
sql, args := cache.SelectFromTTLTableStatusWithID(table.ID)
Expand Down Expand Up @@ -574,7 +575,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return err
}

jobID := uuid.New().String()
jobID = uuid.New().String()
jobExist := false
if len(tableStatus.CurrentJobID) > 0 {
// don't create new job if there is already one running
Expand Down Expand Up @@ -629,7 +630,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return nil, err
}

job := m.createNewJob(expireTime, now, table)
job := m.createNewJob(jobID, expireTime, now, table)

// job is created, notify every scan managers to fetch new tasks
err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, job.id)
Expand All @@ -639,9 +640,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return job, nil
}

func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob {
id := m.tableStatusCache.Tables[table.ID].CurrentJobID

func (m *JobManager) createNewJob(id string, expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob {
return &ttlJob{
id: id,
ownerID: m.id,
Expand Down
2 changes: 1 addition & 1 deletion ttl/ttlworker/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
zap.String("SQL", sql),
zap.Int("retryTimes", retryTimes),
zap.Bool("needRetry", needRetry),
zap.Error(err),
zap.Error(sqlErr),
)

if !needRetry {
Expand Down

0 comments on commit e245a93

Please sign in to comment.