diff --git a/config/config.toml.example b/config/config.toml.example index befcc7a88ebf7..3e93a13c3f94d 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -122,8 +122,10 @@ metrics-interval = 15 [performance] # Max CPUs to use, 0 use number of CPUs in the machine. max-procs = 0 + # Max memory size to use, 0 use the total usable memory in the machine. max-memory = 0 + # StmtCountLimit limits the max count of statement inside a transaction. stmt-count-limit = 5000 diff --git a/executor/set_test.go b/executor/set_test.go index 35c61825ac494..e5d72c5b8e2ba 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -265,6 +265,15 @@ func (s *testSuite) TestSetVar(c *C) { tk.MustQuery("select @@session.tidb_query_log_max_len;").Check(testkit.Rows("20")) _, err = tk.Exec("set global tidb_query_log_max_len = 20") c.Assert(err, NotNil) + + tk.MustExec("set tidb_batch_commit = 0") + tk.MustQuery("select @@session.tidb_batch_commit;").Check(testkit.Rows("0")) + tk.MustExec("set tidb_batch_commit = 1") + tk.MustQuery("select @@session.tidb_batch_commit;").Check(testkit.Rows("1")) + _, err = tk.Exec("set global tidb_batch_commit = 0") + c.Assert(err, NotNil) + _, err = tk.Exec("set global tidb_batch_commit = 2") + c.Assert(err, NotNil) } func (s *testSuite) TestSetCharset(c *C) { diff --git a/session/session_test.go b/session/session_test.go index 64070baed0a1c..aa1e67a5b603e 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2030,6 +2030,63 @@ func (s *testSessionSuite) TestStatementCountLimit(c *C) { c.Assert(err, NotNil) } +func (s *testSessionSuite) TestBatchCommit(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("set tidb_batch_commit = 1") + tk.MustExec("create table t (id int)") + saved := config.GetGlobalConfig().Performance + config.GetGlobalConfig().Performance.StmtCountLimit = 3 + defer func() { + config.GetGlobalConfig().Performance = saved + }() + tk1 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("SET SESSION autocommit = 1") + tk.MustExec("begin") + tk.MustExec("insert into t values (1)") + tk1.MustQuery("select * from t").Check(testkit.Rows()) + tk.MustExec("insert into t values (2)") + tk1.MustQuery("select * from t").Check(testkit.Rows()) + tk.MustExec("rollback") + tk1.MustQuery("select * from t").Check(testkit.Rows()) + + // The above rollback will not make the session in transaction. + tk.MustExec("insert into t values (1)") + tk1.MustQuery("select * from t").Check(testkit.Rows("1")) + tk.MustExec("delete from t") + + tk.MustExec("begin") + tk.MustExec("insert into t values (5)") + tk1.MustQuery("select * from t").Check(testkit.Rows()) + tk.MustExec("insert into t values (6)") + tk1.MustQuery("select * from t").Check(testkit.Rows()) + tk.MustExec("insert into t values (7)") + tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7")) + + // The session is still in transaction. + tk.MustExec("insert into t values (8)") + tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7")) + tk.MustExec("insert into t values (9)") + tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7")) + tk.MustExec("insert into t values (10)") + tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7")) + tk.MustExec("commit") + tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7", "8", "9", "10")) + + // The above commit will not make the session in transaction. + tk.MustExec("insert into t values (11)") + tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7", "8", "9", "10", "11")) + + tk.MustExec("delete from t") + tk.MustExec("SET SESSION autocommit = 0") + tk.MustExec("insert into t values (1)") + tk.MustExec("insert into t values (2)") + tk.MustExec("insert into t values (3)") + tk.MustExec("rollback") + tk1.MustExec("insert into t values (4)") + tk1.MustExec("insert into t values (5)") + tk.MustQuery("select * from t").Check(testkit.Rows("4", "5")) +} + func (s *testSessionSuite) TestCastTimeToDate(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("set time_zone = '-8:00'") diff --git a/session/tidb.go b/session/tidb.go index e16d3d878b631..de9dacbd3a8dd 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -153,8 +153,9 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) var rs sqlexec.RecordSet se := sctx.(*session) rs, err = s.Exec(ctx) + sessVars := se.GetSessionVars() // All the history should be added here. - se.GetSessionVars().TxnCtx.StatementCount++ + sessVars.TxnCtx.StatementCount++ if !s.IsReadOnly() { if err == nil { GetHistory(sctx).Add(0, s, se.sessionVars.StmtCtx) @@ -167,7 +168,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) } } } - if !se.sessionVars.InTxn() { + if !sessVars.InTxn() { if err != nil { log.Info("RollbackTxn for ddl/autocommit error.") err1 := se.RollbackTxn(ctx) @@ -180,10 +181,18 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) // So we limit the statement count in a transaction here. history := GetHistory(sctx) if history.Count() > int(config.GetGlobalConfig().Performance.StmtCountLimit) { - err1 := se.RollbackTxn(ctx) - terror.Log(errors.Trace(err1)) - return rs, errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t", - history.Count(), sctx.GetSessionVars().IsAutocommit()) + if !sessVars.BatchCommit { + err1 := se.RollbackTxn(ctx) + terror.Log(errors.Trace(err1)) + return rs, errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t", + history.Count(), sctx.GetSessionVars().IsAutocommit()) + } + err = se.NewTxn(ctx) + // The transaction does not committed yet, we need to keep it in transaction. + // The last history could not be "commit"/"rollback" statement. + // It means it is impossible to start a new transaction at the end of the transaction. + // Because after the server executed "commit"/"rollback" statement, the session is out of the transaction. + se.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true) } } if se.txn.pending() { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 7fbb0da3a25f0..86fd36b5837d2 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -292,6 +292,9 @@ type SessionVars struct { // BatchDelete indicates if we should split delete data into multiple batches. BatchDelete bool + // BatchCommit indicates if we should split the transaction into multiple batches. + BatchCommit bool + // IDAllocator is provided by kvEncoder, if it is provided, we will use it to alloc auto id instead of using // Table.alloc. IDAllocator autoid.Allocator @@ -633,6 +636,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.BatchInsert = TiDBOptOn(val) case TiDBBatchDelete: s.BatchDelete = TiDBOptOn(val) + case TiDBBatchCommit: + s.BatchCommit = TiDBOptOn(val) case TiDBDMLBatchSize: s.DMLBatchSize = tidbOptPositiveInt32(val, DefDMLBatchSize) case TiDBCurrentTS, TiDBConfig: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 26a03c098a550..eae4fe4b8f284 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -640,6 +640,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, TiDBSkipUTF8Check, boolToIntStr(DefSkipUTF8Check)}, {ScopeSession, TiDBBatchInsert, boolToIntStr(DefBatchInsert)}, {ScopeSession, TiDBBatchDelete, boolToIntStr(DefBatchDelete)}, + {ScopeSession, TiDBBatchCommit, boolToIntStr(DefBatchCommit)}, {ScopeSession, TiDBDMLBatchSize, strconv.Itoa(DefDMLBatchSize)}, {ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)}, {ScopeGlobal | ScopeSession, TiDBMaxChunkSize, strconv.Itoa(DefMaxChunkSize)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index cbc69ee8e2142..4b04b48e1707d 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -71,6 +71,10 @@ const ( // split data into multiple batches and use a single txn for each batch. This will be helpful when deleting large data. TiDBBatchDelete = "tidb_batch_delete" + // tidb_batch_commit is used to enable/disable auto-split the transaction. + // If set this option on, the transaction will be committed when it reaches stmt-count-limit and starts a new transaction. + TiDBBatchCommit = "tidb_batch_commit" + // tidb_dml_batch_size is used to split the insert/delete data into small batches. // It only takes effort when tidb_batch_insert/tidb_batch_delete is on. // Its default value is 20000. When the row size is large, 20k rows could be larger than 100MB. @@ -243,6 +247,7 @@ const ( DefOptInSubqToJoinAndAgg = true DefBatchInsert = false DefBatchDelete = false + DefBatchCommit = false DefCurretTS = 0 DefMaxChunkSize = 32 DefDMLBatchSize = 20000 diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index c5f3aa3c1e1d9..f25df3a3938d5 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -331,7 +331,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, case AutocommitVar, TiDBSkipUTF8Check, TiDBOptAggPushDown, TiDBOptInSubqToJoinAndAgg, TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming, - TiDBBatchDelete, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction: + TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction: if strings.EqualFold(value, "ON") || value == "1" || strings.EqualFold(value, "OFF") || value == "0" { return value, nil }