diff --git a/cmd/ftl-provisioner-cloudformation/postgres.go b/cmd/ftl-provisioner-cloudformation/postgres.go new file mode 100644 index 0000000000..d867e2a0f4 --- /dev/null +++ b/cmd/ftl-provisioner-cloudformation/postgres.go @@ -0,0 +1,53 @@ +package main + +import ( + goformation "github.com/awslabs/goformation/v7/cloudformation" + "github.com/awslabs/goformation/v7/cloudformation/rds" +) + +type PostgresTemplater struct { + resourceID string + cluster string + module string + config *Config +} + +var _ ResourceTemplater = (*PostgresTemplater)(nil) + +func (p *PostgresTemplater) AddToTemplate(template *goformation.Template) error { + clusterID := cloudformationResourceID(p.resourceID, "cluster") + instanceID := cloudformationResourceID(p.resourceID, "instance") + template.Resources[clusterID] = &rds.DBCluster{ + Engine: ptr("aurora-postgresql"), + MasterUsername: ptr("root"), + ManageMasterUserPassword: ptr(true), + DBSubnetGroupName: ptr(p.config.DatabaseSubnetGroupARN), + VpcSecurityGroupIds: []string{p.config.DatabaseSecurityGroup}, + EngineMode: ptr("provisioned"), + Port: ptr(5432), + ServerlessV2ScalingConfiguration: &rds.DBCluster_ServerlessV2ScalingConfiguration{ + MinCapacity: ptr(0.5), + MaxCapacity: ptr(10.0), + }, + Tags: ftlTags(p.cluster, p.module), + } + template.Resources[instanceID] = &rds.DBInstance{ + Engine: ptr("aurora-postgresql"), + DBInstanceClass: ptr("db.serverless"), + DBClusterIdentifier: ptr(goformation.Ref(clusterID)), + Tags: ftlTags(p.cluster, p.module), + } + addOutput(template.Outputs, goformation.GetAtt(clusterID, "Endpoint.Address"), &CloudformationOutputKey{ + ResourceID: p.resourceID, + PropertyName: PropertyPsqlWriteEndpoint, + }) + addOutput(template.Outputs, goformation.GetAtt(clusterID, "ReadEndpoint.Address"), &CloudformationOutputKey{ + ResourceID: p.resourceID, + PropertyName: PropertyPsqlReadEndpoint, + }) + addOutput(template.Outputs, goformation.GetAtt(clusterID, "MasterUserSecret.SecretArn"), &CloudformationOutputKey{ + ResourceID: p.resourceID, + PropertyName: PropertyPsqlMasterUserARN, + }) + return nil +} diff --git a/cmd/ftl-provisioner-cloudformation/provisioner.go b/cmd/ftl-provisioner-cloudformation/provisioner.go index c91e11e268..ee0581d191 100644 --- a/cmd/ftl-provisioner-cloudformation/provisioner.go +++ b/cmd/ftl-provisioner-cloudformation/provisioner.go @@ -3,7 +3,6 @@ package main import ( "bytes" "context" - "errors" "fmt" "strconv" "time" @@ -13,8 +12,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/secretsmanager" goformation "github.com/awslabs/goformation/v7/cloudformation" cf "github.com/awslabs/goformation/v7/cloudformation/cloudformation" - "github.com/awslabs/goformation/v7/cloudformation/rds" "github.com/awslabs/goformation/v7/cloudformation/tags" + "github.com/puzpuzpuz/xsync/v3" "golang.org/x/text/cases" "golang.org/x/text/language" @@ -25,9 +24,9 @@ import ( ) const ( - PropertyDBReadEndpoint = "db:read_endpoint" - PropertyDBWriteEndpoint = "db:write_endpoint" - PropertyMasterUserARN = "db:master_user_secret_arn" + PropertyPsqlReadEndpoint = "psql:read_endpoint" + PropertyPsqlWriteEndpoint = "psql:write_endpoint" + PropertyPsqlMasterUserARN = "psql:master_user_secret_arn" ) type Config struct { @@ -40,6 +39,8 @@ type CloudformationProvisioner struct { client *cloudformation.Client secrets *secretsmanager.Client confg *Config + + running *xsync.MapOf[string, *task] } var _ provisionerconnect.ProvisionerPluginServiceHandler = (*CloudformationProvisioner)(nil) @@ -54,7 +55,12 @@ func NewCloudformationProvisioner(ctx context.Context, config Config) (context.C return nil, nil, fmt.Errorf("failed to create secretsmanager client: %w", err) } - return ctx, &CloudformationProvisioner{client: client, secrets: secrets, confg: &config}, nil + return ctx, &CloudformationProvisioner{ + client: client, + secrets: secrets, + confg: &config, + running: xsync.NewMapOf[string, *task](), + }, nil } func (c *CloudformationProvisioner) Ping(context.Context, *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) { @@ -66,24 +72,25 @@ func (c *CloudformationProvisioner) Provision(ctx context.Context, req *connect. if err != nil { return nil, err } + token := *res.StackId + changeSetID := *res.Id + if !updated { return connect.NewResponse(&provisioner.ProvisionResponse{ // even if there are no changes, return the stack id so that any resource outputs can be populated Status: provisioner.ProvisionResponse_SUBMITTED, - ProvisioningToken: *res.StackId, + ProvisioningToken: token, }), nil } - _, err = c.client.ExecuteChangeSet(ctx, &cloudformation.ExecuteChangeSetInput{ - ChangeSetName: res.Id, - StackName: res.StackId, - }) - if err != nil { - return nil, fmt.Errorf("failed to execute change-set: %w", err) - } + task := &task{stackID: token} + if _, ok := c.running.LoadOrStore(token, task); ok { + return nil, fmt.Errorf("provisioner already running: %s", token) + } + task.Start(ctx, c.client, c.secrets, changeSetID) return connect.NewResponse(&provisioner.ProvisionResponse{ Status: provisioner.ProvisionResponse_SUBMITTED, - ProvisioningToken: *res.StackId, + ProvisioningToken: token, }), nil } @@ -124,8 +131,18 @@ func generateChangeSetName(stack string) string { func (c *CloudformationProvisioner) createTemplate(req *provisioner.ProvisionRequest) (string, error) { template := goformation.NewTemplate() for _, resourceCtx := range req.DesiredResources { - if err := c.resourceToCF(req.FtlClusterId, req.Module, template, resourceCtx.Resource); err != nil { - return "", err + var templater ResourceTemplater + if _, ok := resourceCtx.Resource.Resource.(*provisioner.Resource_Postgres); ok { + templater = &PostgresTemplater{ + resourceID: resourceCtx.Resource.ResourceId, + cluster: req.FtlClusterId, + module: req.Module, + config: c.confg, + } + } + + if err := templater.AddToTemplate(template); err != nil { + return "", fmt.Errorf("failed to add resource to template: %w", err) } } // Stack can not be empty, insert a null resource to keep the stack around @@ -140,45 +157,9 @@ func (c *CloudformationProvisioner) createTemplate(req *provisioner.ProvisionReq return string(bytes), nil } -func (c *CloudformationProvisioner) resourceToCF(cluster, module string, template *goformation.Template, resource *provisioner.Resource) error { - if _, ok := resource.Resource.(*provisioner.Resource_Postgres); ok { - clusterID := cloudformationResourceID(resource.ResourceId, "cluster") - instanceID := cloudformationResourceID(resource.ResourceId, "instance") - template.Resources[clusterID] = &rds.DBCluster{ - Engine: ptr("aurora-postgresql"), - MasterUsername: ptr("root"), - ManageMasterUserPassword: ptr(true), - DBSubnetGroupName: ptr(c.confg.DatabaseSubnetGroupARN), - VpcSecurityGroupIds: []string{c.confg.DatabaseSecurityGroup}, - EngineMode: ptr("provisioned"), - Port: ptr(5432), - ServerlessV2ScalingConfiguration: &rds.DBCluster_ServerlessV2ScalingConfiguration{ - MinCapacity: ptr(0.5), - MaxCapacity: ptr(10.0), - }, - Tags: ftlTags(cluster, module), - } - template.Resources[instanceID] = &rds.DBInstance{ - Engine: ptr("aurora-postgresql"), - DBInstanceClass: ptr("db.serverless"), - DBClusterIdentifier: ptr(goformation.Ref(clusterID)), - Tags: ftlTags(cluster, module), - } - addOutput(template.Outputs, goformation.GetAtt(clusterID, "Endpoint.Address"), &CloudformationOutputKey{ - ResourceID: resource.ResourceId, - PropertyName: PropertyDBWriteEndpoint, - }) - addOutput(template.Outputs, goformation.GetAtt(clusterID, "ReadEndpoint.Address"), &CloudformationOutputKey{ - ResourceID: resource.ResourceId, - PropertyName: PropertyDBReadEndpoint, - }) - addOutput(template.Outputs, goformation.GetAtt(clusterID, "MasterUserSecret.SecretArn"), &CloudformationOutputKey{ - ResourceID: resource.ResourceId, - PropertyName: PropertyMasterUserARN, - }) - return nil - } - return errors.New("unsupported resource type") +// ResourceTemplater interface for different resource types +type ResourceTemplater interface { + AddToTemplate(tmpl *goformation.Template) error } func ftlTags(cluster, module string) []tags.Tag { diff --git a/cmd/ftl-provisioner-cloudformation/status.go b/cmd/ftl-provisioner-cloudformation/status.go index 8242f0f437..7409eecd30 100644 --- a/cmd/ftl-provisioner-cloudformation/status.go +++ b/cmd/ftl-provisioner-cloudformation/status.go @@ -2,85 +2,43 @@ package main import ( "context" - "database/sql" - "encoding/json" - "errors" "fmt" "net/url" - "strings" "connectrpc.com/connect" - "github.com/aws/aws-sdk-go-v2/service/cloudformation" "github.com/aws/aws-sdk-go-v2/service/cloudformation/types" - "github.com/aws/aws-sdk-go-v2/service/secretsmanager" _ "github.com/jackc/pgx/v5/stdlib" // SQL driver "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner" ) func (c *CloudformationProvisioner) Status(ctx context.Context, req *connect.Request[provisioner.StatusRequest]) (*connect.Response[provisioner.StatusResponse], error) { - client, err := createClient(ctx) - if err != nil { - return nil, fmt.Errorf("failed to create cloudformation client: %w", err) + token := req.Msg.ProvisioningToken + // if the task is not in the map, it means that the provisioner has crashed since starting the task + // in that case, we start a new task to query the existing stack + task, _ := c.running.LoadOrStore(token, &task{stackID: token}) + + if task.err.Load() != nil { + c.running.Delete(token) + return nil, connect.NewError(connect.CodeUnknown, task.err.Load()) } - desc, err := client.DescribeStacks(ctx, &cloudformation.DescribeStacksInput{ - StackName: &req.Msg.ProvisioningToken, - }) - if err != nil { - return nil, fmt.Errorf("failed to describe stack: %w", err) - } - stack := desc.Stacks[0] - - switch stack.StackStatus { - case types.StackStatusCreateInProgress: - return running() - case types.StackStatusCreateFailed: - return failure(&stack) - case types.StackStatusCreateComplete: - return c.success(ctx, &stack, req.Msg.DesiredResources) - case types.StackStatusRollbackInProgress: - return failure(&stack) - case types.StackStatusRollbackFailed: - return failure(&stack) - case types.StackStatusRollbackComplete: - return failure(&stack) - case types.StackStatusDeleteInProgress: - return running() - case types.StackStatusDeleteFailed: - return failure(&stack) - case types.StackStatusDeleteComplete: - return c.success(ctx, &stack, req.Msg.DesiredResources) - case types.StackStatusUpdateInProgress: - return running() - case types.StackStatusUpdateCompleteCleanupInProgress: - return running() - case types.StackStatusUpdateComplete: - return c.success(ctx, &stack, req.Msg.DesiredResources) - case types.StackStatusUpdateFailed: - return failure(&stack) - case types.StackStatusUpdateRollbackInProgress: - return running() - default: - return nil, errors.New("unsupported Cloudformation status code: " + string(desc.Stacks[0].StackStatus)) - } -} + if task.outputs.Load() != nil { + c.running.Delete(token) -func (c *CloudformationProvisioner) success(ctx context.Context, stack *types.Stack, resources []*provisioner.Resource) (*connect.Response[provisioner.StatusResponse], error) { - err := c.updateResources(ctx, stack.Outputs, resources) - if err != nil { - return nil, err - } - return connect.NewResponse(&provisioner.StatusResponse{ - Status: &provisioner.StatusResponse_Success{ - Success: &provisioner.StatusResponse_ProvisioningSuccess{ - UpdatedResources: resources, + resources := req.Msg.DesiredResources + if err := c.updateResources(ctx, task.outputs.Load(), resources); err != nil { + return nil, err + } + return connect.NewResponse(&provisioner.StatusResponse{ + Status: &provisioner.StatusResponse_Success{ + Success: &provisioner.StatusResponse_ProvisioningSuccess{ + UpdatedResources: resources, + }, }, - }, - }), nil -} + }), nil + } -func running() (*connect.Response[provisioner.StatusResponse], error) { return connect.NewResponse(&provisioner.StatusResponse{ Status: &provisioner.StatusResponse_Running{ Running: &provisioner.StatusResponse_ProvisioningRunning{}, @@ -88,10 +46,6 @@ func running() (*connect.Response[provisioner.StatusResponse], error) { }), nil } -func failure(stack *types.Stack) (*connect.Response[provisioner.StatusResponse], error) { - return nil, connect.NewError(connect.CodeUnknown, errors.New(*stack.StackStatusReason)) -} - func outputsByResourceID(outputs []types.Output) (map[string][]types.Output, error) { m := make(map[string][]types.Output) for _, output := range outputs { @@ -147,31 +101,15 @@ func (c *CloudformationProvisioner) updatePostgresOutputs(ctx context.Context, t return fmt.Errorf("failed to group outputs by property name: %w", err) } - // TODO: Move to provisioner workflow - secretARN := *byName[PropertyMasterUserARN].OutputValue - username, password, err := c.secretARNToUsernamePassword(ctx, secretARN) + // TODO: mind the secret rotation + secretARN := *byName[PropertyPsqlMasterUserARN].OutputValue + username, password, err := secretARNToUsernamePassword(ctx, c.secrets, secretARN) if err != nil { return fmt.Errorf("failed to get username and password from secret ARN: %w", err) } - to.ReadDsn = endpointToDSN(byName[PropertyDBReadEndpoint].OutputValue, resourceID, 5432, username, password) - to.WriteDsn = endpointToDSN(byName[PropertyDBWriteEndpoint].OutputValue, resourceID, 5432, username, password) - adminEndpoint := endpointToDSN(byName[PropertyDBReadEndpoint].OutputValue, "postgres", 5432, username, password) - - // Connect to postgres without a specific database to create the new one - db, err := sql.Open("pgx", adminEndpoint) - if err != nil { - return fmt.Errorf("failed to connect to postgres: %w", err) - } - defer db.Close() - - // Create the database if it doesn't exist - if _, err := db.ExecContext(ctx, "CREATE DATABASE "+resourceID); err != nil { - // Ignore if database already exists - if !strings.Contains(err.Error(), "already exists") { - return fmt.Errorf("failed to create database: %w", err) - } - } + to.ReadDsn = endpointToDSN(byName[PropertyPsqlReadEndpoint].OutputValue, resourceID, 5432, username, password) + to.WriteDsn = endpointToDSN(byName[PropertyPsqlWriteEndpoint].OutputValue, resourceID, 5432, username, password) return nil } @@ -190,20 +128,3 @@ func endpointToDSN(endpoint *string, database string, port int, username, passwo return url.String() } - -func (c *CloudformationProvisioner) secretARNToUsernamePassword(ctx context.Context, secretARN string) (string, string, error) { - secret, err := c.secrets.GetSecretValue(ctx, &secretsmanager.GetSecretValueInput{ - SecretId: &secretARN, - }) - if err != nil { - return "", "", fmt.Errorf("failed to get secret value: %w", err) - } - secretString := *secret.SecretString - - var secretData map[string]string - if err := json.Unmarshal([]byte(secretString), &secretData); err != nil { - return "", "", fmt.Errorf("failed to unmarshal secret data: %w", err) - } - - return secretData["username"], secretData["password"], nil -} diff --git a/cmd/ftl-provisioner-cloudformation/task.go b/cmd/ftl-provisioner-cloudformation/task.go new file mode 100644 index 0000000000..a0cc6a55df --- /dev/null +++ b/cmd/ftl-provisioner-cloudformation/task.go @@ -0,0 +1,159 @@ +package main + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/alecthomas/atomic" + "github.com/aws/aws-sdk-go-v2/service/cloudformation" + "github.com/aws/aws-sdk-go-v2/service/cloudformation/types" + "github.com/aws/aws-sdk-go-v2/service/secretsmanager" + "github.com/jpillora/backoff" +) + +type task struct { + stackID string + + err atomic.Value[error] + outputs atomic.Value[[]types.Output] +} + +func (t *task) updateStack(ctx context.Context, client *cloudformation.Client, changeSetID string) ([]types.Output, error) { + _, err := client.ExecuteChangeSet(ctx, &cloudformation.ExecuteChangeSetInput{ + ChangeSetName: &changeSetID, + StackName: &t.stackID, + }) + if err != nil { + return nil, fmt.Errorf("failed to execute change-set: %w", err) + } + + retry := backoff.Backoff{ + Min: 100 * time.Millisecond, + Max: 5 * time.Second, + Factor: 2, + } + for { + desc, err := client.DescribeStacks(ctx, &cloudformation.DescribeStacksInput{ + StackName: &t.stackID, + }) + if err != nil { + return nil, fmt.Errorf("failed to describe stack: %w", err) + } + stack := desc.Stacks[0] + + switch stack.StackStatus { + // noop while running + case types.StackStatusCreateInProgress: + case types.StackStatusUpdateInProgress: + case types.StackStatusUpdateCompleteCleanupInProgress: + case types.StackStatusUpdateRollbackInProgress: + + // success + case types.StackStatusCreateComplete: + return stack.Outputs, nil + case types.StackStatusDeleteComplete: + return stack.Outputs, nil + case types.StackStatusUpdateComplete: + return stack.Outputs, nil + + // failures + case types.StackStatusCreateFailed: + return nil, fmt.Errorf("stack creation failed: %s", *stack.StackStatusReason) + case types.StackStatusRollbackInProgress: + return nil, fmt.Errorf("stack rollback in progress: %s", *stack.StackStatusReason) + case types.StackStatusRollbackFailed: + return nil, fmt.Errorf("stack rollback failed: %s", *stack.StackStatusReason) + case types.StackStatusRollbackComplete: + return nil, fmt.Errorf("stack rollback complete: %s", *stack.StackStatusReason) + case types.StackStatusDeleteInProgress: + case types.StackStatusDeleteFailed: + return nil, fmt.Errorf("stack deletion failed: %s", *stack.StackStatusReason) + case types.StackStatusUpdateFailed: + return nil, fmt.Errorf("stack update failed: %s", *stack.StackStatusReason) + default: + return nil, fmt.Errorf("unsupported Cloudformation status code: %s", string(stack.StackStatus)) + } + + time.Sleep(retry.Duration()) + } +} + +func (t *task) postUpdate(ctx context.Context, secrets *secretsmanager.Client, outputs []types.Output) error { + byResourceID, err := outputsByResourceID(outputs) + if err != nil { + return fmt.Errorf("failed to group outputs by resource ID: %w", err) + } + + for resourceID, outputs := range byResourceID { + byName, err := outputsByPropertyName(outputs) + if err != nil { + return fmt.Errorf("failed to group outputs by property name: %w", err) + } + + if write, ok := byName[PropertyPsqlWriteEndpoint]; ok { + if secret, ok := byName[PropertyPsqlMasterUserARN]; ok { + secretARN := *secret.OutputValue + username, password, err := secretARNToUsernamePassword(ctx, secrets, secretARN) + if err != nil { + return fmt.Errorf("failed to get username and password from secret ARN: %w", err) + } + + adminEndpoint := endpointToDSN(write.OutputValue, "postgres", 5432, username, password) + + // Connect to postgres without a specific database to create the new one + db, err := sql.Open("pgx", adminEndpoint) + if err != nil { + return fmt.Errorf("failed to connect to postgres: %w", err) + } + defer db.Close() + + // Create the database if it doesn't exist + if _, err := db.ExecContext(ctx, "CREATE DATABASE "+resourceID); err != nil { + // Ignore if database already exists + if !strings.Contains(err.Error(), "already exists") { + return fmt.Errorf("failed to create database: %w", err) + } + } + } + } + } + + return nil +} + +func (t *task) Start(oldCtx context.Context, client *cloudformation.Client, secrets *secretsmanager.Client, changeSetID string) { + ctx := context.WithoutCancel(oldCtx) + go func() { + outputs, err := t.updateStack(ctx, client, changeSetID) + if err != nil { + t.err.Store(err) + return + } + if err := t.postUpdate(ctx, secrets, outputs); err != nil { + t.err.Store(err) + return + } + t.outputs.Store(outputs) + }() +} + +func secretARNToUsernamePassword(ctx context.Context, secrets *secretsmanager.Client, secretARN string) (string, string, error) { + secret, err := secrets.GetSecretValue(ctx, &secretsmanager.GetSecretValueInput{ + SecretId: &secretARN, + }) + if err != nil { + return "", "", fmt.Errorf("failed to get secret value: %w", err) + } + secretString := *secret.SecretString + + var secretData map[string]string + if err := json.Unmarshal([]byte(secretString), &secretData); err != nil { + return "", "", fmt.Errorf("failed to unmarshal secret data: %w", err) + } + + return secretData["username"], secretData["password"], nil +}