Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: remove NewTxn and NewStaleTxnWithStartTS in session #35885

Merged
merged 24 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 5 additions & 104 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ import (
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
tikvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -2460,19 +2459,6 @@ func (s *session) DropPreparedStmt(stmtID uint32) error {
return nil
}

// setTxnAssertionLevel sets assertion level of a transactin. Note that assertion level should be set only once just
// after creating a new transaction.
func setTxnAssertionLevel(txn kv.Transaction, assertionLevel variable.AssertionLevel) {
switch assertionLevel {
case variable.AssertionLevelOff:
txn.SetOption(kv.AssertionLevel, kvrpcpb.AssertionLevel_Off)
case variable.AssertionLevelFast:
txn.SetOption(kv.AssertionLevel, kvrpcpb.AssertionLevel_Fast)
case variable.AssertionLevelStrict:
txn.SetOption(kv.AssertionLevel, kvrpcpb.AssertionLevel_Strict)
}
}

func (s *session) Txn(active bool) (kv.Transaction, error) {
if !active {
return &s.txn, nil
Expand All @@ -2481,91 +2467,6 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
return &s.txn, err
}

func (s *session) NewTxn(ctx context.Context) error {
if err := s.checkBeforeNewTxn(ctx); err != nil {
return err
}
txn, err := s.store.Begin(tikv.WithTxnScope(s.sessionVars.CheckAndGetTxnScope()))
if err != nil {
return err
}
txn.SetVars(s.sessionVars.KVVars)
replicaReadType := s.GetSessionVars().GetReplicaRead()
if replicaReadType.IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, replicaReadType)
}
setTxnAssertionLevel(txn, s.sessionVars.AssertionLevel)
s.txn.changeInvalidToValid(txn)
is := s.GetDomainInfoSchema()
s.sessionVars.TxnCtx = &variable.TransactionContext{
TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{
InfoSchema: is,
CreateTime: time.Now(),
StartTS: txn.StartTS(),
ShardStep: int(s.sessionVars.ShardAllocateStep),
IsStaleness: false,
TxnScope: s.sessionVars.CheckAndGetTxnScope(),
},
}
s.txn.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor())
if s.GetSessionVars().InRestrictedSQL {
s.txn.SetOption(kv.RequestSourceInternal, true)
if source := ctx.Value(kv.RequestSourceKey); source != nil {
s.txn.SetOption(kv.RequestSourceType, source.(kv.RequestSource).RequestSourceType)
}
}
return nil
}

func (s *session) checkBeforeNewTxn(ctx context.Context) error {
if s.txn.Valid() {
txnStartTS := s.txn.StartTS()
txnScope := s.GetSessionVars().TxnCtx.TxnScope
err := s.CommitTxn(ctx)
if err != nil {
return err
}
logutil.Logger(ctx).Info("Try to create a new txn inside a transaction auto commit",
zap.Int64("schemaVersion", s.GetInfoSchema().SchemaMetaVersion()),
zap.Uint64("txnStartTS", txnStartTS),
zap.String("txnScope", txnScope))
}
return nil
}

// NewStaleTxnWithStartTS create a transaction with the given StartTS.
func (s *session) NewStaleTxnWithStartTS(ctx context.Context, startTS uint64) error {
if err := s.checkBeforeNewTxn(ctx); err != nil {
return err
}
txnScope := kv.GlobalTxnScope
txn, err := s.store.Begin(tikv.WithTxnScope(txnScope), tikv.WithStartTS(startTS))
if err != nil {
return err
}
txn.SetVars(s.sessionVars.KVVars)
txn.SetOption(kv.IsStalenessReadOnly, true)
txn.SetOption(kv.TxnScope, txnScope)
setTxnAssertionLevel(txn, s.sessionVars.AssertionLevel)
s.txn.changeInvalidToValid(txn)
is, err := getSnapshotInfoSchema(s, txn.StartTS())
if err != nil {
return errors.Trace(err)
}
s.sessionVars.TxnCtx = &variable.TransactionContext{
TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{
InfoSchema: is,
CreateTime: time.Now(),
StartTS: txn.StartTS(),
ShardStep: int(s.sessionVars.ShardAllocateStep),
IsStaleness: true,
TxnScope: txnScope,
},
}
s.txn.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor())
return nil
}

