From 6c9f7b399173dd5769dbc4e8e366e78f05cead85 Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Wed, 27 Nov 2019 20:17:02 +0300 Subject: [PATCH] fix: allow mode to be set for each request being proxied This is critical for us to allow single proxy instance to proxy across the nodes and down to filesocket listeners. Signed-off-by: Andrey Smirnov --- proxy/director.go | 2 +- proxy/examples_test.go | 11 +++++------ proxy/handler.go | 5 ++--- proxy/handler_one2many_test.go | 12 ++++++------ proxy/handler_one2one_test.go | 8 ++++---- proxy/proxy.go | 9 --------- 6 files changed, 18 insertions(+), 29 deletions(-) diff --git a/proxy/director.go b/proxy/director.go index 47b808f..655e30f 100644 --- a/proxy/director.go +++ b/proxy/director.go @@ -99,4 +99,4 @@ func (sb *SingleBackend) BuildError(err error) ([]byte, error) { // are invoked. So decisions around authorization, monitoring etc. are better to be handled there. // // See the rather rich example. -type StreamDirector func(ctx context.Context, fullMethodName string) ([]Backend, error) +type StreamDirector func(ctx context.Context, fullMethodName string) (Mode, []Backend, error) diff --git a/proxy/examples_test.go b/proxy/examples_test.go index 9803262..3d53bfd 100644 --- a/proxy/examples_test.go +++ b/proxy/examples_test.go @@ -25,7 +25,6 @@ func ExampleRegisterService() { // Register a TestService with 4 of its methods explicitly. proxy.RegisterService(server, director, "talos.testproto.TestService", - proxy.WithMode(proxy.One2Many), proxy.WithMethodNames("PingEmpty", "Ping", "PingError", "PingList"), proxy.WithStreamedMethodNames("PingList"), ) @@ -55,21 +54,21 @@ func ExampleStreamDirector() { } } - director = func(ctx context.Context, fullMethodName string) ([]proxy.Backend, error) { + director = func(ctx context.Context, fullMethodName string) (proxy.Mode, []proxy.Backend, error) { // Make sure we never forward internal services. if strings.HasPrefix(fullMethodName, "/com.example.internal.") { - return nil, status.Errorf(codes.Unimplemented, "Unknown method") + return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method") } md, ok := metadata.FromIncomingContext(ctx) if ok { // Decide on which backend to dial if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { - return []proxy.Backend{simpleBackendGen("api-service.staging.svc.local")}, nil + return proxy.One2One, []proxy.Backend{simpleBackendGen("api-service.staging.svc.local")}, nil } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { - return []proxy.Backend{simpleBackendGen("api-service.prod.svc.local")}, nil + return proxy.One2One, []proxy.Backend{simpleBackendGen("api-service.prod.svc.local")}, nil } } - return nil, status.Errorf(codes.Unimplemented, "Unknown method") + return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method") } } diff --git a/proxy/handler.go b/proxy/handler.go index 9fff9a9..c5d7dee 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -20,7 +20,6 @@ var ( ) type handlerOptions struct { - mode Mode serviceName string methodNames []string streamedMethods map[string]struct{} @@ -51,7 +50,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context") } - backends, err := s.director(serverStream.Context(), fullMethodName) + mode, backends, err := s.director(serverStream.Context(), fullMethodName) if err != nil { return err } @@ -80,7 +79,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error } } - switch s.options.mode { + switch mode { case One2One: if len(backendConnections) != 1 { return status.Errorf(codes.Internal, "one2one proxying can't should have exactly one connection (got %d)", len(backendConnections)) diff --git a/proxy/handler_one2many_test.go b/proxy/handler_one2many_test.go index 1f0f043..a54fbce 100644 --- a/proxy/handler_one2many_test.go +++ b/proxy/handler_one2many_test.go @@ -250,6 +250,7 @@ func (s *ProxyOne2ManySuite) TestPingEmptyTargets() { {"1", "2"}, {"3", "2", "1"}, {"0", "4"}, + {"3"}, } { md := metadata.Pairs(clientMdKey, "true") md.Set("targets", targets...) @@ -550,20 +551,20 @@ func (s *ProxyOne2ManySuite) SetupSuite() { } // Setup of the proxy's Director. - director := func(ctx context.Context, fullName string) ([]proxy.Backend, error) { + director := func(ctx context.Context, fullName string) (proxy.Mode, []proxy.Backend, error) { var targets []int md, ok := metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { - return nil, status.Errorf(codes.PermissionDenied, "testing rejection") + return proxy.One2Many, nil, status.Errorf(codes.PermissionDenied, "testing rejection") } if mdTargets, exists := md["targets"]; exists { for _, strTarget := range mdTargets { t, err := strconv.Atoi(strTarget) if err != nil { - return nil, err + return proxy.One2Many, nil, err } targets = append(targets, t) @@ -587,17 +588,16 @@ func (s *ProxyOne2ManySuite) SetupSuite() { } } - return result, nil + return proxy.One2Many, result, nil } s.proxy = grpc.NewServer( grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director, proxy.WithMode(proxy.One2Many))), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), ) // Ping handler is handled as an explicit registration and not as a TransparentHandler. proxy.RegisterService(s.proxy, director, "talos.testproto.MultiService", - proxy.WithMode(proxy.One2Many), proxy.WithMethodNames("Ping", "PingStream", "PingStreamError"), proxy.WithStreamedMethodNames("PingStream", "PingStreamError"), ) diff --git a/proxy/handler_one2one_test.go b/proxy/handler_one2one_test.go index bc8ca20..e2e05fd 100644 --- a/proxy/handler_one2one_test.go +++ b/proxy/handler_one2one_test.go @@ -204,15 +204,15 @@ func (s *ProxyOne2OneSuite) SetupSuite() { // Setup of the proxy's Director. s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec())) // nolint: staticcheck require.NoError(s.T(), err, "must not error on deferred client Dial") - director := func(ctx context.Context, fullName string) ([]proxy.Backend, error) { + director := func(ctx context.Context, fullName string) (proxy.Mode, []proxy.Backend, error) { md, ok := metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { - return nil, status.Errorf(codes.PermissionDenied, "testing rejection") + return proxy.One2One, nil, status.Errorf(codes.PermissionDenied, "testing rejection") } } - return []proxy.Backend{ + return proxy.One2One, []proxy.Backend{ &proxy.SingleBackend{ GetConn: func(ctx context.Context) (context.Context, *grpc.ClientConn, error) { md, _ := metadata.FromIncomingContext(ctx) @@ -225,7 +225,7 @@ func (s *ProxyOne2OneSuite) SetupSuite() { } s.proxy = grpc.NewServer( grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director, proxy.WithMode(proxy.One2One))), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), ) // Ping handler is handled as an explicit registration and not as a TransparentHandler. proxy.RegisterService(s.proxy, director, diff --git a/proxy/proxy.go b/proxy/proxy.go index 1f13a98..acdc612 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -57,15 +57,6 @@ func WithStreamedDetector(detector StreamedDetectorFunc) Option { } } -// WithMode sets proxying mode: One2One or One2Many. -// -// Default mode is One2One. -func WithMode(mode Mode) Option { - return func(o *handlerOptions) { - o.mode = mode - } -} - // RegisterService sets up a proxy handler for a particular gRPC service and method. // The behavior is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file. //