Skip to content

Commit

Permalink
chore: pull out admin service
Browse files Browse the repository at this point in the history
  • Loading branch information
worstell committed Dec 11, 2024
1 parent b05fa0a commit 50b356a
Show file tree
Hide file tree
Showing 42 changed files with 1,083 additions and 210 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ EXPOSE 8893
ENV FTL_ENDPOINT="http://host.docker.internal:8892"
ENV FTL_BIND=http://0.0.0.0:8892
ENV FTL_ADVERTISE=http://127.0.0.1:8892
ENV FTL_DSN="postgres://host.docker.internal/ftl?sslmode=disable&user=postgres&password=secret"

# Controller-specific configurations
ENV FTL_CONTROLLER_CONSOLE_URL="*"
ENV FTL_CONTROLLER_DSN="postgres://host.docker.internal/ftl?sslmode=disable&user=postgres&password=secret"

# Provisioner-specific configurations
ENV FTL_PROVISIONER_PLUGIN_CONFIG_FILE="/root/ftl-provisioner-config.toml"
Expand Down
3 changes: 2 additions & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ DOCKER_IMAGES := '''
"runner": {},
"runner-jvm": {},
"timeline": {},
"lease": {}
"lease": {},
"admin": {}
}
'''

Expand Down
File renamed without changes.
12 changes: 10 additions & 2 deletions backend/controller/admin/client.go → backend/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,20 @@ type Client interface {

// Unset a secret.
SecretUnset(ctx context.Context, req *connect.Request[ftlv1.SecretUnsetRequest]) (*connect.Response[ftlv1.SecretUnsetResponse], error)

// MapConfigsForModule combines all configuration values visible to the module.
// Local values take precedence.
MapConfigsForModule(ctx context.Context, req *connect.Request[ftlv1.MapConfigsForModuleRequest]) (*connect.Response[ftlv1.MapConfigsForModuleResponse], error)

// MapSecretsForModule combines all secrets visible to the module.
// Local values take precedence.
MapSecretsForModule(ctx context.Context, req *connect.Request[ftlv1.MapSecretsForModuleRequest]) (*connect.Response[ftlv1.MapSecretsForModuleResponse], error)
}

// ShouldUseLocalClient returns whether a local admin client should be used based on the admin service client and the endpoint.
//
// If the controller is not present AND endpoint is local, then a local client should be used
// so that the user does not need to spin up a controller just to run the `ftl config/secret` commands.
// If the service is not present AND endpoint is local, then a local client should be used
// so that the user does not need to spin up a cluster just to run the `ftl config/secret` commands.
//
// If true is returned, use NewLocalClient() to create a local client after setting up config and secret managers for the context.
func ShouldUseLocalClient(ctx context.Context, adminClient ftlv1connect.AdminServiceClient, endpoint *url.URL) (bool, error) {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
82 changes: 76 additions & 6 deletions backend/controller/admin/admin.go → backend/admin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,52 @@ package admin

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/url"

"connectrpc.com/connect"
"github.com/alecthomas/kong"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/go-runtime/encoding"
"github.com/TBD54566975/ftl/internal/configuration"
"github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/configuration/providers"
"github.com/TBD54566975/ftl/internal/dsn"
"github.com/TBD54566975/ftl/internal/log"
internalobservability "github.com/TBD54566975/ftl/internal/observability"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
_ "github.com/jackc/pgx/v5/stdlib"
)

type Config struct {
Bind *url.URL `help:"Socket to bind to." default:"http://127.0.0.1:8896" env:"FTL_BIND"`
DSN string `help:"DAL DSN." default:"${dsn}" env:"FTL_DSN"`
MaxOpenDBConnections int `help:"Maximum number of database connections." default:"20" env:"FTL_MAX_OPEN_DB_CONNECTIONS"`
MaxIdleDBConnections int `help:"Maximum number of idle database connections." default:"20" env:"FTL_MAX_IDLE_DB_CONNECTIONS"`
}

func (c *Config) SetDefaults() {
if err := kong.ApplyDefaults(c, kong.Vars{"dsn": dsn.PostgresDSN("ftl")}); err != nil {
panic(err)
}
}

func (c *Config) OpenDBAndInstrument() (*sql.DB, error) {
conn, err := internalobservability.OpenDBAndInstrument(c.DSN)
if err != nil {
return nil, fmt.Errorf("failed to open DB connection: %w", err)
}
conn.SetMaxIdleConns(c.MaxIdleDBConnections)
conn.SetMaxOpenConns(c.MaxOpenDBConnections)
return conn, nil
}

