Skip to content

Commit

Permalink
ttl: fix ttl job manager will panic if the status cache doesn't conta…
Browse files Browse the repository at this point in the history
…in table (#41069) (#43552) (#43561)
  • Loading branch information
lcwangchao authored May 5, 2023
1 parent 58fcf7b commit dd60929
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 12 deletions.
3 changes: 3 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ go_library(
"//util/logutil",
"//util/sqlexec",
"//util/timeutil",
"@com_github_google_uuid//:uuid",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_golang_x_time//rate",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
Expand Down Expand Up @@ -66,6 +68,7 @@ go_test(
"//util/logutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_time//rate",
Expand Down
22 changes: 14 additions & 8 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"context"
"time"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/ttl/cache"
Expand All @@ -32,7 +34,7 @@ import (

const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%?, %?)"
const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status
SET current_job_id = UUID(),
SET current_job_id = %?,
current_job_owner_id = %?,
current_job_start_time = %?,
current_job_status = 'waiting',
Expand All @@ -48,8 +50,8 @@ func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []
return insertNewTableIntoStatusTemplate, []interface{}{tableID, parentTableID}
}

func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) {
return setTableStatusOwnerTemplate, []interface{}{id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID}
func setTableStatusOwnerSQL(jobID string, tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) {
return setTableStatusOwnerTemplate, []interface{}{jobID, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID}
}

func updateHeartBeatSQL(tableID int64, now time.Time, id string) (string, []interface{}) {
Expand Down Expand Up @@ -508,6 +510,7 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b
// It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances.
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*ttlJob, error) {
var expireTime time.Time
var jobID string

err := se.RunInTxn(ctx, func() error {
sql, args := cache.SelectFromTTLTableStatusWithID(table.ID)
Expand Down Expand Up @@ -544,7 +547,12 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return err
}

sql, args = setTableStatusOwnerSQL(table.ID, now, expireTime, m.id)
jobID = uuid.New().String()
failpoint.Inject("set-job-uuid", func(val failpoint.Value) {
jobID = val.(string)
})

sql, args = setTableStatusOwnerSQL(jobID, table.ID, now, expireTime, m.id)
_, err = se.ExecuteSQL(ctx, sql, args...)
return errors.Wrapf(err, "execute sql: %s", sql)
})
Expand All @@ -561,12 +569,10 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
if err != nil {
return nil, err
}
return m.createNewJob(expireTime, now, table)
return m.createNewJob(jobID, expireTime, now, table)
}

func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *cache.PhysicalTable) (*ttlJob, error) {
id := m.tableStatusCache.Tables[table.ID].CurrentJobID

func (m *JobManager) createNewJob(id string, expireTime time.Time, now time.Time, table *cache.PhysicalTable) (*ttlJob, error) {
statistics := &ttlStatistics{}

ranges, err := table.SplitScanRanges(m.ctx, m.store, splitScanCount)
Expand Down
11 changes: 8 additions & 3 deletions ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -230,6 +231,10 @@ func TestLockNewTable(t *testing.T) {
args,
}
}

failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/set-job-uuid", `return("test-job-id")`)
defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/set-job-uuid")

type sqlExecute struct {
executeInfo

Expand All @@ -249,7 +254,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
getExecuteInfo(setTableStatusOwnerSQL(1, now, expireTime, "test-id")),
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")),
nil, nil,
},
{
Expand All @@ -271,7 +276,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
getExecuteInfo(setTableStatusOwnerSQL(1, now, expireTime, "test-id")),
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")),
nil, nil,
},
{
Expand All @@ -285,7 +290,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
getExecuteInfo(setTableStatusOwnerSQL(1, now, expireTime, "test-id")),
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")),
nil, errors.New("test error message"),
},
}, false, true},
Expand Down
2 changes: 1 addition & 1 deletion ttl/ttlworker/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
zap.String("SQL", sql),
zap.Int("retryTimes", retryTimes),
zap.Bool("needRetry", needRetry),
zap.Error(err),
zap.Error(sqlErr),
)

if !needRetry {
Expand Down

0 comments on commit dd60929

Please sign in to comment.