Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: use AtomicInt32 instead of int to fix races #8696

Merged
merged 2 commits into from
Aug 27, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"time"

"vitess.io/vitess/go/sync2"

"vitess.io/vitess/go/vt/topo"

vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
Expand Down Expand Up @@ -199,7 +201,6 @@ func TestVStreamEvents(t *testing.T) {
// TestVStreamChunks ensures that a transaction that's broken
// into chunks is sent together.
func TestVStreamChunks(t *testing.T) {
t.Skip("flaky test")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -222,8 +223,9 @@ func TestVStreamChunks(t *testing.T) {

rowEncountered := false
doneCounting := false
rowCount := 0
ddlCount := 0
var rowCount, ddlCount sync2.AtomicInt32
rowCount.Set(0)
ddlCount.Set(0)
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Expand All @@ -243,7 +245,7 @@ func TestVStreamChunks(t *testing.T) {
return fmt.Errorf("unexpected event: %v", events[0])
}
rowEncountered = true
rowCount++
rowCount.Add(1)
case binlogdatapb.VEventType_COMMIT:
if !rowEncountered {
t.Errorf("Unexpected event, COMMIT after non-rows: %v", events[0])
Expand All @@ -255,22 +257,18 @@ func TestVStreamChunks(t *testing.T) {
t.Errorf("Unexpected event, DDL during ROW events: %v", events[0])
return fmt.Errorf("unexpected event: %v", events[0])
}
ddlCount++
ddlCount.Add(1)
default:
t.Errorf("Unexpected event: %v", events[0])
return fmt.Errorf("unexpected event: %v", events[0])
}
if rowCount == 100 && ddlCount == 100 {
if rowCount.Get() == int32(100) && ddlCount.Get() == int32(100) {
cancel()
}
return nil
})
if rowCount != 100 {
t.Errorf("rowCount: %d, want 100", rowCount)
}
if ddlCount != 100 {
t.Errorf("ddlCount: %d, want 100", ddlCount)
}
assert.Equal(t, int32(100), rowCount.Get())
assert.Equal(t, int32(100), ddlCount.Get())
}

func TestVStreamMulti(t *testing.T) {
Expand Down Expand Up @@ -336,7 +334,6 @@ func TestVStreamMulti(t *testing.T) {
}

func TestVStreamRetry(t *testing.T) {
t.Skip()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -358,7 +355,8 @@ func TestVStreamRetry(t *testing.T) {
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "bb"))
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc"))
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error"))
count := 0
var count sync2.AtomicInt32
count.Set(0)
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Expand All @@ -367,15 +365,15 @@ func TestVStreamRetry(t *testing.T) {
}},
}
err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
count++
count.Add(1)
return nil
})
wantErr := "final error"
if err == nil || !strings.Contains(err.Error(), wantErr) {
t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr)
}
time.Sleep(100 * time.Millisecond) // wait for goroutine within VStream to finish
assert.Equal(t, 2, count)
assert.Equal(t, int32(2), count.Get())
}

func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) {
Expand Down