Skip to content

Commit

Permalink
etcd (ticdc): fix a data race in unit test (#4551) (#4576)
Browse files Browse the repository at this point in the history
close #4549
  • Loading branch information
ti-chi-bot authored Jun 22, 2022
1 parent 0eb48ae commit 601781b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
33 changes: 17 additions & 16 deletions pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"io/ioutil"
"os"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -51,23 +52,23 @@ func (m *mockClient) Txn(ctx context.Context) clientv3.Txn {
type mockWatcher struct {
clientv3.Watcher
watchCh chan clientv3.WatchResponse
resetCount *int
requestCount *int
resetCount *int32
requestCount *int32
rev *int64
}

func (m mockWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
*m.resetCount++
atomic.AddInt32(m.resetCount, 1)
op := &clientv3.Op{}
for _, opt := range opts {
opt(op)
}
*m.rev = op.Rev()
atomic.StoreInt64(m.rev, op.Rev())
return m.watchCh
}

func (m mockWatcher) RequestProgress(ctx context.Context) error {
*m.requestCount++
atomic.AddInt32(m.requestCount, 1)
return nil
}

Expand Down Expand Up @@ -153,8 +154,8 @@ func TestDelegateLease(t *testing.T) {
// test no data lost when WatchCh blocked
func TestWatchChBlocked(t *testing.T) {
cli := clientv3.NewCtxClient(context.TODO())
resetCount := 0
requestCount := 0
resetCount := int32(0)
requestCount := int32(0)
rev := int64(0)
watchCh := make(chan clientv3.WatchResponse, 1)
watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev}
Expand Down Expand Up @@ -204,18 +205,18 @@ func TestWatchChBlocked(t *testing.T) {

require.Equal(t, sentRes, receivedRes)
// make sure watchCh has been reset since timeout
require.True(t, *watcher.resetCount > 1)
require.True(t, atomic.LoadInt32(watcher.resetCount) > 1)
// make sure RequestProgress has been call since timeout
require.True(t, *watcher.requestCount > 1)
require.True(t, atomic.LoadInt32(watcher.requestCount) > 1)
// make sure etcdRequestProgressDuration is less than etcdWatchChTimeoutDuration
require.Less(t, etcdRequestProgressDuration, etcdWatchChTimeoutDuration)
}

// test no data lost when OutCh blocked
func TestOutChBlocked(t *testing.T) {
cli := clientv3.NewCtxClient(context.TODO())
resetCount := 0
requestCount := 0
resetCount := int32(0)
requestCount := int32(0)
rev := int64(0)
watchCh := make(chan clientv3.WatchResponse, 1)
watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev}
Expand Down Expand Up @@ -266,8 +267,8 @@ func TestOutChBlocked(t *testing.T) {
func TestRevisionNotFallBack(t *testing.T) {
cli := clientv3.NewCtxClient(context.TODO())

resetCount := 0
requestCount := 0
resetCount := int32(0)
requestCount := int32(0)
rev := int64(0)
watchCh := make(chan clientv3.WatchResponse, 1)
watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev}
Expand Down Expand Up @@ -301,11 +302,11 @@ func TestRevisionNotFallBack(t *testing.T) {
// move time forward
mockClock.Add(time.Second * 30)
// make sure watchCh has been reset since timeout
require.True(t, *watcher.resetCount > 1)
// make suer revision in WatchWitchChan does not fall back
require.True(t, atomic.LoadInt32(watcher.resetCount) > 1)
// make sure revision in WatchWitchChan does not fall back
// even if there has not any response been received from WatchCh
// while WatchCh was reset
require.Equal(t, *watcher.rev, revision)
require.Equal(t, atomic.LoadInt64(watcher.rev), revision)
}

type mockTxn struct {
Expand Down
5 changes: 2 additions & 3 deletions pkg/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/pkg/logutil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)

type Captures []*model.CaptureInfo
Expand Down

0 comments on commit 601781b

Please sign in to comment.