Skip to content

Commit

Permalink
Fix the failpoints
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Nov 25, 2024
1 parent 7766d48 commit 88e113e
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 22 deletions.
24 changes: 12 additions & 12 deletions client/clients/tso/tso_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,11 @@ func (s *testTSODispatcherSuite) testStaticConcurrencyImpl(concurrency int) {
}

func (s *testTSODispatcherSuite) TestConcurrentRPC() {
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay", "return"))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherAlwaysCheckConcurrency", "return"))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeNoDelay", "return"))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/tsoDispatcherAlwaysCheckConcurrency", "return"))
defer func() {
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherAlwaysCheckConcurrency"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeNoDelay"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/tsoDispatcherAlwaysCheckConcurrency"))
}()

s.testStaticConcurrencyImpl(1)
Expand All @@ -289,11 +289,11 @@ func (s *testTSODispatcherSuite) TestBatchDelaying() {
ctx := context.Background()
s.option.SetTSOClientRPCConcurrency(2)

s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay", "return"))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoStreamSimulateEstimatedRPCLatency", `return("12ms")`))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeNoDelay", "return"))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/tsoStreamSimulateEstimatedRPCLatency", `return("12ms")`))
defer func() {
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoStreamSimulateEstimatedRPCLatency"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeNoDelay"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/tsoStreamSimulateEstimatedRPCLatency"))
}()

// Make sure concurrency option takes effect.
Expand All @@ -302,23 +302,23 @@ func (s *testTSODispatcherSuite) TestBatchDelaying() {
s.reqMustReady(req)

// Trigger the check.
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("6ms")`))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeAssertDelayDuration", `return("6ms")`))
defer func() {
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeAssertDelayDuration"))
}()
req = s.sendReq(ctx)
s.streamInner.generateNext()
s.reqMustReady(req)

// Try other concurrency.
s.option.SetTSOClientRPCConcurrency(3)
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("4ms")`))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeAssertDelayDuration", `return("4ms")`))
req = s.sendReq(ctx)
s.streamInner.generateNext()
s.reqMustReady(req)

s.option.SetTSOClientRPCConcurrency(4)
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("3ms")`))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeAssertDelayDuration", `return("3ms")`))
req = s.sendReq(ctx)
s.streamInner.generateNext()
s.reqMustReady(req)
Expand Down
16 changes: 8 additions & 8 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,10 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) {
go func() {
defer wg.Done()
leader := cluster.GetLeaderServer()
re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/unreachableNetwork", "return(true)"))
leader.Stop()
re.NotEmpty(cluster.WaitLeader())
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/unreachableNetwork"))
leaderReadyTime = time.Now()
}()
wg.Wait()
Expand Down Expand Up @@ -519,7 +519,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding1(
cli := setupCli(ctx, re, suite.endpoints, opt.WithForwardingOption(true))
defer cli.Close()

re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/unreachableNetwork", "return(true)"))
var lastTS uint64
testutil.Eventually(re, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
Expand All @@ -532,7 +532,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding1(
})

lastTS = checkTS(re, cli, lastTS)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/unreachableNetwork"))
time.Sleep(2 * time.Second)
checkTS(re, cli, lastTS)

Expand All @@ -554,7 +554,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding2(
cli := setupCli(ctx, re, suite.endpoints, opt.WithForwardingOption(true))
defer cli.Close()

re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/unreachableNetwork", "return(true)"))
var lastTS uint64
testutil.Eventually(re, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
Expand All @@ -571,7 +571,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding2(
re.NotEmpty(suite.cluster.WaitLeader())
lastTS = checkTS(re, cli, lastTS)

re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/unreachableNetwork"))
time.Sleep(5 * time.Second)
checkTS(re, cli, lastTS)
}
Expand Down Expand Up @@ -783,7 +783,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTSFuture() {
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()

re.NoError(failpoint.Enable("github.com/tikv/pd/client/shortDispatcherChannel", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/shortDispatcherChannel", "return(true)"))

cli := setupCli(ctx, re, suite.endpoints)

Expand Down Expand Up @@ -820,7 +820,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTSFuture() {
wg2.Wait()
wg3.Wait()
re.Less(time.Since(start), time.Second*2)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/shortDispatcherChannel"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/shortDispatcherChannel"))
}

func checkTS(re *require.Assertions, cli pd.Client, lastTS uint64) uint64 {
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() {

func (suite *tsoClientTestSuite) TestGetTSWhileResettingTSOClient() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/client/delayDispatchTSORequest", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/delayDispatchTSORequest", "return(true)"))
var (
stopSignal atomic.Bool
wg sync.WaitGroup
Expand Down Expand Up @@ -481,7 +481,7 @@ func (suite *tsoClientTestSuite) TestGetTSWhileResettingTSOClient() {
}
stopSignal.Store(true)
wg.Wait()
re.NoError(failpoint.Disable("github.com/tikv/pd/client/delayDispatchTSORequest"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/delayDispatchTSORequest"))
}

// When we upgrade the PD cluster, there may be a period of time that the old and new PDs are running at the same time.
Expand Down

0 comments on commit 88e113e

Please sign in to comment.