Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: remove NewTxn and NewStaleTxnWithStartTS in session #35885

Merged
merged 24 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 7 additions & 107 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ import (
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
tikvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -2462,19 +2461,6 @@ func (s *session) DropPreparedStmt(stmtID uint32) error {
return nil
}

// setTxnAssertionLevel sets assertion level of a transactin. Note that assertion level should be set only once just
// after creating a new transaction.
func setTxnAssertionLevel(txn kv.Transaction, assertionLevel variable.AssertionLevel) {
switch assertionLevel {
case variable.AssertionLevelOff:
txn.SetOption(kv.AssertionLevel, kvrpcpb.AssertionLevel_Off)
case variable.AssertionLevelFast:
txn.SetOption(kv.AssertionLevel, kvrpcpb.AssertionLevel_Fast)
case variable.AssertionLevelStrict:
txn.SetOption(kv.AssertionLevel, kvrpcpb.AssertionLevel_Strict)
}
}

func (s *session) Txn(active bool) (kv.Transaction, error) {
if !active {
return &s.txn, nil
Expand All @@ -2483,91 +2469,6 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
return &s.txn, err
}

func (s *session) NewTxn(ctx context.Context) error {
if err := s.checkBeforeNewTxn(ctx); err != nil {
return err
}
txn, err := s.store.Begin(tikv.WithTxnScope(s.sessionVars.CheckAndGetTxnScope()))
if err != nil {
return err
}
txn.SetVars(s.sessionVars.KVVars)
replicaReadType := s.GetSessionVars().GetReplicaRead()
if replicaReadType.IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, replicaReadType)
}
setTxnAssertionLevel(txn, s.sessionVars.AssertionLevel)
s.txn.changeInvalidToValid(txn)
is := s.GetDomainInfoSchema()
s.sessionVars.TxnCtx = &variable.TransactionContext{
TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{
InfoSchema: is,
CreateTime: time.Now(),
StartTS: txn.StartTS(),
ShardStep: int(s.sessionVars.ShardAllocateStep),
IsStaleness: false,
TxnScope: s.sessionVars.CheckAndGetTxnScope(),
},
}
s.txn.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor())
if s.GetSessionVars().InRestrictedSQL {
s.txn.SetOption(kv.RequestSourceInternal, true)
if source := ctx.Value(kv.RequestSourceKey); source != nil {
s.txn.SetOption(kv.RequestSourceType, source.(kv.RequestSource).RequestSourceType)
}
}
return nil
}

func (s *session) checkBeforeNewTxn(ctx context.Context) error {
if s.txn.Valid() {
txnStartTS := s.txn.StartTS()
txnScope := s.GetSessionVars().TxnCtx.TxnScope
err := s.CommitTxn(ctx)
if err != nil {
return err
}
logutil.Logger(ctx).Info("Try to create a new txn inside a transaction auto commit",
zap.Int64("schemaVersion", s.GetInfoSchema().SchemaMetaVersion()),
zap.Uint64("txnStartTS", txnStartTS),
zap.String("txnScope", txnScope))
}
return nil
}

// NewStaleTxnWithStartTS create a transaction with the given StartTS.
func (s *session) NewStaleTxnWithStartTS(ctx context.Context, startTS uint64) error {
if err := s.checkBeforeNewTxn(ctx); err != nil {
return err
}
txnScope := config.GetTxnScopeFromConfig()
txn, err := s.store.Begin(tikv.WithTxnScope(txnScope), tikv.WithStartTS(startTS))
if err != nil {
return err
}
txn.SetVars(s.sessionVars.KVVars)
txn.SetOption(kv.IsStalenessReadOnly, true)
txn.SetOption(kv.TxnScope, txnScope)
setTxnAssertionLevel(txn, s.sessionVars.AssertionLevel)
s.txn.changeInvalidToValid(txn)
is, err := getSnapshotInfoSchema(s, txn.StartTS())
if err != nil {
return errors.Trace(err)
}
s.sessionVars.TxnCtx = &variable.TransactionContext{
TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{
InfoSchema: is,
CreateTime: time.Now(),
StartTS: txn.StartTS(),
ShardStep: int(s.sessionVars.ShardAllocateStep),
IsStaleness: true,
TxnScope: txnScope,
},
}
s.txn.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor())
return nil
}

