diff --git a/executor/ddl.go b/executor/ddl.go index 28365e2495a74..f516f530c605a 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -743,21 +743,21 @@ func (e *DDLExec) executeAlterPlacementPolicy(s *ast.AlterPlacementPolicyStmt) e } func (e *DDLExec) executeCreateResourceGroup(s *ast.CreateResourceGroupStmt) error { - if !variable.EnableResourceControl.Load() { + if !variable.EnableResourceControl.Load() && !e.ctx.GetSessionVars().InRestrictedSQL { return infoschema.ErrResourceGroupSupportDisabled } return domain.GetDomain(e.ctx).DDL().AddResourceGroup(e.ctx, s) } func (e *DDLExec) executeAlterResourceGroup(s *ast.AlterResourceGroupStmt) error { - if !variable.EnableResourceControl.Load() { + if !variable.EnableResourceControl.Load() && !e.ctx.GetSessionVars().InRestrictedSQL { return infoschema.ErrResourceGroupSupportDisabled } return domain.GetDomain(e.ctx).DDL().AlterResourceGroup(e.ctx, s) } func (e *DDLExec) executeDropResourceGroup(s *ast.DropResourceGroupStmt) error { - if !variable.EnableResourceControl.Load() { + if !variable.EnableResourceControl.Load() && !e.ctx.GetSessionVars().InRestrictedSQL { return infoschema.ErrResourceGroupSupportDisabled } return domain.GetDomain(e.ctx).DDL().DropResourceGroup(e.ctx, s) diff --git a/session/bootstrap.go b/session/bootstrap.go index eedf26e9efbc8..53d43e69eee8f 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -549,6 +549,10 @@ const ( key(parent_table_id, create_time), key(create_time) );` + + // CreateTTLResourceGroup creates the resource group for ttl + // TODO: set a more proper value for the RU_PER_SECOND + CreateTTLResourceGroup = `CREATE RESOURCE GROUP IF NOT EXISTS tidb_ttl RU_PER_SEC=10000000` ) // bootstrap initiates system DB for a store. @@ -804,11 +808,13 @@ const ( // - tidb_enable_foreign_key: off -> on // - tidb_store_batch_size: 0 -> 4 version134 = 134 + // version135 adds the tidb_ttl resource group + version135 = 135 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version134 +var currentBootstrapVersion int64 = version135 // DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it. var internalSQLTimeout = owner.ManagerSessionTTL + 15 @@ -930,6 +936,7 @@ var ( upgradeToVer132, upgradeToVer133, upgradeToVer134, + upgradeToVer135, } ) @@ -2316,6 +2323,13 @@ func upgradeToVer134(s Session, ver int64) { mustExecute(s, "UPDATE HIGH_PRIORITY %n.%n SET VARIABLE_VALUE = %? WHERE VARIABLE_NAME = %? AND VARIABLE_VALUE = %?;", mysql.SystemDB, mysql.GlobalVariablesTable, "4", variable.TiDBStoreBatchSize, "0") } +func upgradeToVer135(s Session, ver int64) { + if ver >= version135 { + return + } + doReentrantDDL(s, CreateTTLResourceGroup) +} + func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, @@ -2426,6 +2440,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateTTLTask) // Create tidb_ttl_job_history table mustExecute(s, CreateTTLJobHistory) + // Create tidb_ttl resource group + mustExecute(s, CreateTTLResourceGroup) } // doBootstrapSQLFile executes SQL commands in a file as the last stage of bootstrap. diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index 9a00cf596bded..4df2728a68f02 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -356,12 +356,13 @@ func TestPlacementPolicies(t *testing.T) { func TestResourceGroups(t *testing.T) { store := testkit.CreateMockStore(t) + systemResourceGroupCount := 1 tk := testkit.NewTestKit(t, store) usage, err := telemetry.GetFeatureUsage(tk.Session()) require.NoError(t, err) - require.Equal(t, uint64(0), usage.ResourceControlUsage.NumResourceGroups) + require.Equal(t, uint64(systemResourceGroupCount), usage.ResourceControlUsage.NumResourceGroups) require.Equal(t, false, usage.ResourceControlUsage.Enabled) tk.MustExec("set global tidb_enable_resource_control = 'ON'") @@ -369,27 +370,27 @@ func TestResourceGroups(t *testing.T) { usage, err = telemetry.GetFeatureUsage(tk.Session()) require.NoError(t, err) require.Equal(t, true, usage.ResourceControlUsage.Enabled) - require.Equal(t, uint64(1), usage.ResourceControlUsage.NumResourceGroups) + require.Equal(t, uint64(systemResourceGroupCount+1), usage.ResourceControlUsage.NumResourceGroups) tk.MustExec("create resource group y ru_per_sec=100") usage, err = telemetry.GetFeatureUsage(tk.Session()) require.NoError(t, err) - require.Equal(t, uint64(2), usage.ResourceControlUsage.NumResourceGroups) + require.Equal(t, uint64(systemResourceGroupCount+2), usage.ResourceControlUsage.NumResourceGroups) tk.MustExec("alter resource group y ru_per_sec=200") usage, err = telemetry.GetFeatureUsage(tk.Session()) require.NoError(t, err) - require.Equal(t, uint64(2), usage.ResourceControlUsage.NumResourceGroups) + require.Equal(t, uint64(systemResourceGroupCount+2), usage.ResourceControlUsage.NumResourceGroups) tk.MustExec("drop resource group y") usage, err = telemetry.GetFeatureUsage(tk.Session()) require.NoError(t, err) - require.Equal(t, uint64(1), usage.ResourceControlUsage.NumResourceGroups) + require.Equal(t, uint64(systemResourceGroupCount+1), usage.ResourceControlUsage.NumResourceGroups) tk.MustExec("set global tidb_enable_resource_control = 'OFF'") usage, err = telemetry.GetFeatureUsage(tk.Session()) require.NoError(t, err) - require.Equal(t, uint64(1), usage.ResourceControlUsage.NumResourceGroups) + require.Equal(t, uint64(systemResourceGroupCount+1), usage.ResourceControlUsage.NumResourceGroups) require.Equal(t, false, usage.ResourceControlUsage.Enabled) } diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go index 4cf3d919d9545..e8c84e720f8a7 100644 --- a/ttl/ttlworker/scan.go +++ b/ttl/ttlworker/scan.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "strconv" + "strings" "sync/atomic" "time" @@ -173,7 +174,12 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s return t.result(sqlErr) } retrySQL = sql - retryTimes++ + + // if the sql returns this error, it means we have reached the limit of resource group (and has waited for 1 second). + // TODO: throttle the traffic rather than a stable interval to make it easier to pass the limiter. + if !strings.Contains(sqlErr.Error(), "[resource group controller] limiter has no enough token or needs wait too long") { + retryTimes++ + } tracer.EnterPhase(metrics.PhaseWaitRetry) select { diff --git a/ttl/ttlworker/scan_test.go b/ttl/ttlworker/scan_test.go index 3659ce0843617..b148b836279ee 100644 --- a/ttl/ttlworker/scan_test.go +++ b/ttl/ttlworker/scan_test.go @@ -211,6 +211,7 @@ type mockScanTask struct { tbl *cache.PhysicalTable sessPool *mockSessionPool sqlRetry []int + sqlErr map[int]error delCh chan *ttlDeleteTask prevSQL string @@ -237,6 +238,7 @@ func newMockScanTask(t *testing.T, sqlCnt int) *mockScanTask { delCh: make(chan *ttlDeleteTask, sqlCnt*(scanTaskExecuteSQLMaxRetry+1)), sessPool: newMockSessionPool(t), sqlRetry: make([]int, sqlCnt), + sqlErr: make(map[int]error), schemaChangeIdx: -1, } task.sessPool.se.executeSQL = task.execSQL @@ -379,6 +381,9 @@ func (t *mockScanTask) execSQL(_ context.Context, sql string, _ ...interface{}) } if curRetry < t.sqlRetry[i] { + if err := t.sqlErr[i]; err != nil { + return nil, err + } return nil, errors.New("mockErr") } @@ -407,3 +412,13 @@ func TestScanTaskDoScan(t *testing.T) { task.schemaChangeInRetry = 2 task.runDoScanForTest(1, "table 'test.t1' meta changed, should abort current job: [schema:1146]Table 'test.t1' doesn't exist") } + +func TestScanTaskRetryWithLimit(t *testing.T) { + task := newMockScanTask(t, 3) + task.sqlRetry[1] = scanTaskExecuteSQLMaxRetry + 1 + task.runDoScanForTest(1, "mockErr") + + task.sqlRetry[1] = scanTaskExecuteSQLMaxRetry + 1 + task.sqlErr[1] = fmt.Errorf("[resource group controller] limiter has no enough token or needs wait too long") + task.runDoScanForTest(3, "") +} diff --git a/ttl/ttlworker/session.go b/ttl/ttlworker/session.go index d3e0c940e78c7..b9ca50df43ace 100644 --- a/ttl/ttlworker/session.go +++ b/ttl/ttlworker/session.go @@ -16,21 +16,17 @@ package ttlworker import ( "context" - "fmt" "time" "github.com/ngaut/pools" "github.com/pingcap/errors" - "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" - "go.uber.org/zap" ) // The following two functions are using `sqlexec.SQLExecutor` to represent session @@ -85,21 +81,12 @@ func getSession(pool sessionPool) (session.Session, error) { originalRetryLimit := sctx.GetSessionVars().RetryLimit originalEnable1PC := sctx.GetSessionVars().Enable1PC originalEnableAsyncCommit := sctx.GetSessionVars().EnableAsyncCommit + originalResourceGroupName := sctx.GetSessionVars().ResourceGroupName se := session.NewSession(sctx, exec, func(se session.Session) { - _, err = se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit)) - if err != nil { - logutil.BgLogger().Error("fail to reset tidb_retry_limit", zap.Int64("originalRetryLimit", originalRetryLimit), zap.Error(err)) - } - - if !originalEnable1PC { - _, err = se.ExecuteSQL(context.Background(), "set tidb_enable_1pc=OFF") - terror.Log(err) - } - - if !originalEnableAsyncCommit { - _, err = se.ExecuteSQL(context.Background(), "set tidb_enable_async_commit=OFF") - terror.Log(err) - } + sctx.GetSessionVars().RetryLimit = originalRetryLimit + sctx.GetSessionVars().Enable1PC = originalEnable1PC + sctx.GetSessionVars().EnableAsyncCommit = originalEnableAsyncCommit + sctx.GetSessionVars().ResourceGroupName = originalResourceGroupName DetachStatsCollector(exec) @@ -109,25 +96,13 @@ func getSession(pool sessionPool) (session.Session, error) { exec = AttachStatsCollector(exec) // store and set the retry limit to 0 - _, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0") - if err != nil { - se.Close() - return nil, err - } + sctx.GetSessionVars().RetryLimit = 0 // set enable 1pc to ON - _, err = se.ExecuteSQL(context.Background(), "set tidb_enable_1pc=ON") - if err != nil { - se.Close() - return nil, err - } + sctx.GetSessionVars().Enable1PC = true // set enable async commit to ON - _, err = se.ExecuteSQL(context.Background(), "set tidb_enable_async_commit=ON") - if err != nil { - se.Close() - return nil, err - } + sctx.GetSessionVars().EnableAsyncCommit = true // Force rollback the session to guarantee the session is not in any explicit transaction if _, err = se.ExecuteSQL(context.Background(), "ROLLBACK"); err != nil { @@ -135,6 +110,8 @@ func getSession(pool sessionPool) (session.Session, error) { return nil, err } + sctx.GetSessionVars().ResourceGroupName = "tidb_ttl" + return se, nil }