Skip to content

Commit

Permalink
Fix flaky TestTBufferedServer test (#2348)
Browse files Browse the repository at this point in the history
* Fix flaky TestTBufferedServer test

Signed-off-by: Yuri Shkuro <[email protected]>

* Add comments

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored Jul 17, 2020
1 parent 681a1eb commit 9529bee
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 49 deletions.
16 changes: 11 additions & 5 deletions cmd/agent/app/servers/tbuffered_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@
package servers

import (
"io"
"sync"
"sync/atomic"

"github.com/apache/thrift/lib/go/thrift"
"github.com/uber/jaeger-lib/metrics"
)

// ThriftTransport is a subset of thrift.TTransport methods, for easier mocking.
type ThriftTransport interface {
io.Reader
io.Closer
}

// TBufferedServer is a custom thrift server that reads traffic using the transport provided
// and places messages into a buffered channel to be processed by the processor provided
type TBufferedServer struct {
Expand All @@ -33,7 +39,7 @@ type TBufferedServer struct {
maxPacketSize int
maxQueueSize int
serving uint32
transport thrift.TTransport
transport ThriftTransport
readBufPool *sync.Pool
metrics struct {
// Size of the current server queue
Expand All @@ -55,7 +61,7 @@ type TBufferedServer struct {

// NewTBufferedServer creates a TBufferedServer
func NewTBufferedServer(
transport thrift.TTransport,
transport ThriftTransport,
maxQueueSize int,
maxPacketSize int,
mFactory metrics.Factory,
Expand All @@ -74,7 +80,7 @@ func NewTBufferedServer(
maxPacketSize: maxPacketSize,
readBufPool: readBufPool,
}
metrics.Init(&res.metrics, mFactory, nil)
metrics.MustInit(&res.metrics, mFactory, nil)
return res, nil
}

Expand Down Expand Up @@ -114,7 +120,7 @@ func (s *TBufferedServer) IsServing() bool {
// emptied by the readers
func (s *TBufferedServer) Stop() {
atomic.StoreUint32(&s.serving, 0)
s.transport.Close()
_ = s.transport.Close()
close(s.dataChan)
}

Expand Down
146 changes: 102 additions & 44 deletions cmd/agent/app/servers/tbuffered_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ package servers

import (
"context"
"io"
"sync"
"testing"
"time"

athrift "github.com/apache/thrift/lib/go/thrift"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/atomic"

"github.com/jaegertracing/jaeger/cmd/agent/app/customtransport"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp"
Expand All @@ -32,27 +35,17 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

func TestTBufferedServer(t *testing.T) {
t.Run("processed", func(t *testing.T) {
testTBufferedServer(t, 10, false)
})
t.Run("dropped", func(t *testing.T) {
testTBufferedServer(t, 1, true)
})
}

func testTBufferedServer(t *testing.T, queueSize int, testDroppedPackets bool) {
func TestTBufferedServer_SendReceive(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)

transport, err := thriftudp.NewTUDPServerTransport("127.0.0.1:0")
require.NoError(t, err)

maxPacketSize := 65000
server, err := NewTBufferedServer(transport, queueSize, maxPacketSize, metricsFactory)
server, err := NewTBufferedServer(transport, 100, maxPacketSize, metricsFactory)
require.NoError(t, err)
go server.Serve()
defer server.Stop()
time.Sleep(10 * time.Millisecond) // wait for server to start serving

hostPort := transport.Addr().String()
client, clientCloser, err := testutils.NewZipkinThriftUDPClient(hostPort)
Expand All @@ -62,51 +55,116 @@ func testTBufferedServer(t *testing.T, queueSize int, testDroppedPackets bool) {
span := zipkincore.NewSpan()
span.Name = "span1"

err = client.EmitZipkinBatch(context.Background(), []*zipkincore.Span{span})
require.NoError(t, err)

if testDroppedPackets {
// because queueSize == 1 for this test, and we're not reading from data chan,
// the second packet we send will be dropped by the server
err = client.EmitZipkinBatch(context.Background(), []*zipkincore.Span{span})
for i := 0; i < 1000; i++ {
err := client.EmitZipkinBatch(context.Background(), []*zipkincore.Span{span})
require.NoError(t, err)

for i := 0; i < 50; i++ {
c, _ := metricsFactory.Snapshot()
if c["thrift.udp.server.packets.dropped"] == 1 {
return
}
time.Sleep(time.Millisecond)
select {
case readBuf := <-server.DataChan():
assert.NotEqual(t, 0, len(readBuf.GetBytes()))

inMemReporter := testutils.NewInMemoryReporter()
protoFact := athrift.NewTCompactProtocolFactory()
trans := &customtransport.TBufferedReadTransport{}
protocol := protoFact.GetProtocol(trans)

_, err = protocol.Transport().Write(readBuf.GetBytes())
require.NoError(t, err)

server.DataRecd(readBuf) // return to pool

handler := agent.NewAgentProcessor(inMemReporter)
_, err = handler.Process(context.Background(), protocol, protocol)
require.NoError(t, err)

require.Len(t, inMemReporter.ZipkinSpans(), 1)
assert.Equal(t, "span1", inMemReporter.ZipkinSpans()[0].Name)

return // exit test on successful receipt
default:
time.Sleep(10 * time.Millisecond)
}
}
t.Fatal("server did not receive packets")
}

// The fakeTransport allows the server to read two packets, one filled with 1's, another with 2's,
// then returns an error, and then blocks on the semaphore. The semaphore is only released when
// the test is exiting.
type fakeTransport struct {
packet atomic.Int64
wg sync.WaitGroup
}

func (t *fakeTransport) Read(p []byte) (n int, err error) {
packet := t.packet.Inc()
if packet > 2 {
if packet > 3 {
// return error once when packet==3, otherwise block
t.wg.Wait()
}
return 0, io.EOF
}
for i := range p {
p[i] = byte(packet)
}
return len(p), nil
}

func (t *fakeTransport) Close() error {
return nil
}

func TestTBufferedServer_Metrics(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)

transport := new(fakeTransport)
transport.wg.Add(1)
defer transport.wg.Done()

maxPacketSize := 65000
server, err := NewTBufferedServer(transport, 1, maxPacketSize, metricsFactory)
require.NoError(t, err)
go server.Serve()
defer server.Stop()

// The fakeTransport will allow the server to read exactly two packets and one error.
// Since we use the server with queue size == 1, the first packet will be
// sent to channel, and the second one dropped.

packetDropped := false
for i := 0; i < 5000; i++ {
c, _ := metricsFactory.Snapshot()
assert.FailNow(t, "Dropped packets counter not incremented", "Counters: %+v", c)
if c["thrift.udp.server.packets.dropped"] == 1 {
packetDropped = true
break
}
time.Sleep(time.Millisecond)
}
require.True(t, packetDropped, "packetDropped")

inMemReporter := testutils.NewInMemoryReporter()
var readBuf *ReadBuf
select {
case readBuf := <-server.DataChan():
assert.NotEqual(t, 0, len(readBuf.GetBytes()))
protoFact := athrift.NewTCompactProtocolFactory()
trans := &customtransport.TBufferedReadTransport{}
protocol := protoFact.GetProtocol(trans)
protocol.Transport().Write(readBuf.GetBytes())
server.DataRecd(readBuf)
handler := agent.NewAgentProcessor(inMemReporter)
handler.Process(context.Background(), protocol, protocol)
case <-time.After(time.Second * 1):
t.Fatalf("Server should have received span submission")
case readBuf = <-server.DataChan():
b := readBuf.GetBytes()
assert.Len(t, b, 65000)
assert.EqualValues(t, 1, b[0], "first packet must be all 0x01's")
default:
t.Fatal("expecting a packet in the channel")
}

require.Equal(t, 1, len(inMemReporter.ZipkinSpans()))
assert.Equal(t, "span1", inMemReporter.ZipkinSpans()[0].Name)

// server must emit metrics
metricsFactory.AssertCounterMetrics(t,
metricstest.ExpectedMetric{Name: "thrift.udp.server.packets.processed", Value: 1},
metricstest.ExpectedMetric{Name: "thrift.udp.server.packets.dropped", Value: 0},
metricstest.ExpectedMetric{Name: "thrift.udp.server.packets.dropped", Value: 1},
metricstest.ExpectedMetric{Name: "thrift.udp.server.read.errors", Value: 1},
)
metricsFactory.AssertGaugeMetrics(t,
metricstest.ExpectedMetric{Name: "thrift.udp.server.packet_size", Value: 65000},
metricstest.ExpectedMetric{Name: "thrift.udp.server.queue_size", Value: 1},
)

server.DataRecd(readBuf)
metricsFactory.AssertGaugeMetrics(t,
metricstest.ExpectedMetric{Name: "thrift.udp.server.packet_size", Value: 38},
metricstest.ExpectedMetric{Name: "thrift.udp.server.queue_size", Value: 0},
)
}

0 comments on commit 9529bee

Please sign in to comment.