From 50f9f2deaa71da8dbdb38958447b23f5802eba69 Mon Sep 17 00:00:00 2001 From: ajanikow <12255597+ajanikow@users.noreply.github.com> Date: Mon, 26 Aug 2024 12:06:09 +0000 Subject: [PATCH] [Feature] Integration Service Authentication --- CHANGELOG.md | 1 + .../shutdown/v1/{ => definition}/consts.go | 2 +- integrations/shutdown/v1/impl.go | 2 +- pkg/integrations/auth.go | 70 +++++++ pkg/integrations/auth_test.go | 132 +++++++++++++ pkg/integrations/clients/client.go | 83 ++++++++ pkg/integrations/clients/health_v1.go | 73 +++++++ pkg/integrations/clients/register.go | 98 ++++++++++ pkg/integrations/clients/shutdown_v1.go | 60 ++++++ pkg/integrations/integration.go | 14 ++ pkg/integrations/register.go | 178 ++++++++++++++++-- pkg/integrations/shutdown_v1.go | 7 +- pkg/integrations/suite_test.go | 111 +++++++++++ pkg/util/grpc.go | 71 +++++++ pkg/util/registerer.go | 3 - pkg/util/shutdown/shutdown.go | 7 - pkg/util/svc/health.go | 2 + pkg/util/tests/network.go | 67 +++++++ pkg/util/tests/tgrpc/grpc.go | 19 +- 19 files changed, 953 insertions(+), 47 deletions(-) rename integrations/shutdown/v1/{ => definition}/consts.go (97%) create mode 100644 pkg/integrations/auth.go create mode 100644 pkg/integrations/auth_test.go create mode 100644 pkg/integrations/clients/client.go create mode 100644 pkg/integrations/clients/health_v1.go create mode 100644 pkg/integrations/clients/register.go create mode 100644 pkg/integrations/clients/shutdown_v1.go create mode 100644 pkg/integrations/suite_test.go create mode 100644 pkg/util/grpc.go create mode 100644 pkg/util/tests/network.go diff --git a/CHANGELOG.md b/CHANGELOG.md index af0c001b6..81b31c60a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - (Feature) Gateway Group for ArangoDeployment - (Feature) Gateway config loader - (Feature) ConfigV1 Integration Service +- (Feature) Integration Service Authentication ## [1.2.42](https://github.com/arangodb/kube-arangodb/tree/1.2.42) (2024-07-23) - (Maintenance) Go 1.22.4 & Kubernetes 1.29.6 libraries diff --git a/integrations/shutdown/v1/consts.go b/integrations/shutdown/v1/definition/consts.go similarity index 97% rename from integrations/shutdown/v1/consts.go rename to integrations/shutdown/v1/definition/consts.go index ac55fa72a..048827cc8 100644 --- a/integrations/shutdown/v1/consts.go +++ b/integrations/shutdown/v1/definition/consts.go @@ -18,7 +18,7 @@ // Copyright holder is ArangoDB GmbH, Cologne, Germany // -package v1 +package definition const ( Name = "shutdown.v1" diff --git a/integrations/shutdown/v1/impl.go b/integrations/shutdown/v1/impl.go index 3e369e84e..65ce4cb46 100644 --- a/integrations/shutdown/v1/impl.go +++ b/integrations/shutdown/v1/impl.go @@ -45,7 +45,7 @@ type impl struct { } func (i *impl) Name() string { - return Name + return pbShutdownV1.Name } func (i *impl) Health() svc.HealthState { diff --git a/pkg/integrations/auth.go b/pkg/integrations/auth.go new file mode 100644 index 000000000..027f1f83b --- /dev/null +++ b/pkg/integrations/auth.go @@ -0,0 +1,70 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package integrations + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "github.com/arangodb/kube-arangodb/pkg/util" +) + +func basicTokenAuthAuthorize(ctx context.Context, token string) error { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return status.Errorf(codes.Unauthenticated, "metadata is not provided") + } + + values := md[util.AuthorizationGRPCHeader] + if len(values) == 0 { + return status.Errorf(codes.Unauthenticated, "authorization token is not provided") + } + + if token != values[0] { + return status.Errorf(codes.Unauthenticated, "invalid token") + } + + return nil +} + +func basicTokenAuthUnaryInterceptor(token string) grpc.ServerOption { + return grpc.UnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + if err := basicTokenAuthAuthorize(ctx, token); err != nil { + return nil, err + } + + return handler(ctx, req) + }) +} + +func basicTokenAuthStreamInterceptor(token string) grpc.ServerOption { + return grpc.StreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if err := basicTokenAuthAuthorize(ss.Context(), token); err != nil { + return err + } + + return handler(srv, ss) + }) +} diff --git a/pkg/integrations/auth_test.go b/pkg/integrations/auth_test.go new file mode 100644 index 000000000..79e122960 --- /dev/null +++ b/pkg/integrations/auth_test.go @@ -0,0 +1,132 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package integrations + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + + "github.com/arangodb/kube-arangodb/pkg/util/shutdown" + "github.com/arangodb/kube-arangodb/pkg/util/tests/tgrpc" +) + +func Test_AuthCases(t *testing.T) { + c, health, internal, external := startService(t, + "--health.auth.type=None", + "--services.external.auth.token=test1", + "--services.external.auth.type=Token", + "--services.auth.token=test2", + "--services.auth.type=Token", + ) + defer c.Require(t) + + t.Run("Without auth", func(t *testing.T) { + t.Run("health", func(t *testing.T) { + require.NoError(t, executeSync(t, shutdown.Context(), + fmt.Sprintf("--address=127.0.0.1:%d", health), + "--token=", + "client", + "health", + "v1")) + }) + t.Run("internal", func(t *testing.T) { + tgrpc.AsGRPCError(t, executeSync(t, shutdown.Context(), + fmt.Sprintf("--address=127.0.0.1:%d", internal), + "--token=", + "client", + "health", + "v1")). + Code(t, codes.Unauthenticated). + Errorf(t, "authorization token is not provided") + }) + t.Run("external", func(t *testing.T) { + tgrpc.AsGRPCError(t, executeSync(t, shutdown.Context(), + fmt.Sprintf("--address=127.0.0.1:%d", external), + "--token=", + "client", + "health", + "v1")). + Code(t, codes.Unauthenticated). + Errorf(t, "authorization token is not provided") + }) + }) + + t.Run("With auth 1", func(t *testing.T) { + t.Run("health", func(t *testing.T) { + require.NoError(t, executeSync(t, shutdown.Context(), + fmt.Sprintf("--address=127.0.0.1:%d", health), + "--token=test1", + "client", + "health", + "v1")) + }) + t.Run("internal", func(t *testing.T) { + tgrpc.AsGRPCError(t, executeSync(t, shutdown.Context(), + fmt.Sprintf("--address=127.0.0.1:%d", internal), + "--token=test1", + "client", + "health", + "v1")). + Code(t, codes.Unauthenticated). + Errorf(t, "invalid token") + }) + t.Run("external", func(t *testing.T) { + require.NoError(t, executeSync(t, shutdown.Context(), + fmt.Sprintf("--address=127.0.0.1:%d", external), + "--token=test1", + "client", + "health", + "v1")) + }) + }) + + t.Run("With auth 2", func(t *testing.T) { + t.Run("health", func(t *testing.T) { + require.NoError(t, executeSync(t, shutdown.Context(), + fmt.Sprintf("--address=127.0.0.1:%d", health), + "--token=test2", + "client", + "health", + "v1")) + }) + t.Run("internal", func(t *testing.T) { + require.NoError(t, executeSync(t, shutdown.Context(), + fmt.Sprintf("--address=127.0.0.1:%d", internal), + "--token=test2", + "client", + "health", + "v1")) + }) + t.Run("external", func(t *testing.T) { + tgrpc.AsGRPCError(t, executeSync(t, shutdown.Context(), + fmt.Sprintf("--address=127.0.0.1:%d", external), + "--token=test2", + "client", + "health", + "v1")). + Code(t, codes.Unauthenticated). + Errorf(t, "invalid token") + }) + }) +} diff --git a/pkg/integrations/clients/client.go b/pkg/integrations/clients/client.go new file mode 100644 index 000000000..0c28a502a --- /dev/null +++ b/pkg/integrations/clients/client.go @@ -0,0 +1,83 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package clients + +import ( + "context" + "io" + + "github.com/spf13/cobra" + "google.golang.org/grpc" + + "github.com/arangodb/kube-arangodb/pkg/util" + "github.com/arangodb/kube-arangodb/pkg/util/shutdown" +) + +type commandRun[T any] interface { + Register(name, desc string, in func(ctx context.Context, client T) error) commandRun[T] +} + +type commandRunImpl[T any] struct { + cmd *cobra.Command + cfg *Config + in func(cc grpc.ClientConnInterface) T +} + +func (c commandRunImpl[T]) Register(name, desc string, in func(ctx context.Context, client T) error) commandRun[T] { + c.cmd.AddCommand(&cobra.Command{ + Use: name, + Short: desc, + RunE: func(cmd *cobra.Command, args []string) error { + client, closer, err := client(shutdown.Context(), c.cfg, c.in) + if err != nil { + return err + } + + defer closer.Close() + + return in(shutdown.Context(), client) + }, + }) + return c +} + +func withCommandRun[T any](cmd *cobra.Command, cfg *Config, in func(cc grpc.ClientConnInterface) T) commandRun[T] { + return &commandRunImpl[T]{ + cmd: cmd, + cfg: cfg, + in: in, + } +} + +func client[T any](ctx context.Context, cfg *Config, in func(cc grpc.ClientConnInterface) T) (T, io.Closer, error) { + var opts []grpc.DialOption + + if token := cfg.Token; token != "" { + opts = append(opts, util.TokenAuthInterceptors(token)...) + } + + client, closer, err := util.NewGRPCClient(ctx, in, cfg.Address, opts...) + if err != nil { + return util.Default[T](), nil, err + } + + return client, closer, nil +} diff --git a/pkg/integrations/clients/health_v1.go b/pkg/integrations/clients/health_v1.go new file mode 100644 index 000000000..bd3030fb7 --- /dev/null +++ b/pkg/integrations/clients/health_v1.go @@ -0,0 +1,73 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package clients + +import ( + "github.com/spf13/cobra" + pbHealth "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/shutdown" +) + +func init() { + registerer.MustRegister("health/v1", func(cfg *Config) Client { + return &healthV1{ + cfg: cfg, + } + }) +} + +type healthV1 struct { + cfg *Config +} + +func (s *healthV1) Name() string { + return "health" +} + +func (s *healthV1) Version() string { + return "v1" +} + +func (s *healthV1) Register(cmd *cobra.Command) error { + cmd.RunE = func(cmd *cobra.Command, args []string) error { + client, c, err := client(shutdown.Context(), s.cfg, pbHealth.NewHealthClient) + if err != nil { + return err + } + defer c.Close() + + res, err := client.Check(shutdown.Context(), &pbHealth.HealthCheckRequest{}) + if err != nil { + return err + } + + switch s := res.GetStatus(); s { + case pbHealth.HealthCheckResponse_SERVING: + println("OK") + return nil + default: + return errors.Errorf("Not healthy: %s", s.String()) + } + } + return nil +} diff --git a/pkg/integrations/clients/register.go b/pkg/integrations/clients/register.go new file mode 100644 index 000000000..95863a790 --- /dev/null +++ b/pkg/integrations/clients/register.go @@ -0,0 +1,98 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package clients + +import ( + "github.com/spf13/cobra" + + "github.com/arangodb/kube-arangodb/pkg/util" +) + +var registerer = util.NewRegisterer[string, Factory]() + +type Factory func(c *Config) Client + +type Config struct { + Address string + Token string +} + +func (c *Config) Register(cmd *cobra.Command) error { + f := cmd.PersistentFlags() + + f.StringVar(&c.Address, "address", "127.0.0.1:8080", "GRPC Service Address") + f.StringVar(&c.Token, "token", "", "GRPC Token") + + return nil +} + +type Client interface { + Name() string + Version() string + + Register(cmd *cobra.Command) error +} + +func Register(cmd *cobra.Command) error { + client := &cobra.Command{Use: "client"} + cmd.AddCommand(client) + + var cfg config + + return cfg.Register(client) +} + +type config struct { + cfg Config +} + +func (c *config) Register(cmd *cobra.Command) error { + if err := c.cfg.Register(cmd); err != nil { + return err + } + + cmds := map[string]*cobra.Command{} + + for _, command := range registerer.Items() { + r := command.V(&c.cfg) + + v, ok := cmds[r.Name()] + if !ok { + v = &cobra.Command{ + Use: r.Name(), + } + cmd.AddCommand(v) + cmds[r.Name()] = v + } + + p := &cobra.Command{ + Use: r.Version(), + } + + if err := r.Register(p); err != nil { + return err + } + + v.AddCommand(p) + } + + return nil +} diff --git a/pkg/integrations/clients/shutdown_v1.go b/pkg/integrations/clients/shutdown_v1.go new file mode 100644 index 000000000..590d2f148 --- /dev/null +++ b/pkg/integrations/clients/shutdown_v1.go @@ -0,0 +1,60 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package clients + +import ( + "context" + + "github.com/spf13/cobra" + + pbSharedV1 "github.com/arangodb/kube-arangodb/integrations/shared/v1/definition" + pbShutdownV1 "github.com/arangodb/kube-arangodb/integrations/shutdown/v1/definition" +) + +func init() { + registerer.MustRegister("shutdown/v1", func(cfg *Config) Client { + return &shutdownV1{ + cfg: cfg, + } + }) +} + +type shutdownV1 struct { + cfg *Config +} + +func (s *shutdownV1) Name() string { + return "shutdown" +} + +func (s *shutdownV1) Version() string { + return "v1" +} + +func (s *shutdownV1) Register(cmd *cobra.Command) error { + withCommandRun(cmd, s.cfg, pbShutdownV1.NewShutdownV1Client). + Register("shutdown", "Runs the Shutdown GRPC Call", func(ctx context.Context, client pbShutdownV1.ShutdownV1Client) error { + _, err := client.Shutdown(ctx, &pbSharedV1.Empty{}) + + return err + }) + return nil +} diff --git a/pkg/integrations/integration.go b/pkg/integrations/integration.go index 5ce9350dc..0b6543177 100644 --- a/pkg/integrations/integration.go +++ b/pkg/integrations/integration.go @@ -40,3 +40,17 @@ type Integration interface { Handler(ctx context.Context) (svc.Handler, error) } + +type IntegrationEnablement interface { + Integration + + EnabledTypes() (internal, external bool) +} + +func GetIntegrationEnablement(in Integration) (internal, external bool) { + if v, ok := in.(IntegrationEnablement); ok { + return v.EnabledTypes() + } + + return true, false +} diff --git a/pkg/integrations/register.go b/pkg/integrations/register.go index 367eff288..6827ebcf9 100644 --- a/pkg/integrations/register.go +++ b/pkg/integrations/register.go @@ -21,11 +21,17 @@ package integrations import ( + "context" "fmt" "sort" + "strings" + "sync" "github.com/spf13/cobra" + "google.golang.org/grpc" + pbImplShutdownV1 "github.com/arangodb/kube-arangodb/integrations/shutdown/v1" + "github.com/arangodb/kube-arangodb/pkg/integrations/clients" "github.com/arangodb/kube-arangodb/pkg/util" "github.com/arangodb/kube-arangodb/pkg/util/errors" "github.com/arangodb/kube-arangodb/pkg/util/shutdown" @@ -40,18 +46,60 @@ func Register(cmd *cobra.Command) error { return c.Register(cmd) } +type configurationTest struct { + ctx context.Context + cancel context.CancelFunc +} + type configuration struct { + // Only for testing + test *configurationTest + registered []Integration health struct { + serviceConfiguration shutdownEnabled bool - - config svc.Configuration } services struct { - config svc.Configuration + internal, external serviceConfiguration + } +} + +type serviceConfiguration struct { + enabled bool + + address string + + auth struct { + t string + + token string + } +} + +func (s *serviceConfiguration) Config() (svc.Configuration, error) { + var opts []grpc.ServerOption + + switch strings.ToLower(s.auth.t) { + case "none": + break + case "token": + if s.auth.token == "" { + return util.Default[svc.Configuration](), errors.Errorf("Token is empty") + } + + opts = append(opts, + basicTokenAuthUnaryInterceptor(s.auth.token), + basicTokenAuthStreamInterceptor(s.auth.token), + ) } + + return svc.Configuration{ + Options: opts, + Address: s.address, + }, nil } func (c *configuration) Register(cmd *cobra.Command) error { @@ -67,14 +115,28 @@ func (c *configuration) Register(cmd *cobra.Command) error { f := cmd.Flags() - f.StringVar(&c.health.config.Address, "health.address", "0.0.0.0:9091", "Address to expose health service") + f.StringVar(&c.health.address, "health.address", "0.0.0.0:9091", "Address to expose health service") f.BoolVar(&c.health.shutdownEnabled, "health.shutdown.enabled", true, "Determines if shutdown service should be enabled and exposed") - f.StringVar(&c.services.config.Address, "services.address", "127.0.0.1:9092", "Address to expose services") + f.StringVar(&c.health.auth.t, "health.auth.type", "None", "Auth type for health service") + f.StringVar(&c.health.auth.token, "health.auth.token", "", "Token for health service (when auth service is token)") + + f.BoolVar(&c.services.internal.enabled, "services.enabled", true, "Defines if internal access is enabled") + f.StringVar(&c.services.internal.address, "services.address", "127.0.0.1:9092", "Address to expose internal services") + f.StringVar(&c.services.internal.auth.t, "services.auth.type", "None", "Auth type for internal service") + f.StringVar(&c.services.internal.auth.token, "services.auth.token", "", "Token for internal service (when auth service is token)") + + f.BoolVar(&c.services.external.enabled, "services.external.enabled", false, "Defines if external access is enabled") + f.StringVar(&c.services.external.address, "services.external.address", "0.0.0.0:9093", "Address to expose external services") + f.StringVar(&c.services.external.auth.t, "services.external.auth.type", "None", "Auth type for external service") + f.StringVar(&c.services.external.auth.token, "services.external.auth.token", "", "Token for external service (when auth service is token)") for _, service := range c.registered { prefix := fmt.Sprintf("integration.%s", service.Name()) f.Bool(prefix, false, service.Description()) + internal, external := GetIntegrationEnablement(service) + f.Bool(fmt.Sprintf("%s.internal", prefix), internal, fmt.Sprintf("Defones if Internal access to service %s is enabled", service.Name())) + f.Bool(fmt.Sprintf("%s.external", prefix), external, fmt.Sprintf("Defones if External access to service %s is enabled", service.Name())) if err := service.Register(cmd, func(name string) string { return fmt.Sprintf("%s.%s", prefix, name) @@ -83,22 +145,65 @@ func (c *configuration) Register(cmd *cobra.Command) error { } } - return nil + return clients.Register(cmd) } func (c *configuration) run(cmd *cobra.Command, args []string) error { - handlers := make([]svc.Handler, 0, len(c.registered)) + if t := c.test; t == nil { + return c.runWithContext(shutdown.Context(), shutdown.Stop, cmd) + } else { + return c.runWithContext(t.ctx, t.cancel, cmd) + } +} + +func (c *configuration) runWithContext(ctx context.Context, cancel context.CancelFunc, cmd *cobra.Command) error { + healthConfig, err := c.health.Config() + if err != nil { + return errors.Wrapf(err, "Unable to parse health config") + } + internalConfig, err := c.services.internal.Config() + if err != nil { + return errors.Wrapf(err, "Unable to parse internal config") + } + externalConfig, err := c.services.external.Config() + if err != nil { + return errors.Wrapf(err, "Unable to parse external config") + } + + var internalHandlers, externalHandlers []svc.Handler for _, handler := range c.registered { if ok, err := cmd.Flags().GetBool(fmt.Sprintf("integration.%s", handler.Name())); err != nil { return err } else { - logger.Str("service", handler.Name()).Bool("enabled", ok).Info("Service discovered") - if ok { - if svc, err := handler.Handler(shutdown.Context()); err != nil { + internalEnabled, err := cmd.Flags().GetBool(fmt.Sprintf("integration.%s.internal", handler.Name())) + if err != nil { + return err + } + + externalEnabled, err := cmd.Flags().GetBool(fmt.Sprintf("integration.%s.external", handler.Name())) + if err != nil { + return err + } + + logger. + Str("service", handler.Name()). + Bool("enabled", ok). + Bool("internal", internalEnabled). + Bool("external", externalEnabled). + Info("Service discovered") + + if ok && (internalEnabled || externalEnabled) { + if svc, err := handler.Handler(ctx); err != nil { return err } else { - handlers = append(handlers, svc) + if internalEnabled { + internalHandlers = append(internalHandlers, svc) + } + + if externalEnabled { + externalHandlers = append(externalHandlers, svc) + } } } } @@ -107,18 +212,57 @@ func (c *configuration) run(cmd *cobra.Command, args []string) error { var healthServices []svc.Handler if c.health.shutdownEnabled { - healthServices = append(healthServices, shutdown.NewGlobalShutdownServer()) + healthServices = append(healthServices, pbImplShutdownV1.New(cancel)) } - health := svc.NewHealthService(c.health.config, svc.Readiness, healthServices...) + health := svc.NewHealthService(healthConfig, svc.Readiness, healthServices...) + + internalHandlers = append(internalHandlers, health) + externalHandlers = append(externalHandlers, health) - healthHandler := health.Start(shutdown.Context()) + healthHandler := health.Start(ctx) logger.Str("address", healthHandler.Address()).Info("Health handler started") - s := svc.NewService(c.services.config, handlers...).StartWithHealth(shutdown.Context(), health) + var wg sync.WaitGroup + + var internal, external error + + if c.services.internal.enabled { + wg.Add(1) + + go func() { + defer wg.Done() + s := svc.NewService(internalConfig, internalHandlers...).StartWithHealth(ctx, health) + + logger.Str("address", s.Address()).Str("type", "internal").Info("Service handler started") + + internal = s.Wait() + + if internal != nil { + logger.Err(internal).Str("address", s.Address()).Str("type", "internal").Error("Service handler failed") + } + }() + } + + if c.services.external.enabled { + wg.Add(1) + + go func() { + defer wg.Done() + s := svc.NewService(externalConfig, externalHandlers...).StartWithHealth(ctx, health) + + logger.Str("address", s.Address()).Str("type", "external").Info("Service handler started") + + external = s.Wait() + + if external != nil { + logger.Err(external).Str("address", s.Address()).Str("type", "external").Error("Service handler failed") + } + }() + } - logger.Str("address", s.Address()).Info("Service handler started") + wg.Wait() - return s.Wait() + return errors.Errors(internal, external) } diff --git a/pkg/integrations/shutdown_v1.go b/pkg/integrations/shutdown_v1.go index 926a66fc2..504f1931f 100644 --- a/pkg/integrations/shutdown_v1.go +++ b/pkg/integrations/shutdown_v1.go @@ -26,12 +26,13 @@ import ( "github.com/spf13/cobra" pbImplShutdownV1 "github.com/arangodb/kube-arangodb/integrations/shutdown/v1" + pbShutdownV1 "github.com/arangodb/kube-arangodb/integrations/shutdown/v1/definition" "github.com/arangodb/kube-arangodb/pkg/util/shutdown" "github.com/arangodb/kube-arangodb/pkg/util/svc" ) func init() { - registerer.Register(pbImplShutdownV1.Name, func() Integration { + registerer.Register(pbShutdownV1.Name, func() Integration { return &shutdownV1{} }) } @@ -40,11 +41,11 @@ type shutdownV1 struct { } func (s *shutdownV1) Handler(ctx context.Context) (svc.Handler, error) { - return shutdown.NewGlobalShutdownServer(), nil + return pbImplShutdownV1.New(shutdown.Stop), nil } func (s *shutdownV1) Name() string { - return pbImplShutdownV1.Name + return pbShutdownV1.Name } func (s *shutdownV1) Description() string { diff --git a/pkg/integrations/suite_test.go b/pkg/integrations/suite_test.go new file mode 100644 index 000000000..e3c903621 --- /dev/null +++ b/pkg/integrations/suite_test.go @@ -0,0 +1,111 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package integrations + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/spf13/cobra" + "github.com/stretchr/testify/require" + + "github.com/arangodb/kube-arangodb/pkg/util/shutdown" + "github.com/arangodb/kube-arangodb/pkg/util/tests" +) + +type waitFunc func() error + +func (w waitFunc) Require(t *testing.T) { + require.NoError(t, w()) +} + +func executeSync(t *testing.T, ctx context.Context, args ...string) error { + var c configuration + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + go func() { + defer cancel() + <-shutdown.Channel() + }() + + c.test = &configurationTest{ + ctx: ctx, + cancel: cancel, + } + + cmd := &cobra.Command{} + + tCmd := &cobra.Command{ + Use: "test", + } + + require.NoError(t, c.Register(tCmd)) + + cmd.AddCommand(tCmd) + + cmd.SetOut(os.Stdout) + + cmd.SetArgs(append([]string{"test"}, args...)) + + return cmd.Execute() +} + +func executeAsync(t *testing.T, ctx context.Context, args ...string) waitFunc { + ctx, cancel := context.WithCancel(ctx) + + var err error + done := make(chan struct{}) + + go func() { + defer close(done) + + err = executeSync(t, ctx, args...) + }() + + return func() error { + cancel() + <-done + return err + } +} + +func startService(t *testing.T, args ...string) (waitFunc, int, int, int) { + _, health := tests.ResolveAddress(t, "127.0.0.1:0") + _, internal := tests.ResolveAddress(t, "127.0.0.1:0") + _, external := tests.ResolveAddress(t, "127.0.0.1:0") + + cancel := executeAsync(t, shutdown.Context(), append([]string{ + fmt.Sprintf("--health.address=127.0.0.1:%d", health), + fmt.Sprintf("--services.address=127.0.0.1:%d", internal), + fmt.Sprintf("--services.external.address=127.0.0.1:%d", external), + "--services.external.enabled", + }, args...)...) + + tests.WaitForAddress(t, "127.0.0.1", health) + tests.WaitForAddress(t, "127.0.0.1", internal) + tests.WaitForAddress(t, "127.0.0.1", external) + + return cancel, health, internal, external +} diff --git a/pkg/util/grpc.go b/pkg/util/grpc.go new file mode 100644 index 000000000..1bad6ea1f --- /dev/null +++ b/pkg/util/grpc.go @@ -0,0 +1,71 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package util + +import ( + "context" + "io" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" +) + +const AuthorizationGRPCHeader = "adb-authorization" + +func NewGRPCClient[T any](ctx context.Context, in func(cc grpc.ClientConnInterface) T, addr string, opts ...grpc.DialOption) (T, io.Closer, error) { + con, err := NewGRPCConn(ctx, addr, opts...) + if err != nil { + return Default[T](), nil, err + } + + return in(con), con, nil +} + +func NewGRPCConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + var z []grpc.DialOption + + z = append(z, grpc.WithTransportCredentials(insecure.NewCredentials())) + + z = append(z, opts...) + + conn, err := grpc.DialContext(ctx, addr, z...) + if err != nil { + return nil, err + } + + return conn, nil +} + +func TokenAuthInterceptors(token string) []grpc.DialOption { + return []grpc.DialOption{ + grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + return invoker(attachTokenAuthToInterceptors(ctx, token), method, req, reply, cc, opts...) + }), + grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + return streamer(attachTokenAuthToInterceptors(ctx, token), desc, cc, method, opts...) + }), + } +} + +func attachTokenAuthToInterceptors(ctx context.Context, token string) context.Context { + return metadata.AppendToOutgoingContext(ctx, AuthorizationGRPCHeader, token) +} diff --git a/pkg/util/registerer.go b/pkg/util/registerer.go index ad9f6d01f..9eb804666 100644 --- a/pkg/util/registerer.go +++ b/pkg/util/registerer.go @@ -55,9 +55,6 @@ func (r *registerer[K, V]) Register(key K, value V) bool { } func (r *registerer[K, V]) MustRegister(key K, value V) { - r.lock.Lock() - defer r.lock.Unlock() - if !r.Register(key, value) { panic("Unable to register item") } diff --git a/pkg/util/shutdown/shutdown.go b/pkg/util/shutdown/shutdown.go index d5b9d2ff2..855f02bc3 100644 --- a/pkg/util/shutdown/shutdown.go +++ b/pkg/util/shutdown/shutdown.go @@ -25,9 +25,6 @@ import ( "os" "os/signal" "syscall" - - pbImplShutdownV1 "github.com/arangodb/kube-arangodb/integrations/shutdown/v1" - "github.com/arangodb/kube-arangodb/pkg/util/svc" ) func init() { @@ -43,10 +40,6 @@ func init() { }() } -func NewGlobalShutdownServer() svc.Handler { - return pbImplShutdownV1.New(stop) -} - var ( ctx context.Context stop context.CancelFunc diff --git a/pkg/util/svc/health.go b/pkg/util/svc/health.go index e135b9b54..50c7c175c 100644 --- a/pkg/util/svc/health.go +++ b/pkg/util/svc/health.go @@ -47,6 +47,8 @@ type Health interface { } type HealthService interface { + Handler + Service Health diff --git a/pkg/util/tests/network.go b/pkg/util/tests/network.go new file mode 100644 index 000000000..cfbf2b10e --- /dev/null +++ b/pkg/util/tests/network.go @@ -0,0 +1,67 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package tests + +import ( + "fmt" + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func ResolveAddress(t *testing.T, addr string) (string, int) { + ln, err := net.Listen("tcp", addr) + require.NoError(t, err) + + pr, ok := ln.Addr().(*net.TCPAddr) + require.True(t, ok) + addr = pr.IP.String() + port := pr.Port + + require.NoError(t, ln.Close()) + return addr, port +} + +func WaitForAddress(t *testing.T, addr string, port int) { + tickerT := time.NewTicker(125 * time.Millisecond) + defer tickerT.Stop() + + timerT := time.NewTimer(1 * time.Second) + defer timerT.Stop() + + for { + select { + case <-timerT.C: + require.Fail(t, "Timeouted") + case <-tickerT.C: + conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", addr, port), 125*time.Millisecond) + if err != nil { + continue + } + + require.NoError(t, conn.Close()) + + return + } + } +} diff --git a/pkg/util/tests/tgrpc/grpc.go b/pkg/util/tests/tgrpc/grpc.go index dacb456a5..7145aa842 100644 --- a/pkg/util/tests/tgrpc/grpc.go +++ b/pkg/util/tests/tgrpc/grpc.go @@ -28,33 +28,22 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" + "github.com/arangodb/kube-arangodb/pkg/util" "github.com/arangodb/kube-arangodb/pkg/util/svc" ) func NewGRPCClient[T any](t *testing.T, ctx context.Context, in func(cc grpc.ClientConnInterface) T, addr string, opts ...grpc.DialOption) T { - return in(NewGRPCConn(t, ctx, addr, opts...)) -} - -func NewGRPCConn(t *testing.T, ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn { - var z []grpc.DialOption - - z = append(z, grpc.WithTransportCredentials(insecure.NewCredentials())) - - z = append(z, opts...) - - conn, err := grpc.DialContext(ctx, addr, z...) + client, closer, err := util.NewGRPCClient(ctx, in, addr, opts...) require.NoError(t, err) - go func() { <-ctx.Done() - require.NoError(t, conn.Close()) + require.NoError(t, closer.Close()) }() - return conn + return client } type ErrorStatusValidator interface {