Skip to content

Commit

Permalink
session: add a batch commit session variable for the large transaction (
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored Dec 10, 2018
1 parent 97b569c commit e3f3ac2
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 7 deletions.
2 changes: 2 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ metrics-interval = 15
[performance]
# Max CPUs to use, 0 use number of CPUs in the machine.
max-procs = 0

# Max memory size to use, 0 use the total usable memory in the machine.
max-memory = 0

# StmtCountLimit limits the max count of statement inside a transaction.
stmt-count-limit = 5000

Expand Down
9 changes: 9 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,15 @@ func (s *testSuite) TestSetVar(c *C) {
tk.MustQuery("select @@session.tidb_query_log_max_len;").Check(testkit.Rows("20"))
_, err = tk.Exec("set global tidb_query_log_max_len = 20")
c.Assert(err, NotNil)

tk.MustExec("set tidb_batch_commit = 0")
tk.MustQuery("select @@session.tidb_batch_commit;").Check(testkit.Rows("0"))
tk.MustExec("set tidb_batch_commit = 1")
tk.MustQuery("select @@session.tidb_batch_commit;").Check(testkit.Rows("1"))
_, err = tk.Exec("set global tidb_batch_commit = 0")
c.Assert(err, NotNil)
_, err = tk.Exec("set global tidb_batch_commit = 2")
c.Assert(err, NotNil)
}

func (s *testSuite) TestSetCharset(c *C) {
Expand Down
57 changes: 57 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2030,6 +2030,63 @@ func (s *testSessionSuite) TestStatementCountLimit(c *C) {
c.Assert(err, NotNil)
}

func (s *testSessionSuite) TestBatchCommit(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("set tidb_batch_commit = 1")
tk.MustExec("create table t (id int)")
saved := config.GetGlobalConfig().Performance
config.GetGlobalConfig().Performance.StmtCountLimit = 3
defer func() {
config.GetGlobalConfig().Performance = saved
}()
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("SET SESSION autocommit = 1")
tk.MustExec("begin")
tk.MustExec("insert into t values (1)")
tk1.MustQuery("select * from t").Check(testkit.Rows())
tk.MustExec("insert into t values (2)")
tk1.MustQuery("select * from t").Check(testkit.Rows())
tk.MustExec("rollback")
tk1.MustQuery("select * from t").Check(testkit.Rows())

// The above rollback will not make the session in transaction.
tk.MustExec("insert into t values (1)")
tk1.MustQuery("select * from t").Check(testkit.Rows("1"))
tk.MustExec("delete from t")

tk.MustExec("begin")
tk.MustExec("insert into t values (5)")
tk1.MustQuery("select * from t").Check(testkit.Rows())
tk.MustExec("insert into t values (6)")
tk1.MustQuery("select * from t").Check(testkit.Rows())
tk.MustExec("insert into t values (7)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7"))

// The session is still in transaction.
tk.MustExec("insert into t values (8)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7"))
tk.MustExec("insert into t values (9)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7"))
tk.MustExec("insert into t values (10)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7"))
tk.MustExec("commit")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7", "8", "9", "10"))

// The above commit will not make the session in transaction.
tk.MustExec("insert into t values (11)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7", "8", "9", "10", "11"))

tk.MustExec("delete from t")
tk.MustExec("SET SESSION autocommit = 0")
tk.MustExec("insert into t values (1)")
tk.MustExec("insert into t values (2)")
tk.MustExec("insert into t values (3)")
tk.MustExec("rollback")
tk1.MustExec("insert into t values (4)")
tk1.MustExec("insert into t values (5)")
tk.MustQuery("select * from t").Check(testkit.Rows("4", "5"))
}

