diff --git a/testbed/testbed/load_generator.go b/testbed/testbed/load_generator.go index 4b800da2f7a..717108573e8 100644 --- a/testbed/testbed/load_generator.go +++ b/testbed/testbed/load_generator.go @@ -30,8 +30,8 @@ import ( type LoadGenerator struct { exporter *jaeger.Exporter - TracesSent uint64 - SpansSent uint64 + tracesSent uint64 + spansSent uint64 stopOnce sync.Once stopWait sync.WaitGroup @@ -109,11 +109,11 @@ func (lg *LoadGenerator) Stop() { // GetStats returns the stats as a printable string. func (lg *LoadGenerator) GetStats() string { - return fmt.Sprintf("Sent:%5d spans", atomic.LoadUint64(&lg.SpansSent)) + return fmt.Sprintf("Sent:%5d spans", atomic.LoadUint64(&lg.spansSent)) } -func (lg *LoadGenerator) GetSpansSent() uint64 { - return atomic.LoadUint64(&lg.SpansSent) +func (lg *LoadGenerator) SpansSent() uint64 { + return atomic.LoadUint64(&lg.spansSent) } func (lg *LoadGenerator) generate() { @@ -142,12 +142,12 @@ func (lg *LoadGenerator) generate() { func (lg *LoadGenerator) generateTrace() { - traceID := atomic.AddUint64(&lg.TracesSent, 1) + traceID := atomic.AddUint64(&lg.tracesSent, 1) for i := uint(0); i < lg.options.SpansPerTrace; i++ { startTime := time.Now() - spanID := atomic.AddUint64(&lg.SpansSent, 1) + spanID := atomic.AddUint64(&lg.spansSent, 1) // Create a span. span := &trace.SpanData{ diff --git a/testbed/testbed/mock_backend_test.go b/testbed/testbed/mock_backend_test.go index d3f69553055..7720a5c7feb 100644 --- a/testbed/testbed/mock_backend_test.go +++ b/testbed/testbed/mock_backend_test.go @@ -36,21 +36,21 @@ func TestGeneratorAndBackend(t *testing.T) { lg, err := NewLoadGenerator() require.NoError(t, err, "Cannot start load generator") - assert.EqualValues(t, 0, lg.SpansSent) + assert.EqualValues(t, 0, lg.spansSent) // Generate at 1000 SPS lg.Start(LoadOptions{SpansPerSecond: 1000}) // Wait until at least 50 spans are sent - WaitFor(t, func() bool { return lg.GetSpansSent() > 50 }, "SpansSent > 50") + WaitFor(t, func() bool { return lg.SpansSent() > 50 }, "SpansSent > 50") lg.Stop() // The backend should receive everything generated. - assert.Equal(t, lg.SpansSent, mb.SpansReceived()) + assert.Equal(t, lg.SpansSent(), mb.SpansReceived()) } -// WaitFor the specific condition for up to 5 seconds. Records a test error +// WaitFor the specific condition for up to 10 seconds. Records a test error // if condition does not become true. func WaitFor(t *testing.T, cond func() bool, errMsg ...interface{}) bool { startTime := time.Now() @@ -70,7 +70,7 @@ func WaitFor(t *testing.T, cond func() bool, errMsg ...interface{}) bool { return true } - if time.Since(startTime) > time.Second*5 { + if time.Since(startTime) > time.Second*10 { // Waited too long t.Error("Time out waiting for", errMsg) return false diff --git a/testbed/testbed/test_case.go b/testbed/testbed/test_case.go index cba059e77e8..6922a9e2060 100644 --- a/testbed/testbed/test_case.go +++ b/testbed/testbed/test_case.go @@ -206,7 +206,7 @@ func (tc *TestCase) Stop() { testName: tc.t.Name(), result: result, receivedSpanCount: tc.MockBackend.SpansReceived(), - sentSpanCount: tc.LoadGenerator.SpansSent, + sentSpanCount: tc.LoadGenerator.SpansSent(), duration: time.Since(tc.startTime), cpuPercentageAvg: rc.CPUPercentAvg, cpuPercentageMax: rc.CPUPercentMax, @@ -218,12 +218,61 @@ func (tc *TestCase) Stop() { // ValidateData validates data by comparing the number of spans sent by load generator // and number of spans received by mock backend. func (tc *TestCase) ValidateData() { - if assert.EqualValues(tc.t, tc.LoadGenerator.SpansSent, tc.MockBackend.SpansReceived(), + select { + case <-tc.ErrorSignal: + // Error is already signaled and recorded. Validating data is pointless. + return + default: + } + + if assert.EqualValues(tc.t, tc.LoadGenerator.SpansSent(), tc.MockBackend.SpansReceived(), "Received and sent span counters do not match.") { log.Printf("Sent and received data matches.") } } +// Sleep for specified duration or until error is signalled. +func (tc *TestCase) Sleep(d time.Duration) { + select { + case <-time.After(d): + case <-tc.ErrorSignal: + } +} + +// WaitFor the specific condition for up to 10 seconds. Records a test error +// if time is out and condition does not become true. If error is signalled +// while waiting the function will return false, but will not record additional +// test error (we assume that signalled error is already recorded in indicateError()). +func (tc *TestCase) WaitFor(cond func() bool, errMsg ...interface{}) bool { + startTime := time.Now() + + // Start with 5 ms waiting interval between condition re-evaluation. + waitInterval := time.Millisecond * 5 + + for { + if cond() { + return true + } + + select { + case <-time.After(waitInterval): + case <-tc.ErrorSignal: + return false + } + + // Increase waiting interval exponentially up to 500 ms. + if waitInterval < time.Millisecond*500 { + waitInterval = waitInterval * 2 + } + + if time.Since(startTime) > time.Second*10 { + // Waited too long + tc.t.Error("Time out waiting for", errMsg) + return false + } + } +} + func (tc *TestCase) indicateError(err error) { // Print to log for visibility log.Print(err.Error()) diff --git a/testbed/tests/perf_test.go b/testbed/tests/perf_test.go index e709d214dc5..53c214ed943 100644 --- a/testbed/tests/perf_test.go +++ b/testbed/tests/perf_test.go @@ -61,10 +61,7 @@ func TestIdleMode(t *testing.T) { tc.StartAgent() - select { - case <-time.After(10 * time.Second): - case <-tc.ErrorSignal: - } + tc.Sleep(10 * time.Second) } func Test10kSPS(t *testing.T) { @@ -78,16 +75,12 @@ func Test10kSPS(t *testing.T) { tc.StartAgent() tc.StartLoad(testbed.LoadOptions{SpansPerSecond: 10000}) - select { - case <-time.After(15 * time.Second): - case <-tc.ErrorSignal: - } + tc.Sleep(15 * time.Second) tc.StopLoad() - select { - case <-time.After(1 * time.Second): - case <-tc.ErrorSignal: - } + + tc.WaitFor(func() bool { return tc.LoadGenerator.SpansSent() == tc.MockBackend.SpansReceived() }, + "all spans received") tc.StopAgent() @@ -104,10 +97,7 @@ func TestNoBackend10kSPS(t *testing.T) { tc.StartAgent() tc.StartLoad(testbed.LoadOptions{SpansPerSecond: 10000}) - select { - case <-time.After(10 * time.Second): - case <-tc.ErrorSignal: - } + tc.Sleep(10 * time.Second) } func Test1000SPSWithAttributes(t *testing.T) { @@ -170,17 +160,11 @@ func Test1000SPSWithAttributes(t *testing.T) { } tc.StartLoad(options) - - select { - case <-time.After(10 * time.Second): - case <-tc.ErrorSignal: - } - + tc.Sleep(10 * time.Second) tc.StopLoad() - select { - case <-time.After(1 * time.Second): - case <-tc.ErrorSignal: - } + + tc.WaitFor(func() bool { return tc.LoadGenerator.SpansSent() == tc.MockBackend.SpansReceived() }, + "all spans received") tc.StopAgent()