Skip to content

Commit

Permalink
support resource limit for TTL
Browse files Browse the repository at this point in the history
Signed-off-by: YangKeao <[email protected]>
  • Loading branch information
YangKeao committed Feb 21, 2023
1 parent 5cbf6eb commit 0d16e84
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 44 deletions.
6 changes: 3 additions & 3 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,21 +743,21 @@ func (e *DDLExec) executeAlterPlacementPolicy(s *ast.AlterPlacementPolicyStmt) e
}

func (e *DDLExec) executeCreateResourceGroup(s *ast.CreateResourceGroupStmt) error {
if !variable.EnableResourceControl.Load() {
if !variable.EnableResourceControl.Load() && !e.ctx.GetSessionVars().InRestrictedSQL {
return infoschema.ErrResourceGroupSupportDisabled
}
return domain.GetDomain(e.ctx).DDL().AddResourceGroup(e.ctx, s)
}

func (e *DDLExec) executeAlterResourceGroup(s *ast.AlterResourceGroupStmt) error {
if !variable.EnableResourceControl.Load() {
if !variable.EnableResourceControl.Load() && !e.ctx.GetSessionVars().InRestrictedSQL {
return infoschema.ErrResourceGroupSupportDisabled
}
return domain.GetDomain(e.ctx).DDL().AlterResourceGroup(e.ctx, s)
}

func (e *DDLExec) executeDropResourceGroup(s *ast.DropResourceGroupStmt) error {
if !variable.EnableResourceControl.Load() {
if !variable.EnableResourceControl.Load() && !e.ctx.GetSessionVars().InRestrictedSQL {
return infoschema.ErrResourceGroupSupportDisabled
}
return domain.GetDomain(e.ctx).DDL().DropResourceGroup(e.ctx, s)
Expand Down
18 changes: 17 additions & 1 deletion session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,10 @@ const (
key(parent_table_id, create_time),
key(create_time)
);`

// CreateTTLResourceGroup creates the resource group for ttl
// TODO: set a more proper value for the RU_PER_SECOND
CreateTTLResourceGroup = `CREATE RESOURCE GROUP IF NOT EXISTS tidb_ttl RU_PER_SEC=10000000`
)

// bootstrap initiates system DB for a store.
Expand Down Expand Up @@ -804,11 +808,13 @@ const (
// - tidb_enable_foreign_key: off -> on
// - tidb_store_batch_size: 0 -> 4
version134 = 134
// version135 adds the tidb_ttl resource group
version135 = 135
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version134
var currentBootstrapVersion int64 = version135

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -930,6 +936,7 @@ var (
upgradeToVer132,
upgradeToVer133,
upgradeToVer134,
upgradeToVer135,
}
)

Expand Down Expand Up @@ -2316,6 +2323,13 @@ func upgradeToVer134(s Session, ver int64) {
mustExecute(s, "UPDATE HIGH_PRIORITY %n.%n SET VARIABLE_VALUE = %? WHERE VARIABLE_NAME = %? AND VARIABLE_VALUE = %?;", mysql.SystemDB, mysql.GlobalVariablesTable, "4", variable.TiDBStoreBatchSize, "0")
}

func upgradeToVer135(s Session, ver int64) {
if ver >= version135 {
return
}
doReentrantDDL(s, CreateTTLResourceGroup)
}

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down Expand Up @@ -2426,6 +2440,8 @@ func doDDLWorks(s Session) {
mustExecute(s, CreateTTLTask)
// Create tidb_ttl_job_history table
mustExecute(s, CreateTTLJobHistory)
// Create tidb_ttl resource group
mustExecute(s, CreateTTLResourceGroup)
}

// doBootstrapSQLFile executes SQL commands in a file as the last stage of bootstrap.
Expand Down
13 changes: 7 additions & 6 deletions telemetry/data_feature_usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,40 +356,41 @@ func TestPlacementPolicies(t *testing.T) {

func TestResourceGroups(t *testing.T) {
store := testkit.CreateMockStore(t)
systemResourceGroupCount := 1

tk := testkit.NewTestKit(t, store)

usage, err := telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(0), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, uint64(systemResourceGroupCount), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, false, usage.ResourceControlUsage.Enabled)

tk.MustExec("set global tidb_enable_resource_control = 'ON'")
tk.MustExec("create resource group x ru_per_sec=100")
usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, true, usage.ResourceControlUsage.Enabled)
require.Equal(t, uint64(1), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, uint64(systemResourceGroupCount+1), usage.ResourceControlUsage.NumResourceGroups)

tk.MustExec("create resource group y ru_per_sec=100")
usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(2), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, uint64(systemResourceGroupCount+2), usage.ResourceControlUsage.NumResourceGroups)

tk.MustExec("alter resource group y ru_per_sec=200")
usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(2), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, uint64(systemResourceGroupCount+2), usage.ResourceControlUsage.NumResourceGroups)

tk.MustExec("drop resource group y")
usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(1), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, uint64(systemResourceGroupCount+1), usage.ResourceControlUsage.NumResourceGroups)

tk.MustExec("set global tidb_enable_resource_control = 'OFF'")
usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(1), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, uint64(systemResourceGroupCount+1), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, false, usage.ResourceControlUsage.Enabled)
}

Expand Down
8 changes: 7 additions & 1 deletion ttl/ttlworker/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -173,7 +174,12 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
return t.result(sqlErr)
}
retrySQL = sql
retryTimes++

// if the sql returns this error, it means we have reached the limit of resource group (and has waited for 1 second).
// TODO: throttle the traffic rather than a stable interval to make it easier to pass the limiter.
if !strings.Contains(sqlErr.Error(), "[resource group controller] limiter has no enough token or needs wait too long") {
retryTimes++
}

tracer.EnterPhase(metrics.PhaseWaitRetry)
select {
Expand Down
15 changes: 15 additions & 0 deletions ttl/ttlworker/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ type mockScanTask struct {
tbl *cache.PhysicalTable
sessPool *mockSessionPool
sqlRetry []int
sqlErr map[int]error

delCh chan *ttlDeleteTask
prevSQL string
Expand All @@ -237,6 +238,7 @@ func newMockScanTask(t *testing.T, sqlCnt int) *mockScanTask {
delCh: make(chan *ttlDeleteTask, sqlCnt*(scanTaskExecuteSQLMaxRetry+1)),
sessPool: newMockSessionPool(t),
sqlRetry: make([]int, sqlCnt),
sqlErr: make(map[int]error),
schemaChangeIdx: -1,
}
task.sessPool.se.executeSQL = task.execSQL
Expand Down Expand Up @@ -379,6 +381,9 @@ func (t *mockScanTask) execSQL(_ context.Context, sql string, _ ...interface{})
}

if curRetry < t.sqlRetry[i] {
if err := t.sqlErr[i]; err != nil {
return nil, err
}
return nil, errors.New("mockErr")
}

Expand Down Expand Up @@ -407,3 +412,13 @@ func TestScanTaskDoScan(t *testing.T) {
task.schemaChangeInRetry = 2
task.runDoScanForTest(1, "table 'test.t1' meta changed, should abort current job: [schema:1146]Table 'test.t1' doesn't exist")
}

func TestScanTaskRetryWithLimit(t *testing.T) {
task := newMockScanTask(t, 3)
task.sqlRetry[1] = scanTaskExecuteSQLMaxRetry + 1
task.runDoScanForTest(1, "mockErr")

task.sqlRetry[1] = scanTaskExecuteSQLMaxRetry + 1
task.sqlErr[1] = fmt.Errorf("[resource group controller] limiter has no enough token or needs wait too long")
task.runDoScanForTest(3, "")
}
43 changes: 10 additions & 33 deletions ttl/ttlworker/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,17 @@ package ttlworker

import (
"context"
"fmt"
"time"

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/metrics"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)

// The following two functions are using `sqlexec.SQLExecutor` to represent session
Expand Down Expand Up @@ -85,21 +81,12 @@ func getSession(pool sessionPool) (session.Session, error) {
originalRetryLimit := sctx.GetSessionVars().RetryLimit
originalEnable1PC := sctx.GetSessionVars().Enable1PC
originalEnableAsyncCommit := sctx.GetSessionVars().EnableAsyncCommit
originalResourceGroupName := sctx.GetSessionVars().ResourceGroupName
se := session.NewSession(sctx, exec, func(se session.Session) {
_, err = se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit))
if err != nil {
logutil.BgLogger().Error("fail to reset tidb_retry_limit", zap.Int64("originalRetryLimit", originalRetryLimit), zap.Error(err))
}

if !originalEnable1PC {
_, err = se.ExecuteSQL(context.Background(), "set tidb_enable_1pc=OFF")
terror.Log(err)
}

if !originalEnableAsyncCommit {
_, err = se.ExecuteSQL(context.Background(), "set tidb_enable_async_commit=OFF")
terror.Log(err)
}
sctx.GetSessionVars().RetryLimit = originalRetryLimit
sctx.GetSessionVars().Enable1PC = originalEnable1PC
sctx.GetSessionVars().EnableAsyncCommit = originalEnableAsyncCommit
sctx.GetSessionVars().ResourceGroupName = originalResourceGroupName

DetachStatsCollector(exec)

Expand All @@ -109,32 +96,22 @@ func getSession(pool sessionPool) (session.Session, error) {
exec = AttachStatsCollector(exec)

// store and set the retry limit to 0
_, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0")
if err != nil {
se.Close()
return nil, err
}
sctx.GetSessionVars().RetryLimit = 0

// set enable 1pc to ON
_, err = se.ExecuteSQL(context.Background(), "set tidb_enable_1pc=ON")
if err != nil {
se.Close()
return nil, err
}
sctx.GetSessionVars().Enable1PC = true

// set enable async commit to ON
_, err = se.ExecuteSQL(context.Background(), "set tidb_enable_async_commit=ON")
if err != nil {
se.Close()
return nil, err
}
sctx.GetSessionVars().EnableAsyncCommit = true

// Force rollback the session to guarantee the session is not in any explicit transaction
if _, err = se.ExecuteSQL(context.Background(), "ROLLBACK"); err != nil {
se.Close()
return nil, err
}

sctx.GetSessionVars().ResourceGroupName = "tidb_ttl"

return se, nil
}

Expand Down

0 comments on commit 0d16e84

Please sign in to comment.