Skip to content

Commit

Permalink
Merge pull request #11 from mwitkow/fix-close-bug
Browse files Browse the repository at this point in the history
Fix a channel closing bug
  • Loading branch information
Michal Witkowski authored Mar 27, 2017
2 parents af55d61 + 3fcbd37 commit 97396d9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 11 deletions.
5 changes: 3 additions & 2 deletions proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
if err != nil {
return err
}
// Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
// Channels do not have to be closed, it is just a control flow mechanism, see
// https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
s2cErrChan := s.forwardServerToClient(serverStream, clientStream)
defer close(s2cErrChan)
c2sErrChan := s.forwardClientToServer(clientStream, serverStream)
defer close(c2sErrChan)
// We don't know which side is going to stop sending first, so we need a select between the two.
for i := 0; i < 2; i++ {
select {
Expand Down
36 changes: 27 additions & 9 deletions proxy/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,11 @@ func (s *assertingService) PingStream(stream pb.TestService_PingStreamServer) er
type ProxyHappySuite struct {
suite.Suite

serverListener net.Listener
server *grpc.Server
proxyListener net.Listener
proxy *grpc.Server
serverListener net.Listener
server *grpc.Server
proxyListener net.Listener
proxy *grpc.Server
serverClientConn *grpc.ClientConn

client *grpc.ClientConn
testClient pb.TestServiceClient
Expand All @@ -121,6 +122,12 @@ func (s *ProxyHappySuite) TestPingEmptyCarriesClientMetadata() {
require.Equal(s.T(), &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, out)
}

func (s *ProxyHappySuite) TestPingEmpty_StressTest() {
for i := 0; i < 50; i++ {
s.TestPingEmptyCarriesClientMetadata()
}
}

func (s *ProxyHappySuite) TestPingCarriesServerHeadersAndTrailers() {
headerMd := make(metadata.MD)
trailerMd := make(metadata.MD)
Expand Down Expand Up @@ -175,6 +182,12 @@ func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() {
assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata")
}

func (s *ProxyHappySuite) TestPingStream_StressTest() {
for i := 0; i < 50; i++ {
s.TestPingStream_FullDuplexWorks()
}
}

func (s *ProxyHappySuite) SetupSuite() {
var err error

Expand All @@ -189,7 +202,7 @@ func (s *ProxyHappySuite) SetupSuite() {
pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()})

// Setup of the proxy's Director.
proxyClientConn, err := grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec()))
s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec()))
require.NoError(s.T(), err, "must not error on deferred client Dial")
director := func(ctx context.Context, fullName string) (*grpc.ClientConn, error) {
md, ok := metadata.FromContext(ctx)
Expand All @@ -198,7 +211,7 @@ func (s *ProxyHappySuite) SetupSuite() {
return nil, grpc.Errorf(codes.PermissionDenied, "testing rejection")
}
}
return proxyClientConn, nil
return s.serverClientConn, nil
}
s.proxy = grpc.NewServer(
grpc.CustomCodec(proxy.Codec()),
Expand All @@ -225,6 +238,14 @@ func (s *ProxyHappySuite) SetupSuite() {
}

func (s *ProxyHappySuite) TearDownSuite() {
if s.client != nil {
s.client.Close()
}
if s.serverClientConn != nil {
s.serverClientConn.Close()
}
// Close all transports so the logs don't get spammy.
time.Sleep(10 * time.Millisecond)
if s.proxy != nil {
s.proxy.Stop()
s.proxyListener.Close()
Expand All @@ -233,9 +254,6 @@ func (s *ProxyHappySuite) TearDownSuite() {
s.server.Stop()
s.serverListener.Close()
}
if s.client != nil {
s.client.Close()
}
}

func TestProxyHappySuite(t *testing.T) {
Expand Down

0 comments on commit 97396d9

Please sign in to comment.