From cc91c09782824e261bf1c861961a272aedb2b123 Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Wed, 27 Nov 2019 00:48:41 +0300 Subject: [PATCH] refactor: provide better public API, enforce proxying mode Provide more clear public API with options, also enforce proxying mode with options (not by looking at number of backends). One2one and one2many have some differences: one2one is transparent and oen2many might inject additional metadata, wrap errors and responses, etc., so if client expects one2many format, it should get it even with one upstream. Also provide method to guess streamed methods via function so any policy can be implemented (inspecting grpc server description, looking at method name prefix/suffix, etc.) No functional changes, just shuffling code around. Signed-off-by: Andrey Smirnov --- .github/workflows/go.yml | 1 + README.md | 5 +- proxy/DOC.md | 140 ++++++++++++++- proxy/examples_test.go | 6 +- proxy/handler.go | 170 +++--------------- .../{handler_multi.go => handler_one2many.go} | 8 +- ...multi_test.go => handler_one2many_test.go} | 48 ++--- proxy/handler_one2one.go | 113 ++++++++++++ ...andler_test.go => handler_one2one_test.go} | 36 ++-- proxy/proxy.go | 116 ++++++++++++ 10 files changed, 440 insertions(+), 203 deletions(-) rename proxy/{handler_multi.go => handler_one2many.go} (97%) rename proxy/{handler_multi_test.go => handler_one2many_test.go} (93%) create mode 100644 proxy/handler_one2one.go rename proxy/{handler_test.go => handler_one2one_test.go} (90%) create mode 100644 proxy/proxy.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index f1f2441..12bff73 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -37,6 +37,7 @@ jobs: with: token: ${{secrets.CODECOV_TOKEN}} file: ./coverage.txt + if: github.event_name == 'push' lint: name: Lint diff --git a/README.md b/README.md index 2863960..686574b 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,10 @@ locally: ```go server := grpc.NewServer( grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director))) + grpc.UnknownServiceHandler( + proxy.TransparentHandler(director), + proxy.WithMode(proxy.One2One), + )) pb_test.RegisterTestServiceServer(server, &testImpl{}) ``` diff --git a/proxy/DOC.md b/proxy/DOC.md index 39ef251..cd9ab7f 100644 --- a/proxy/DOC.md +++ b/proxy/DOC.md @@ -46,26 +46,26 @@ nolint: staticcheck #### func RegisterService ```go -func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) +func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, options ...Option) ``` RegisterService sets up a proxy handler for a particular gRPC service and -method. The behaviour is the same as if you were registering a handler method, +method. The behavior is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file. -This can *only* be used if the `server` also uses grpcproxy.CodecForServer() +This can *only* be used if the `server` also uses grpc.CustomCodec() ServerOption. #### func TransparentHandler ```go -func TransparentHandler(director StreamDirector) grpc.StreamHandler +func TransparentHandler(director StreamDirector, options ...Option) grpc.StreamHandler ``` TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server. The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the backends. It should be used as a `grpc.UnknownServiceHandler`. -This can *only* be used if the `server` also uses grpcproxy.CodecForServer() +This can *only* be used if the `server` also uses grpc.CustomCodec() ServerOption. #### type Backend @@ -115,6 +115,127 @@ When proxying one-to-many and aggregating results, Backend might be used to append additional fields to upstream response to support more complicated proxying. +#### type Mode + +```go +type Mode int +``` + +Mode specifies proxying mode: one2one (transparent) or one2many (aggregation, +error wrapping). + +```go +const ( + One2One Mode = iota + One2Many +) +``` +Mode constants. + +#### type Option + +```go +type Option func(*handlerOptions) +``` + +Option configures gRPC proxy + +#### func WithMethodNames + +```go +func WithMethodNames(methodNames ...string) Option +``` +WithMethodNames configures list of method names to proxy for non-transparent +handler. + +#### func WithMode + +```go +func WithMode(mode Mode) Option +``` +WithMode sets proxying mode: One2One or One2Many. + +Default mode is One2One. + +#### func WithStreamedDetector + +```go +func WithStreamedDetector(detector StreamedDetectorFunc) Option +``` +WithStreamedDetector configures a function to detect streamed methods. + +This is only important for one2many proxying. + +#### func WithStreamedMethodNames + +```go +func WithStreamedMethodNames(streamedMethodNames ...string) Option +``` +WithStreamedMethodNames configures list of streamed method names. + +This is only important for one2many proxying. This option can't be used with +TransparentHandler. + +#### type ServerStreamWrapper + +```go +type ServerStreamWrapper struct { + grpc.ServerStream +} +``` + +ServerStreamWrapper wraps grpc.ServerStream and adds locking to send path + +#### func (*ServerStreamWrapper) SendHeader + +```go +func (wrapper *ServerStreamWrapper) SendHeader(md metadata.MD) error +``` +SendHeader sends the header metadata. The provided md and headers set by +SetHeader() will be sent. It fails if called multiple times. + +#### func (*ServerStreamWrapper) SendMsg + +```go +func (wrapper *ServerStreamWrapper) SendMsg(m interface{}) error +``` +SendMsg sends a message. On error, SendMsg aborts the stream and the error is +returned directly. + +SendMsg blocks until: + + - There is sufficient flow control to schedule m with the transport, or + - The stream is done, or + - The stream breaks. + +SendMsg does not wait until the message is received by the client. An untimely +stream closure may result in lost messages. + +It is safe to have a goroutine calling SendMsg and another goroutine calling +RecvMsg on the same stream at the same time, but it is not safe to call SendMsg +on the same stream in different goroutines. + +#### func (*ServerStreamWrapper) SetHeader + +```go +func (wrapper *ServerStreamWrapper) SetHeader(md metadata.MD) error +``` +SetHeader sets the header metadata. It may be called multiple times. When call +multiple times, all the provided metadata will be merged. All the metadata will +be sent out when one of the following happens: + + - ServerStream.SendHeader() is called; + - The first response is sent out; + - An RPC status is sent out (error or success). + +#### func (*ServerStreamWrapper) SetTrailer + +```go +func (wrapper *ServerStreamWrapper) SetTrailer(md metadata.MD) +``` +SetTrailer sets the trailer metadata which will be sent with the RPC status. +When called more than once, all the provided metadata will be merged. + #### type SingleBackend ```go @@ -186,3 +307,12 @@ stream interceptors are invoked. So decisions around authorization, monitoring etc. are better to be handled there. See the rather rich example. + +#### type StreamedDetectorFunc + +```go +type StreamedDetectorFunc func(fullMethodName string) bool +``` + +StreamedDetectorFunc reports is gRPC is doing streaming (only for one2many +proxying). diff --git a/proxy/examples_test.go b/proxy/examples_test.go index f6698fe..9803262 100644 --- a/proxy/examples_test.go +++ b/proxy/examples_test.go @@ -25,8 +25,10 @@ func ExampleRegisterService() { // Register a TestService with 4 of its methods explicitly. proxy.RegisterService(server, director, "talos.testproto.TestService", - []string{"PingEmpty", "Ping", "PingError", "PingList"}, - []string{"PingList"}) + proxy.WithMode(proxy.One2Many), + proxy.WithMethodNames("PingEmpty", "Ping", "PingError", "PingList"), + proxy.WithStreamedMethodNames("PingList"), + ) } func ExampleTransparentHandler() { diff --git a/proxy/handler.go b/proxy/handler.go index 4fae205..9fff9a9 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -6,7 +6,6 @@ package proxy import ( "context" - "io" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -20,54 +19,17 @@ var ( } ) -// RegisterService sets up a proxy handler for a particular gRPC service and method. -// The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file. -// -// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. -// -// streamedMethodNames is only important for one-2-many proxying. -func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames []string, streamedMethodNames []string) { - streamer := &handler{ - director: director, - streamedMethods: map[string]struct{}{}, - } - - for _, methodName := range streamedMethodNames { - streamer.streamedMethods["/"+serviceName+"/"+methodName] = struct{}{} - } - - fakeDesc := &grpc.ServiceDesc{ - ServiceName: serviceName, - HandlerType: (*interface{})(nil), - } - for _, m := range methodNames { - streamDesc := grpc.StreamDesc{ - StreamName: m, - Handler: streamer.handler, - ServerStreams: true, - ClientStreams: true, - } - fakeDesc.Streams = append(fakeDesc.Streams, streamDesc) - } - server.RegisterService(fakeDesc, streamer) -} - -// TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server. -// The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the -// backends. It should be used as a `grpc.UnknownServiceHandler`. -// -// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. -func TransparentHandler(director StreamDirector) grpc.StreamHandler { - streamer := &handler{ - director: director, - streamedMethods: map[string]struct{}{}, - } - return streamer.handler +type handlerOptions struct { + mode Mode + serviceName string + methodNames []string + streamedMethods map[string]struct{} + streamedDetector StreamedDetectorFunc } type handler struct { - director StreamDirector - streamedMethods map[string]struct{} + director StreamDirector + options handlerOptions } type backendConnection struct { @@ -94,11 +56,6 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error return err } - if len(backends) == 0 { - return status.Errorf(codes.Unavailable, "no backend connections for proxying") - } - - var establishedConnections int backendConnections := make([]backendConnection, len(backends)) clientCtx, clientCancel := context.WithCancel(serverStream.Context()) @@ -121,109 +78,22 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error if backendConnections[i].connError != nil { continue } - - establishedConnections++ - } - - if len(backendConnections) != 1 { - return s.handlerMulti(fullMethodName, serverStream, backendConnections) - } - - // case of proxying one to one: - if backendConnections[0].connError != nil { - return backendConnections[0].connError } - // Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate. - // Channels do not have to be closed, it is just a control flow mechanism, see - // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ - s2cErrChan := s.forwardServerToClient(serverStream, &backendConnections[0]) - c2sErrChan := s.forwardClientToServer(&backendConnections[0], serverStream) - // We don't know which side is going to stop sending first, so we need a select between the two. - for i := 0; i < 2; i++ { - select { - case s2cErr := <-s2cErrChan: - if s2cErr == io.EOF { - // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./ - // the clientStream>serverStream may continue pumping though. - //nolint: errcheck - backendConnections[0].clientStream.CloseSend() - break - } else { - // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need - // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and - // exit with an error to the stack - return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) - } - case c2sErr := <-c2sErrChan: - // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two - // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers - // will be nil. - serverStream.SetTrailer(backendConnections[0].clientStream.Trailer()) - // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error. - if c2sErr != io.EOF { - return c2sErr - } - return nil + switch s.options.mode { + case One2One: + if len(backendConnections) != 1 { + return status.Errorf(codes.Internal, "one2one proxying can't should have exactly one connection (got %d)", len(backendConnections)) } - } - return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") -} -func (s *handler) forwardClientToServer(src *backendConnection, dst grpc.ServerStream) chan error { - ret := make(chan error, 1) - go func() { - f := &frame{} - for i := 0; ; i++ { - if err := src.clientStream.RecvMsg(f); err != nil { - ret <- err // this can be io.EOF which is happy case - break - } - - var err error - f.payload, err = src.backend.AppendInfo(f.payload) - if err != nil { - ret <- err - break - } - - if i == 0 { - // This is a bit of a hack, but client to server headers are only readable after first client msg is - // received but must be written to server stream before the first msg is flushed. - // This is the only place to do it nicely. - md, err := src.clientStream.Header() - if err != nil { - ret <- err - break - } - if err := dst.SendHeader(md); err != nil { - ret <- err - break - } - } - if err := dst.SendMsg(f); err != nil { - ret <- err - break - } + return s.handlerOne2One(fullMethodName, serverStream, backendConnections) + case One2Many: + if len(backendConnections) == 0 { + return status.Errorf(codes.Unavailable, "no backend connections for proxying") } - }() - return ret -} + return s.handlerOne2Many(fullMethodName, serverStream, backendConnections) -func (s *handler) forwardServerToClient(src grpc.ServerStream, dst *backendConnection) chan error { - ret := make(chan error, 1) - go func() { - f := &frame{} - for i := 0; ; i++ { - if err := src.RecvMsg(f); err != nil { - ret <- err // this can be io.EOF which is happy case - break - } - if err := dst.clientStream.SendMsg(f); err != nil { - ret <- err - break - } - } - }() - return ret + default: + return status.Errorf(codes.Internal, "unsupported proxy mode") + } } diff --git a/proxy/handler_multi.go b/proxy/handler_one2many.go similarity index 97% rename from proxy/handler_multi.go rename to proxy/handler_one2many.go index 33e4e17..343f3ce 100644 --- a/proxy/handler_multi.go +++ b/proxy/handler_one2many.go @@ -15,17 +15,17 @@ import ( "google.golang.org/grpc/status" ) -func (s *handler) handlerMulti(fullMethodName string, serverStream grpc.ServerStream, backendConnections []backendConnection) error { +func (s *handler) handlerOne2Many(fullMethodName string, serverStream grpc.ServerStream, backendConnections []backendConnection) error { // wrap the stream for safe concurrent access serverStream = &ServerStreamWrapper{ServerStream: serverStream} s2cErrChan := s.forwardServerToClientsMulti(serverStream, backendConnections) var c2sErrChan chan error - if _, streamed := s.streamedMethods[fullMethodName]; !streamed { - c2sErrChan = s.forwardClientsToServerMultiUnary(backendConnections, serverStream) - } else { + if s.options.streamedDetector != nil && s.options.streamedDetector(fullMethodName) { c2sErrChan = s.forwardClientsToServerMultiStreaming(backendConnections, serverStream) + } else { + c2sErrChan = s.forwardClientsToServerMultiUnary(backendConnections, serverStream) } for i := 0; i < 2; i++ { diff --git a/proxy/handler_multi_test.go b/proxy/handler_one2many_test.go similarity index 93% rename from proxy/handler_multi_test.go rename to proxy/handler_one2many_test.go index c847bbc..1f0f043 100644 --- a/proxy/handler_multi_test.go +++ b/proxy/handler_one2many_test.go @@ -28,8 +28,8 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - pb "github.com/talos-systems/grpc-proxy/testservice" "github.com/talos-systems/grpc-proxy/proxy" + pb "github.com/talos-systems/grpc-proxy/testservice" ) const ( @@ -199,7 +199,7 @@ func (b *assertingBackend) BuildError(err error) ([]byte, error) { }) } -type MultiServiceSuite struct { +type ProxyOne2ManySuite struct { suite.Suite serverListeners []net.Listener @@ -215,7 +215,7 @@ type MultiServiceSuite struct { ctxCancel context.CancelFunc } -func (s *MultiServiceSuite) TestPingEmptyCarriesClientMetadata() { +func (s *ProxyOne2ManySuite) TestPingEmptyCarriesClientMetadata() { ctx := metadata.NewOutgoingContext(s.ctx, metadata.Pairs(clientMdKey, "true")) out, err := s.testClient.PingEmpty(ctx, &pb.Empty{}) require.NoError(s.T(), err, "PingEmpty should succeed without errors") @@ -239,13 +239,13 @@ func (s *MultiServiceSuite) TestPingEmptyCarriesClientMetadata() { s.Require().Empty(expectedUpstreams) } -func (s *MultiServiceSuite) TestPingEmpty_StressTest() { +func (s *ProxyOne2ManySuite) TestPingEmpty_StressTest() { for i := 0; i < 50; i++ { s.TestPingEmptyCarriesClientMetadata() } } -func (s *MultiServiceSuite) TestPingEmptyTargets() { +func (s *ProxyOne2ManySuite) TestPingEmptyTargets() { for _, targets := range [][]string{ {"1", "2"}, {"3", "2", "1"}, @@ -277,7 +277,7 @@ func (s *MultiServiceSuite) TestPingEmptyTargets() { s.Require().Empty(expectedUpstreams) } } -func (s *MultiServiceSuite) TestPingEmptyConnError() { +func (s *ProxyOne2ManySuite) TestPingEmptyConnError() { targets := []string{"0", "-1", "2"} md := metadata.Pairs(clientMdKey, "true") md.Set("targets", targets...) @@ -309,7 +309,7 @@ func (s *MultiServiceSuite) TestPingEmptyConnError() { s.Require().Empty(expectedUpstreams) } -func (s *MultiServiceSuite) TestPingCarriesServerHeadersAndTrailers() { +func (s *ProxyOne2ManySuite) TestPingCarriesServerHeadersAndTrailers() { headerMd := make(metadata.MD) trailerMd := make(metadata.MD) // This is an awkward calling convention... but meh. @@ -329,7 +329,7 @@ func (s *MultiServiceSuite) TestPingCarriesServerHeadersAndTrailers() { assert.Len(s.T(), trailerMd, 1, "server response trailers must contain server data") } -func (s *MultiServiceSuite) TestPingErrorPropagatesAppError() { +func (s *ProxyOne2ManySuite) TestPingErrorPropagatesAppError() { out, err := s.testClient.PingError(s.ctx, &pb.PingRequest{Value: "foo"}) s.Require().NoError(err, "error should be encapsulated in the response") @@ -341,7 +341,7 @@ func (s *MultiServiceSuite) TestPingErrorPropagatesAppError() { } } -func (s *MultiServiceSuite) TestPingStreamErrorPropagatesAppError() { +func (s *ProxyOne2ManySuite) TestPingStreamErrorPropagatesAppError() { stream, err := s.testClient.PingStreamError(s.ctx) s.Require().NoError(err, "error should be encapsulated in the response") @@ -358,7 +358,7 @@ func (s *MultiServiceSuite) TestPingStreamErrorPropagatesAppError() { require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaning OK") } -func (s *MultiServiceSuite) TestPingStreamConnError() { +func (s *ProxyOne2ManySuite) TestPingStreamConnError() { targets := []string{"0", "-1", "2"} md := metadata.Pairs(clientMdKey, "true") md.Set("targets", targets...) @@ -379,7 +379,7 @@ func (s *MultiServiceSuite) TestPingStreamConnError() { require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaning OK") } -func (s *MultiServiceSuite) TestDirectorErrorIsPropagated() { +func (s *ProxyOne2ManySuite) TestDirectorErrorIsPropagated() { // See SetupSuite where the StreamDirector has a special case. ctx := metadata.NewOutgoingContext(s.ctx, metadata.Pairs(rejectingMdKey, "true")) _, err := s.testClient.Ping(ctx, &pb.PingRequest{Value: "foo"}) @@ -388,7 +388,7 @@ func (s *MultiServiceSuite) TestDirectorErrorIsPropagated() { assert.Equal(s.T(), "testing rejection", status.Convert(err).Message()) } -func (s *MultiServiceSuite) TestPingStream_FullDuplexWorks() { +func (s *ProxyOne2ManySuite) TestPingStream_FullDuplexWorks() { stream, err := s.testClient.PingStream(s.ctx) require.NoError(s.T(), err, "PingStream request should be successful.") @@ -430,7 +430,7 @@ func (s *MultiServiceSuite) TestPingStream_FullDuplexWorks() { assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata") } -func (s *MultiServiceSuite) TestPingStream_FullDuplexConcurrent() { +func (s *ProxyOne2ManySuite) TestPingStream_FullDuplexConcurrent() { stream, err := s.testClient.PingStream(s.ctx) require.NoError(s.T(), err, "PingStream request should be successful.") @@ -497,21 +497,21 @@ func (s *MultiServiceSuite) TestPingStream_FullDuplexConcurrent() { assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata") } -func (s *MultiServiceSuite) TestPingStream_StressTest() { +func (s *ProxyOne2ManySuite) TestPingStream_StressTest() { for i := 0; i < 50; i++ { s.TestPingStream_FullDuplexWorks() } } -func (s *MultiServiceSuite) SetupTest() { +func (s *ProxyOne2ManySuite) SetupTest() { s.ctx, s.ctxCancel = context.WithTimeout(context.TODO(), 120*time.Second) } -func (s *MultiServiceSuite) TearDownTest() { +func (s *ProxyOne2ManySuite) TearDownTest() { s.ctxCancel() } -func (s *MultiServiceSuite) SetupSuite() { +func (s *ProxyOne2ManySuite) SetupSuite() { var err error s.proxyListener, err = net.Listen("tcp", "127.0.0.1:0") @@ -592,13 +592,15 @@ func (s *MultiServiceSuite) SetupSuite() { s.proxy = grpc.NewServer( grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director, proxy.WithMode(proxy.One2Many))), ) // Ping handler is handled as an explicit registration and not as a TransparentHandler. proxy.RegisterService(s.proxy, director, "talos.testproto.MultiService", - []string{"Ping", "PingStream", "PingStreamError"}, - []string{"PingStream", "PingStreamError"}) + proxy.WithMode(proxy.One2Many), + proxy.WithMethodNames("Ping", "PingStream", "PingStreamError"), + proxy.WithStreamedMethodNames("PingStream", "PingStreamError"), + ) // Start the serving loops. for i := range s.servers { @@ -619,7 +621,7 @@ func (s *MultiServiceSuite) SetupSuite() { s.testClient = pb.NewMultiServiceClient(clientConn) } -func (s *MultiServiceSuite) TearDownSuite() { +func (s *ProxyOne2ManySuite) TearDownSuite() { if s.client != nil { s.client.Close() } @@ -646,8 +648,8 @@ func (s *MultiServiceSuite) TearDownSuite() { } } } -func TestMultiServiceSuite(t *testing.T) { - suite.Run(t, &MultiServiceSuite{}) +func TestProxyOne2ManySuite(t *testing.T) { + suite.Run(t, &ProxyOne2ManySuite{}) } func init() { diff --git a/proxy/handler_one2one.go b/proxy/handler_one2one.go new file mode 100644 index 0000000..28adadd --- /dev/null +++ b/proxy/handler_one2one.go @@ -0,0 +1,113 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// Copyright 2019 Andrey Smirnov. All Rights Reserved. +// See LICENSE for licensing terms. + +package proxy + +import ( + "io" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *handler) handlerOne2One(fullMethodName string, serverStream grpc.ServerStream, backendConnections []backendConnection) error { + // case of proxying one to one: + if backendConnections[0].connError != nil { + return backendConnections[0].connError + } + + // Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate. + // Channels do not have to be closed, it is just a control flow mechanism, see + // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ + s2cErrChan := s.forwardServerToClient(serverStream, &backendConnections[0]) + c2sErrChan := s.forwardClientToServer(&backendConnections[0], serverStream) + // We don't know which side is going to stop sending first, so we need a select between the two. + for i := 0; i < 2; i++ { + select { + case s2cErr := <-s2cErrChan: + if s2cErr == io.EOF { + // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./ + // the clientStream>serverStream may continue pumping though. + //nolint: errcheck + backendConnections[0].clientStream.CloseSend() + break + } else { + // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need + // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and + // exit with an error to the stack + return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) + } + case c2sErr := <-c2sErrChan: + // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two + // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers + // will be nil. + serverStream.SetTrailer(backendConnections[0].clientStream.Trailer()) + // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error. + if c2sErr != io.EOF { + return c2sErr + } + return nil + } + } + return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") +} + +func (s *handler) forwardClientToServer(src *backendConnection, dst grpc.ServerStream) chan error { + ret := make(chan error, 1) + go func() { + f := &frame{} + for i := 0; ; i++ { + if err := src.clientStream.RecvMsg(f); err != nil { + ret <- err // this can be io.EOF which is happy case + break + } + + var err error + f.payload, err = src.backend.AppendInfo(f.payload) + if err != nil { + ret <- err + break + } + + if i == 0 { + // This is a bit of a hack, but client to server headers are only readable after first client msg is + // received but must be written to server stream before the first msg is flushed. + // This is the only place to do it nicely. + md, err := src.clientStream.Header() + if err != nil { + ret <- err + break + } + if err := dst.SendHeader(md); err != nil { + ret <- err + break + } + } + if err := dst.SendMsg(f); err != nil { + ret <- err + break + } + } + }() + return ret +} + +func (s *handler) forwardServerToClient(src grpc.ServerStream, dst *backendConnection) chan error { + ret := make(chan error, 1) + go func() { + f := &frame{} + for i := 0; ; i++ { + if err := src.RecvMsg(f); err != nil { + ret <- err // this can be io.EOF which is happy case + break + } + if err := dst.clientStream.SendMsg(f); err != nil { + ret <- err + break + } + } + }() + return ret +} diff --git a/proxy/handler_test.go b/proxy/handler_one2one_test.go similarity index 90% rename from proxy/handler_test.go rename to proxy/handler_one2one_test.go index b34cac4..bc8ca20 100644 --- a/proxy/handler_test.go +++ b/proxy/handler_one2one_test.go @@ -92,8 +92,8 @@ func (s *assertingService) PingStream(stream pb.TestService_PingStreamServer) er return nil } -// ProxyHappySuite tests the "happy" path of handling: that everything works in absence of connection issues. -type ProxyHappySuite struct { +// ProxyOne2OneSuite tests the "happy" path of handling: that everything works in absence of connection issues. +type ProxyOne2OneSuite struct { suite.Suite serverListener net.Listener @@ -109,28 +109,28 @@ type ProxyHappySuite struct { ctxCancel context.CancelFunc } -func (s *ProxyHappySuite) SetupTest() { +func (s *ProxyOne2OneSuite) SetupTest() { s.ctx, s.ctxCancel = context.WithTimeout(context.TODO(), 120*time.Second) } -func (s *ProxyHappySuite) TearDownTest() { +func (s *ProxyOne2OneSuite) TearDownTest() { s.ctxCancel() } -func (s *ProxyHappySuite) TestPingEmptyCarriesClientMetadata() { +func (s *ProxyOne2OneSuite) TestPingEmptyCarriesClientMetadata() { ctx := metadata.NewOutgoingContext(s.ctx, metadata.Pairs(clientMdKey, "true")) out, err := s.testClient.PingEmpty(ctx, &pb.Empty{}) require.NoError(s.T(), err, "PingEmpty should succeed without errors") require.Equal(s.T(), &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, out) } -func (s *ProxyHappySuite) TestPingEmpty_StressTest() { +func (s *ProxyOne2OneSuite) TestPingEmpty_StressTest() { for i := 0; i < 50; i++ { s.TestPingEmptyCarriesClientMetadata() } } -func (s *ProxyHappySuite) TestPingCarriesServerHeadersAndTrailers() { +func (s *ProxyOne2OneSuite) TestPingCarriesServerHeadersAndTrailers() { headerMd := make(metadata.MD) trailerMd := make(metadata.MD) // This is an awkward calling convention... but meh. @@ -141,14 +141,14 @@ func (s *ProxyHappySuite) TestPingCarriesServerHeadersAndTrailers() { assert.Len(s.T(), trailerMd, 1, "server response trailers must contain server data") } -func (s *ProxyHappySuite) TestPingErrorPropagatesAppError() { +func (s *ProxyOne2OneSuite) TestPingErrorPropagatesAppError() { _, err := s.testClient.PingError(s.ctx, &pb.PingRequest{Value: "foo"}) require.Error(s.T(), err, "PingError should never succeed") assert.Equal(s.T(), codes.FailedPrecondition, status.Code(err)) assert.Equal(s.T(), "Userspace error.", status.Convert(err).Message()) } -func (s *ProxyHappySuite) TestDirectorErrorIsPropagated() { +func (s *ProxyOne2OneSuite) TestDirectorErrorIsPropagated() { // See SetupSuite where the StreamDirector has a special case. ctx := metadata.NewOutgoingContext(s.ctx, metadata.Pairs(rejectingMdKey, "true")) _, err := s.testClient.Ping(ctx, &pb.PingRequest{Value: "foo"}) @@ -157,7 +157,7 @@ func (s *ProxyHappySuite) TestDirectorErrorIsPropagated() { assert.Equal(s.T(), "testing rejection", status.Convert(err).Message()) } -func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() { +func (s *ProxyOne2OneSuite) TestPingStream_FullDuplexWorks() { stream, err := s.testClient.PingStream(s.ctx) require.NoError(s.T(), err, "PingStream request should be successful.") @@ -184,13 +184,13 @@ func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() { assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata") } -func (s *ProxyHappySuite) TestPingStream_StressTest() { +func (s *ProxyOne2OneSuite) TestPingStream_StressTest() { for i := 0; i < 50; i++ { s.TestPingStream_FullDuplexWorks() } } -func (s *ProxyHappySuite) SetupSuite() { +func (s *ProxyOne2OneSuite) SetupSuite() { var err error s.proxyListener, err = net.Listen("tcp", "127.0.0.1:0") @@ -225,13 +225,13 @@ func (s *ProxyHappySuite) SetupSuite() { } s.proxy = grpc.NewServer( grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director, proxy.WithMode(proxy.One2One))), ) // Ping handler is handled as an explicit registration and not as a TransparentHandler. proxy.RegisterService(s.proxy, director, "talos.testproto.TestService", - []string{"Ping"}, - nil) + proxy.WithMethodNames("Ping"), + ) // Start the serving loops. s.T().Logf("starting grpc.Server at: %v", s.serverListener.Addr().String()) @@ -250,7 +250,7 @@ func (s *ProxyHappySuite) SetupSuite() { s.testClient = pb.NewTestServiceClient(clientConn) } -func (s *ProxyHappySuite) TearDownSuite() { +func (s *ProxyOne2OneSuite) TearDownSuite() { if s.client != nil { s.client.Close() } @@ -269,6 +269,6 @@ func (s *ProxyHappySuite) TearDownSuite() { } } -func TestProxyHappySuite(t *testing.T) { - suite.Run(t, &ProxyHappySuite{}) +func TestProxyOne2OneSuite(t *testing.T) { + suite.Run(t, &ProxyOne2OneSuite{}) } diff --git a/proxy/proxy.go b/proxy/proxy.go new file mode 100644 index 0000000..1f13a98 --- /dev/null +++ b/proxy/proxy.go @@ -0,0 +1,116 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// Copyright 2019 Andrey Smirnov. All Rights Reserved. +// See LICENSE for licensing terms. + +package proxy + +import "google.golang.org/grpc" + +// Mode specifies proxying mode: one2one (transparent) or one2many (aggregation, error wrapping). +type Mode int + +// Mode constants. +const ( + One2One Mode = iota + One2Many +) + +// StreamedDetectorFunc reports is gRPC is doing streaming (only for one2many proxying). +type StreamedDetectorFunc func(fullMethodName string) bool + +// Option configures gRPC proxy +type Option func(*handlerOptions) + +// WithMethodNames configures list of method names to proxy for non-transparent handler. +func WithMethodNames(methodNames ...string) Option { + return func(o *handlerOptions) { + o.methodNames = append([]string(nil), methodNames...) + } +} + +// WithStreamedMethodNames configures list of streamed method names. +// +// This is only important for one2many proxying. +// This option can't be used with TransparentHandler. +func WithStreamedMethodNames(streamedMethodNames ...string) Option { + return func(o *handlerOptions) { + o.streamedMethods = map[string]struct{}{} + + for _, methodName := range streamedMethodNames { + o.streamedMethods["/"+o.serviceName+"/"+methodName] = struct{}{} + } + + o.streamedDetector = func(fullMethodName string) bool { + _, exists := o.streamedMethods[fullMethodName] + + return exists + } + } +} + +// WithStreamedDetector configures a function to detect streamed methods. +// +// This is only important for one2many proxying. +func WithStreamedDetector(detector StreamedDetectorFunc) Option { + return func(o *handlerOptions) { + o.streamedDetector = detector + } +} + +// WithMode sets proxying mode: One2One or One2Many. +// +// Default mode is One2One. +func WithMode(mode Mode) Option { + return func(o *handlerOptions) { + o.mode = mode + } +} + +// RegisterService sets up a proxy handler for a particular gRPC service and method. +// The behavior is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file. +// +// This can *only* be used if the `server` also uses grpc.CustomCodec() ServerOption. +func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, options ...Option) { + streamer := &handler{ + director: director, + options: handlerOptions{ + serviceName: serviceName, + }, + } + + for _, o := range options { + o(&streamer.options) + } + + fakeDesc := &grpc.ServiceDesc{ + ServiceName: serviceName, + HandlerType: (*interface{})(nil), + } + for _, m := range streamer.options.methodNames { + streamDesc := grpc.StreamDesc{ + StreamName: m, + Handler: streamer.handler, + ServerStreams: true, + ClientStreams: true, + } + fakeDesc.Streams = append(fakeDesc.Streams, streamDesc) + } + server.RegisterService(fakeDesc, streamer) +} + +// TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server. +// The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the +// backends. It should be used as a `grpc.UnknownServiceHandler`. +// +// This can *only* be used if the `server` also uses grpc.CustomCodec() ServerOption. +func TransparentHandler(director StreamDirector, options ...Option) grpc.StreamHandler { + streamer := &handler{ + director: director, + } + + for _, o := range options { + o(&streamer.options) + } + + return streamer.handler +}