From 8ce93488680bad73debe1303510e4afa8b4d8ec6 Mon Sep 17 00:00:00 2001 From: lysu Date: Wed, 13 Jan 2021 00:35:51 +0800 Subject: [PATCH] executor: fix SelectForUpdate in subquery return WriteConflictError in pessimistic mode Signed-off-by: lysu --- executor/adapter.go | 42 ++++++++++------------------ executor/executor.go | 56 +++++++++++++++++++++++++++---------- session/pessimistic_test.go | 14 ++++++++++ 3 files changed, 69 insertions(+), 43 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index a0ec99dc2d293..806ea518a6e7c 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -19,7 +19,6 @@ import ( "fmt" "math" "runtime/trace" - "strconv" "strings" "sync/atomic" "time" @@ -620,8 +619,9 @@ func UpdateForUpdateTS(seCtx sessionctx.Context, newForUpdateTS uint64) error { } // handlePessimisticLockError updates TS and rebuild executor if the err is write conflict. -func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) { - sessVars := a.Ctx.GetSessionVars() +func handlePessimisticLockError(ctx context.Context, sctx sessionctx.Context, err error, + retryCount *uint, retryStartTime *time.Time, execBuilder func() (Executor, error)) (Executor, error) { + sessVars := sctx.GetSessionVars() if err != nil && sessVars.IsIsolation(ast.Serializable) { return nil, err } @@ -657,30 +657,30 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E // select for update key1 again(this time lock succ(maybe lock released by others)) // the async rollback operation rollbacked the lock just acquired if err != nil { - tsErr := UpdateForUpdateTS(a.Ctx, 0) + tsErr := UpdateForUpdateTS(sctx, 0) if tsErr != nil { logutil.Logger(ctx).Warn("UpdateForUpdateTS failed", zap.Error(tsErr)) } } return nil, err } - if a.retryCount >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount { + if *retryCount >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount { return nil, errors.New("pessimistic lock retry limit reached") } - a.retryCount++ - a.retryStartTime = time.Now() - err = UpdateForUpdateTS(a.Ctx, newForUpdateTS) + *retryCount = *retryCount + 1 + *retryStartTime = time.Now() + err = UpdateForUpdateTS(sctx, newForUpdateTS) if err != nil { return nil, err } - e, err := a.buildExecutor() + e, err := execBuilder() if err != nil { return nil, err } // Rollback the statement change before retry it. - a.Ctx.StmtRollback() - a.Ctx.GetSessionVars().StmtCtx.ResetForRetry() - a.Ctx.GetSessionVars().RetryInfo.ResetOffset() + sctx.StmtRollback() + sctx.GetSessionVars().StmtCtx.ResetForRetry() + sctx.GetSessionVars().RetryInfo.ResetOffset() if err = e.Open(ctx); err != nil { return nil, err @@ -688,22 +688,8 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E return e, nil } -func extractConflictCommitTS(errStr string) uint64 { - strs := strings.Split(errStr, "conflictCommitTS=") - if len(strs) != 2 { - return 0 - } - tsPart := strs[1] - length := strings.IndexByte(tsPart, ',') - if length < 0 { - return 0 - } - tsStr := tsPart[:length] - ts, err := strconv.ParseUint(tsStr, 10, 64) - if err != nil { - return 0 - } - return ts +func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) { + return handlePessimisticLockError(ctx, a.Ctx, err, &a.retryCount, &a.retryStartTime, a.buildExecutor) } type pessimisticTxn interface { diff --git a/executor/executor.go b/executor/executor.go index 63f65f3d816b3..6d0a1d97aab8f 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1124,6 +1124,26 @@ func (e *LimitExec) adjustRequiredRows(chk *chunk.Chunk) *chunk.Chunk { return chk.SetRequiredRows(mathutil.Min(limitTotal, limitRequired), e.maxChunkSize) } +type execCtx struct { + p plannercore.PhysicalPlan + + builder *executorBuilder + exec Executor + retryCount uint + retryStartTime time.Time + is infoschema.InfoSchema + sctx sessionctx.Context +} + +func (c *execCtx) buildExecutor() (Executor, error) { + c.builder = &executorBuilder{is: c.is, ctx: c.sctx} + c.exec = c.builder.build(c.p) + if c.builder.err != nil { + return nil, c.builder.err + } + return c.exec, nil +} + func init() { // While doing optimization in the plan package, we need to execute uncorrelated subquery, // but the plan package cannot import the executor package because of the dependency cycle. @@ -1140,28 +1160,34 @@ func init() { defer span1.Finish() ctx = opentracing.ContextWithSpan(ctx, span1) } - - e := &executorBuilder{is: is, ctx: sctx} - exec := e.build(p) - if e.err != nil { - return nil, e.err - } - err := exec.Open(ctx) - defer terror.Call(exec.Close) + c := &execCtx{sctx: sctx, is: is, p: p} + _, err := c.buildExecutor() if err != nil { return nil, err } - chk := newFirstChunk(exec) - - err = Next(ctx, exec, chk) + err = c.exec.Open(ctx) + defer terror.Call(c.exec.Close) if err != nil { return nil, err } - if chk.NumRows() == 0 { - return nil, nil + for { + chk := newFirstChunk(c.exec) + err = Next(ctx, c.exec, chk) + if err != nil { + if sctx.GetSessionVars().TxnCtx.IsPessimistic && c.builder.hasLock { + _, err = handlePessimisticLockError(ctx, sctx, err, &c.retryCount, &c.retryStartTime, c.buildExecutor) + if err != nil { + return nil, err + } + continue + } + return nil, err + } + if chk.NumRows() == 0 { + return nil, nil + } + return chk.GetRow(0).GetDatumRow(retTypes(c.exec)), nil } - row := chk.GetRow(0).GetDatumRow(retTypes(exec)) - return row, err } } diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index f8ed54d4aeab8..b75a6d549d3b4 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -1305,6 +1305,20 @@ func (s *testPessimisticSuite) TestBatchPointGetAlreadyLocked(c *C) { tk.MustExec("commit") } +func (s *testPessimisticSuite) TestSubQueryForUpdate(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("use test") + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk2.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t(id int primary key, v int)") + tk.MustExec("insert into t values(1, 10), (2, 20)") + tk.MustExec("begin pessimistic") + tk.MustQuery("select v from t").Check(testkit.Rows("10", "20")) + tk2.MustExec("update t set v = v * 10") + tk.MustQuery("select (select v from t where id = 1 for update) v1, (select v from t where id = 2) from dual").Check(testkit.Rows("100 20")) +} + func (s *testPessimisticSuite) TestRollbackWakeupBlockedTxn(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk2 := testkit.NewTestKitWithInit(c, s.store)