func (s *session) SetValue(key fmt.Stringer, value interface{}) {
s.mu.Lock()
s.mu.values[key] = value
Expand Down Expand Up @@ -3137,7 +3038,9 @@ func (s *session) PrepareTSFuture(ctx context.Context, future oracle.Future, sco
}

failpoint.Inject("assertTSONotRequest", func() {
panic("tso shouldn't be requested")
if _, ok := future.(sessiontxn.ConstantFuture); !ok {
panic("tso shouldn't be requested")
}
})

failpoint.InjectContext(ctx, "mockGetTSFail", func() {
Expand All @@ -3152,10 +3055,11 @@ func (s *session) PrepareTSFuture(ctx context.Context, future oracle.Future, sco
return nil
}

// GetPreparedTxnFuture returns the TxnFuture
func (s *session) GetPreparedTxnFuture() sessionctx.TxnFuture {
if !s.txn.validOrPending() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this? I think it is reasonable if a txn is not prepared the TxnFuture should return a nil value.

return nil
}
//if !s.txn.validOrPending() {
// return nil
//}
xhebox marked this conversation as resolved.
Show resolved Hide resolved
return &s.txn
}

Expand Down Expand Up @@ -3444,10 +3348,6 @@ func (s *session) BuiltinFunctionUsageInc(scalarFuncSigName string) {
s.functionUsageMu.builtinFunctionUsage.Inc(scalarFuncSigName)
}

func (s *session) getSnapshotInterceptor() kv.SnapshotInterceptor {
return temptable.SessionSnapshotInterceptor(s)
}

func (s *session) GetStmtStats() *stmtstats.StatementStats {
return s.stmtStats
}
Expand Down
24 changes: 8 additions & 16 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,22 +240,6 @@ func (txn *LazyTxn) GetOption(opt int) interface{} {
return txn.Transaction.GetOption(opt)
}

func (txn *LazyTxn) changeInvalidToValid(kvTxn kv.Transaction) {
txn.Transaction = kvTxn
txn.initStmtBuf()
txn.txnFuture = nil

txn.mu.Lock()
defer txn.mu.Unlock()
txn.resetTxnInfo(
kvTxn.StartTS(),
txninfo.TxnIdle,
uint64(txn.Transaction.Len()),
uint64(txn.Transaction.Size()),
"",
nil)
}

func (txn *LazyTxn) changeToPending(future *txnFuture) {
txn.Transaction = nil
txn.txnFuture = future
Expand Down Expand Up @@ -504,6 +488,14 @@ func (txn *LazyTxn) Wait(ctx context.Context, sctx sessionctx.Context) (kv.Trans
return txn, nil
}

// GetPreparedTSFuture returns the prepared ts future
func (txn *LazyTxn) GetPreparedTSFuture() oracle.Future {
if future := txn.txnFuture; future != nil {
return future.future
}
return nil
}

func keyNeedToLock(k, v []byte, flags kv.KeyFlags) bool {
isTableKey := bytes.HasPrefix(k, tablecodec.TablePrefix())
if !isTableKey {
Expand Down
10 changes: 3 additions & 7 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ type SessionStatesHandler interface {
// Context is an interface for transaction and executive args environment.
type Context interface {
SessionStatesHandler
// NewTxn creates a new transaction for further execution.
// If old transaction is valid, it is committed first.
// It's used in BEGIN statement and DDL statements to commit old transaction.
NewTxn(context.Context) error
// NewStaleTxnWithStartTS initializes a staleness transaction with the given StartTS.
NewStaleTxnWithStartTS(ctx context.Context, startTS uint64) error
// SetDiskFullOpt set the disk full opt when tikv disk full happened.
SetDiskFullOpt(level kvrpcpb.DiskFullOpt)
// RollbackTxn rolls back the current transaction.
Expand Down Expand Up @@ -149,7 +143,7 @@ type Context interface {
HasLockedTables() bool
// PrepareTSFuture uses to prepare timestamp by future.
PrepareTSFuture(ctx context.Context, future oracle.Future, scope string) error
// GetPreparedTxnFuture returns the prepared ts future
// GetPreparedTxnFuture returns the TxnFuture
GetPreparedTxnFuture() TxnFuture
// StoreIndexUsage stores the index usage information.
StoreIndexUsage(tblID int64, idxID int64, rowsSelected int64)
Expand Down Expand Up @@ -178,6 +172,8 @@ type Context interface {
type TxnFuture interface {
// Wait converts pending txn to valid
Wait(ctx context.Context, sctx Context) (kv.Transaction, error)
// GetPreparedTSFuture returns the prepared ts future
GetPreparedTSFuture() oracle.Future
}

type basicCtxType int
Expand Down
33 changes: 25 additions & 8 deletions sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ type baseTxnContextProvider struct {
enterNewTxnType sessiontxn.EnterNewTxnType
}

// prepareTxnNotConsiderSnapshotTS is used to prepare an oracle ts future even when SnapshotTS is set.
func (p *baseTxnContextProvider) prepareTxnNotConsiderSnapshotTS() error {
if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS == 0 {
return nil
}
p.isTxnPrepared = false
return p.prepareTxn(false)
}

// OnInitialize is the hook that should be called when enter a new txn with this provider
func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn.EnterNewTxnType) (err error) {
if p.getStmtReadTSFunc == nil || p.getStmtForUpdateTSFunc == nil {
Expand All @@ -67,12 +76,18 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn
activeNow := true
switch tp {
case sessiontxn.EnterNewTxnDefault:
if err = p.sctx.NewTxn(ctx); err != nil {
if err := sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil {
return err
}
if err := p.prepareTxnNotConsiderSnapshotTS(); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok if we call p.prepareTxn(false) here ?

return err
}
case sessiontxn.EnterNewTxnWithBeginStmt:
if !sessiontxn.CanReuseTxnWhenExplicitBegin(p.sctx) {
if err = p.sctx.NewTxn(ctx); err != nil {
if err := sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil {
return err
}
if err := p.prepareTxnNotConsiderSnapshotTS(); err != nil {
return err
}
}
Expand Down Expand Up @@ -102,7 +117,7 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn
if err != nil {
return err
}
p.isTxnPrepared = txn.Valid() || p.sctx.GetPreparedTxnFuture() != nil
p.isTxnPrepared = txn.Valid() || p.sctx.GetPreparedTxnFuture().GetPreparedTSFuture() != nil
if activeNow {
_, err = p.ActivateTxn()
}
Expand Down Expand Up @@ -172,7 +187,7 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
return p.txn, nil
}

if err := p.prepareTxn(); err != nil {
if err := p.prepareTxn(true); err != nil {
return nil, err
}

Expand Down Expand Up @@ -227,13 +242,15 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
return txn, nil
}

func (p *baseTxnContextProvider) prepareTxn() error {
func (p *baseTxnContextProvider) prepareTxn(considerSnapshotTS bool) error {
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
if p.isTxnPrepared {
return nil
}

if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
return p.prepareTxnWithTS(snapshotTS)
if considerSnapshotTS {
if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
return p.prepareTxnWithTS(snapshotTS)
}
}

future := sessiontxn.NewOracleFuture(p.ctx, p.sctx, p.sctx.GetSessionVars().TxnCtx.TxnScope)
Expand Down Expand Up @@ -280,7 +297,7 @@ func (p *baseTxnContextProvider) AdviseWarmup() error {
// When executing `START TRANSACTION READ ONLY AS OF ...` no need to warmUp
return nil
}
return p.prepareTxn()
return p.prepareTxn(true)
}

// AdviseOptimizeWithPlan providers optimization according to the plan
Expand Down
2 changes: 1 addition & 1 deletion sessiontxn/isolation/readcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (p *PessimisticRCTxnContextProvider) AdviseWarmup() error {
return nil
}

if err := p.prepareTxn(); err != nil {
if err := p.prepareTxn(true); err != nil {
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
return err
}
p.prepareStmtTS()
Expand Down
50 changes: 43 additions & 7 deletions sessiontxn/staleread/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ package staleread

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table/temptable"
)

// StalenessTxnContextProvider implements sessiontxn.TxnContextProvider
Expand Down Expand Up @@ -73,18 +76,51 @@ func (p *StalenessTxnContextProvider) OnInitialize(ctx context.Context, tp sessi
}

func (p *StalenessTxnContextProvider) activateStaleTxn() error {
if err := p.sctx.NewStaleTxnWithStartTS(p.ctx, p.ts); err != nil {
var err error
if err = sessiontxn.CheckBeforeNewTxn(p.ctx, p.sctx); err != nil {
return err
}
p.is = p.sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema)
if err := p.sctx.GetSessionVars().SetSystemVar(variable.TiDBSnapshot, ""); err != nil {

txnScope := config.GetTxnScopeFromConfig()
if err = p.sctx.PrepareTSFuture(p.ctx, sessiontxn.ConstantFuture(p.ts), txnScope); err != nil {
return err
}

txnCtx := p.sctx.GetSessionVars().TxnCtx
txnCtx.IsStaleness = true
txnCtx.InfoSchema = p.is
return nil
txnFuture := p.sctx.GetPreparedTxnFuture()
if txnFuture == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ts is prepared in line 97. I think we can guarantee txnFuture is not nil here. Maybe this line is not necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's unnecessary.

return errors.AddStack(kv.ErrInvalidTxn)
}

txn, err := txnFuture.Wait(p.ctx, p.sctx)
if err != nil {
return err
}

sessVars := p.sctx.GetSessionVars()
txn.SetVars(sessVars.KVVars)
txn.SetOption(kv.IsStalenessReadOnly, true)
txn.SetOption(kv.TxnScope, txnScope)
sessiontxn.SetTxnAssertionLevel(txn, sessVars.AssertionLevel)
is, err := GetSessionSnapshotInfoSchema(p.sctx, p.ts)
if err != nil {
return errors.Trace(err)
}
sessVars.TxnCtx = &variable.TransactionContext{
TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{
InfoSchema: is,
CreateTime: time.Now(),
StartTS: txn.StartTS(),
ShardStep: int(sessVars.ShardAllocateStep),
IsStaleness: true,
TxnScope: txnScope,
},
}
txn.SetOption(kv.SnapInterceptor, temptable.SessionSnapshotInterceptor(p.sctx))

p.is = is
err = p.sctx.GetSessionVars().SetSystemVar(variable.TiDBSnapshot, "")

return err
}

func (p *StalenessTxnContextProvider) enterNewStaleTxnWithReplaceProvider() error {
Expand Down
Loading