Skip to content

Commit

Permalink
*: Txn() function signature refactor and remove ActivePendingTxn() (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Dec 4, 2018
1 parent 51c2e66 commit 1648ea2
Show file tree
Hide file tree
Showing 35 changed files with 155 additions and 182 deletions.
6 changes: 3 additions & 3 deletions ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *testColumnChangeSuite) TestColumnChange(c *C) {
row := types.MakeDatums(1, 2)
h, err := originTable.AddRecord(ctx, row, false)
c.Assert(err, IsNil)
err = ctx.Txn().Commit(context.Background())
err = ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)

var mu sync.Mutex
Expand Down Expand Up @@ -125,7 +125,7 @@ func (s *testColumnChangeSuite) TestColumnChange(c *C) {
}
mu.Unlock()
}
err = hookCtx.Txn().Commit(context.Background())
err = hookCtx.Txn(true).Commit(context.Background())
if err != nil {
checkErr = errors.Trace(err)
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func (s *testColumnChangeSuite) testAddColumnNoDefault(c *C, ctx sessionctx.Cont
checkErr = errors.Trace(err)
}
}
err = hookCtx.Txn().Commit(context.TODO())
err = hookCtx.Txn(true).Commit(context.TODO())
if err != nil {
checkErr = errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ func (s *testColumnSuite) checkColumnKVExist(ctx sessionctx.Context, t table.Tab
if err != nil {
return errors.Trace(err)
}
defer ctx.Txn().Commit(context.Background())
defer ctx.Txn(true).Commit(context.Background())
key := t.RecordKey(handle)
data, err := ctx.Txn().Get(key)
data, err := ctx.Txn(true).Get(key)
if !isExist {
if terror.ErrorEqual(err, kv.ErrNotExist) {
return nil
Expand Down Expand Up @@ -751,7 +751,7 @@ func (s *testColumnSuite) TestAddColumn(c *C) {
handle, err := t.AddRecord(ctx, oldRow, false)
c.Assert(err, IsNil)

err = ctx.Txn().Commit(context.Background())
err = ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)

newColName := "c4"
Expand Down Expand Up @@ -814,7 +814,7 @@ func (s *testColumnSuite) TestAddColumn(c *C) {
job = testDropTable(c, ctx, d, s.dbInfo, tblInfo)
testCheckJobDone(c, d, job, false)

err = ctx.Txn().Commit(context.Background())
err = ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)

d.Stop()
Expand All @@ -838,7 +838,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) {
_, err = t.AddRecord(ctx, append(row, types.NewDatum(defaultColValue)), false)
c.Assert(err, IsNil)

err = ctx.Txn().Commit(context.Background())
err = ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)

checkOK := false
Expand Down Expand Up @@ -887,7 +887,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) {
job = testDropTable(c, ctx, d, s.dbInfo, tblInfo)
testCheckJobDone(c, d, job, false)

err = ctx.Txn().Commit(context.Background())
err = ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)

d.Stop()
Expand Down
16 changes: 8 additions & 8 deletions ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) {
return
}
jobIDs := []int64{job.ID}
errs, err := admin.CancelJobs(hookCtx.Txn(), jobIDs)
errs, err := admin.CancelJobs(hookCtx.Txn(true), jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
Expand All @@ -391,7 +391,7 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) {
checkErr = errors.Trace(errs[0])
return
}
err = hookCtx.Txn().Commit(context.Background())
err = hookCtx.Txn(true).Commit(context.Background())
if err != nil {
checkErr = errors.Trace(err)
}
Expand Down Expand Up @@ -653,12 +653,12 @@ LOOP:
// Make sure there is index with name c3_index.
c.Assert(nidx, NotNil)
c.Assert(nidx.Meta().ID, Greater, int64(0))
ctx.Txn().Rollback()
ctx.Txn(true).Rollback()

c.Assert(ctx.NewTxn(), IsNil)
defer ctx.Txn().Rollback()
defer ctx.Txn(true).Rollback()

it, err := nidx.SeekFirst(ctx.Txn())
it, err := nidx.SeekFirst(ctx.Txn(true))
c.Assert(err, IsNil)
defer it.Close()

Expand Down Expand Up @@ -754,9 +754,9 @@ func checkDelRangeDone(c *C, ctx sessionctx.Context, idx table.Index) {
handles := make(map[int64]struct{})

c.Assert(ctx.NewTxn(), IsNil)
defer ctx.Txn().Rollback()
defer ctx.Txn(true).Rollback()

it, err := idx.SeekFirst(ctx.Txn())
it, err := idx.SeekFirst(ctx.Txn(true))
c.Assert(err, IsNil)
defer it.Close()

Expand Down Expand Up @@ -982,7 +982,7 @@ LOOP:
i := 0
j := 0
ctx.NewTxn()
defer ctx.Txn().Rollback()
defer ctx.Txn(true).Rollback()
err = t.IterRecords(ctx, t.FirstKey(), t.Cols(),
func(h int64, data []types.Datum, cols []*table.Column) (bool, error) {
i++
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func testNewDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
func getSchemaVer(c *C, ctx sessionctx.Context) int64 {
err := ctx.NewTxn()
c.Assert(err, IsNil)
m := meta.NewMeta(ctx.Txn())
m := meta.NewMeta(ctx.Txn(true))
ver, err := m.GetSchemaVersion()
c.Assert(err, IsNil)
return ver
Expand Down Expand Up @@ -91,7 +91,7 @@ func checkHistoryJob(c *C, job *model.Job) {
}

func checkHistoryJobArgs(c *C, ctx sessionctx.Context, id int64, args *historyJobArgs) {
t := meta.NewMeta(ctx.Txn())
t := meta.NewMeta(ctx.Txn(true))
historyJob, err := t.GetHistoryDDLJob(id)
c.Assert(err, IsNil)
c.Assert(historyJob.BinlogInfo.FinishedTS, Greater, uint64(0))
Expand Down
8 changes: 4 additions & 4 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
row := types.MakeDatums(1, 2)
_, err = originTable.AddRecord(ctx, row, false)
c.Assert(err, IsNil)
err = ctx.Txn().Commit(context.Background())
err = ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)

tc := &TestDDLCallback{}
Expand All @@ -530,8 +530,8 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
checkErr = errors.Trace(err)
return
}
checkCancelState(hookCtx.Txn(), job, test)
err = hookCtx.Txn().Commit(context.Background())
checkCancelState(hookCtx.Txn(true), job, test)
err = hookCtx.Txn(true).Commit(context.Background())
if err != nil {
checkErr = errors.Trace(err)
return
Expand Down Expand Up @@ -559,7 +559,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
test = &tests[3]
testCreateIndex(c, ctx, d, dbInfo, tblInfo, false, "idx", "c2")
c.Check(errors.ErrorStack(checkErr), Equals, "")
c.Assert(ctx.Txn().Commit(context.Background()), IsNil)
c.Assert(ctx.Txn(true).Commit(context.Background()), IsNil)

// for dropping index
idxName := []interface{}{model.NewCIStr("idx")}
Expand Down
2 changes: 1 addition & 1 deletion ddl/fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (s *testColumnChangeSuite) TestFailBeforeDecodeArgs(c *C) {
row := types.MakeDatums(1, 2)
_, err = originTable.AddRecord(ctx, row, false)
c.Assert(err, IsNil)
err = ctx.Txn().Commit(context.Background())
err = ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)

tc := &TestDDLCallback{}
Expand Down
6 changes: 3 additions & 3 deletions ddl/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) {

testCreateTable(c, ctx, d, s.dbInfo, tblInfo)

err = ctx.Txn().Commit(context.Background())
err = ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)

// fix data race
Expand Down Expand Up @@ -160,7 +160,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) {

job := s.testCreateForeignKey(c, tblInfo, "c1_fk", []string{"c1"}, "t2", []string{"c1"}, ast.ReferOptionCascade, ast.ReferOptionSetNull)
testCheckJobDone(c, d, job, true)
err = ctx.Txn().Commit(context.Background())
err = ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)
mu.Lock()
hErr := hookErr
Expand Down Expand Up @@ -218,6 +218,6 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) {
job = testDropTable(c, ctx, d, s.dbInfo, tblInfo)
testCheckJobDone(c, d, job, false)

err = ctx.Txn().Commit(context.Background())
err = ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)
}
12 changes: 6 additions & 6 deletions ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s *testIndexChangeSuite) TestIndexChange(c *C) {
_, err = originTable.AddRecord(ctx, types.MakeDatums(3, 3), false)
c.Assert(err, IsNil)

err = ctx.Txn().Commit(context.Background())
err = ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)