func (s *session) SetValue(key fmt.Stringer, value interface{}) {
s.mu.Lock()
s.mu.values[key] = value
Expand Down Expand Up @@ -3137,7 +3038,9 @@ func (s *session) PrepareTSFuture(ctx context.Context, future oracle.Future, sco
}

failpoint.Inject("assertTSONotRequest", func() {
panic("tso shouldn't be requested")
if _, ok := future.(sessiontxn.ConstantFuture); !ok {
panic("tso shouldn't be requested")
}
})

failpoint.InjectContext(ctx, "mockGetTSFail", func() {
Expand All @@ -3152,6 +3055,8 @@ func (s *session) PrepareTSFuture(ctx context.Context, future oracle.Future, sco
return nil
}

// GetPreparedTxnFuture returns the TxnFuture if it is valid or pending.
// It returns nil otherwise.
func (s *session) GetPreparedTxnFuture() sessionctx.TxnFuture {
if !s.txn.validOrPending() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this? I think it is reasonable if a txn is not prepared the TxnFuture should return a nil value.

return nil
Expand Down Expand Up @@ -3448,10 +3353,6 @@ func (s *session) BuiltinFunctionUsageInc(scalarFuncSigName string) {
s.functionUsageMu.builtinFunctionUsage.Inc(scalarFuncSigName)
}

func (s *session) getSnapshotInterceptor() kv.SnapshotInterceptor {
return temptable.SessionSnapshotInterceptor(s)
}

func (s *session) GetStmtStats() *stmtstats.StatementStats {
return s.stmtStats
}
Expand Down
16 changes: 0 additions & 16 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,22 +240,6 @@ func (txn *LazyTxn) GetOption(opt int) interface{} {
return txn.Transaction.GetOption(opt)
}

func (txn *LazyTxn) changeInvalidToValid(kvTxn kv.Transaction) {
txn.Transaction = kvTxn
txn.initStmtBuf()
txn.txnFuture = nil

txn.mu.Lock()
defer txn.mu.Unlock()
txn.resetTxnInfo(
kvTxn.StartTS(),
txninfo.TxnIdle,
uint64(txn.Transaction.Len()),
uint64(txn.Transaction.Size()),
"",
nil)
}

func (txn *LazyTxn) changeToPending(future *txnFuture) {
txn.Transaction = nil
txn.txnFuture = future
Expand Down
9 changes: 2 additions & 7 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ type SessionStatesHandler interface {
// Context is an interface for transaction and executive args environment.
type Context interface {
SessionStatesHandler
// NewTxn creates a new transaction for further execution.
// If old transaction is valid, it is committed first.
// It's used in BEGIN statement and DDL statements to commit old transaction.
NewTxn(context.Context) error
// NewStaleTxnWithStartTS initializes a staleness transaction with the given StartTS.
NewStaleTxnWithStartTS(ctx context.Context, startTS uint64) error
// SetDiskFullOpt set the disk full opt when tikv disk full happened.
SetDiskFullOpt(level kvrpcpb.DiskFullOpt)
// RollbackTxn rolls back the current transaction.
Expand Down Expand Up @@ -149,7 +143,8 @@ type Context interface {
HasLockedTables() bool
// PrepareTSFuture uses to prepare timestamp by future.
PrepareTSFuture(ctx context.Context, future oracle.Future, scope string) error
// GetPreparedTxnFuture returns the prepared ts future
// GetPreparedTxnFuture returns the TxnFuture if it is valid or pending.
// It returns nil otherwise.
GetPreparedTxnFuture() TxnFuture
// StoreIndexUsage stores the index usage information.
StoreIndexUsage(tblID int64, idxID int64, rowsSelected int64)
Expand Down
45 changes: 39 additions & 6 deletions sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,26 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn
activeNow := true
switch tp {
case sessiontxn.EnterNewTxnDefault:
if err = p.sctx.NewTxn(ctx); err != nil {
// As we will enter a new txn, we need to commit the old txn if it's still valid.
// There are two main steps here to enter a new txn:
// 1. prepareTxnWithOracleTS
// 2. ActivateTxn
if err := sessiontxn.CommitBeforeEnterNewTxn(p.ctx, p.sctx); err != nil {
return err
}
if err := p.prepareTxnWithOracleTS(); err != nil {
return err
}
case sessiontxn.EnterNewTxnWithBeginStmt:
if !sessiontxn.CanReuseTxnWhenExplicitBegin(p.sctx) {
if err = p.sctx.NewTxn(ctx); err != nil {
// As we will enter a new txn, we need to commit the old txn if it's still valid.
// There are two main steps here to enter a new txn:
// 1. prepareTxnWithOracleTS
// 2. ActivateTxn
if err := sessiontxn.CommitBeforeEnterNewTxn(p.ctx, p.sctx); err != nil {
return err
}
if err := p.prepareTxnWithOracleTS(); err != nil {
return err
}
}
Expand Down Expand Up @@ -111,17 +125,20 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn
return err
}

// GetTxnInfoSchema returns the information schema used by txn
func (p *baseTxnContextProvider) GetTxnInfoSchema() infoschema.InfoSchema {
if is := p.sctx.GetSessionVars().SnapshotInfoschema; is != nil {
return is.(infoschema.InfoSchema)
}
return p.infoSchema
}

// GetTxnScope returns the current txn scope
func (p *baseTxnContextProvider) GetTxnScope() string {
return p.sctx.GetSessionVars().TxnCtx.TxnScope
}

// GetReadReplicaScope returns the read replica scope
func (p *baseTxnContextProvider) GetReadReplicaScope() string {
if txnScope := p.GetTxnScope(); txnScope != kv.GlobalTxnScope && txnScope != "" {
// In local txn, we should use txnScope as the readReplicaScope
Expand All @@ -137,6 +154,7 @@ func (p *baseTxnContextProvider) GetReadReplicaScope() string {
return kv.GlobalReplicaScope
}

//GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update)
func (p *baseTxnContextProvider) GetStmtReadTS() (uint64, error) {
if _, err := p.ActivateTxn(); err != nil {
return 0, err
Expand All @@ -148,6 +166,7 @@ func (p *baseTxnContextProvider) GetStmtReadTS() (uint64, error) {
return p.getStmtReadTSFunc()
}

// GetStmtForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update
func (p *baseTxnContextProvider) GetStmtForUpdateTS() (uint64, error) {
if _, err := p.ActivateTxn(); err != nil {
return 0, err
Expand All @@ -159,16 +178,19 @@ func (p *baseTxnContextProvider) GetStmtForUpdateTS() (uint64, error) {
return p.getStmtForUpdateTSFunc()
}

// OnStmtStart is the hook that should be called when a new statement started
func (p *baseTxnContextProvider) OnStmtStart(ctx context.Context, _ ast.StmtNode) error {
p.ctx = ctx
return nil
}

// OnStmtRetry is the hook that should be called when a statement is retried internally.
func (p *baseTxnContextProvider) OnStmtRetry(ctx context.Context) error {
p.ctx = ctx
return nil
}

// OnStmtErrorForNextAction is the hook that should be called when a new statement get an error
func (p *baseTxnContextProvider) OnStmtErrorForNextAction(point sessiontxn.StmtErrorHandlePoint, err error) (sessiontxn.StmtErrorAction, error) {
switch point {
case sessiontxn.StmtErrAfterPessimisticLock:
Expand All @@ -187,6 +209,7 @@ func (p *baseTxnContextProvider) getTxnStartTS() (uint64, error) {
return txn.StartTS(), nil
}

// ActivateTxn activates the transaction and set the relevant context variables.
func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
if p.txn != nil {
return p.txn, nil
Expand All @@ -197,10 +220,6 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
}

txnFuture := p.sctx.GetPreparedTxnFuture()
if txnFuture == nil {
return nil, errors.AddStack(kv.ErrInvalidTxn)
}

txn, err := txnFuture.Wait(p.ctx, p.sctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -247,6 +266,8 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
return txn, nil
}

// prepareTxn prepares txn with an oracle ts future. If the snapshotTS is set,
// the txn is prepared with it.
func (p *baseTxnContextProvider) prepareTxn() error {
if p.isTxnPrepared {
return nil
Expand All @@ -260,6 +281,18 @@ func (p *baseTxnContextProvider) prepareTxn() error {
return p.replaceTxnTsFuture(future)
}

// prepareTxnWithOracleTS
// The difference between prepareTxnWithOracleTS and prepareTxn is that prepareTxnWithOracleTS
// does not consider snapshotTS
func (p *baseTxnContextProvider) prepareTxnWithOracleTS() error {
if p.isTxnPrepared {
return nil
}

future := sessiontxn.NewOracleFuture(p.ctx, p.sctx, p.sctx.GetSessionVars().TxnCtx.TxnScope)
return p.replaceTxnTsFuture(future)
}

func (p *baseTxnContextProvider) prepareTxnWithTS(ts uint64) error {
return p.replaceTxnTsFuture(sessiontxn.ConstantFuture(ts))
}
Expand Down
10 changes: 5 additions & 5 deletions sessiontxn/isolation/readcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,14 @@ func (p *PessimisticRCTxnContextProvider) handleAfterPessimisticLockError(lockEr

// AdviseWarmup provides warmup for inner state
func (p *PessimisticRCTxnContextProvider) AdviseWarmup() error {
if p.isTidbSnapshotEnabled() {
return nil
}

if err := p.prepareTxn(); err != nil {
return err
}
p.prepareStmtTS()

if !p.isTidbSnapshotEnabled() {
p.prepareStmtTS()
}

return nil
}

Expand Down
Loading