From dd60929c507a427bd1f3045c50c5c10c72b22db7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Fri, 5 May 2023 19:35:56 +0800 Subject: [PATCH] ttl: fix ttl job manager will panic if the status cache doesn't contain table (#41069) (#43552) (#43561) --- ttl/ttlworker/BUILD.bazel | 3 +++ ttl/ttlworker/job_manager.go | 22 ++++++++++++++-------- ttl/ttlworker/job_manager_test.go | 11 ++++++++--- ttl/ttlworker/scan.go | 2 +- 4 files changed, 26 insertions(+), 12 deletions(-) diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index 527c782ed691e..347bb566834f2 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -29,8 +29,10 @@ go_library( "//util/logutil", "//util/sqlexec", "//util/timeutil", + "@com_github_google_uuid//:uuid", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@org_golang_x_time//rate", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", @@ -66,6 +68,7 @@ go_test( "//util/logutil", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@org_golang_x_time//rate", diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 25af41e46ca58..7edc1a89c7042 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -18,7 +18,9 @@ import ( "context" "time" + "github.com/google/uuid" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" @@ -32,7 +34,7 @@ import ( const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%?, %?)" const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status - SET current_job_id = UUID(), + SET current_job_id = %?, current_job_owner_id = %?, current_job_start_time = %?, current_job_status = 'waiting', @@ -48,8 +50,8 @@ func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, [] return insertNewTableIntoStatusTemplate, []interface{}{tableID, parentTableID} } -func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) { - return setTableStatusOwnerTemplate, []interface{}{id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID} +func setTableStatusOwnerSQL(jobID string, tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) { + return setTableStatusOwnerTemplate, []interface{}{jobID, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID} } func updateHeartBeatSQL(tableID int64, now time.Time, id string) (string, []interface{}) { @@ -508,6 +510,7 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b // It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances. func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*ttlJob, error) { var expireTime time.Time + var jobID string err := se.RunInTxn(ctx, func() error { sql, args := cache.SelectFromTTLTableStatusWithID(table.ID) @@ -544,7 +547,12 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return err } - sql, args = setTableStatusOwnerSQL(table.ID, now, expireTime, m.id) + jobID = uuid.New().String() + failpoint.Inject("set-job-uuid", func(val failpoint.Value) { + jobID = val.(string) + }) + + sql, args = setTableStatusOwnerSQL(jobID, table.ID, now, expireTime, m.id) _, err = se.ExecuteSQL(ctx, sql, args...) return errors.Wrapf(err, "execute sql: %s", sql) }) @@ -561,12 +569,10 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * if err != nil { return nil, err } - return m.createNewJob(expireTime, now, table) + return m.createNewJob(jobID, expireTime, now, table) } -func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *cache.PhysicalTable) (*ttlJob, error) { - id := m.tableStatusCache.Tables[table.ID].CurrentJobID - +func (m *JobManager) createNewJob(id string, expireTime time.Time, now time.Time, table *cache.PhysicalTable) (*ttlJob, error) { statistics := &ttlStatistics{} ranges, err := table.SplitScanRanges(m.ctx, m.store, splitScanCount) diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index 1f649abb065af..f791c7d6c3ded 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx/variable" @@ -230,6 +231,10 @@ func TestLockNewTable(t *testing.T) { args, } } + + failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/set-job-uuid", `return("test-job-id")`) + defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/set-job-uuid") + type sqlExecute struct { executeInfo @@ -249,7 +254,7 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - getExecuteInfo(setTableStatusOwnerSQL(1, now, expireTime, "test-id")), + getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")), nil, nil, }, { @@ -271,7 +276,7 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - getExecuteInfo(setTableStatusOwnerSQL(1, now, expireTime, "test-id")), + getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")), nil, nil, }, { @@ -285,7 +290,7 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - getExecuteInfo(setTableStatusOwnerSQL(1, now, expireTime, "test-id")), + getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")), nil, errors.New("test error message"), }, }, false, true}, diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go index 38a4fd544535d..01935eba27fbf 100644 --- a/ttl/ttlworker/scan.go +++ b/ttl/ttlworker/scan.go @@ -166,7 +166,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s zap.String("SQL", sql), zap.Int("retryTimes", retryTimes), zap.Bool("needRetry", needRetry), - zap.Error(err), + zap.Error(sqlErr), ) if !needRetry {