Skip to content

Commit

Permalink
chore: have separate read and write connectors in db runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
jvmakine committed Nov 26, 2024
1 parent 5a1f065 commit a7237f8
Show file tree
Hide file tree
Showing 17 changed files with 1,351 additions and 1,150 deletions.
1,984 changes: 1,026 additions & 958 deletions backend/protos/xyz/block/ftl/v1/schema/schema.pb.go

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions backend/protos/xyz/block/ftl/v1/schema/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ message Config {
Type type = 4;
}

message DSNDatabaseRuntime {
message DSNDatabaseConnector {
optional Position pos = 1;
string dsn = 2;
}
Expand All @@ -60,12 +60,17 @@ message Database {
repeated Metadata metadata = 5;
}

message DatabaseRuntime {
message DatabaseConnector {
oneof value {
DSNDatabaseRuntime dsn_database_runtime = 1;
DSNDatabaseConnector dsn_database_connector = 1;
}
}

message DatabaseRuntime {
DatabaseConnector read_connector = 1;
DatabaseConnector write_connector = 2;
}

message Decl {
oneof value {
Config config = 6;
Expand Down
8 changes: 5 additions & 3 deletions backend/provisioner/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,11 @@ func TestDeployment_Progress(t *testing.T) {
}
if psql.Postgres.Output == nil {
psql.Postgres.Output = &schemapb.DatabaseRuntime{
Value: &schemapb.DatabaseRuntime_DsnDatabaseRuntime{
DsnDatabaseRuntime: &schemapb.DSNDatabaseRuntime{
Dsn: "postgres://localhost:5432/foo",
WriteConnector: &schemapb.DatabaseConnector{
Value: &schemapb.DatabaseConnector_DsnDatabaseConnector{
DsnDatabaseConnector: &schemapb.DSNDatabaseConnector{
Dsn: "postgres://localhost:5432/foo",
},
},
},
}
Expand Down
34 changes: 26 additions & 8 deletions backend/provisioner/dev_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,18 @@ func establishMySQLDB(ctx context.Context, rc *provisioner.ResourceContext, mysq
}
dsn := dsn.MySQLDSN(dbName, dsn.Port(mysqlPort))
mysql.Mysql.Output = &schemapb.DatabaseRuntime{
Value: &schemapb.DatabaseRuntime_DsnDatabaseRuntime{
DsnDatabaseRuntime: &schemapb.DSNDatabaseRuntime{
Dsn: dsn,
WriteConnector: &schemapb.DatabaseConnector{
Value: &schemapb.DatabaseConnector_DsnDatabaseConnector{
DsnDatabaseConnector: &schemapb.DSNDatabaseConnector{
Dsn: dsn,
},
},
},
ReadConnector: &schemapb.DatabaseConnector{
Value: &schemapb.DatabaseConnector_DsnDatabaseConnector{
DsnDatabaseConnector: &schemapb.DSNDatabaseConnector{
Dsn: dsn,
},
},
},
}
Expand All @@ -117,7 +126,7 @@ func ProvisionPostgresForTest(ctx context.Context, module string, id string) (st
return "", err
}

return res.GetPostgres().GetOutput().GetDsnDatabaseRuntime().GetDsn(), nil
return res.GetPostgres().GetOutput().WriteConnector.GetDsnDatabaseConnector().GetDsn(), nil
}

func ProvisionMySQLForTest(ctx context.Context, module string, id string) (string, error) {
Expand All @@ -129,7 +138,7 @@ func ProvisionMySQLForTest(ctx context.Context, module string, id string) (strin
if err != nil {
return "", err
}
return res.GetMysql().GetOutput().GetDsnDatabaseRuntime().GetDsn(), nil
return res.GetMysql().GetOutput().WriteConnector.GetDsnDatabaseConnector().GetDsn(), nil
}

func provisionPostgres(postgresPort int) func(ctx context.Context, rc *provisioner.ResourceContext, module string, id string) (*provisioner.Resource, error) {
Expand Down Expand Up @@ -184,9 +193,18 @@ func provisionPostgres(postgresPort int) func(ctx context.Context, rc *provision
}
dsn := dsn.PostgresDSN(dbName, dsn.Port(postgresPort))
pg.Postgres.Output = &schemapb.DatabaseRuntime{
Value: &schemapb.DatabaseRuntime_DsnDatabaseRuntime{
DsnDatabaseRuntime: &schemapb.DSNDatabaseRuntime{
Dsn: dsn,
WriteConnector: &schemapb.DatabaseConnector{
Value: &schemapb.DatabaseConnector_DsnDatabaseConnector{
DsnDatabaseConnector: &schemapb.DSNDatabaseConnector{
Dsn: dsn,
},
},
},
ReadConnector: &schemapb.DatabaseConnector{
Value: &schemapb.DatabaseConnector_DsnDatabaseConnector{
DsnDatabaseConnector: &schemapb.DSNDatabaseConnector{
Dsn: dsn,
},
},
},
}
Expand Down
8 changes: 5 additions & 3 deletions backend/provisioner/resource_equality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ func TestResourceEqual(t *testing.T) {
Resource: &provisioner.Resource_Postgres{
Postgres: &provisioner.PostgresResource{
Output: &schemapb.DatabaseRuntime{
Value: &schemapb.DatabaseRuntime_DsnDatabaseRuntime{
DsnDatabaseRuntime: &schemapb.DSNDatabaseRuntime{
Dsn: "foo",
WriteConnector: &schemapb.DatabaseConnector{
Value: &schemapb.DatabaseConnector_DsnDatabaseConnector{
DsnDatabaseConnector: &schemapb.DSNDatabaseConnector{
Dsn: "foo",
},
},
},
},
Expand Down
6 changes: 4 additions & 2 deletions backend/provisioner/sql_migration_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/TBD54566975/ftl/internal/errors"
"github.com/TBD54566975/ftl/internal/infra"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/sha256"
)

Expand Down Expand Up @@ -63,15 +64,16 @@ func provisionSQLMigration(registryConfig artefacts.RegistryConfig) func(ctx con
resource := rc.Dependencies[0].Resource
switch res := resource.(type) {
case *provisioner.Resource_Postgres:
dsn, err = infra.ResolvePostgresDSN(ctx, res.Postgres.GetOutput())
dsn, err = infra.ResolvePostgresDSN(ctx, schema.DatabaseConnectorFromProto(res.Postgres.GetOutput().WriteConnector))
if err != nil {
return nil, fmt.Errorf("failed to resolve postgres DSN: %w", err)
}
case *provisioner.Resource_Mysql:
dsn, err = infra.ResolveMySQLDSN(ctx, res.Mysql.GetOutput())
dsn, err = infra.ResolveMySQLDSN(ctx, schema.DatabaseConnectorFromProto(res.Mysql.GetOutput().WriteConnector))
if err != nil {
return nil, fmt.Errorf("failed to resolve mysql DSN: %w", err)
}
dsn = "mysql://" + dsn
// strip the tcp part
exp := regexp.MustCompile(`tcp\((.*?)\)`)
dsn = exp.ReplaceAllString(dsn, "$1")
Expand Down
14 changes: 8 additions & 6 deletions backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/TBD54566975/ftl/internal/download"
"github.com/TBD54566975/ftl/internal/exec"
"github.com/TBD54566975/ftl/internal/identity"
"github.com/TBD54566975/ftl/internal/infra"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
ftlobservability "github.com/TBD54566975/ftl/internal/observability"
Expand Down Expand Up @@ -640,12 +641,13 @@ func (s *Service) startPgProxy(ctx context.Context, module *schema.Module, start
return "", fmt.Errorf("database %s not found", params["database"])
}

if dsn, ok := db.Runtime.(*schema.DSNDatabaseRuntime); ok {
logger.Debugf("Resolved DSN (%s): %s", params["database"], dsn.DSN)
return dsn.DSN, nil
dsn, err := infra.ResolvePostgresDSN(ctx, db.Runtime.WriteConnector)
if err != nil {
return "", fmt.Errorf("failed to resolve postgres DSN: %w", err)
}

return "", fmt.Errorf("unknown database runtime type: %T", db.Runtime)
logger.Debugf("Resolved DSN (%s): %s", params["database"], dsn)
return dsn, nil
}).Start(ctx, channel); err != nil {
started.Done()
return fmt.Errorf("failed to start pgproxy: %w", err)
Expand Down Expand Up @@ -675,8 +677,8 @@ func (s *Service) startMySQLProxy(ctx context.Context, module *schema.Module, la
errorC := make(chan error)
databaseRuntime := decl.Runtime
var proxy *mysql.Proxy
switch db := databaseRuntime.(type) {
case *schema.DSNDatabaseRuntime:
switch db := databaseRuntime.WriteConnector.(type) {
case *schema.DSNDatabaseConnector:
proxy = mysql.NewProxy("localhost", 0, db.DSN, &mysqlLogger{logger: logger}, portC)
default:
return fmt.Errorf("unknown database runtime type: %T", databaseRuntime)
Expand Down
15 changes: 12 additions & 3 deletions cmd/ftl-provisioner-cloudformation/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,18 @@ func (c *CloudformationProvisioner) updatePostgresOutputs(ctx context.Context, t
return fmt.Errorf("failed to get username and password from secret ARN: %w", err)
}

to.Value = &schemapb.DatabaseRuntime_DsnDatabaseRuntime{
DsnDatabaseRuntime: &schemapb.DSNDatabaseRuntime{
Dsn: endpointToDSN(byName[PropertyMySQLWriteEndpoint].OutputValue, resourceID, 5432, username, password),
to.WriteConnector = &schemapb.DatabaseConnector{
Value: &schemapb.DatabaseConnector_DsnDatabaseConnector{
DsnDatabaseConnector: &schemapb.DSNDatabaseConnector{
Dsn: endpointToDSN(byName[PropertyMySQLWriteEndpoint].OutputValue, resourceID, 5432, username, password),
},
},
}
to.ReadConnector = &schemapb.DatabaseConnector{
Value: &schemapb.DatabaseConnector_DsnDatabaseConnector{
DsnDatabaseConnector: &schemapb.DSNDatabaseConnector{
Dsn: endpointToDSN(byName[PropertyMySQLReadEndpoint].OutputValue, resourceID, 5432, username, password),
},
},
}

Expand Down
81 changes: 62 additions & 19 deletions frontend/console/src/protos/xyz/block/ftl/v1/schema/schema_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions internal/infra/dsn_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ import (
"context"
"fmt"

schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/schema"
)

func ResolvePostgresDSN(ctx context.Context, schema *schemapb.DatabaseRuntime) (string, error) {
dsnRuntime, ok := schema.Value.(*schemapb.DatabaseRuntime_DsnDatabaseRuntime)
func ResolvePostgresDSN(ctx context.Context, connector schema.DatabaseConnector) (string, error) {
dsnRuntime, ok := connector.(*schema.DSNDatabaseConnector)
if !ok {
return "", fmt.Errorf("unexpected database runtime type: %T", schema.Value)
return "", fmt.Errorf("unexpected database connector type: %T", connector)
}
return dsnRuntime.DsnDatabaseRuntime.Dsn, nil
return dsnRuntime.DSN, nil
}

func ResolveMySQLDSN(ctx context.Context, schema *schemapb.DatabaseRuntime) (string, error) {
dsnRuntime, ok := schema.Value.(*schemapb.DatabaseRuntime_DsnDatabaseRuntime)
func ResolveMySQLDSN(ctx context.Context, connector schema.DatabaseConnector) (string, error) {
dsnRuntime, ok := connector.(*schema.DSNDatabaseConnector)
if !ok {
return "", fmt.Errorf("unexpected database runtime type: %T", schema.Value)
return "", fmt.Errorf("unexpected database connector type: %T", connector)
}
return dsnRuntime.DsnDatabaseRuntime.Dsn, nil
return dsnRuntime.DSN, nil
}
3 changes: 2 additions & 1 deletion internal/schema/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ func (d *Data) Monomorphise(ref *Ref) (*Data, error) {
*Schema, *String, *Time, Type, *TypeParameter, *Unit, *Verb, *Enum,
*EnumVariant, Value, *IntValue, *StringValue, *TypeValue, Symbol,
Named, *TypeAlias, *Topic, *Subscription, *MetadataSubscriber, *MetadataTypeMap,
*MetadataEncoding, *MetadataPublisher, *MetadataSQLMigration, *DSNDatabaseRuntime, DatabaseRuntime:
*MetadataEncoding, *MetadataPublisher, *MetadataSQLMigration, *DSNDatabaseConnector, *DatabaseRuntime,
DatabaseConnector:
}
return next()
})
Expand Down
Loading

0 comments on commit a7237f8

Please sign in to comment.