From 2df07d2eb0eb6e42102de6bbd2fab684c1725fd2 Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Fri, 1 Jul 2022 18:14:07 +0800 Subject: [PATCH 01/16] remove newtxn Signed-off-by: SpadeA-Tang --- sessionctx/context.go | 4 ---- sessiontxn/isolation/base.go | 4 ++-- sessiontxn/txn.go | 24 ++++++++++++++++++++++++ 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/sessionctx/context.go b/sessionctx/context.go index 3a320fe9078ef..3c338a2a9411f 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -53,10 +53,6 @@ type SessionStatesHandler interface { // Context is an interface for transaction and executive args environment. type Context interface { SessionStatesHandler - // NewTxn creates a new transaction for further execution. - // If old transaction is valid, it is committed first. - // It's used in BEGIN statement and DDL statements to commit old transaction. - NewTxn(context.Context) error // NewStaleTxnWithStartTS initializes a staleness transaction with the given StartTS. NewStaleTxnWithStartTS(ctx context.Context, startTS uint64) error // SetDiskFullOpt set the disk full opt when tikv disk full happened. diff --git a/sessiontxn/isolation/base.go b/sessiontxn/isolation/base.go index 11c543a17bcf4..07cc6857a537d 100644 --- a/sessiontxn/isolation/base.go +++ b/sessiontxn/isolation/base.go @@ -67,12 +67,12 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn activeNow := true switch tp { case sessiontxn.EnterNewTxnDefault: - if err = p.sctx.NewTxn(ctx); err != nil { + if err := sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil { return err } case sessiontxn.EnterNewTxnWithBeginStmt: if !sessiontxn.CanReuseTxnWhenExplicitBegin(p.sctx) { - if err = p.sctx.NewTxn(ctx); err != nil { + if err := sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil { return err } } diff --git a/sessiontxn/txn.go b/sessiontxn/txn.go index 890c17590939c..42e00def782ba 100644 --- a/sessiontxn/txn.go +++ b/sessiontxn/txn.go @@ -16,6 +16,8 @@ package sessiontxn import ( "context" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" "github.com/opentracing/opentracing-go" "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -71,6 +73,28 @@ func CanReuseTxnWhenExplicitBegin(sctx sessionctx.Context) bool { return txnCtx.History == nil && !txnCtx.IsStaleness && sessVars.SnapshotTS == 0 } +// CheckBeforeNewTxn is called before entering a new transaction. It checks whether the old +// txn is valid in which case we should commit it first. +func CheckBeforeNewTxn(ctx context.Context, sctx sessionctx.Context) error { + txn, err := sctx.Txn(false) + if err != nil { + return err + } + if txn.Valid() { + txnStartTS := txn.StartTS() + txnScope := sctx.GetSessionVars().TxnCtx.TxnScope + err = sctx.CommitTxn(ctx) + if err != nil { + return err + } + logutil.Logger(ctx).Info("Try to create a new txn inside a transaction auto commit", + zap.Int64("schemaVersion", sctx.GetInfoSchema().SchemaMetaVersion()), + zap.Uint64("txnStartTS", txnStartTS), + zap.String("txnScope", txnScope)) + } + return nil +} + // SetTxnAssertionLevel sets assertion level of a transactin. Note that assertion level should be set only once just // after creating a new transaction. func SetTxnAssertionLevel(txn kv.Transaction, assertionLevel variable.AssertionLevel) { From 71775f05318dbed3e58d3c500d08103e140f1e65 Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Fri, 1 Jul 2022 18:46:41 +0800 Subject: [PATCH 02/16] remove newtxnwithstalets Signed-off-by: SpadeA-Tang --- session/session.go | 33 ---------------------- sessionctx/context.go | 2 -- sessiontxn/staleread/provider.go | 48 ++++++++++++++++++++++++++++---- 3 files changed, 43 insertions(+), 40 deletions(-) diff --git a/session/session.go b/session/session.go index ec72be7ec4001..8f4c4f5e0ca36 100644 --- a/session/session.go +++ b/session/session.go @@ -2517,39 +2517,6 @@ func (s *session) checkBeforeNewTxn(ctx context.Context) error { return nil } -// NewStaleTxnWithStartTS create a transaction with the given StartTS. -func (s *session) NewStaleTxnWithStartTS(ctx context.Context, startTS uint64) error { - if err := s.checkBeforeNewTxn(ctx); err != nil { - return err - } - txnScope := config.GetTxnScopeFromConfig() - txn, err := s.store.Begin(tikv.WithTxnScope(txnScope), tikv.WithStartTS(startTS)) - if err != nil { - return err - } - txn.SetVars(s.sessionVars.KVVars) - txn.SetOption(kv.IsStalenessReadOnly, true) - txn.SetOption(kv.TxnScope, txnScope) - setTxnAssertionLevel(txn, s.sessionVars.AssertionLevel) - s.txn.changeInvalidToValid(txn) - is, err := getSnapshotInfoSchema(s, txn.StartTS()) - if err != nil { - return errors.Trace(err) - } - s.sessionVars.TxnCtx = &variable.TransactionContext{ - TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{ - InfoSchema: is, - CreateTime: time.Now(), - StartTS: txn.StartTS(), - ShardStep: int(s.sessionVars.ShardAllocateStep), - IsStaleness: true, - TxnScope: txnScope, - }, - } - s.txn.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor()) - return nil -} - func (s *session) SetValue(key fmt.Stringer, value interface{}) { s.mu.Lock() s.mu.values[key] = value diff --git a/sessionctx/context.go b/sessionctx/context.go index 3c338a2a9411f..6f734f2313c36 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -53,8 +53,6 @@ type SessionStatesHandler interface { // Context is an interface for transaction and executive args environment. type Context interface { SessionStatesHandler - // NewStaleTxnWithStartTS initializes a staleness transaction with the given StartTS. - NewStaleTxnWithStartTS(ctx context.Context, startTS uint64) error // SetDiskFullOpt set the disk full opt when tikv disk full happened. SetDiskFullOpt(level kvrpcpb.DiskFullOpt) // RollbackTxn rolls back the current transaction. diff --git a/sessiontxn/staleread/provider.go b/sessiontxn/staleread/provider.go index 34c8ec985371d..a30542ed2eb94 100644 --- a/sessiontxn/staleread/provider.go +++ b/sessiontxn/staleread/provider.go @@ -16,14 +16,17 @@ package staleread import ( "context" + "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/table/temptable" ) // StalenessTxnContextProvider implements sessiontxn.TxnContextProvider @@ -73,17 +76,52 @@ func (p *StalenessTxnContextProvider) OnInitialize(ctx context.Context, tp sessi } func (p *StalenessTxnContextProvider) activateStaleTxn() error { - if err := p.sctx.NewStaleTxnWithStartTS(p.ctx, p.ts); err != nil { + var err error + if err = sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil { return err } - p.is = p.sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema) + + txnScope := config.GetTxnScopeFromConfig() + if err = p.sctx.PrepareTSFuture(p.ctx, sessiontxn.ConstantFuture(p.ts), txnScope); err != nil { + return err + } + + txnFuture := p.sctx.GetPreparedTxnFuture() + if txnFuture == nil { + return errors.AddStack(kv.ErrInvalidTxn) + } + + txn, err := txnFuture.Wait(p.ctx, p.sctx) + if err != nil { + return err + } + + sessVars := p.sctx.GetSessionVars() + txn.SetVars(sessVars.KVVars) + txn.SetOption(kv.IsStalenessReadOnly, true) + txn.SetOption(kv.TxnScope, txnScope) + sessiontxn.SetTxnAssertionLevel(txn, sessVars.AssertionLevel) + is, err := GetSessionSnapshotInfoSchema(p.sctx, p.ts) + if err != nil { + return errors.Trace(err) + } + sessVars.TxnCtx = &variable.TransactionContext{ + TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{ + InfoSchema: is, + CreateTime: time.Now(), + StartTS: txn.StartTS(), + ShardStep: int(sessVars.ShardAllocateStep), + IsStaleness: true, + TxnScope: txnScope, + }, + } + txn.SetOption(kv.SnapInterceptor, temptable.SessionSnapshotInterceptor(p.sctx)) + + p.is = is if err := p.sctx.GetSessionVars().SetSystemVar(variable.TiDBSnapshot, ""); err != nil { return err } - txnCtx := p.sctx.GetSessionVars().TxnCtx - txnCtx.IsStaleness = true - txnCtx.InfoSchema = p.is return nil } From f128c8939bbb1baa171a98aad7321ce9c1ee87bf Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Fri, 1 Jul 2022 18:46:48 +0800 Subject: [PATCH 03/16] update Signed-off-by: SpadeA-Tang --- session/session.go | 31 ------------------------------- session/txn.go | 16 ---------------- 2 files changed, 47 deletions(-) diff --git a/session/session.go b/session/session.go index 8f4c4f5e0ca36..ab467098403ce 100644 --- a/session/session.go +++ b/session/session.go @@ -103,7 +103,6 @@ import ( "github.com/pingcap/tidb/util/timeutil" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" - "github.com/tikv/client-go/v2/tikv" tikvutil "github.com/tikv/client-go/v2/util" ) @@ -2471,36 +2470,6 @@ func (s *session) Txn(active bool) (kv.Transaction, error) { return &s.txn, err } -func (s *session) NewTxn(ctx context.Context) error { - if err := s.checkBeforeNewTxn(ctx); err != nil { - return err - } - txn, err := s.store.Begin(tikv.WithTxnScope(s.sessionVars.CheckAndGetTxnScope())) - if err != nil { - return err - } - txn.SetVars(s.sessionVars.KVVars) - replicaReadType := s.GetSessionVars().GetReplicaRead() - if replicaReadType.IsFollowerRead() { - txn.SetOption(kv.ReplicaRead, replicaReadType) - } - setTxnAssertionLevel(txn, s.sessionVars.AssertionLevel) - s.txn.changeInvalidToValid(txn) - is := s.GetDomainInfoSchema() - s.sessionVars.TxnCtx = &variable.TransactionContext{ - TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{ - InfoSchema: is, - CreateTime: time.Now(), - StartTS: txn.StartTS(), - ShardStep: int(s.sessionVars.ShardAllocateStep), - IsStaleness: false, - TxnScope: s.sessionVars.CheckAndGetTxnScope(), - }, - } - s.txn.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor()) - return nil -} - func (s *session) checkBeforeNewTxn(ctx context.Context) error { if s.txn.Valid() { txnStartTS := s.txn.StartTS() diff --git a/session/txn.go b/session/txn.go index 1b0b3a07620d7..48e7e582696ef 100644 --- a/session/txn.go +++ b/session/txn.go @@ -211,22 +211,6 @@ func (txn *LazyTxn) GetOption(opt int) interface{} { return txn.Transaction.GetOption(opt) } -func (txn *LazyTxn) changeInvalidToValid(kvTxn kv.Transaction) { - txn.Transaction = kvTxn - txn.initStmtBuf() - txn.txnFuture = nil - - txn.mu.Lock() - defer txn.mu.Unlock() - txn.resetTxnInfo( - kvTxn.StartTS(), - txninfo.TxnIdle, - uint64(txn.Transaction.Len()), - uint64(txn.Transaction.Size()), - "", - nil) -} - func (txn *LazyTxn) changeToPending(future *txnFuture) { txn.Transaction = nil txn.txnFuture = future From c7ab65c8de3587c22490ae9f03a56ebb74370a25 Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Mon, 4 Jul 2022 10:45:55 +0800 Subject: [PATCH 04/16] fmt Signed-off-by: SpadeA-Tang --- session/session.go | 33 -------------------------------- sessiontxn/staleread/provider.go | 6 ++---- sessiontxn/txn.go | 5 +++-- 3 files changed, 5 insertions(+), 39 deletions(-) diff --git a/session/session.go b/session/session.go index 2008ed05ae9d2..3470ec94272d4 100644 --- a/session/session.go +++ b/session/session.go @@ -2449,19 +2449,6 @@ func (s *session) DropPreparedStmt(stmtID uint32) error { return nil } -// setTxnAssertionLevel sets assertion level of a transactin. Note that assertion level should be set only once just -// after creating a new transaction. -func setTxnAssertionLevel(txn kv.Transaction, assertionLevel variable.AssertionLevel) { - switch assertionLevel { - case variable.AssertionLevelOff: - txn.SetOption(kv.AssertionLevel, kvrpcpb.AssertionLevel_Off) - case variable.AssertionLevelFast: - txn.SetOption(kv.AssertionLevel, kvrpcpb.AssertionLevel_Fast) - case variable.AssertionLevelStrict: - txn.SetOption(kv.AssertionLevel, kvrpcpb.AssertionLevel_Strict) - } -} - func (s *session) Txn(active bool) (kv.Transaction, error) { if !active { return &s.txn, nil @@ -2470,22 +2457,6 @@ func (s *session) Txn(active bool) (kv.Transaction, error) { return &s.txn, err } -func (s *session) checkBeforeNewTxn(ctx context.Context) error { - if s.txn.Valid() { - txnStartTS := s.txn.StartTS() - txnScope := s.GetSessionVars().TxnCtx.TxnScope - err := s.CommitTxn(ctx) - if err != nil { - return err - } - logutil.Logger(ctx).Info("Try to create a new txn inside a transaction auto commit", - zap.Int64("schemaVersion", s.GetInfoSchema().SchemaMetaVersion()), - zap.Uint64("txnStartTS", txnStartTS), - zap.String("txnScope", txnScope)) - } - return nil -} - func (s *session) SetValue(key fmt.Stringer, value interface{}) { s.mu.Lock() s.mu.values[key] = value @@ -3359,10 +3330,6 @@ func (s *session) BuiltinFunctionUsageInc(scalarFuncSigName string) { s.functionUsageMu.builtinFunctionUsage.Inc(scalarFuncSigName) } -func (s *session) getSnapshotInterceptor() kv.SnapshotInterceptor { - return temptable.SessionSnapshotInterceptor(s) -} - func (s *session) GetStmtStats() *stmtstats.StatementStats { return s.stmtStats } diff --git a/sessiontxn/staleread/provider.go b/sessiontxn/staleread/provider.go index 689d0a2a2224b..05f67bae8eac9 100644 --- a/sessiontxn/staleread/provider.go +++ b/sessiontxn/staleread/provider.go @@ -118,11 +118,9 @@ func (p *StalenessTxnContextProvider) activateStaleTxn() error { txn.SetOption(kv.SnapInterceptor, temptable.SessionSnapshotInterceptor(p.sctx)) p.is = is - if err := p.sctx.GetSessionVars().SetSystemVar(variable.TiDBSnapshot, ""); err != nil { - return err - } + err = p.sctx.GetSessionVars().SetSystemVar(variable.TiDBSnapshot, "") - return nil + return err } func (p *StalenessTxnContextProvider) enterNewStaleTxnWithReplaceProvider() error { diff --git a/sessiontxn/txn.go b/sessiontxn/txn.go index cbd51faf16f4d..07f46d4ddb5ee 100644 --- a/sessiontxn/txn.go +++ b/sessiontxn/txn.go @@ -16,8 +16,6 @@ package sessiontxn import ( "context" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" "github.com/opentracing/opentracing-go" "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -25,7 +23,9 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table/temptable" + "github.com/pingcap/tidb/util/logutil" "github.com/tikv/client-go/v2/oracle" + "go.uber.org/zap" ) // ConstantFuture implements oracle.Future @@ -94,6 +94,7 @@ func CheckBeforeNewTxn(ctx context.Context, sctx sessionctx.Context) error { zap.String("txnScope", txnScope)) } return nil +} // GetSnapshotWithTS returns a snapshot with ts. func GetSnapshotWithTS(s sessionctx.Context, ts uint64) kv.Snapshot { From 3de64fa8fc4270e4147776d0012a80999fde3799 Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Mon, 4 Jul 2022 15:18:42 +0800 Subject: [PATCH 05/16] update Signed-off-by: SpadeA-Tang --- session/session.go | 7 ++++--- session/txn.go | 8 ++++++++ sessionctx/context.go | 4 +++- sessiontxn/isolation/base.go | 22 ++++++++++++++++------ sessiontxn/isolation/readcommitted.go | 2 +- util/mock/context.go | 18 ++++++++++++++++-- 6 files changed, 48 insertions(+), 13 deletions(-) diff --git a/session/session.go b/session/session.go index 3470ec94272d4..855abdfe90856 100644 --- a/session/session.go +++ b/session/session.go @@ -3038,10 +3038,11 @@ func (s *session) PrepareTSFuture(ctx context.Context, future oracle.Future, sco return nil } +// GetPreparedTxnFuture returns the TxnFuture func (s *session) GetPreparedTxnFuture() sessionctx.TxnFuture { - if !s.txn.validOrPending() { - return nil - } + //if !s.txn.validOrPending() { + // return nil + //} return &s.txn } diff --git a/session/txn.go b/session/txn.go index cdb1339ddac43..69d7532ead686 100644 --- a/session/txn.go +++ b/session/txn.go @@ -488,6 +488,14 @@ func (txn *LazyTxn) Wait(ctx context.Context, sctx sessionctx.Context) (kv.Trans return txn, nil } +// GetPreparedTSFuture returns the prepared ts future +func (txn *LazyTxn) GetPreparedTSFuture() oracle.Future { + if future := txn.txnFuture; future != nil { + return future.future + } + return nil +} + func keyNeedToLock(k, v []byte, flags kv.KeyFlags) bool { isTableKey := bytes.HasPrefix(k, tablecodec.TablePrefix()) if !isTableKey { diff --git a/sessionctx/context.go b/sessionctx/context.go index 3ceb9aeb42085..a8539fa8d8efe 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -143,7 +143,7 @@ type Context interface { HasLockedTables() bool // PrepareTSFuture uses to prepare timestamp by future. PrepareTSFuture(ctx context.Context, future oracle.Future, scope string) error - // GetPreparedTxnFuture returns the prepared ts future + // GetPreparedTxnFuture returns the TxnFuture GetPreparedTxnFuture() TxnFuture // StoreIndexUsage stores the index usage information. StoreIndexUsage(tblID int64, idxID int64, rowsSelected int64) @@ -172,6 +172,8 @@ type Context interface { type TxnFuture interface { // Wait converts pending txn to valid Wait(ctx context.Context, sctx Context) (kv.Transaction, error) + // GetPreparedTSFuture returns the prepared ts future + GetPreparedTSFuture() oracle.Future } type basicCtxType int diff --git a/sessiontxn/isolation/base.go b/sessiontxn/isolation/base.go index bac23d50b3b09..0a3112a0617dc 100644 --- a/sessiontxn/isolation/base.go +++ b/sessiontxn/isolation/base.go @@ -70,11 +70,19 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn if err := sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil { return err } + p.isTxnPrepared = false + if err := p.prepareTxn(false); err != nil { + return err + } case sessiontxn.EnterNewTxnWithBeginStmt: if !sessiontxn.CanReuseTxnWhenExplicitBegin(p.sctx) { if err := sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil { return err } + p.isTxnPrepared = false + if err := p.prepareTxn(false); err != nil { + return err + } } sessVars.SetInTxn(true) case sessiontxn.EnterNewTxnBeforeStmt: @@ -102,7 +110,7 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn if err != nil { return err } - p.isTxnPrepared = txn.Valid() || p.sctx.GetPreparedTxnFuture() != nil + p.isTxnPrepared = txn.Valid() || p.sctx.GetPreparedTxnFuture().GetPreparedTSFuture() != nil if activeNow { _, err = p.ActivateTxn() } @@ -172,7 +180,7 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { return p.txn, nil } - if err := p.prepareTxn(); err != nil { + if err := p.prepareTxn(true); err != nil { return nil, err } @@ -219,13 +227,15 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { return txn, nil } -func (p *baseTxnContextProvider) prepareTxn() error { +func (p *baseTxnContextProvider) prepareTxn(considerSnapshotTS bool) error { if p.isTxnPrepared { return nil } - if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 { - return p.prepareTxnWithTS(snapshotTS) + if considerSnapshotTS { + if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 { + return p.prepareTxnWithTS(snapshotTS) + } } future := sessiontxn.NewOracleFuture(p.ctx, p.sctx, p.sctx.GetSessionVars().TxnCtx.TxnScope) @@ -272,7 +282,7 @@ func (p *baseTxnContextProvider) AdviseWarmup() error { // When executing `START TRANSACTION READ ONLY AS OF ...` no need to warmUp return nil } - return p.prepareTxn() + return p.prepareTxn(true) } // AdviseOptimizeWithPlan providers optimization according to the plan diff --git a/sessiontxn/isolation/readcommitted.go b/sessiontxn/isolation/readcommitted.go index d2afad4c2ea26..c264d3404f447 100644 --- a/sessiontxn/isolation/readcommitted.go +++ b/sessiontxn/isolation/readcommitted.go @@ -223,7 +223,7 @@ func (p *PessimisticRCTxnContextProvider) AdviseWarmup() error { return nil } - if err := p.prepareTxn(); err != nil { + if err := p.prepareTxn(true); err != nil { return err } p.prepareStmtTS() diff --git a/util/mock/context.go b/util/mock/context.go index b8be5b9ddaaf9..4d916854534a9 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -63,10 +63,24 @@ type wrapTxn struct { kv.Transaction } -func (txn *wrapTxn) Wait(_ context.Context, _ sessionctx.Context) (kv.Transaction, error) { +// Wait converts pending txn to valid +func (txn *wrapTxn) Wait(_ context.Context, sctx sessionctx.Context) (kv.Transaction, error) { + if txn.Transaction != nil { + return txn, nil + } + kvTxn, err := sctx.GetStore().Begin() + if err != nil { + return nil, errors.Trace(err) + } + txn.Transaction = kvTxn return txn, nil } +// GetPreparedTSFuture returns the prepared ts future +func (txn *wrapTxn) GetPreparedTSFuture() oracle.Future { + return nil +} + func (txn *wrapTxn) Valid() bool { return txn.Transaction != nil && txn.Transaction.Valid() } @@ -369,7 +383,7 @@ func (c *Context) PrepareTSFuture(ctx context.Context, future oracle.Future, sco return nil } -// GetPreparedTxnFuture returns the prepared ts future +// GetPreparedTxnFuture returns the TxnFuture func (c *Context) GetPreparedTxnFuture() sessionctx.TxnFuture { return &c.txn } From 8bdff9032059b746ccc5e4243561c398a7636e1c Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Mon, 4 Jul 2022 16:20:15 +0800 Subject: [PATCH 06/16] update Signed-off-by: SpadeA-Tang --- session/session.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/session/session.go b/session/session.go index e164944e6aa9f..12a898980b9d1 100644 --- a/session/session.go +++ b/session/session.go @@ -3038,7 +3038,9 @@ func (s *session) PrepareTSFuture(ctx context.Context, future oracle.Future, sco } failpoint.Inject("assertTSONotRequest", func() { - panic("tso shouldn't be requested") + if _, ok := future.(sessiontxn.ConstantFuture); !ok { + panic("tso shouldn't be requested") + } }) failpoint.InjectContext(ctx, "mockGetTSFail", func() { From 3c9c1ecfc95dd22729c75e6e2b9fd5bad55438da Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Mon, 4 Jul 2022 16:51:19 +0800 Subject: [PATCH 07/16] update Signed-off-by: SpadeA-Tang --- sessiontxn/isolation/base.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/sessiontxn/isolation/base.go b/sessiontxn/isolation/base.go index 70acf49f6c041..0292a29e21a3b 100644 --- a/sessiontxn/isolation/base.go +++ b/sessiontxn/isolation/base.go @@ -56,6 +56,15 @@ type baseTxnContextProvider struct { enterNewTxnType sessiontxn.EnterNewTxnType } +// prepareTxnNotConsiderSnapshotTS is used to prepare an oracle ts future even when SnapshotTS is set. +func (p *baseTxnContextProvider) prepareTxnNotConsiderSnapshotTS() error { + if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS == 0 { + return nil + } + p.isTxnPrepared = false + return p.prepareTxn(false) +} + // OnInitialize is the hook that should be called when enter a new txn with this provider func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn.EnterNewTxnType) (err error) { if p.getStmtReadTSFunc == nil || p.getStmtForUpdateTSFunc == nil { @@ -70,8 +79,7 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn if err := sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil { return err } - p.isTxnPrepared = false - if err := p.prepareTxn(false); err != nil { + if err := p.prepareTxnNotConsiderSnapshotTS(); err != nil { return err } case sessiontxn.EnterNewTxnWithBeginStmt: @@ -79,8 +87,7 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn if err := sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil { return err } - p.isTxnPrepared = false - if err := p.prepareTxn(false); err != nil { + if err := p.prepareTxnNotConsiderSnapshotTS(); err != nil { return err } } From 77d942c330c2a3466df0fade3320fda5652129db Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Mon, 4 Jul 2022 17:30:34 +0800 Subject: [PATCH 08/16] remove unnecessary comment Signed-off-by: SpadeA-Tang --- session/session.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/session/session.go b/session/session.go index 12a898980b9d1..0585f67a66f27 100644 --- a/session/session.go +++ b/session/session.go @@ -3057,9 +3057,6 @@ func (s *session) PrepareTSFuture(ctx context.Context, future oracle.Future, sco // GetPreparedTxnFuture returns the TxnFuture func (s *session) GetPreparedTxnFuture() sessionctx.TxnFuture { - //if !s.txn.validOrPending() { - // return nil - //} return &s.txn } From a7186f2529f7e3f1e257e72939341e960cb9a646 Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Tue, 5 Jul 2022 15:34:35 +0800 Subject: [PATCH 09/16] update comments Signed-off-by: SpadeA-Tang --- sessiontxn/isolation/base.go | 10 ++++++++++ sessiontxn/staleread/provider.go | 2 ++ 2 files changed, 12 insertions(+) diff --git a/sessiontxn/isolation/base.go b/sessiontxn/isolation/base.go index d946ec5881476..d117b5f27eadd 100644 --- a/sessiontxn/isolation/base.go +++ b/sessiontxn/isolation/base.go @@ -126,6 +126,7 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn return err } +// GetTxnInfoSchema returns the information schema used by txn func (p *baseTxnContextProvider) GetTxnInfoSchema() infoschema.InfoSchema { if is := p.sctx.GetSessionVars().SnapshotInfoschema; is != nil { return is.(infoschema.InfoSchema) @@ -133,10 +134,12 @@ func (p *baseTxnContextProvider) GetTxnInfoSchema() infoschema.InfoSchema { return p.infoSchema } +// GetTxnScope returns the current txn scope func (p *baseTxnContextProvider) GetTxnScope() string { return p.sctx.GetSessionVars().TxnCtx.TxnScope } +// GetReadReplicaScope returns the read replica scope func (p *baseTxnContextProvider) GetReadReplicaScope() string { if txnScope := p.GetTxnScope(); txnScope != kv.GlobalTxnScope && txnScope != "" { // In local txn, we should use txnScope as the readReplicaScope @@ -152,6 +155,7 @@ func (p *baseTxnContextProvider) GetReadReplicaScope() string { return kv.GlobalReplicaScope } +//GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update) func (p *baseTxnContextProvider) GetStmtReadTS() (uint64, error) { if _, err := p.ActivateTxn(); err != nil { return 0, err @@ -163,6 +167,7 @@ func (p *baseTxnContextProvider) GetStmtReadTS() (uint64, error) { return p.getStmtReadTSFunc() } +// GetStmtForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update func (p *baseTxnContextProvider) GetStmtForUpdateTS() (uint64, error) { if _, err := p.ActivateTxn(); err != nil { return 0, err @@ -174,16 +179,19 @@ func (p *baseTxnContextProvider) GetStmtForUpdateTS() (uint64, error) { return p.getStmtForUpdateTSFunc() } +// OnStmtStart is the hook that should be called when a new statement started func (p *baseTxnContextProvider) OnStmtStart(ctx context.Context, _ ast.StmtNode) error { p.ctx = ctx return nil } +// OnStmtRetry is the hook that should be called when a statement is retried internally. func (p *baseTxnContextProvider) OnStmtRetry(ctx context.Context) error { p.ctx = ctx return nil } +// OnStmtErrorForNextAction is the hook that should be called when a new statement get an error func (p *baseTxnContextProvider) OnStmtErrorForNextAction(point sessiontxn.StmtErrorHandlePoint, err error) (sessiontxn.StmtErrorAction, error) { switch point { case sessiontxn.StmtErrAfterPessimisticLock: @@ -202,6 +210,7 @@ func (p *baseTxnContextProvider) getTxnStartTS() (uint64, error) { return txn.StartTS(), nil } +// ActivateTxn activates the transaction and set the relevant context variables. func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { if p.txn != nil { return p.txn, nil @@ -267,6 +276,7 @@ func (p *baseTxnContextProvider) prepareTxn(considerSnapshotTS bool) error { return nil } + // Sometimes, we need to prepare a latest oracle ts future even the SnapshotTS is set. if considerSnapshotTS { if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 { return p.prepareTxnWithTS(snapshotTS) diff --git a/sessiontxn/staleread/provider.go b/sessiontxn/staleread/provider.go index 25db71cfa13b6..e68356c9c87ce 100644 --- a/sessiontxn/staleread/provider.go +++ b/sessiontxn/staleread/provider.go @@ -85,6 +85,8 @@ func (p *StalenessTxnContextProvider) OnInitialize(ctx context.Context, tp sessi } } +// activateStaleTxn first commit old transaction if needed, and then prepare and activate a transaction +// with the staleness snapshot ts. After that, it sets the relevant context variables. func (p *StalenessTxnContextProvider) activateStaleTxn() error { var err error if err = sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil { From 929c91d4c3a94b4b9dbb423899e79027d263d927 Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Tue, 5 Jul 2022 15:39:40 +0800 Subject: [PATCH 10/16] update Signed-off-by: SpadeA-Tang --- sessiontxn/isolation/base.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sessiontxn/isolation/base.go b/sessiontxn/isolation/base.go index d117b5f27eadd..4a6b31477205b 100644 --- a/sessiontxn/isolation/base.go +++ b/sessiontxn/isolation/base.go @@ -276,7 +276,7 @@ func (p *baseTxnContextProvider) prepareTxn(considerSnapshotTS bool) error { return nil } - // Sometimes, we need to prepare a latest oracle ts future even the SnapshotTS is set. + // Sometimes, we need to prepare the latest oracle ts future even the SnapshotTS is set. if considerSnapshotTS { if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 { return p.prepareTxnWithTS(snapshotTS) From 3ab0e7853cbb443f22e569a64a04b96524cb58cd Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Wed, 6 Jul 2022 11:18:20 +0800 Subject: [PATCH 11/16] update Signed-off-by: SpadeA-Tang --- sessiontxn/staleread/provider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sessiontxn/staleread/provider.go b/sessiontxn/staleread/provider.go index e68356c9c87ce..bd571c4f3df4b 100644 --- a/sessiontxn/staleread/provider.go +++ b/sessiontxn/staleread/provider.go @@ -93,7 +93,7 @@ func (p *StalenessTxnContextProvider) activateStaleTxn() error { return err } - txnScope := config.GetTxnScopeFromConfig() + txnScope := kv.GlobalTxnScope if err = p.sctx.PrepareTSFuture(p.ctx, sessiontxn.ConstantFuture(p.ts), txnScope); err != nil { return err } From a3c6a561567f6ffc9de43494545f86d70a07cc64 Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Thu, 7 Jul 2022 12:30:46 +0800 Subject: [PATCH 12/16] update Signed-off-by: SpadeA-Tang --- sessiontxn/isolation/base.go | 4 ---- sessiontxn/staleread/provider.go | 4 ---- 2 files changed, 8 deletions(-) diff --git a/sessiontxn/isolation/base.go b/sessiontxn/isolation/base.go index 4a6b31477205b..a3228b3ec514e 100644 --- a/sessiontxn/isolation/base.go +++ b/sessiontxn/isolation/base.go @@ -221,10 +221,6 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { } txnFuture := p.sctx.GetPreparedTxnFuture() - if txnFuture == nil { - return nil, errors.AddStack(kv.ErrInvalidTxn) - } - txn, err := txnFuture.Wait(p.ctx, p.sctx) if err != nil { return nil, err diff --git a/sessiontxn/staleread/provider.go b/sessiontxn/staleread/provider.go index bd571c4f3df4b..25fe76912133f 100644 --- a/sessiontxn/staleread/provider.go +++ b/sessiontxn/staleread/provider.go @@ -99,10 +99,6 @@ func (p *StalenessTxnContextProvider) activateStaleTxn() error { } txnFuture := p.sctx.GetPreparedTxnFuture() - if txnFuture == nil { - return errors.AddStack(kv.ErrInvalidTxn) - } - txn, err := txnFuture.Wait(p.ctx, p.sctx) if err != nil { return err From cd26fd4e0ae27d2e4bbd8b307267bcf9352910b5 Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Thu, 7 Jul 2022 12:53:35 +0800 Subject: [PATCH 13/16] update Signed-off-by: SpadeA-Tang --- sessiontxn/isolation/base.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sessiontxn/isolation/base.go b/sessiontxn/isolation/base.go index a3228b3ec514e..5ff1ca639abc0 100644 --- a/sessiontxn/isolation/base.go +++ b/sessiontxn/isolation/base.go @@ -57,9 +57,12 @@ type baseTxnContextProvider struct { enterNewTxnType sessiontxn.EnterNewTxnType } -// prepareTxnNotConsiderSnapshotTS is used to prepare an oracle ts future even when SnapshotTS is set. -func (p *baseTxnContextProvider) prepareTxnNotConsiderSnapshotTS() error { +// prepareTxnWhenSnapshotTSSet is an optimization for just calling p.prepareTxn(false). +// If warmup is called, it has already prepared txn. In case of snapshotTS being set, we cannot use that preparation as +// we need the latest ts in some cases. Otherwise, we reuse the txn preparation and do nothing here. +func (p *baseTxnContextProvider) prepareTxnWhenSnapshotTSSet() error { if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS == 0 { + // If snapshotTS is not set, we can reuse the return nil } p.isTxnPrepared = false @@ -80,7 +83,7 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn if err := sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil { return err } - if err := p.prepareTxnNotConsiderSnapshotTS(); err != nil { + if err := p.prepareTxn(false); err != nil { return err } case sessiontxn.EnterNewTxnWithBeginStmt: @@ -88,7 +91,7 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn if err := sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil { return err } - if err := p.prepareTxnNotConsiderSnapshotTS(); err != nil { + if err := p.prepareTxnWhenSnapshotTSSet(); err != nil { return err } } From 59fa3bddf44ac1dde66b502108461338d4f224d2 Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Thu, 7 Jul 2022 14:19:50 +0800 Subject: [PATCH 14/16] update Signed-off-by: SpadeA-Tang --- session/session.go | 6 +++++- session/txn.go | 8 -------- sessionctx/context.go | 5 ++--- sessiontxn/isolation/base.go | 2 +- util/mock/context.go | 15 ++++++++------- 5 files changed, 16 insertions(+), 20 deletions(-) diff --git a/session/session.go b/session/session.go index a4657fb706e55..12f4213236c5e 100644 --- a/session/session.go +++ b/session/session.go @@ -3055,8 +3055,12 @@ func (s *session) PrepareTSFuture(ctx context.Context, future oracle.Future, sco return nil } -// GetPreparedTxnFuture returns the TxnFuture +// GetPreparedTxnFuture returns the TxnFuture if it is valid or pending. +// It returns nil otherwise. func (s *session) GetPreparedTxnFuture() sessionctx.TxnFuture { + if !s.txn.validOrPending() { + return nil + } return &s.txn } diff --git a/session/txn.go b/session/txn.go index 69d7532ead686..cdb1339ddac43 100644 --- a/session/txn.go +++ b/session/txn.go @@ -488,14 +488,6 @@ func (txn *LazyTxn) Wait(ctx context.Context, sctx sessionctx.Context) (kv.Trans return txn, nil } -// GetPreparedTSFuture returns the prepared ts future -func (txn *LazyTxn) GetPreparedTSFuture() oracle.Future { - if future := txn.txnFuture; future != nil { - return future.future - } - return nil -} - func keyNeedToLock(k, v []byte, flags kv.KeyFlags) bool { isTableKey := bytes.HasPrefix(k, tablecodec.TablePrefix()) if !isTableKey { diff --git a/sessionctx/context.go b/sessionctx/context.go index a8539fa8d8efe..4ab11f7da5a47 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -143,7 +143,8 @@ type Context interface { HasLockedTables() bool // PrepareTSFuture uses to prepare timestamp by future. PrepareTSFuture(ctx context.Context, future oracle.Future, scope string) error - // GetPreparedTxnFuture returns the TxnFuture + // GetPreparedTxnFuture returns the TxnFuture if it is valid or pending. + // It returns nil otherwise. GetPreparedTxnFuture() TxnFuture // StoreIndexUsage stores the index usage information. StoreIndexUsage(tblID int64, idxID int64, rowsSelected int64) @@ -172,8 +173,6 @@ type Context interface { type TxnFuture interface { // Wait converts pending txn to valid Wait(ctx context.Context, sctx Context) (kv.Transaction, error) - // GetPreparedTSFuture returns the prepared ts future - GetPreparedTSFuture() oracle.Future } type basicCtxType int diff --git a/sessiontxn/isolation/base.go b/sessiontxn/isolation/base.go index 5ff1ca639abc0..2d2332fe160ad 100644 --- a/sessiontxn/isolation/base.go +++ b/sessiontxn/isolation/base.go @@ -121,7 +121,7 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn if err != nil { return err } - p.isTxnPrepared = txn.Valid() || p.sctx.GetPreparedTxnFuture().GetPreparedTSFuture() != nil + p.isTxnPrepared = txn.Valid() || p.sctx.GetPreparedTxnFuture() != nil if activeNow { _, err = p.ActivateTxn() } diff --git a/util/mock/context.go b/util/mock/context.go index 339a1cf55773c..c7b9fe66d9d66 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -61,9 +61,10 @@ type Context struct { type wrapTxn struct { kv.Transaction + prepared bool } -// Wait converts pending txn to valid +// Wait creates a new kvTransaction func (txn *wrapTxn) Wait(_ context.Context, sctx sessionctx.Context) (kv.Transaction, error) { kvTxn, err := sctx.GetStore().Begin() if err != nil { @@ -73,11 +74,6 @@ func (txn *wrapTxn) Wait(_ context.Context, sctx sessionctx.Context) (kv.Transac return txn, nil } -// GetPreparedTSFuture returns the prepared ts future -func (txn *wrapTxn) GetPreparedTSFuture() oracle.Future { - return nil -} - func (txn *wrapTxn) Valid() bool { return txn.Transaction != nil && txn.Transaction.Valid() } @@ -377,11 +373,16 @@ func (c *Context) HasLockedTables() bool { // PrepareTSFuture implements the sessionctx.Context interface. func (c *Context) PrepareTSFuture(ctx context.Context, future oracle.Future, scope string) error { + c.txn.prepared = true return nil } -// GetPreparedTxnFuture returns the TxnFuture +// GetPreparedTxnFuture returns the TxnFuture if it is prepared. +// It returns nil otherwise. func (c *Context) GetPreparedTxnFuture() sessionctx.TxnFuture { + if !c.txn.prepared { + return nil + } return &c.txn } From 98ffff8164c56f03361b8aed93793884bd198e58 Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Thu, 7 Jul 2022 16:59:52 +0800 Subject: [PATCH 15/16] update Signed-off-by: SpadeA-Tang --- sessiontxn/isolation/base.go | 55 +++++++++++++++------------ sessiontxn/isolation/readcommitted.go | 10 ++--- sessiontxn/staleread/provider.go | 2 +- sessiontxn/txn.go | 4 +- util/mock/context.go | 32 ++++++++++++---- 5 files changed, 64 insertions(+), 39 deletions(-) diff --git a/sessiontxn/isolation/base.go b/sessiontxn/isolation/base.go index 2d2332fe160ad..e8f18bf75059c 100644 --- a/sessiontxn/isolation/base.go +++ b/sessiontxn/isolation/base.go @@ -57,18 +57,6 @@ type baseTxnContextProvider struct { enterNewTxnType sessiontxn.EnterNewTxnType } -// prepareTxnWhenSnapshotTSSet is an optimization for just calling p.prepareTxn(false). -// If warmup is called, it has already prepared txn. In case of snapshotTS being set, we cannot use that preparation as -// we need the latest ts in some cases. Otherwise, we reuse the txn preparation and do nothing here. -func (p *baseTxnContextProvider) prepareTxnWhenSnapshotTSSet() error { - if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS == 0 { - // If snapshotTS is not set, we can reuse the - return nil - } - p.isTxnPrepared = false - return p.prepareTxn(false) -} - // OnInitialize is the hook that should be called when enter a new txn with this provider func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn.EnterNewTxnType) (err error) { if p.getStmtReadTSFunc == nil || p.getStmtForUpdateTSFunc == nil { @@ -80,18 +68,26 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn activeNow := true switch tp { case sessiontxn.EnterNewTxnDefault: - if err := sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil { + // As we will enter a new txn, we need to commit the old txn if it's still valid. + // There are two main steps here to enter a new txn: + // 1. prepareTxnWithOracleTS + // 2. ActivateTxn + if err := sessiontxn.CommitBeforeEnterNewTxn(p.ctx, p.sctx); err != nil { return err } - if err := p.prepareTxn(false); err != nil { + if err := p.prepareTxnWithOracleTS(); err != nil { return err } case sessiontxn.EnterNewTxnWithBeginStmt: if !sessiontxn.CanReuseTxnWhenExplicitBegin(p.sctx) { - if err := sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil { + // As we will enter a new txn, we need to commit the old txn if it's still valid. + // There are two main steps here to enter a new txn: + // 1. prepareTxnWithOracleTS + // 2. ActivateTxn + if err := sessiontxn.CommitBeforeEnterNewTxn(p.ctx, p.sctx); err != nil { return err } - if err := p.prepareTxnWhenSnapshotTSSet(); err != nil { + if err := p.prepareTxnWithOracleTS(); err != nil { return err } } @@ -219,7 +215,7 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { return p.txn, nil } - if err := p.prepareTxn(true); err != nil { + if err := p.prepareTxn(); err != nil { return nil, err } @@ -270,16 +266,27 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { return txn, nil } -func (p *baseTxnContextProvider) prepareTxn(considerSnapshotTS bool) error { +// prepareTxn prepares txn with an oracle ts future. If the snapshotTS is set, +// the txn is prepared with it. +func (p *baseTxnContextProvider) prepareTxn() error { if p.isTxnPrepared { return nil } - // Sometimes, we need to prepare the latest oracle ts future even the SnapshotTS is set. - if considerSnapshotTS { - if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 { - return p.prepareTxnWithTS(snapshotTS) - } + if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 { + return p.prepareTxnWithTS(snapshotTS) + } + + future := sessiontxn.NewOracleFuture(p.ctx, p.sctx, p.sctx.GetSessionVars().TxnCtx.TxnScope) + return p.replaceTxnTsFuture(future) +} + +// prepareTxnWithOracleTS +// The difference between prepareTxnWithOracleTS and prepareTxn is that prepareTxnWithOracleTS +// does not consider snapshotTS +func (p *baseTxnContextProvider) prepareTxnWithOracleTS() error { + if p.isTxnPrepared { + return nil } future := sessiontxn.NewOracleFuture(p.ctx, p.sctx, p.sctx.GetSessionVars().TxnCtx.TxnScope) @@ -326,7 +333,7 @@ func (p *baseTxnContextProvider) AdviseWarmup() error { // When executing `START TRANSACTION READ ONLY AS OF ...` no need to warmUp return nil } - return p.prepareTxn(true) + return p.prepareTxn() } // AdviseOptimizeWithPlan providers optimization according to the plan diff --git a/sessiontxn/isolation/readcommitted.go b/sessiontxn/isolation/readcommitted.go index cd0ebad3f027f..c24062a39f20e 100644 --- a/sessiontxn/isolation/readcommitted.go +++ b/sessiontxn/isolation/readcommitted.go @@ -219,14 +219,14 @@ func (p *PessimisticRCTxnContextProvider) handleAfterPessimisticLockError(lockEr // AdviseWarmup provides warmup for inner state func (p *PessimisticRCTxnContextProvider) AdviseWarmup() error { - if p.isTidbSnapshotEnabled() { - return nil + if err := p.prepareTxn(); err != nil { + return err } - if err := p.prepareTxn(true); err != nil { - return err + if !p.isTidbSnapshotEnabled() { + p.prepareStmtTS() } - p.prepareStmtTS() + return nil } diff --git a/sessiontxn/staleread/provider.go b/sessiontxn/staleread/provider.go index 25fe76912133f..31d02726526ae 100644 --- a/sessiontxn/staleread/provider.go +++ b/sessiontxn/staleread/provider.go @@ -89,7 +89,7 @@ func (p *StalenessTxnContextProvider) OnInitialize(ctx context.Context, tp sessi // with the staleness snapshot ts. After that, it sets the relevant context variables. func (p *StalenessTxnContextProvider) activateStaleTxn() error { var err error - if err = sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil { + if err = sessiontxn.CommitBeforeEnterNewTxn(p.ctx, p.sctx); err != nil { return err } diff --git a/sessiontxn/txn.go b/sessiontxn/txn.go index 8992f3d28b54e..00ca355228ca9 100644 --- a/sessiontxn/txn.go +++ b/sessiontxn/txn.go @@ -74,9 +74,9 @@ func CanReuseTxnWhenExplicitBegin(sctx sessionctx.Context) bool { return txnCtx.History == nil && !txnCtx.IsStaleness && sessVars.SnapshotTS == 0 } -// CheckBeforeNewTxn is called before entering a new transaction. It checks whether the old +// CommitBeforeEnterNewTxn is called before entering a new transaction. It checks whether the old // txn is valid in which case we should commit it first. -func CheckBeforeNewTxn(ctx context.Context, sctx sessionctx.Context) error { +func CommitBeforeEnterNewTxn(ctx context.Context, sctx sessionctx.Context) error { txn, err := sctx.Txn(false) if err != nil { return err diff --git a/util/mock/context.go b/util/mock/context.go index c7b9fe66d9d66..33ba7338d2dbd 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -18,6 +18,7 @@ package mock import ( "context" "fmt" + "github.com/tikv/client-go/v2/tikv" "time" "github.com/pingcap/errors" @@ -61,16 +62,33 @@ type Context struct { type wrapTxn struct { kv.Transaction - prepared bool + tsFuture oracle.Future +} + +func (txn *wrapTxn) validOrPending() bool { + return txn.tsFuture != nil || txn.Transaction.Valid() +} + +func (txn *wrapTxn) pending() bool { + return txn.Transaction == nil && txn.tsFuture != nil } // Wait creates a new kvTransaction func (txn *wrapTxn) Wait(_ context.Context, sctx sessionctx.Context) (kv.Transaction, error) { - kvTxn, err := sctx.GetStore().Begin() - if err != nil { - return nil, errors.Trace(err) + if !txn.validOrPending() { + return txn, errors.AddStack(kv.ErrInvalidTxn) + } + if txn.pending() { + ts, err := txn.tsFuture.Wait() + if err != nil { + return nil, err + } + kvTxn, err := sctx.GetStore().Begin(tikv.WithStartTS(ts)) + if err != nil { + return nil, errors.Trace(err) + } + txn.Transaction = kvTxn } - txn.Transaction = kvTxn return txn, nil } @@ -373,14 +391,14 @@ func (c *Context) HasLockedTables() bool { // PrepareTSFuture implements the sessionctx.Context interface. func (c *Context) PrepareTSFuture(ctx context.Context, future oracle.Future, scope string) error { - c.txn.prepared = true + c.txn.tsFuture = future return nil } // GetPreparedTxnFuture returns the TxnFuture if it is prepared. // It returns nil otherwise. func (c *Context) GetPreparedTxnFuture() sessionctx.TxnFuture { - if !c.txn.prepared { + if !c.txn.validOrPending() { return nil } return &c.txn From 7fea97ca90a137ad966dda8c6d4a6bac3153d9cb Mon Sep 17 00:00:00 2001 From: SpadeA-Tang Date: Thu, 7 Jul 2022 17:58:09 +0800 Subject: [PATCH 16/16] update Signed-off-by: SpadeA-Tang --- util/mock/context.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/util/mock/context.go b/util/mock/context.go index 33ba7338d2dbd..d45c80a7af787 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -18,7 +18,6 @@ package mock import ( "context" "fmt" - "github.com/tikv/client-go/v2/tikv" "time" "github.com/pingcap/errors" @@ -39,6 +38,7 @@ import ( "github.com/pingcap/tidb/util/topsql/stmtstats" "github.com/pingcap/tipb/go-binlog" "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/tikv" ) var ( @@ -391,6 +391,7 @@ func (c *Context) HasLockedTables() bool { // PrepareTSFuture implements the sessionctx.Context interface. func (c *Context) PrepareTSFuture(ctx context.Context, future oracle.Future, scope string) error { + c.txn.Transaction = nil c.txn.tsFuture = future return nil }