Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix SelectForUpdate in decorrelated subquery under pessimistic mode #22372

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 14 additions & 28 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"math"
"runtime/trace"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -620,8 +619,9 @@ func UpdateForUpdateTS(seCtx sessionctx.Context, newForUpdateTS uint64) error {
}

// handlePessimisticLockError updates TS and rebuild executor if the err is write conflict.
func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) {
sessVars := a.Ctx.GetSessionVars()
func handlePessimisticLockError(ctx context.Context, sctx sessionctx.Context, err error,
retryCount *uint, retryStartTime *time.Time, execBuilder func() (Executor, error)) (Executor, error) {
sessVars := sctx.GetSessionVars()
if err != nil && sessVars.IsIsolation(ast.Serializable) {
return nil, err
}
Expand Down Expand Up @@ -657,53 +657,39 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
// select for update key1 again(this time lock succ(maybe lock released by others))
// the async rollback operation rollbacked the lock just acquired
if err != nil {
tsErr := UpdateForUpdateTS(a.Ctx, 0)
tsErr := UpdateForUpdateTS(sctx, 0)
if tsErr != nil {
logutil.Logger(ctx).Warn("UpdateForUpdateTS failed", zap.Error(tsErr))
}
}
return nil, err
}
if a.retryCount >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount {
if *retryCount >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount {
return nil, errors.New("pessimistic lock retry limit reached")
}
a.retryCount++
a.retryStartTime = time.Now()
err = UpdateForUpdateTS(a.Ctx, newForUpdateTS)
*retryCount = *retryCount + 1
*retryStartTime = time.Now()
err = UpdateForUpdateTS(sctx, newForUpdateTS)
if err != nil {
return nil, err
}
e, err := a.buildExecutor()
e, err := execBuilder()
if err != nil {
return nil, err
}
// Rollback the statement change before retry it.
a.Ctx.StmtRollback()
a.Ctx.GetSessionVars().StmtCtx.ResetForRetry()
a.Ctx.GetSessionVars().RetryInfo.ResetOffset()
sctx.StmtRollback()
sctx.GetSessionVars().StmtCtx.ResetForRetry()
sctx.GetSessionVars().RetryInfo.ResetOffset()

if err = e.Open(ctx); err != nil {
return nil, err
}
return e, nil
}

func extractConflictCommitTS(errStr string) uint64 {
strs := strings.Split(errStr, "conflictCommitTS=")
if len(strs) != 2 {
return 0
}
tsPart := strs[1]
length := strings.IndexByte(tsPart, ',')
if length < 0 {
return 0
}
tsStr := tsPart[:length]
ts, err := strconv.ParseUint(tsStr, 10, 64)
if err != nil {
return 0
}
return ts
func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) {
return handlePessimisticLockError(ctx, a.Ctx, err, &a.retryCount, &a.retryStartTime, a.buildExecutor)
}

type pessimisticTxn interface {
Expand Down
56 changes: 41 additions & 15 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,26 @@ func (e *LimitExec) adjustRequiredRows(chk *chunk.Chunk) *chunk.Chunk {
return chk.SetRequiredRows(mathutil.Min(limitTotal, limitRequired), e.maxChunkSize)
}

type execCtx struct {
p plannercore.PhysicalPlan

builder *executorBuilder
exec Executor
retryCount uint
retryStartTime time.Time
is infoschema.InfoSchema
sctx sessionctx.Context
}

func (c *execCtx) buildExecutor() (Executor, error) {
c.builder = &executorBuilder{is: c.is, ctx: c.sctx}
c.exec = c.builder.build(c.p)
if c.builder.err != nil {
return nil, c.builder.err
}
return c.exec, nil
}

func init() {
// While doing optimization in the plan package, we need to execute uncorrelated subquery,
// but the plan package cannot import the executor package because of the dependency cycle.
Expand All @@ -1140,28 +1160,34 @@ func init() {
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

e := &executorBuilder{is: is, ctx: sctx}
exec := e.build(p)
if e.err != nil {
return nil, e.err
}
err := exec.Open(ctx)
defer terror.Call(exec.Close)
c := &execCtx{sctx: sctx, is: is, p: p}
_, err := c.buildExecutor()
if err != nil {
return nil, err
}
chk := newFirstChunk(exec)

err = Next(ctx, exec, chk)
err = c.exec.Open(ctx)
defer terror.Call(c.exec.Close)
if err != nil {
return nil, err
}
if chk.NumRows() == 0 {
return nil, nil
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it execute the whole query inside EvalSubqueryFirstRow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, it only execute subplan subquery for example:

select v from t where id = 1 for update of select (select v from t where id = 1 for update) v1, (select v from t where id = 2) from dual

chk := newFirstChunk(c.exec)
err = Next(ctx, c.exec, chk)
if err != nil {
if sctx.GetSessionVars().TxnCtx.IsPessimistic && c.builder.hasLock {
_, err = handlePessimisticLockError(ctx, sctx, err, &c.retryCount, &c.retryStartTime, c.buildExecutor)
if err != nil {
return nil, err
}
continue
}
return nil, err
}
if chk.NumRows() == 0 {
return nil, nil
}
return chk.GetRow(0).GetDatumRow(retTypes(c.exec)), nil
}
row := chk.GetRow(0).GetDatumRow(retTypes(exec))
return row, err
}
}

Expand Down
14 changes: 14 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1305,6 +1305,20 @@ func (s *testPessimisticSuite) TestBatchPointGetAlreadyLocked(c *C) {
tk.MustExec("commit")
}

func (s *testPessimisticSuite) TestSubQueryForUpdate(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("use test")
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk2.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t(id int primary key, v int)")
tk.MustExec("insert into t values(1, 10), (2, 20)")
tk.MustExec("begin pessimistic")
tk.MustQuery("select v from t").Check(testkit.Rows("10", "20"))
tk2.MustExec("update t set v = v * 10")
tk.MustQuery("select (select v from t where id = 1 for update) v1, (select v from t where id = 2) from dual").Check(testkit.Rows("100 20"))
}

func (s *testPessimisticSuite) TestRollbackWakeupBlockedTxn(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
Expand Down