From 129747292e89ea6f93d0dc5fd721818955c41bc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juho=20M=C3=A4kinen?= Date: Thu, 26 Sep 2024 07:47:29 +1000 Subject: [PATCH] chore: simple in memory provisioning scheduler (#2815) Rough domain model for provisioning deployments. - `Task` is a single executable task to bring the infra from one state to another - `Deployment` is a list of sequentially executable tasks - `Provisioner` is an interface for calling provisioners, wrapping calls to provisioner plugins This is a WIP, and there are a lot of `TODO`s around, but I wanted to get this out to keep PRs small. --------- Co-authored-by: github-actions[bot] --- backend/provisioner/deployment/deployment.go | 133 ++++++++++++++ .../provisioner/deployment/deployment_test.go | 92 ++++++++++ backend/provisioner/deployment/provisioner.go | 171 ++++++++++++++++++ .../{provisioner.go => service.go} | 66 ++++++- .../ftl/ftltest/testdata/go/outer/go.mod | 1 - .../ftl/ftltest/testdata/go/outer/go.sum | 4 +- 6 files changed, 461 insertions(+), 6 deletions(-) create mode 100644 backend/provisioner/deployment/deployment.go create mode 100644 backend/provisioner/deployment/deployment_test.go create mode 100644 backend/provisioner/deployment/provisioner.go rename backend/provisioner/{provisioner.go => service.go} (58%) diff --git a/backend/provisioner/deployment/deployment.go b/backend/provisioner/deployment/deployment.go new file mode 100644 index 000000000..e206f86ee --- /dev/null +++ b/backend/provisioner/deployment/deployment.go @@ -0,0 +1,133 @@ +package deployment + +import ( + "context" + "fmt" + + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner" +) + +type TaskState string + +const ( + TaskStatePending TaskState = "" + TaskStateRunning TaskState = "running" + TaskStateDone TaskState = "done" + TaskStateFailed TaskState = "failed" +) + +// Task is a unit of work for a deployment +type Task struct { + Handler Provisioner + Module string + State TaskState + Desired []*provisioner.Resource + Existing []*provisioner.Resource + // populated only when the task is done + Output []*provisioner.Resource + + // set if the task is currently running + RunningToken string +} + +func (t *Task) Start(ctx context.Context) error { + if t.State != TaskStatePending { + return fmt.Errorf("task state is not pending: %s", t.State) + } + t.State = TaskStateRunning + token, err := t.Handler.Provision(ctx, t.Module, t.constructResourceContext(t.Desired), t.Existing) + if err != nil { + t.State = TaskStateFailed + return fmt.Errorf("error provisioning resources: %w", err) + } + if token == "" { + // no changes + t.State = TaskStateDone + t.Output = t.Desired + } + t.RunningToken = token + return nil +} + +func (t *Task) constructResourceContext(r []*provisioner.Resource) []*provisioner.ResourceContext { + result := make([]*provisioner.ResourceContext, len(r)) + for i, res := range r { + result[i] = &provisioner.ResourceContext{ + Resource: res, + // TODO: Collect previously constructed resources from a dependency graph here + } + } + return result +} + +func (t *Task) Progress(ctx context.Context) error { + if t.State != TaskStateRunning { + return fmt.Errorf("task state is not running: %s", t.State) + } + state, output, err := t.Handler.State(ctx, t.RunningToken, t.Desired) + if err != nil { + return fmt.Errorf("error getting state: %w", err) + } + if state == TaskStateDone { + t.State = TaskStateDone + t.Output = output + } + return nil +} + +// Deployment is a single deployment of resources for a single module +type Deployment struct { + Module string + Tasks []*Task +} + +// next running or pending task. Nil if all tasks are done. +func (d *Deployment) next() *Task { + for _, t := range d.Tasks { + if t.State == TaskStatePending || t.State == TaskStateRunning || t.State == TaskStateFailed { + return t + } + } + return nil +} + +// Progress the deployment. Returns true if there are still tasks running or pending. +func (d *Deployment) Progress(ctx context.Context) (bool, error) { + next := d.next() + if next == nil { + return false, nil + } + + if next.State == TaskStatePending { + err := next.Start(ctx) + if err != nil { + return true, err + } + } + err := next.Progress(ctx) + return d.next() != nil, err +} + +type DeploymentState struct { + Pending []*Task + Running *Task + Failed *Task + Done []*Task +} + +func (d *Deployment) State() *DeploymentState { + result := &DeploymentState{} + for _, t := range d.Tasks { + switch t.State { + case TaskStatePending: + result.Pending = append(result.Pending, t) + case TaskStateRunning: + result.Running = t + case TaskStateFailed: + result.Failed = t + case TaskStateDone: + result.Done = append(result.Done, t) + } + } + return result +} diff --git a/backend/provisioner/deployment/deployment_test.go b/backend/provisioner/deployment/deployment_test.go new file mode 100644 index 000000000..fa48d2e08 --- /dev/null +++ b/backend/provisioner/deployment/deployment_test.go @@ -0,0 +1,92 @@ +package deployment_test + +import ( + "context" + "testing" + + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner" + "github.com/TBD54566975/ftl/backend/provisioner/deployment" + "github.com/alecthomas/assert/v2" +) + +// MockProvisioner is a mock implementation of the Provisioner interface +type MockProvisioner struct { + Token string + stateCalls int +} + +var _ deployment.Provisioner = (*MockProvisioner)(nil) + +func (m *MockProvisioner) Provision( + ctx context.Context, + module string, + desired []*provisioner.ResourceContext, + existing []*provisioner.Resource, +) (string, error) { + return m.Token, nil +} + +func (m *MockProvisioner) State( + ctx context.Context, + token string, + desired []*provisioner.Resource, +) (deployment.TaskState, []*provisioner.Resource, error) { + m.stateCalls++ + if m.stateCalls <= 1 { + return deployment.TaskStateRunning, nil, nil + } + return deployment.TaskStateDone, desired, nil +} + +func TestDeployment_Progress(t *testing.T) { + ctx := context.Background() + + t.Run("no tasks", func(t *testing.T) { + deployment := &deployment.Deployment{} + progress, err := deployment.Progress(ctx) + assert.NoError(t, err) + assert.False(t, progress) + }) + + t.Run("progresses each provisioner in order", func(t *testing.T) { + registry := deployment.ProvisionerRegistry{} + registry.Register(&MockProvisioner{Token: "foo"}, deployment.ResourceTypePostgres) + registry.Register(&MockProvisioner{Token: "bar"}, deployment.ResourceTypeMysql) + + dpl := registry.CreateDeployment( + "test-module", + []*provisioner.Resource{{ + ResourceId: "a", + Resource: &provisioner.Resource_Mysql{}, + }, { + ResourceId: "b", + Resource: &provisioner.Resource_Postgres{}, + }}, + []*provisioner.Resource{}, + ) + + assert.Equal(t, 2, len(dpl.State().Pending)) + + _, err := dpl.Progress(ctx) + assert.NoError(t, err) + assert.Equal(t, 1, len(dpl.State().Pending)) + assert.NotZero(t, dpl.State().Running) + + _, err = dpl.Progress(ctx) + assert.NoError(t, err) + assert.Equal(t, 1, len(dpl.State().Pending)) + assert.Zero(t, dpl.State().Running) + assert.Equal(t, 1, len(dpl.State().Done)) + + _, err = dpl.Progress(ctx) + assert.NoError(t, err) + assert.Equal(t, 0, len(dpl.State().Pending)) + assert.NotZero(t, dpl.State().Running) + assert.Equal(t, 1, len(dpl.State().Done)) + + running, err := dpl.Progress(ctx) + assert.NoError(t, err) + assert.Equal(t, 2, len(dpl.State().Done)) + assert.False(t, running) + }) +} diff --git a/backend/provisioner/deployment/provisioner.go b/backend/provisioner/deployment/provisioner.go new file mode 100644 index 000000000..44d64be64 --- /dev/null +++ b/backend/provisioner/deployment/provisioner.go @@ -0,0 +1,171 @@ +package deployment + +import ( + "context" + "fmt" + + "connectrpc.com/connect" + + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner" + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/common/plugin" + "github.com/TBD54566975/ftl/internal/log" +) + +// ResourceType is a type of resource used to configure provisioners +type ResourceType string + +const ( + ResourceTypeUnknown ResourceType = "unknown" + ResourceTypePostgres ResourceType = "postgres" + ResourceTypeMysql ResourceType = "mysql" +) + +// Provisioner is a runnable process to provision resources +type Provisioner interface { + Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error) + State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) +} + +type provisionerConfig struct { + provisioner Provisioner + types []ResourceType +} + +// ProvisionerRegistry contains all known resource handlers in the order they should be executed +type ProvisionerRegistry struct { + Provisioners []*provisionerConfig +} + +// Register to the registry, to be executed after all the previously added handlers +func (reg *ProvisionerRegistry) Register(handler Provisioner, types ...ResourceType) { + reg.Provisioners = append(reg.Provisioners, &provisionerConfig{ + provisioner: handler, + types: types, + }) +} + +// CreateDeployment to take the system to the desired state +func (reg *ProvisionerRegistry) CreateDeployment(module string, desiredResources, existingResources []*provisioner.Resource) *Deployment { + var result []*Task + + existingByHandler := reg.groupByProvisioner(existingResources) + desiredByHandler := reg.groupByProvisioner(desiredResources) + + for handler, desired := range desiredByHandler { + existing := existingByHandler[handler] + result = append(result, &Task{ + Handler: handler, + Desired: desired, + Existing: existing, + }) + } + return &Deployment{Tasks: result, Module: module} +} + +// ExtractResources from a module schema +func ExtractResources(sch *schema.Module) ([]*provisioner.Resource, error) { + var result []*provisioner.Resource + for _, decl := range sch.Decls { + if db, ok := decl.(*schema.Database); ok { + if db.Type == "postgres" { + result = append(result, &provisioner.Resource{ + ResourceId: decl.GetName(), + Resource: &provisioner.Resource_Postgres{}, + }) + } else if db.Type == "mysql" { + result = append(result, &provisioner.Resource{ + ResourceId: decl.GetName(), + Resource: &provisioner.Resource_Mysql{}, + }) + } else { + return nil, fmt.Errorf("unknown db type: %s", db.Type) + } + } + } + return result, nil +} + +func (reg *ProvisionerRegistry) groupByProvisioner(resources []*provisioner.Resource) map[Provisioner][]*provisioner.Resource { + result := map[Provisioner][]*provisioner.Resource{} + for _, r := range resources { + for _, cfg := range reg.Provisioners { + for _, t := range cfg.types { + typed := typeOf(r) + if t == typed { + result[cfg.provisioner] = append(result[cfg.provisioner], r) + break + } + } + } + } + return result +} + +func typeOf(r *provisioner.Resource) ResourceType { + if _, ok := r.Resource.(*provisioner.Resource_Mysql); ok { + return ResourceTypeMysql + } else if _, ok := r.Resource.(*provisioner.Resource_Postgres); ok { + return ResourceTypePostgres + } + return ResourceTypeUnknown +} + +// PluginProvisioner delegates provisioning to an external plugin +type PluginProvisioner struct { + cmdCtx context.Context + client *plugin.Plugin[provisionerconnect.ProvisionerPluginServiceClient] +} + +func NewPluginProvisioner(ctx context.Context, name, dir, exe string) (*PluginProvisioner, error) { + client, cmdCtx, err := plugin.Spawn( + ctx, + log.Debug, + name, + dir, + exe, + provisionerconnect.NewProvisionerPluginServiceClient, + ) + if err != nil { + return nil, fmt.Errorf("error spawning plugin: %w", err) + } + + return &PluginProvisioner{ + cmdCtx: cmdCtx, + client: client, + }, nil +} + +func (p *PluginProvisioner) Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error) { + resp, err := p.client.Client.Provision(ctx, connect.NewRequest(&provisioner.ProvisionRequest{ + DesiredResources: desired, + ExistingResources: existing, + FtlClusterId: "ftl", + Module: module, + })) + if err != nil { + return "", fmt.Errorf("error calling plugin: %w", err) + } + if resp.Msg.Status != provisioner.ProvisionResponse_SUBMITTED { + return resp.Msg.ProvisioningToken, nil + } + return "", nil +} + +func (p *PluginProvisioner) State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) { + resp, err := p.client.Client.Status(ctx, connect.NewRequest(&provisioner.StatusRequest{ + ProvisioningToken: token, + })) + if err != nil { + return "", nil, fmt.Errorf("error getting status from plugin: %w", err) + } + if failed, ok := resp.Msg.Status.(*provisioner.StatusResponse_Failed); ok { + return TaskStateFailed, nil, fmt.Errorf("provisioning failed: %s", failed.Failed.ErrorMessage) + } else if success, ok := resp.Msg.Status.(*provisioner.StatusResponse_Success); ok { + return TaskStateDone, success.Success.UpdatedResources, nil + } + return TaskStateRunning, nil, nil +} + +var _ Provisioner = (*PluginProvisioner)(nil) diff --git a/backend/provisioner/provisioner.go b/backend/provisioner/service.go similarity index 58% rename from backend/provisioner/provisioner.go rename to backend/provisioner/service.go index 33c1b7a71..ad3efbb8f 100644 --- a/backend/provisioner/provisioner.go +++ b/backend/provisioner/service.go @@ -11,7 +11,10 @@ import ( ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect" + "github.com/TBD54566975/ftl/backend/provisioner/deployment" + "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/rpc" ) @@ -34,6 +37,9 @@ func (c *Config) SetDefaults() { type Service struct { controllerClient ftlv1connect.ControllerServiceClient + // TODO: Store in a resource graph + currentResources map[string][]*provisioner.Resource + registry deployment.ProvisionerRegistry } var _ provisionerconnect.ProvisionerServiceHandler = (*Service)(nil) @@ -44,17 +50,71 @@ func New(ctx context.Context, config Config, controllerClient ftlv1connect.Contr }, nil } +func (s *Service) Ping(context.Context, *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) { + return &connect.Response[ftlv1.PingResponse]{}, nil +} + func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftlv1.CreateDeploymentRequest]) (*connect.Response[ftlv1.CreateDeploymentResponse], error) { - // TODO: provision infrastructure + // TODO: Block deployments to make sure only one module is modified at a time + moduleName := req.Msg.Schema.Name + module, err := schema.ModuleFromProto(req.Msg.Schema) + if err != nil { + return nil, fmt.Errorf("invalid module schema for module %s: %w", moduleName, err) + } + + existingResources := s.currentResources[moduleName] + desiredResources, err := deployment.ExtractResources(module) + if err != nil { + return nil, fmt.Errorf("error extracting resources from schema: %w", err) + } + if err := replaceOutputs(desiredResources, existingResources); err != nil { + return nil, err + } + + deployment := s.registry.CreateDeployment(moduleName, desiredResources, existingResources) + running := true + for running { + running, err = deployment.Progress(ctx) + if err != nil { + // TODO: Deal with failed deployments + return nil, fmt.Errorf("error running a provisioner: %w", err) + } + } + + // TODO: manage multiple deployments properly. Extract as a provisioner plugin response, err := s.controllerClient.CreateDeployment(ctx, req) if err != nil { return nil, fmt.Errorf("call to ftl-controller failed: %w", err) } + + s.currentResources[moduleName] = desiredResources + return response, nil } -func (s *Service) Ping(context.Context, *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) { - return &connect.Response[ftlv1.PingResponse]{}, nil +func replaceOutputs(to []*provisioner.Resource, from []*provisioner.Resource) error { + byID := map[string]*provisioner.Resource{} + for _, r := range from { + byID[r.ResourceId] = r + } + for _, r := range to { + existing := byID[r.ResourceId] + if existing == nil { + continue + } + if mysqlTo, ok := r.Resource.(*provisioner.Resource_Mysql); ok { + if myslqFrom, ok := existing.Resource.(*provisioner.Resource_Mysql); ok { + mysqlTo.Mysql.Output = myslqFrom.Mysql.Output + } + } else if postgresTo, ok := r.Resource.(*provisioner.Resource_Postgres); ok { + if postgresFrom, ok := existing.Resource.(*provisioner.Resource_Postgres); ok { + postgresTo.Postgres.Output = postgresFrom.Postgres.Output + } + } else { + return fmt.Errorf("can not replace outputs for an unknown resource type %T", r) + } + } + return nil } // Start the Provisioner. Blocks until the context is cancelled. diff --git a/go-runtime/ftl/ftltest/testdata/go/outer/go.mod b/go-runtime/ftl/ftltest/testdata/go/outer/go.mod index fbd835f42..601c0759d 100644 --- a/go-runtime/ftl/ftltest/testdata/go/outer/go.mod +++ b/go-runtime/ftl/ftltest/testdata/go/outer/go.mod @@ -16,7 +16,6 @@ require ( github.com/deckarep/golang-set/v2 v2.6.0 // indirect github.com/hashicorp/cronexpr v1.1.2 // indirect github.com/hexops/gotextdiff v1.0.3 // indirect - github.com/stretchr/testify v1.9.0 // indirect github.com/swaggest/jsonschema-go v0.3.72 // indirect github.com/swaggest/refl v1.3.0 // indirect golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect diff --git a/go-runtime/ftl/ftltest/testdata/go/outer/go.sum b/go-runtime/ftl/ftltest/testdata/go/outer/go.sum index 65c48f067..8adb24eab 100644 --- a/go-runtime/ftl/ftltest/testdata/go/outer/go.sum +++ b/go-runtime/ftl/ftltest/testdata/go/outer/go.sum @@ -114,8 +114,8 @@ github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/swaggest/assertjson v1.9.0 h1:dKu0BfJkIxv/xe//mkCrK5yZbs79jL7OVf9Ija7o2xQ= github.com/swaggest/assertjson v1.9.0/go.mod h1:b+ZKX2VRiUjxfUIal0HDN85W0nHPAYUbYH5WkkSsFsU= github.com/swaggest/jsonschema-go v0.3.72 h1:IHaGlR1bdBUBPfhe4tfacN2TGAPKENEGiNyNzvnVHv4=