diff --git a/tools/functional-tester/etcd-runner/command/election_command.go b/tools/functional-tester/etcd-runner/command/election_command.go index e45c9ec1195..24610187d90 100644 --- a/tools/functional-tester/etcd-runner/command/election_command.go +++ b/tools/functional-tester/etcd-runner/command/election_command.go @@ -89,14 +89,14 @@ func runElectionFunc(cmd *cobra.Command, args []string) { } }() err = e.Campaign(ctx, v) + cancel() + <-donec if err == nil { observedLeader = v } if observedLeader == v { validateWaiters = len(rcs) } - cancel() - <-donec select { case <-ctx.Done(): return nil @@ -129,8 +129,10 @@ func runElectionFunc(cmd *cobra.Command, args []string) { return err } if observedLeader == v { - close(nextc) + oldNextc := nextc nextc = make(chan struct{}) + close(oldNextc) + } <-rcNextc observedLeader = "" diff --git a/tools/functional-tester/etcd-runner/command/global.go b/tools/functional-tester/etcd-runner/command/global.go index 1d26f2047ed..4b7ac6eed9f 100644 --- a/tools/functional-tester/etcd-runner/command/global.go +++ b/tools/functional-tester/etcd-runner/command/global.go @@ -56,7 +56,6 @@ func newClient(eps []string, timeout time.Duration) *clientv3.Client { } func doRounds(rcs []roundClient, rounds int, requests int) { - var mu sync.Mutex var wg sync.WaitGroup wg.Add(len(rcs)) @@ -73,22 +72,16 @@ func doRounds(rcs []roundClient, rounds int, requests int) { for rc.acquire() != nil { /* spin */ } - mu.Lock() if err := rc.validate(); err != nil { log.Fatal(err) } - mu.Unlock() time.Sleep(10 * time.Millisecond) rc.progress++ finished <- struct{}{} - mu.Lock() for rc.release() != nil { /* spin */ - mu.Unlock() - mu.Lock() } - mu.Unlock() } }(&rcs[i]) } diff --git a/tools/functional-tester/etcd-runner/command/lock_racer_command.go b/tools/functional-tester/etcd-runner/command/lock_racer_command.go index b0ec491f431..1560362282d 100644 --- a/tools/functional-tester/etcd-runner/command/lock_racer_command.go +++ b/tools/functional-tester/etcd-runner/command/lock_racer_command.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "sync" "github.com/coreos/etcd/clientv3/concurrency" @@ -47,6 +48,8 @@ func runRacerFunc(cmd *cobra.Command, args []string) { rcs := make([]roundClient, totalClientConnections) ctx := context.Background() + // mu ensures validate and release funcs are atomic. + var mu sync.Mutex cnt := 0 eps := endpointsFromFlag(cmd) @@ -69,12 +72,16 @@ func runRacerFunc(cmd *cobra.Command, args []string) { m := concurrency.NewMutex(s, racers) rcs[i].acquire = func() error { return m.Lock(ctx) } rcs[i].validate = func() error { + mu.Lock() + defer mu.Unlock() if cnt++; cnt != 1 { return fmt.Errorf("bad lock; count: %d", cnt) } return nil } rcs[i].release = func() error { + mu.Lock() + defer mu.Unlock() if err := m.Unlock(ctx); err != nil { return err }