type AdminService struct {
schr SchemaRetriever
cm *manager.Manager[configuration.Configuration]
Expand All @@ -31,6 +61,12 @@ type SchemaRetriever interface {
GetActiveSchema(ctx context.Context) (*schema.Schema, error)
}

func NewSchemaRetreiver(source schemaeventsource.EventSource) SchemaRetriever {
return &streamSchemaRetriever{
source: source,
}
}

type streamSchemaRetriever struct {
source schemaeventsource.EventSource
}
Expand All @@ -50,6 +86,28 @@ func NewAdminService(cm *manager.Manager[configuration.Configuration], sm *manag
}
}

func Start(
ctx context.Context,
config Config,
cm *manager.Manager[configuration.Configuration],
sm *manager.Manager[configuration.Secrets],
schr SchemaRetriever,
) error {
config.SetDefaults()

logger := log.FromContext(ctx).Scope("admin")
svc := NewAdminService(cm, sm, schr)

logger.Debugf("Admin service listening on: %s", config.Bind)
err := rpc.Serve(ctx, config.Bind,
rpc.GRPC(ftlv1connect.NewAdminServiceHandler, svc),
)
if err != nil {
return fmt.Errorf("admin service stopped serving: %w", err)
}
return nil
}

func (s *AdminService) Ping(ctx context.Context, req *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) {
return connect.NewResponse(&ftlv1.PingResponse{}), nil
}
Expand Down Expand Up @@ -219,6 +277,24 @@ func (s *AdminService) SecretUnset(ctx context.Context, req *connect.Request[ftl
return connect.NewResponse(&ftlv1.SecretUnsetResponse{}), nil
}

// MapConfigsForModule combines all configuration values visible to the module.
func (s *AdminService) MapConfigsForModule(ctx context.Context, req *connect.Request[ftlv1.MapConfigsForModuleRequest]) (*connect.Response[ftlv1.MapConfigsForModuleResponse], error) {
values, err := s.cm.MapForModule(ctx, req.Msg.Module)
if err != nil {
return nil, fmt.Errorf("failed to map configs for module: %w", err)
}
return connect.NewResponse(&ftlv1.MapConfigsForModuleResponse{Values: values}), nil
}

// MapSecretsForModule combines all secrets visible to the module.
func (s *AdminService) MapSecretsForModule(ctx context.Context, req *connect.Request[ftlv1.MapSecretsForModuleRequest]) (*connect.Response[ftlv1.MapSecretsForModuleResponse], error) {
values, err := s.sm.MapForModule(ctx, req.Msg.Module)
if err != nil {
return nil, fmt.Errorf("failed to map secrets for module: %w", err)
}
return connect.NewResponse(&ftlv1.MapSecretsForModuleResponse{Values: values}), nil
}

func refFromConfigRef(cr *ftlv1.ConfigRef) configuration.Ref {
return configuration.NewRef(cr.GetModule(), cr.GetName())
}
Expand Down Expand Up @@ -273,9 +349,3 @@ func (s *AdminService) validateAgainstSchema(ctx context.Context, isSecret bool,

return nil
}

func NewSchemaRetreiver(source schemaeventsource.EventSource) SchemaRetriever {
return &streamSchemaRetriever{
source: source,
}
}
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ require (
google.golang.org/protobuf v1.35.2 // indirect
)

replace github.com/TBD54566975/ftl => ./../../../../../..
replace github.com/TBD54566975/ftl => ./../../../../..
File renamed without changes.
2 changes: 1 addition & 1 deletion backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"connectrpc.com/connect"

"github.com/TBD54566975/ftl/backend/controller/admin"
"github.com/TBD54566975/ftl/backend/admin"
consolepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1/pbconsoleconnect"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
Expand Down
35 changes: 13 additions & 22 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"google.golang.org/protobuf/proto"

"github.com/TBD54566975/ftl"
"github.com/TBD54566975/ftl/backend/controller/admin"
"github.com/TBD54566975/ftl/backend/controller/artefacts"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/observability"
Expand All @@ -42,8 +41,6 @@ import (
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/internal/configuration"
cf "github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/deploymentcontext"
"github.com/TBD54566975/ftl/internal/dsn"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -69,7 +66,7 @@ type CommonConfig struct {
type Config struct {
Bind *url.URL `help:"Socket to bind to." default:"http://127.0.0.1:8892" env:"FTL_BIND"`
Key model.ControllerKey `help:"Controller key (auto)." placeholder:"KEY"`
DSN string `help:"DAL DSN." default:"${dsn}" env:"FTL_CONTROLLER_DSN"`
DSN string `help:"DAL DSN." default:"${dsn}" env:"FTL_DSN"`
Advertise *url.URL `help:"Endpoint the Controller should advertise (must be unique across the cluster, defaults to --bind if omitted)." env:"FTL_ADVERTISE"`
RunnerTimeout time.Duration `help:"Runner heartbeat timeout." default:"10s"`
ControllerTimeout time.Duration `help:"Controller heartbeat timeout." default:"10s"`
Expand Down Expand Up @@ -105,25 +102,22 @@ func Start(
ctx context.Context,
config Config,
storage *artefacts.OCIArtefactService,
cm *cf.Manager[configuration.Configuration],
sm *cf.Manager[configuration.Secrets],
conn *sql.DB,
devel bool,
adminClient ftlv1connect.AdminServiceClient,
) error {
config.SetDefaults()

logger := log.FromContext(ctx)
logger.Debugf("Starting FTL controller")

svc, err := New(ctx, conn, cm, sm, storage, config, devel)
svc, err := New(ctx, conn, storage, config, devel, adminClient)
if err != nil {
return err
}
logger.Debugf("Listening on %s", config.Bind)
logger.Debugf("Advertising as %s", config.Advertise)

admin := admin.NewAdminService(cm, sm, admin.NewSchemaRetreiver(schemaeventsource.New(ctx, rpc.ClientFromContext[ftlv1connect.SchemaServiceClient](ctx))))

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
Expand All @@ -132,7 +126,6 @@ func Start(
rpc.GRPC(deploymentconnect.NewDeploymentServiceHandler, svc),
rpc.GRPC(ftlv1connect.NewControllerServiceHandler, svc),
rpc.GRPC(ftlv1connect.NewSchemaServiceHandler, svc),
rpc.GRPC(ftlv1connect.NewAdminServiceHandler, admin),
rpc.GRPC(ftlv1connect2.NewLegacyPubsubServiceHandler, svc.pubSub),
rpc.PProf(),
)
Expand All @@ -152,9 +145,7 @@ type Service struct {
leaser leases.Leaser
key model.ControllerKey
deploymentLogsSink *deploymentLogsSink

cm *cf.Manager[configuration.Configuration]
sm *cf.Manager[configuration.Secrets]
adminClient ftlv1connect.AdminServiceClient

tasks *scheduledtask.Scheduler
pubSub *pubsub.Service
Expand All @@ -173,11 +164,10 @@ type Service struct {
func New(
ctx context.Context,
conn *sql.DB,
cm *cf.Manager[configuration.Configuration],
sm *cf.Manager[configuration.Secrets],
storage *artefacts.OCIArtefactService,
config Config,
devel bool,
adminClient ftlv1connect.AdminServiceClient,
) (*Service, error) {
key := config.Key
if config.Key.IsZero() {
Expand All @@ -197,8 +187,6 @@ func New(
routingTable := routing.New(ctx, schemaeventsource.New(ctx, rpc.ClientFromContext[ftlv1connect.SchemaServiceClient](ctx)))

svc := &Service{
cm: cm,
sm: sm,
tasks: scheduler,
leaser: ldb,
key: key,
Expand All @@ -207,6 +195,7 @@ func New(
routeTable: routingTable,
storage: storage,
controllerState: state.NewInMemoryState(),
adminClient: adminClient,
}

pubSub := pubsub.New(ctx, conn, routingTable, svc.controllerState)
Expand Down Expand Up @@ -721,7 +710,11 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request
h := sha.New()

routeView := s.routeTable.Current()
configs, err := s.cm.MapForModule(ctx, module)
configsResp, err := s.adminClient.MapConfigsForModule(ctx, &connect.Request[ftlv1.MapConfigsForModuleRequest]{Msg: &ftlv1.MapConfigsForModuleRequest{Module: module}})
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get configs: %w", err))
}
configs := configsResp.Msg.Values
routeTable := map[string]string{}
for _, module := range callableModuleNames {
deployment, ok := routeView.GetDeployment(module).Get()
Expand All @@ -736,13 +729,11 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request
routeTable[module] = deployment.Schema.Runtime.Deployment.Endpoint
}

if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get configs: %w", err))
}
secrets, err := s.sm.MapForModule(ctx, module)
secretsResp, err := s.adminClient.MapSecretsForModule(ctx, &connect.Request[ftlv1.MapSecretsForModuleRequest]{Msg: &ftlv1.MapSecretsForModuleRequest{Module: module}})
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get secrets: %w", err))
}
secrets := secretsResp.Msg.Values

if err := hashConfigurationMap(h, configs); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not detect change on configs: %w", err))
Expand Down
Loading

0 comments on commit 50b356a

Please sign in to comment.