func (s *testSessionSuite) TestCastTimeToDate(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("set time_zone = '-8:00'")
Expand Down
21 changes: 15 additions & 6 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
var rs sqlexec.RecordSet
se := sctx.(*session)
rs, err = s.Exec(ctx)
sessVars := se.GetSessionVars()
// All the history should be added here.
se.GetSessionVars().TxnCtx.StatementCount++
sessVars.TxnCtx.StatementCount++
if !s.IsReadOnly() {
if err == nil {
GetHistory(sctx).Add(0, s, se.sessionVars.StmtCtx)
Expand All @@ -167,7 +168,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
}
}
}
if !se.sessionVars.InTxn() {
if !sessVars.InTxn() {
if err != nil {
log.Info("RollbackTxn for ddl/autocommit error.")
err1 := se.RollbackTxn(ctx)
Expand All @@ -180,10 +181,18 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
// So we limit the statement count in a transaction here.
history := GetHistory(sctx)
if history.Count() > int(config.GetGlobalConfig().Performance.StmtCountLimit) {
err1 := se.RollbackTxn(ctx)
terror.Log(errors.Trace(err1))
return rs, errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t",
history.Count(), sctx.GetSessionVars().IsAutocommit())
if !sessVars.BatchCommit {
err1 := se.RollbackTxn(ctx)
terror.Log(errors.Trace(err1))
return rs, errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t",
history.Count(), sctx.GetSessionVars().IsAutocommit())
}
err = se.NewTxn(ctx)
// The transaction does not committed yet, we need to keep it in transaction.
// The last history could not be "commit"/"rollback" statement.
// It means it is impossible to start a new transaction at the end of the transaction.
// Because after the server executed "commit"/"rollback" statement, the session is out of the transaction.
se.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
}
if se.txn.pending() {
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ type SessionVars struct {
// BatchDelete indicates if we should split delete data into multiple batches.
BatchDelete bool

// BatchCommit indicates if we should split the transaction into multiple batches.
BatchCommit bool

// IDAllocator is provided by kvEncoder, if it is provided, we will use it to alloc auto id instead of using
// Table.alloc.
IDAllocator autoid.Allocator
Expand Down Expand Up @@ -633,6 +636,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.BatchInsert = TiDBOptOn(val)
case TiDBBatchDelete:
s.BatchDelete = TiDBOptOn(val)
case TiDBBatchCommit:
s.BatchCommit = TiDBOptOn(val)
case TiDBDMLBatchSize:
s.DMLBatchSize = tidbOptPositiveInt32(val, DefDMLBatchSize)
case TiDBCurrentTS, TiDBConfig:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBSkipUTF8Check, boolToIntStr(DefSkipUTF8Check)},
{ScopeSession, TiDBBatchInsert, boolToIntStr(DefBatchInsert)},
{ScopeSession, TiDBBatchDelete, boolToIntStr(DefBatchDelete)},
{ScopeSession, TiDBBatchCommit, boolToIntStr(DefBatchCommit)},
{ScopeSession, TiDBDMLBatchSize, strconv.Itoa(DefDMLBatchSize)},
{ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)},
{ScopeGlobal | ScopeSession, TiDBMaxChunkSize, strconv.Itoa(DefMaxChunkSize)},
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ const (
// split data into multiple batches and use a single txn for each batch. This will be helpful when deleting large data.
TiDBBatchDelete = "tidb_batch_delete"

// tidb_batch_commit is used to enable/disable auto-split the transaction.
// If set this option on, the transaction will be committed when it reaches stmt-count-limit and starts a new transaction.
TiDBBatchCommit = "tidb_batch_commit"

// tidb_dml_batch_size is used to split the insert/delete data into small batches.
// It only takes effort when tidb_batch_insert/tidb_batch_delete is on.
// Its default value is 20000. When the row size is large, 20k rows could be larger than 100MB.
Expand Down Expand Up @@ -243,6 +247,7 @@ const (
DefOptInSubqToJoinAndAgg = true
DefBatchInsert = false
DefBatchDelete = false
DefBatchCommit = false
DefCurretTS = 0
DefMaxChunkSize = 32
DefDMLBatchSize = 20000
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
case AutocommitVar, TiDBSkipUTF8Check, TiDBOptAggPushDown,
TiDBOptInSubqToJoinAndAgg,
TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming,
TiDBBatchDelete, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction:
TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction:
if strings.EqualFold(value, "ON") || value == "1" || strings.EqualFold(value, "OFF") || value == "0" {
return value, nil
}
Expand Down

0 comments on commit e3f3ac2

Please sign in to comment.