tc := &TestDDLCallback{}
Expand Down Expand Up @@ -125,7 +125,7 @@ func (s *testIndexChangeSuite) TestIndexChange(c *C) {
d.SetHook(tc)
testCreateIndex(c, ctx, d, s.dbInfo, originTable.Meta(), false, "c2", "c2")
c.Check(errors.ErrorStack(checkErr), Equals, "")
c.Assert(ctx.Txn().Commit(context.Background()), IsNil)
c.Assert(ctx.Txn(true).Commit(context.Background()), IsNil)
d.Stop()
prevState = model.StateNone
var noneTable table.Table
Expand Down Expand Up @@ -172,7 +172,7 @@ func (s *testIndexChangeSuite) TestIndexChange(c *C) {

func checkIndexExists(ctx sessionctx.Context, tbl table.Table, indexValue interface{}, handle int64, exists bool) error {
idx := tbl.Indices()[0]
doesExist, _, err := idx.Exist(ctx.GetSessionVars().StmtCtx, ctx.Txn(), types.MakeDatums(indexValue), handle)
doesExist, _, err := idx.Exist(ctx.GetSessionVars().StmtCtx, ctx.Txn(true), types.MakeDatums(indexValue), handle)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func (s *testIndexChangeSuite) checkAddPublic(d *ddl, ctx sessionctx.Context, wr
return errors.Trace(err)
}
}
return ctx.Txn().Commit(context.Background())
return ctx.Txn(true).Commit(context.Background())
}

func (s *testIndexChangeSuite) checkDropWriteOnly(d *ddl, ctx sessionctx.Context, publicTbl, writeTbl table.Table) error {
Expand Down Expand Up @@ -362,7 +362,7 @@ func (s *testIndexChangeSuite) checkDropWriteOnly(d *ddl, ctx sessionctx.Context
if err != nil {
return errors.Trace(err)
}
return ctx.Txn().Commit(context.Background())
return ctx.Txn(true).Commit(context.Background())
}

func (s *testIndexChangeSuite) checkDropDeleteOnly(d *ddl, ctx sessionctx.Context, writeTbl, delTbl table.Table) error {
Expand Down Expand Up @@ -407,5 +407,5 @@ func (s *testIndexChangeSuite) checkDropDeleteOnly(d *ddl, ctx sessionctx.Contex
if err != nil {
return errors.Trace(err)
}
return ctx.Txn().Commit(context.Background())
return ctx.Txn(true).Commit(context.Background())
}
18 changes: 9 additions & 9 deletions ddl/reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func (s *testDDLSuite) TestReorg(c *C) {

err := ctx.NewTxn()
c.Assert(err, IsNil)
ctx.Txn().Set([]byte("a"), []byte("b"))
err = ctx.Txn().Rollback()
ctx.Txn(true).Set([]byte("a"), []byte("b"))
err = ctx.Txn(true).Rollback()
c.Assert(err, IsNil)

err = ctx.NewTxn()
c.Assert(err, IsNil)
ctx.Txn().Set([]byte("a"), []byte("b"))
err = ctx.Txn().Commit(context.Background())
ctx.Txn(true).Set([]byte("a"), []byte("b"))
err = ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)

rowCount := int64(10)
Expand All @@ -73,7 +73,7 @@ func (s *testDDLSuite) TestReorg(c *C) {
}
err = ctx.NewTxn()
c.Assert(err, IsNil)
m := meta.NewMeta(ctx.Txn())
m := meta.NewMeta(ctx.Txn(true))
rInfo := &reorgInfo{
Job: job,
}
Expand All @@ -89,12 +89,12 @@ func (s *testDDLSuite) TestReorg(c *C) {
c.Assert(d.reorgCtx.rowCount, Equals, int64(0))

// Test whether reorgInfo's Handle is update.
err = ctx.Txn().Commit(context.Background())
err = ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)
err = ctx.NewTxn()
c.Assert(err, IsNil)

m = meta.NewMeta(ctx.Txn())
m = meta.NewMeta(ctx.Txn(true))
info, err1 := d.getReorgInfo(m, job, nil)
c.Assert(err1, IsNil)
c.Assert(info.Handle, Equals, handle)
Expand All @@ -110,7 +110,7 @@ func (s *testDDLSuite) TestReorg(c *C) {
return nil
})
c.Assert(err, NotNil)
err = ctx.Txn().Commit(context.Background())
err = ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)

d.start(context.Background())
Expand Down Expand Up @@ -172,7 +172,7 @@ func (s *testDDLSuite) TestReorgOwner(c *C) {
c.Assert(err, IsNil)
}

err := ctx.Txn().Commit(context.Background())
err := ctx.Txn(true).Commit(context.Background())
c.Assert(err, IsNil)

tc := &TestDDLCallback{}
Expand Down
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (ast.RecordSet, error) {

var txnStartTS uint64
if sctx.Txn(false).Valid() {
txnStartTS = sctx.Txn().StartTS()
txnStartTS = sctx.Txn(true).StartTS()
}
return &recordSet{
executor: e,
Expand Down Expand Up @@ -265,7 +265,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
txnTS := uint64(0)
// Don't active pending txn here.
if sctx.Txn(false).Valid() {
txnTS = sctx.Txn().StartTS()
txnTS = sctx.Txn(true).StartTS()
}
a.LogSlowQuery(txnTS, err == nil)
}()
Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error {

func (e *CheckIndexRangeExec) buildDAGPB() (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = e.ctx.Txn().StartTS()
dagReq.StartTs = e.ctx.Txn(true).StartTS()
dagReq.TimeZoneOffset = timeZoneOffset(e.ctx)
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = statementContextToFlags(sc)
Expand Down
Loading

0 comments on commit 1648ea2

Please sign in to comment.