Skip to content

Commit

Permalink
Merge pull request #386 from nats-io/fix_flappers
Browse files Browse the repository at this point in the history
Fixing some flapping tests
  • Loading branch information
kozlovic authored Aug 24, 2018
2 parents 510d9e6 + 6d6e0c9 commit 4efd79a
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 59 deletions.
52 changes: 29 additions & 23 deletions test/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package test

import (
"bytes"
"fmt"
"math"
"regexp"
"runtime"
Expand Down Expand Up @@ -43,12 +44,14 @@ func TestCloseLeakingGoRoutines(t *testing.T) {

// Give time for things to settle before capturing the number of
// go routines
time.Sleep(500 * time.Millisecond)
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
delta := (runtime.NumGoroutine() - base)
if delta > 0 {
return fmt.Errorf("%d Go routines still exist post Close()", delta)
}
return nil
})

delta := (runtime.NumGoroutine() - base)
if delta > 0 {
t.Fatalf("%d Go routines still exist post Close()", delta)
}
// Make sure we can call Close() multiple times
nc.Close()
}
Expand All @@ -68,12 +71,13 @@ func TestLeakingGoRoutinesOnFailedConnect(t *testing.T) {

// Give time for things to settle before capturing the number of
// go routines
time.Sleep(500 * time.Millisecond)

delta := (runtime.NumGoroutine() - base)
if delta > 0 {
t.Fatalf("%d Go routines still exist post Close()", delta)
}
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
delta := (runtime.NumGoroutine() - base)
if delta > 0 {
return fmt.Errorf("%d Go routines still exist post Close()", delta)
}
return nil
})
}

func TestConnectedServer(t *testing.T) {
Expand Down Expand Up @@ -258,12 +262,13 @@ func TestAsyncSubscribeRoutineLeakOnUnsubscribe(t *testing.T) {

// Give time for things to settle before capturing the number of
// go routines
time.Sleep(500 * time.Millisecond)

delta := (runtime.NumGoroutine() - base)
if delta > 0 {
t.Fatalf("%d Go routines still exist post Unsubscribe()", delta)
}
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
delta := (runtime.NumGoroutine() - base)
if delta > 0 {
return fmt.Errorf("%d Go routines still exist post Unsubscribe()", delta)
}
return nil
})
}

func TestAsyncSubscribeRoutineLeakOnClose(t *testing.T) {
Expand Down Expand Up @@ -304,12 +309,13 @@ func TestAsyncSubscribeRoutineLeakOnClose(t *testing.T) {

// Give time for things to settle before capturing the number of
// go routines
time.Sleep(500 * time.Millisecond)

delta := (runtime.NumGoroutine() - base)
if delta > 0 {
t.Fatalf("%d Go routines still exist post Close()", delta)
}
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
delta := (runtime.NumGoroutine() - base)
if delta > 0 {
return fmt.Errorf("%d Go routines still exist post Close()", delta)
}
return nil
})
}

func TestSyncSubscribe(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions test/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,9 +1771,9 @@ func TestBarrier(t *testing.T) {
if err := nc.Publish("bar", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
if err := nc.Flush(); err != nil {
t.Fatalf("Error on flush: %v", err)
}
// This could fail if the connection is closed before we get
// here.
nc.Flush()
if err := Wait(ch); err != nil {
t.Fatal("Barrier function was not invoked")
}
Expand Down
30 changes: 15 additions & 15 deletions test/netchan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package test

import (
"fmt"
"runtime"
"testing"
"time"
Expand Down Expand Up @@ -265,13 +266,13 @@ func TestRecvChanAsyncLeakGoRoutines(t *testing.T) {
ec.Publish("foo", 22)
ec.Flush()

time.Sleep(100 * time.Millisecond)

delta := (runtime.NumGoroutine() - before)

if delta > 0 {
t.Fatalf("Leaked Go routine(s) : %d, closing channel should have closed them\n", delta)
}
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
delta := (runtime.NumGoroutine() - before)
if delta > 0 {
return fmt.Errorf("Leaked Go routine(s) : %d, closing channel should have closed them", delta)
}
return nil
})
}

func TestRecvChanLeakGoRoutines(t *testing.T) {
Expand All @@ -294,14 +295,13 @@ func TestRecvChanLeakGoRoutines(t *testing.T) {
}
sub.Unsubscribe()

// Sleep a bit to wait for the Go routine to exit.
time.Sleep(500 * time.Millisecond)

delta := (runtime.NumGoroutine() - before)

if delta > 0 {
t.Fatalf("Leaked Go routine(s) : %d, closing channel should have closed them\n", delta)
}
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
delta := (runtime.NumGoroutine() - before)
if delta > 0 {
return fmt.Errorf("Leaked Go routine(s) : %d, closing channel should have closed them", delta)
}
return nil
})
}

func TestRecvChanMultipleMessages(t *testing.T) {
Expand Down
41 changes: 23 additions & 18 deletions test/sub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ func TestServerAutoUnsub(t *testing.T) {
if err := sub.AutoUnsubscribe(10); err == nil {
t.Fatal("Calling AutoUnsubscribe() on closed subscription should fail")
}
delta := (runtime.NumGoroutine() - base)
if delta > 0 {
t.Fatalf("%d Go routines still exist post max subscriptions hit", delta)
}
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
delta := (runtime.NumGoroutine() - base)
if delta > 0 {
return fmt.Errorf("%d Go routines still exist post max subscriptions hit", delta)
}
return nil
})
}

func TestClientSyncAutoUnsub(t *testing.T) {
Expand Down Expand Up @@ -307,13 +310,14 @@ func TestAutoUnsubscribeFromCallback(t *testing.T) {
nc.Publish("foo", msg)
nc.Flush()

time.Sleep(100 * time.Millisecond)

recv := atomic.LoadInt64(&received)
if recv != resetUnsubMark {
t.Fatalf("Wrong number of received messages. Original max was %v reset to %v, actual received: %v",
max, resetUnsubMark, recv)
}
waitFor(t, time.Second, 100*time.Millisecond, func() error {
recv := atomic.LoadInt64(&received)
if recv != resetUnsubMark {
return fmt.Errorf("Wrong number of received messages. Original max was %v reset to %v, actual received: %v",
max, resetUnsubMark, recv)
}
return nil
})

// Now check with AutoUnsubscribe with higher value than original
received = int64(0)
Expand Down Expand Up @@ -341,13 +345,14 @@ func TestAutoUnsubscribeFromCallback(t *testing.T) {
nc.Publish("foo", msg)
nc.Flush()

time.Sleep(100 * time.Millisecond)

recv = atomic.LoadInt64(&received)
if recv != newMax {
t.Fatalf("Wrong number of received messages. Original max was %v reset to %v, actual received: %v",
max, newMax, recv)
}
waitFor(t, time.Second, 100*time.Millisecond, func() error {
recv := atomic.LoadInt64(&received)
if recv != newMax {
return fmt.Errorf("Wrong number of received messages. Original max was %v reset to %v, actual received: %v",
max, newMax, recv)
}
return nil
})
}

func TestCloseSubRelease(t *testing.T) {
Expand Down

0 comments on commit 4efd79a

Please sign in to comment.