Skip to content

Commit

Permalink
Support for one2many streaming calls, tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
smira committed Nov 22, 2019
1 parent 817b035 commit 2d37ba4
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 10 deletions.
3 changes: 2 additions & 1 deletion proxy/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func ExampleRegisterService() {
// Register a TestService with 4 of its methods explicitly.
proxy.RegisterService(server, director,
"smira.testproto.TestService",
"PingEmpty", "Ping", "PingError", "PingList")
[]string{"PingEmpty", "Ping", "PingError", "PingList"},
[]string{"PingList"})
}

func ExampleTransparentHandler() {
Expand Down
24 changes: 19 additions & 5 deletions proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,18 @@ var (
// The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file.
//
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) {
streamer := &handler{director}
//
// streamedMethodNames is only important for one-2-many proxying.
func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames []string, streamedMethodNames []string) {
streamer := &handler{
director: director,
streamedMethods: map[string]struct{}{},
}

for _, methodName := range streamedMethodNames {
streamer.streamedMethods["/"+serviceName+"/"+methodName] = struct{}{}
}

fakeDesc := &grpc.ServiceDesc{
ServiceName: serviceName,
HandlerType: (*interface{})(nil),
Expand All @@ -48,12 +58,16 @@ func RegisterService(server *grpc.Server, director StreamDirector, serviceName s
//
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
func TransparentHandler(director StreamDirector) grpc.StreamHandler {
streamer := &handler{director}
streamer := &handler{
director: director,
streamedMethods: map[string]struct{}{},
}
return streamer.handler
}

type handler struct {
director StreamDirector
director StreamDirector
streamedMethods map[string]struct{}
}

type backendConnection struct {
Expand Down Expand Up @@ -112,7 +126,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
}

if len(backendConnections) != 1 {
return s.handlerMulti(serverStream, backendConnections)
return s.handlerMulti(fullMethodName, serverStream, backendConnections)
}

// case of proxying one to one:
Expand Down
14 changes: 12 additions & 2 deletions proxy/handler_multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
"google.golang.org/grpc/status"
)

func (s *handler) handlerMulti(serverStream grpc.ServerStream, backendConnections []backendConnection) error {
func (s *handler) handlerMulti(fullMethodName string, serverStream grpc.ServerStream, backendConnections []backendConnection) error {
// wrap the stream for safe concurrent access
serverStream = &ServerStreamWrapper{ServerStream: serverStream}

s2cErrChan := s.forwardServerToClientsMulti(serverStream, backendConnections)
var c2sErrChan chan error

if true { // TODO: if unary
if _, streamed := s.streamedMethods[fullMethodName]; !streamed {
c2sErrChan = s.forwardClientsToServerMultiUnary(backendConnections, serverStream)
} else {
c2sErrChan = s.forwardClientsToServerMultiStreaming(backendConnections, serverStream)
Expand Down Expand Up @@ -99,6 +99,16 @@ func (s *handler) forwardClientsToServerMultiUnary(sources []backendConnection,
for i := 0; i < len(sources); i++ {
go func(src *backendConnection) {
errCh <- func() error {
if src.connError != nil {
payload, err := s.formatError(src, src.connError)
if err != nil {
return err
}

payloadCh <- payload
return nil
}

f := &frame{}
for j := 0; ; j++ {
if err := src.clientStream.RecvMsg(f); err != nil {
Expand Down
45 changes: 44 additions & 1 deletion proxy/handler_multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,48 @@ func (s *MultiServiceSuite) TestDirectorErrorIsPropagated() {
assert.Equal(s.T(), "testing rejection", status.Convert(err).Message())
}

func (s *MultiServiceSuite) TestPingStream_FullDuplexWorks() {
stream, err := s.testClient.PingStream(s.ctx)
require.NoError(s.T(), err, "PingStream request should be successful.")

for i := 0; i < countListResponses; i++ {
ping := &pb.PingRequest{Value: fmt.Sprintf("foo:%d", i)}
require.NoError(s.T(), stream.Send(ping), "sending to PingStream must not fail")

expectedUpstreams := map[string]struct{}{}
for j := 0; j < numUpstreams; j++ {
expectedUpstreams[fmt.Sprintf("server%d", j)] = struct{}{}
}

// each upstream should send back response
for j := 0; j < numUpstreams; j++ {
resp, err := stream.Recv()
s.Require().NoError(err)

s.Assert().Len(resp.Response, 1)
s.Assert().EqualValues(i, resp.Response[0].Counter, "ping roundtrip must succeed with the correct id")
s.Assert().EqualValues(resp.Response[0].Metadata.Hostname, resp.Response[0].Server)

delete(expectedUpstreams, resp.Response[0].Metadata.Hostname)
}

s.Require().Empty(expectedUpstreams)

if i == 0 {
// Check that the header arrives before all entries.
headerMd, err := stream.Header()
require.NoError(s.T(), err, "PingStream headers should not error.")
assert.Contains(s.T(), headerMd, serverHeaderMdKey, "PingStream response headers user contain metadata")
}
}
require.NoError(s.T(), stream.CloseSend(), "no error on close send")
_, err = stream.Recv()
require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaning OK")
// Check that the trailer headers are here.
trailerMd := stream.Trailer()
assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata")
}

func (s *MultiServiceSuite) SetupTest() {
s.ctx, s.ctxCancel = context.WithTimeout(context.TODO(), 120*time.Second)
}
Expand Down Expand Up @@ -352,7 +394,8 @@ func (s *MultiServiceSuite) SetupSuite() {
// Ping handler is handled as an explicit registration and not as a TransparentHandler.
proxy.RegisterService(s.proxy, director,
"smira.testproto.MultiService",
"Ping")
[]string{"Ping", "PingStream"},
[]string{"PingStream"})

// Start the serving loops.
for i := range s.servers {
Expand Down
3 changes: 2 additions & 1 deletion proxy/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ func (s *ProxyHappySuite) SetupSuite() {
// Ping handler is handled as an explicit registration and not as a TransparentHandler.
proxy.RegisterService(s.proxy, director,
"smira.testproto.TestService",
"Ping")
[]string{"Ping"},
nil)

// Start the serving loops.
s.T().Logf("starting grpc.Server at: %v", s.serverListener.Addr().String())
Expand Down

0 comments on commit 2d37ba4

Please sign in to comment.