Skip to content

Commit

Permalink
ddl: fix flaky test TestCancel & fix panic when we failed to get job …
Browse files Browse the repository at this point in the history
…in delivery2Worker (#53887)

close #36582
  • Loading branch information
D3Hunter authored Jun 11, 2024
1 parent d3dd254 commit 7a18952
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 31 deletions.
62 changes: 39 additions & 23 deletions pkg/ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ddl_test
import (
"fmt"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -28,17 +29,18 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/stretchr/testify/require"
atomicutil "go.uber.org/atomic"
)

type testCancelJob struct {
sql string
ok bool
cancelState any // model.SchemaState | []model.SchemaState
onJobBefore bool
onJobUpdate bool
prepareSQL []string
sql string
expectCancelled bool
cancelState any // model.SchemaState | []model.SchemaState
onJobBefore bool
onJobUpdate bool
prepareSQL []string
}

var allTestCase = []testCancelJob{
Expand Down Expand Up @@ -204,6 +206,14 @@ func cancelSuccess(rs *testkit.Result) bool {
}

func TestCancel(t *testing.T) {
var enterCnt, exitCnt atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDelivery2Worker", func(job *model.Job) { enterCnt.Add(1) })
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) { exitCnt.Add(1) })
waitDDLWorkerExited := func() {
require.Eventually(t, func() bool {
return enterCnt.Load() == exitCnt.Load()
}, 10*time.Second, 10*time.Millisecond)
}
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 100*time.Millisecond)
tk := testkit.NewTestKit(t, store)
tkCancel := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -244,28 +254,28 @@ func TestCancel(t *testing.T) {

hook := &callback.TestDDLCallback{Do: dom}
i := atomicutil.NewInt64(0)
cancel := atomicutil.NewBool(false)
canceled := atomicutil.NewBool(false)
cancelResult := atomicutil.NewBool(false)
cancelWhenReorgNotStart := atomicutil.NewBool(false)

hookFunc := func(job *model.Job) {
if testutil.TestMatchCancelState(t, job, allTestCase[i.Load()].cancelState, allTestCase[i.Load()].sql) && !cancel.Load() {
if testutil.TestMatchCancelState(t, job, allTestCase[i.Load()].cancelState, allTestCase[i.Load()].sql) && !canceled.Load() {
if !cancelWhenReorgNotStart.Load() && job.SchemaState == model.StateWriteReorganization && job.MayNeedReorg() && job.RowCount == 0 {
return
}
rs := tkCancel.MustQuery(fmt.Sprintf("admin cancel ddl jobs %d", job.ID))
cancelResult.Store(cancelSuccess(rs))
cancel.Store(true)
canceled.Store(true)
}
}
dom.DDL().SetHook(hook.Clone())

restHook := func(h *callback.TestDDLCallback) {
resetHook := func(h *callback.TestDDLCallback) {
h.OnJobRunBeforeExported = nil
h.OnJobUpdatedExported.Store(nil)
dom.DDL().SetHook(h.Clone())
}
registHook := func(h *callback.TestDDLCallback, onJobRunBefore bool) {
registerHook := func(h *callback.TestDDLCallback, onJobRunBefore bool) {
if onJobRunBefore {
h.OnJobRunBeforeExported = hookFunc
} else {
Expand All @@ -274,41 +284,47 @@ func TestCancel(t *testing.T) {
dom.DDL().SetHook(h.Clone())
}

waitDDLWorkerExited()
for j, tc := range allTestCase {
t.Logf("running test case %d: %s", j, tc.sql)
i.Store(int64(j))
msg := fmt.Sprintf("sql: %s, state: %s", tc.sql, tc.cancelState)
if tc.onJobBefore {
restHook(hook)
resetHook(hook)
for _, prepareSQL := range tc.prepareSQL {
tk.MustExec(prepareSQL)
}
cancel.Store(false)
waitDDLWorkerExited()
canceled.Store(false)
cancelWhenReorgNotStart.Store(true)
registHook(hook, true)
if tc.ok {
registerHook(hook, true)
if tc.expectCancelled {
tk.MustGetErrCode(tc.sql, errno.ErrCancelledDDLJob)
} else {
tk.MustExec(tc.sql)
}
if cancel.Load() {
require.Equal(t, tc.ok, cancelResult.Load(), msg)
waitDDLWorkerExited()
if canceled.Load() {
require.Equal(t, tc.expectCancelled, cancelResult.Load(), msg)
}
}
if tc.onJobUpdate {
restHook(hook)
resetHook(hook)
for _, prepareSQL := range tc.prepareSQL {
tk.MustExec(prepareSQL)
}
cancel.Store(false)
waitDDLWorkerExited()
canceled.Store(false)
cancelWhenReorgNotStart.Store(false)
registHook(hook, false)
if tc.ok {
registerHook(hook, false)
if tc.expectCancelled {
tk.MustGetErrCode(tc.sql, errno.ErrCancelledDDLJob)
} else {
tk.MustExec(tc.sql)
}
if cancel.Load() {
require.Equal(t, tc.ok, cancelResult.Load(), msg)
waitDDLWorkerExited()
if canceled.Load() {
require.Equal(t, tc.expectCancelled, cancelResult.Load(), msg)
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ func (d *ddl) delivery2LocalWorker(pool *workerPool, task *limitJobTask) {

// delivery2Worker owns the worker, need to put it back to the pool in this function.
func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) {
failpoint.InjectCall("beforeDelivery2Worker", job)
injectFailPointForGetJob(job)
jobID, involvedSchemaInfos := job.ID, job.GetInvolvingSchemaInfo()
s.runningJobs.add(jobID, involvedSchemaInfos)
Expand Down Expand Up @@ -561,18 +562,18 @@ func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model.
// job is already moved to history.
failpoint.InjectCall("beforeRefreshJob", job)
for {
job, err = s.sysTblMgr.GetJobByID(s.schCtx, job.ID)
job, err = s.sysTblMgr.GetJobByID(s.schCtx, jobID)
failpoint.InjectCall("mockGetJobByIDFail", &err)
if err == nil {
break
}

if err == systable.ErrNotFound {
logutil.DDLLogger().Info("job not found, might already finished",
zap.Int64("job_id", job.ID), zap.Stringer("state", job.State))
zap.Int64("job_id", jobID))
return
}
logutil.DDLLogger().Error("get job failed", zap.Error(err))
logutil.DDLLogger().Error("get job failed", zap.Int64("job_id", jobID), zap.Error(err))
select {
case <-s.schCtx.Done():
return
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/tests/adminpause/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_test(
"//pkg/errno",
"//pkg/parser/model",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"//pkg/util/sqlexec",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
7 changes: 4 additions & 3 deletions pkg/ddl/tests/adminpause/pause_cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -90,7 +91,7 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit
var isCancelled = &atomic.Bool{}
var cancelResultChn = make(chan []sqlexec.RecordSet, 1)
var cancelErrChn = make(chan error, 1)
var cancelFunc = func(jobType string) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRefreshJob", func(*model.Job) {
Logger.Debug("pauseAndCancelStmt: OnGetJobBeforeExported, ",
zap.String("Expected Schema State", stmtCase.schemaState.String()))

Expand All @@ -107,7 +108,7 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit

isCancelled.CompareAndSwap(false, true) // In case that it runs into this scope again and again
}
}
})
var verifyCancelResult = func(t *testing.T, adminCommandKit *testkit.TestKit) {
require.True(t, isCancelled.Load())

Expand All @@ -130,7 +131,6 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit
originalHook := dom.DDL().GetHook()

hook.OnJobRunBeforeExported = pauseFunc
hook.OnGetJobBeforeExported = cancelFunc
dom.DDL().SetHook(hook.Clone())

isPaused.Store(false)
Expand All @@ -152,6 +152,7 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit

// Release the hook, so that we could run the `rollbackStmts` successfully.
dom.DDL().SetHook(originalHook)
testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/beforeRefreshJob")

for _, rollbackStmt := range stmtCase.rollbackStmts {
// no care about the result here, since the `statement` could have been cancelled OR finished successfully.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ func TestMatchCancelState(t *testing.T, job *model.Job, cancelState any, sql str
switch v := cancelState.(type) {
case model.SchemaState:
if job.Type == model.ActionMultiSchemaChange {
msg := fmt.Sprintf("unexpected multi-schema change(sql: %s, cancel state: %s)", sql, v)
msg := fmt.Sprintf("unexpected multi-schema change(sql: %s, cancel state: %s, job: %s)", sql, v, job.String())
require.Failf(t, msg, "use []model.SchemaState as cancel states instead")
return false
}
return job.SchemaState == v
case SubStates: // For multi-schema change sub-jobs.
if job.MultiSchemaInfo == nil {
msg := fmt.Sprintf("not multi-schema change(sql: %s, cancel state: %v)", sql, v)
msg := fmt.Sprintf("not multi-schema change(sql: %s, cancel state: %v, job: %s)", sql, v, job.String())
require.Failf(t, msg, "use model.SchemaState as the cancel state instead")
return false
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/testkit/testfailpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ func EnableCall(t testing.TB, name string, fn any) {
require.NoError(t, failpoint.Disable(name))
})
}

// Disable disables fail-point.
func Disable(t testing.TB, name string) {
require.NoError(t, failpoint.Disable(name))
}

0 comments on commit 7a18952

Please sign in to comment.