Skip to content

Commit

Permalink
chore: split DAL into two packages, with moduleconfig queries moved t…
Browse files Browse the repository at this point in the history
…o common/configuration (#1840)

Fixes #1696

Approach:

* Duplicate `- engine` block of `sqlc.yaml` to support multiple `out`
locations
* Refactor common error handling vars + funcs into separate `dalerrs`
package. This now lives under the new top level `db` dir, where we can
add additional common DB logic as we split the controller dal further.
This change incurred a lot of NOOP lines changed. :/
* Meat of the change: split moduleconfig CRUD queries out of
`backend/controller/sql` and into `common/configuration/sql`.
Correspondingly, split the moduleconfig DAL functions out of
`backend/controller/dal` and into `common/configuration/dal`.
* Questionable choice: Copied `backend/controller/sql/conn.go` to
`common/configuration/sql/conn.go` with the unnecessary logic stripped
out (`Tx`...). That means that what remains (`sql.DBI` and `sql.NewDB`),
though a small amount of code, is duplicated with the original
`conn.go`. I could refactor this to use a shared interface instead, but
that could also make this messier for readability and take things in a
worse direction overall, so pending better ideas, we can rethink this
when splitting the controller DAL.
deniseli authored Jun 20, 2024

Verified

This commit was signed with the committer’s verified signature.
Exirel Florian Strzelecki
1 parent 3eb84f8 commit 027b28d
Showing 32 changed files with 1,087 additions and 401 deletions.
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
@@ -66,7 +66,7 @@ init-db:

# Regenerate SQLC code (requires init-db to be run first)
build-sqlc:
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} common/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql common/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"

# Build the ZIP files that are embedded in the FTL release binaries
build-zips: build-kt-runtime
17 changes: 9 additions & 8 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@ import (
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/backend/schema"
cf "github.com/TBD54566975/ftl/common/configuration"
"github.com/TBD54566975/ftl/db/dalerrs"
frontend "github.com/TBD54566975/ftl/frontend"
"github.com/TBD54566975/ftl/internal/cors"
ftlhttp "github.com/TBD54566975/ftl/internal/http"
@@ -282,7 +283,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
routes, err := s.dal.GetIngressRoutes(r.Context(), r.Method)
if err != nil {
if errors.Is(err, dal.ErrNotFound) {
if errors.Is(err, dalerrs.ErrNotFound) {
http.NotFound(w, r)
return
}
@@ -481,7 +482,7 @@ func (s *Service) UpdateDeploy(ctx context.Context, req *connect.Request[ftlv1.U

err = s.dal.SetDeploymentReplicas(ctx, deploymentKey, int(req.Msg.MinReplicas))
if err != nil {
if errors.Is(err, dal.ErrNotFound) {
if errors.Is(err, dalerrs.ErrNotFound) {
logger.Errorf(err, "Deployment not found: %s", deploymentKey)
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
}
@@ -503,10 +504,10 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re

err = s.dal.ReplaceDeployment(ctx, newDeploymentKey, int(c.Msg.MinReplicas))
if err != nil {
if errors.Is(err, dal.ErrNotFound) {
if errors.Is(err, dalerrs.ErrNotFound) {
logger.Errorf(err, "Deployment not found: %s", newDeploymentKey)
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
} else if errors.Is(err, dal.ErrConflict) {
} else if errors.Is(err, dalerrs.ErrConflict) {
logger.Infof("Reusing deployment: %s", newDeploymentKey)
} else {
logger.Errorf(err, "Could not replace deployment: %s", newDeploymentKey)
@@ -566,14 +567,14 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre
Deployment: maybeDeployment,
Labels: msg.Labels.AsMap(),
})
if errors.Is(err, dal.ErrConflict) {
if errors.Is(err, dalerrs.ErrConflict) {
return nil, connect.NewError(connect.CodeAlreadyExists, err)
} else if err != nil {
return nil, err
}

routes, err := s.dal.GetRoutingTable(ctx, nil)
if errors.Is(err, dal.ErrNotFound) {
if errors.Is(err, dalerrs.ErrNotFound) {
routes = map[string][]dal.Route{}
} else if err != nil {
return nil, err
@@ -1220,7 +1221,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
logger.Tracef("Acquiring async call")

call, err := s.dal.AcquireAsyncCall(ctx)
if errors.Is(err, dal.ErrNotFound) {
if errors.Is(err, dalerrs.ErrNotFound) {
logger.Tracef("No async calls to execute")
return time.Second * 2, nil
} else if err != nil {
@@ -1555,7 +1556,7 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.D
// Periodically sync the routing table from the DB.
func (s *Service) syncRoutes(ctx context.Context) (time.Duration, error) {
routes, err := s.dal.GetRoutingTable(ctx, nil)
if errors.Is(err, dal.ErrNotFound) {
if errors.Is(err, dalerrs.ErrNotFound) {
routes = map[string][]dal.Route{}
} else if err != nil {
return 0, err
17 changes: 9 additions & 8 deletions backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/db/dalerrs"
)

type asyncOriginParseRoot struct {
@@ -94,10 +95,10 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
ttl := time.Second * 5
row, err := tx.db.AcquireAsyncCall(ctx, ttl)
if err != nil {
err = translatePGError(err)
err = dalerrs.TranslatePGError(err)
// We get a NULL constraint violation if there are no async calls to acquire, so translate it to ErrNotFound.
if errors.Is(err, ErrConstraint) {
return nil, fmt.Errorf("no pending async calls: %w", ErrNotFound)
if errors.Is(err, dalerrs.ErrConstraint) {
return nil, fmt.Errorf("no pending async calls: %w", dalerrs.ErrNotFound)
}
return nil, fmt.Errorf("failed to acquire async call: %w", err)
}
@@ -126,15 +127,15 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, result either.Either[[]byte, string], finalise func(tx *Tx) error) (err error) {
tx, err := d.Begin(ctx)
if err != nil {
return translatePGError(err)
return dalerrs.TranslatePGError(err)
}
defer tx.CommitOrRollback(ctx, &err)

switch result := result.(type) {
case either.Left[[]byte, string]: // Successful response.
_, err = tx.db.SucceedAsyncCall(ctx, result.Get(), call.ID)
if err != nil {
return translatePGError(err)
return dalerrs.TranslatePGError(err)
}

case either.Right[[]byte, string]: // Failure message.
@@ -148,12 +149,12 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, result eit
ScheduledAt: time.Now().Add(call.Backoff),
})
if err != nil {
return translatePGError(err)
return dalerrs.TranslatePGError(err)
}
} else {
_, err = tx.db.FailAsyncCall(ctx, result.Get(), call.ID)
if err != nil {
return translatePGError(err)
return dalerrs.TranslatePGError(err)
}
}
}
@@ -164,7 +165,7 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, result eit
func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error) {
row, err := d.db.LoadAsyncCall(ctx, id)
if err != nil {
return nil, translatePGError(err)
return nil, dalerrs.TranslatePGError(err)
}
origin, err := ParseAsyncOrigin(row.Origin)
if err != nil {
218 changes: 75 additions & 143 deletions backend/controller/dal/dal.go

Large diffs are not rendered by default.

106 changes: 8 additions & 98 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
@@ -12,10 +12,10 @@ import (
"github.com/alecthomas/types/optional"
"golang.org/x/sync/errgroup"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/db/dalerrs"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/sha256"
@@ -88,7 +88,7 @@ func TestDAL(t *testing.T) {

t.Run("GetMissingDeployment", func(t *testing.T) {
_, err := dal.GetDeployment(ctx, model.NewDeploymentKey("invalid"))
assert.IsError(t, err, ErrNotFound)
assert.IsError(t, err, dalerrs.ErrNotFound)
})

t.Run("GetMissingArtefacts", func(t *testing.T) {
@@ -119,7 +119,7 @@ func TestDAL(t *testing.T) {
State: RunnerStateIdle,
})
assert.Error(t, err)
assert.IsError(t, err, ErrConflict)
assert.IsError(t, err, dalerrs.ErrConflict)
})

t.Run("GetIdleRunnersForLanguage", func(t *testing.T) {
@@ -168,7 +168,7 @@ func TestDAL(t *testing.T) {
t.Run("ReserveRunnerForInvalidDeployment", func(t *testing.T) {
_, err := dal.ReserveRunnerForDeployment(ctx, model.NewDeploymentKey("invalid"), time.Second, labels)
assert.Error(t, err)
assert.IsError(t, err, ErrNotFound)
assert.IsError(t, err, dalerrs.ErrNotFound)
assert.EqualError(t, err, "deployment: not found")
})

@@ -192,7 +192,7 @@ func TestDAL(t *testing.T) {

t.Run("ReserveRunnerForDeploymentFailsOnInvalidDeployment", func(t *testing.T) {
_, err = dal.ReserveRunnerForDeployment(ctx, model.NewDeploymentKey("test"), time.Second, labels)
assert.IsError(t, err, ErrNotFound)
assert.IsError(t, err, dalerrs.ErrNotFound)
})

t.Run("UpdateRunnerAssigned", func(t *testing.T) {
@@ -319,7 +319,7 @@ func TestDAL(t *testing.T) {
Deployment: optional.Some(model.NewDeploymentKey("test")),
})
assert.Error(t, err)
assert.IsError(t, err, ErrNotFound)
assert.IsError(t, err, dalerrs.ErrNotFound)
})

t.Run("ReleaseRunnerReservation", func(t *testing.T) {
@@ -342,7 +342,7 @@ func TestDAL(t *testing.T) {

t.Run("GetRoutingTable", func(t *testing.T) {
_, err := dal.GetRoutingTable(ctx, []string{deployment.Module})
assert.IsError(t, err, ErrNotFound)
assert.IsError(t, err, dalerrs.ErrNotFound)
})

t.Run("DeregisterRunner", func(t *testing.T) {
@@ -352,7 +352,7 @@ func TestDAL(t *testing.T) {

t.Run("DeregisterRunnerFailsOnMissing", func(t *testing.T) {
err = dal.DeregisterRunner(ctx, model.NewRunnerKey("localhost", "8080"))
assert.IsError(t, err, ErrNotFound)
assert.IsError(t, err, dalerrs.ErrNotFound)
})

t.Run("VerifyDeploymentNotifications", func(t *testing.T) {
@@ -402,93 +402,3 @@ func assertEventsEqual(t *testing.T, expected, actual []Event) {
t.Helper()
assert.Equal(t, normaliseEvents(expected), normaliseEvents(actual))
}

func TestModuleConfiguration(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn)
assert.NoError(t, err)
assert.NotZero(t, dal)

tests := []struct {
TestName string
ModuleSet optional.Option[string]
ModuleGet optional.Option[string]
PresetGlobal bool
}{
{
"SetModuleGetModule",
optional.Some("echo"),
optional.Some("echo"),
false,
},
{
"SetGlobalGetGlobal",
optional.None[string](),
optional.None[string](),
false,
},
{
"SetGlobalGetModule",
optional.None[string](),
optional.Some("echo"),
false,
},
{
"SetModuleOverridesGlobal",
optional.Some("echo"),
optional.Some("echo"),
true,
},
}

b := []byte(`"asdf"`)
for _, test := range tests {
t.Run(test.TestName, func(t *testing.T) {
if test.PresetGlobal {
err := dal.SetModuleConfiguration(ctx, optional.None[string](), "configname", []byte(`"qwerty"`))
assert.NoError(t, err)
}
err := dal.SetModuleConfiguration(ctx, test.ModuleSet, "configname", b)
assert.NoError(t, err)
gotBytes, err := dal.GetModuleConfiguration(ctx, test.ModuleGet, "configname")
assert.NoError(t, err)
assert.Equal(t, b, gotBytes)
err = dal.UnsetModuleConfiguration(ctx, test.ModuleGet, "configname")
assert.NoError(t, err)
})
}

t.Run("List", func(t *testing.T) {
sortedList := []sql.ModuleConfiguration{
{
Module: optional.Some("echo"),
Name: "a",
},
{
Module: optional.Some("echo"),
Name: "b",
},
{
Module: optional.None[string](),
Name: "a",
},
}

// Insert entries in a separate order from how they should be returned to
// test sorting logic in the SQL query
err := dal.SetModuleConfiguration(ctx, sortedList[1].Module, sortedList[1].Name, []byte(`""`))
assert.NoError(t, err)
err = dal.SetModuleConfiguration(ctx, sortedList[2].Module, sortedList[2].Name, []byte(`""`))
assert.NoError(t, err)
err = dal.SetModuleConfiguration(ctx, sortedList[0].Module, sortedList[0].Name, []byte(`""`))
assert.NoError(t, err)

gotList, err := dal.ListModuleConfiguration(ctx)
assert.NoError(t, err)
for i := range sortedList {
assert.Equal(t, sortedList[i].Module, gotList[i].Module)
assert.Equal(t, sortedList[i].Name, gotList[i].Name)
}
})
}
5 changes: 3 additions & 2 deletions backend/controller/dal/events.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/db/dalerrs"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
)
@@ -260,7 +261,7 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter
}
rows, err := d.db.Conn().Query(ctx, deploymentQuery, deploymentArgs...)
if err != nil {
return nil, translatePGError(err)
return nil, dalerrs.TranslatePGError(err)
}
deploymentIDs := []int64{}
for rows.Next() {
@@ -315,7 +316,7 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter
// Issue query.
rows, err = d.db.Conn().Query(ctx, q, args...)
if err != nil {
return nil, fmt.Errorf("%s: %w", q, translatePGError(err))
return nil, fmt.Errorf("%s: %w", q, dalerrs.TranslatePGError(err))
}
defer rows.Close()

19 changes: 10 additions & 9 deletions backend/controller/dal/fsm.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/db/dalerrs"
)

// StartFSMTransition sends an event to an executing instance of an FSM.
@@ -39,7 +40,7 @@ func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executi
MaxBackoff: retryParams.MaxBackoff,
})
if err != nil {
return fmt.Errorf("failed to create FSM async call: %w", translatePGError(err))
return fmt.Errorf("failed to create FSM async call: %w", dalerrs.TranslatePGError(err))
}

// Start a transition.
@@ -50,9 +51,9 @@ func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executi
AsyncCallID: asyncCallID,
})
if err != nil {
err = translatePGError(err)
if errors.Is(err, ErrNotFound) {
return fmt.Errorf("transition already executing: %w", ErrConflict)
err = dalerrs.TranslatePGError(err)
if errors.Is(err, dalerrs.ErrNotFound) {
return fmt.Errorf("transition already executing: %w", dalerrs.ErrConflict)
}
return fmt.Errorf("failed to start FSM transition: %w", err)
}
@@ -61,17 +62,17 @@ func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executi

func (d *DAL) FinishFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string) error {
_, err := d.db.FinishFSMTransition(ctx, fsm, instanceKey)
return translatePGError(err)
return dalerrs.TranslatePGError(err)
}

func (d *DAL) FailFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error {
_, err := d.db.FailFSMInstance(ctx, fsm, instanceKey)
return translatePGError(err)
return dalerrs.TranslatePGError(err)
}

func (d *DAL) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error {
_, err := d.db.SucceedFSMInstance(ctx, fsm, instanceKey)
return translatePGError(err)
return dalerrs.TranslatePGError(err)
}

type FSMStatus = sql.FsmStatus
@@ -103,8 +104,8 @@ func (d *DAL) AcquireFSMInstance(ctx context.Context, fsm schema.RefKey, instanc
}
row, err := d.db.GetFSMInstance(ctx, fsm, instanceKey)
if err != nil {
err = translatePGError(err)
if !errors.Is(err, ErrNotFound) {
err = dalerrs.TranslatePGError(err)
if !errors.Is(err, dalerrs.ErrNotFound) {
return nil, err
}
row.Status = sql.FsmStatusRunning
5 changes: 3 additions & 2 deletions backend/controller/dal/fsm_test.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (

"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/db/dalerrs"
"github.com/TBD54566975/ftl/internal/log"
)

@@ -20,14 +21,14 @@ func TestSendFSMEvent(t *testing.T) {
assert.NoError(t, err)

_, err = dal.AcquireAsyncCall(ctx)
assert.IsError(t, err, ErrNotFound)
assert.IsError(t, err, dalerrs.ErrNotFound)

ref := schema.RefKey{Module: "module", Name: "verb"}
err = dal.StartFSMTransition(ctx, schema.RefKey{Module: "test", Name: "test"}, "invoiceID", ref, []byte(`{}`), schema.RetryParams{})
assert.NoError(t, err)

err = dal.StartFSMTransition(ctx, schema.RefKey{Module: "test", Name: "test"}, "invoiceID", ref, []byte(`{}`), schema.RetryParams{})
assert.IsError(t, err, ErrConflict)
assert.IsError(t, err, dalerrs.ErrConflict)
assert.EqualError(t, err, "transition already executing: conflict")

call, err := dal.AcquireAsyncCall(ctx)
13 changes: 7 additions & 6 deletions backend/controller/dal/lease.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (

"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/db/dalerrs"
"github.com/TBD54566975/ftl/internal/log"
)

@@ -47,8 +48,8 @@ func (l *Lease) renew(ctx context.Context, cancelCtx context.CancelFunc) {
cancel()

if err != nil {
err = translatePGError(err)
if errors.Is(err, ErrNotFound) {
err = dalerrs.TranslatePGError(err)
if errors.Is(err, dalerrs.ErrNotFound) {
logger.Warnf("Lease expired")
} else {
logger.Errorf(err, "Failed to renew lease %s", l.key)
@@ -64,7 +65,7 @@ func (l *Lease) renew(ctx context.Context, cancelCtx context.CancelFunc) {
}
logger.Debugf("Releasing lease")
_, err := l.db.ReleaseLease(ctx, l.idempotencyKey, l.key)
l.errch <- translatePGError(err)
l.errch <- dalerrs.TranslatePGError(err)
cancelCtx()
return
}
@@ -95,7 +96,7 @@ func (d *DAL) AcquireLease(ctx context.Context, key leases.Key, ttl time.Duratio
}
idempotencyKey, err := d.db.NewLease(ctx, key, ttl, metadataBytes)
if err != nil {
return nil, nil, translatePGError(err)
return nil, nil, dalerrs.TranslatePGError(err)
}
leaseCtx, lease := d.newLease(ctx, key, idempotencyKey, ttl)
return leaseCtx, lease, nil
@@ -121,7 +122,7 @@ func (d *DAL) newLease(ctx context.Context, key leases.Key, idempotencyKey uuid.
func (d *DAL) GetLeaseInfo(ctx context.Context, key leases.Key, metadata any) (expiry time.Time, err error) {
l, err := d.db.GetLeaseInfo(ctx, key)
if err != nil {
return expiry, translatePGError(err)
return expiry, dalerrs.TranslatePGError(err)
}
if err := json.Unmarshal(l.Metadata, metadata); err != nil {
return expiry, fmt.Errorf("could not unmarshal lease metadata: %w", err)
@@ -136,5 +137,5 @@ func (d *DAL) ExpireLeases(ctx context.Context) error {
if count > 0 {
log.FromContext(ctx).Warnf("Expired %d leases", count)
}
return translatePGError(err)
return dalerrs.TranslatePGError(err)
}
7 changes: 4 additions & 3 deletions backend/controller/dal/lease_test.go
Original file line number Diff line number Diff line change
@@ -13,16 +13,17 @@ import (
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/db/dalerrs"
"github.com/TBD54566975/ftl/internal/log"
)

func leaseExists(t *testing.T, conn sql.ConnI, idempotencyKey uuid.UUID, key leases.Key) bool {
t.Helper()
var count int
err := translatePGError(conn.
err := dalerrs.TranslatePGError(conn.
QueryRow(context.Background(), "SELECT COUNT(*) FROM leases WHERE idempotency_key = $1 AND key = $2", idempotencyKey, key).
Scan(&count))
if errors.Is(err, ErrNotFound) {
if errors.Is(err, dalerrs.ErrNotFound) {
return false
}
assert.NoError(t, err)
@@ -49,7 +50,7 @@ func TestLease(t *testing.T) {

// Try to acquire the same lease again, which should fail.
_, _, err = dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*5, optional.None[any]())
assert.IsError(t, err, ErrConflict)
assert.IsError(t, err, dalerrs.ErrConflict)

time.Sleep(time.Second * 6)

3 changes: 2 additions & 1 deletion backend/controller/dal/notify.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jpillora/backoff"

"github.com/TBD54566975/ftl/db/dalerrs"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
)
@@ -102,7 +103,7 @@ func (d *DAL) publishNotification(ctx context.Context, notification event, logge
deployment, err := decodeNotification(notification, func(key model.DeploymentKey) (Deployment, optional.Option[model.DeploymentKey], error) {
row, err := d.db.GetDeployment(ctx, key)
if err != nil {
return Deployment{}, optional.None[model.DeploymentKey](), translatePGError(err)
return Deployment{}, optional.None[model.DeploymentKey](), dalerrs.TranslatePGError(err)
}
return Deployment{
CreatedAt: row.Deployment.CreatedAt,
25 changes: 13 additions & 12 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/db/dalerrs"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
@@ -20,15 +21,15 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, pa
Payload: payload,
})
if err != nil {
return translatePGError(err)
return dalerrs.TranslatePGError(err)
}
return nil
}

func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscription, error) {
rows, err := d.db.GetSubscriptionsNeedingUpdate(ctx)
if err != nil {
return nil, translatePGError(err)
return nil, dalerrs.TranslatePGError(err)
}
return slices.Map(rows, func(row sql.GetSubscriptionsNeedingUpdateRow) model.Subscription {
return model.Subscription{
@@ -53,18 +54,18 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
// also gets a lock on the subscription, and skips any subscriptions locked by others
subs, err := tx.db.GetSubscriptionsNeedingUpdate(ctx)
if err != nil {
return 0, fmt.Errorf("could not get subscriptions to progress: %w", translatePGError(err))
return 0, fmt.Errorf("could not get subscriptions to progress: %w", dalerrs.TranslatePGError(err))
}

successful := 0
for _, subscription := range subs {
nextCursor, err := tx.db.GetNextEventForSubscription(ctx, eventConsumptionDelay, subscription.Topic, subscription.Cursor)
if err != nil {
return 0, fmt.Errorf("failed to get next cursor: %w", translatePGError(err))
return 0, fmt.Errorf("failed to get next cursor: %w", dalerrs.TranslatePGError(err))
}
nextCursorKey, ok := nextCursor.Event.Get()
if !ok {
return 0, fmt.Errorf("could not find event to progress subscription: %w", translatePGError(err))
return 0, fmt.Errorf("could not find event to progress subscription: %w", dalerrs.TranslatePGError(err))
}
if !nextCursor.Ready {
logger.Tracef("Skipping subscription %s because event is too new", subscription.Key)
@@ -79,7 +80,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t

err = tx.db.BeginConsumingTopicEvent(ctx, subscription.Key, nextCursorKey)
if err != nil {
return 0, fmt.Errorf("failed to progress subscription: %w", translatePGError(err))
return 0, fmt.Errorf("failed to progress subscription: %w", dalerrs.TranslatePGError(err))
}

origin := AsyncOriginPubSub{
@@ -97,7 +98,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
MaxBackoff: subscriber.MaxBackoff,
})
if err != nil {
return 0, fmt.Errorf("failed to schedule async task for subscription: %w", translatePGError(err))
return 0, fmt.Errorf("failed to schedule async task for subscription: %w", dalerrs.TranslatePGError(err))
}
successful++
}
@@ -107,7 +108,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error {
err := d.db.CompleteEventForSubscription(ctx, name, module)
if err != nil {
return fmt.Errorf("failed to complete event for subscription: %w", translatePGError(err))
return fmt.Errorf("failed to complete event for subscription: %w", dalerrs.TranslatePGError(err))
}
return nil
}
@@ -134,7 +135,7 @@ func (d *DAL) createSubscriptions(ctx context.Context, tx *sql.Tx, key model.Dep
TopicName: s.Topic.Name,
Name: s.Name,
}); err != nil {
return fmt.Errorf("could not insert subscription: %w", translatePGError(err))
return fmt.Errorf("could not insert subscription: %w", dalerrs.TranslatePGError(err))
}
}
return nil
@@ -193,7 +194,7 @@ func (d *DAL) createSubscribers(ctx context.Context, tx *sql.Tx, key model.Deplo
MaxBackoff: retryParams.MaxBackoff,
})
if err != nil {
return fmt.Errorf("could not insert subscriber: %w", translatePGError(err))
return fmt.Errorf("could not insert subscriber: %w", dalerrs.TranslatePGError(err))
}
}
}
@@ -202,10 +203,10 @@ func (d *DAL) createSubscribers(ctx context.Context, tx *sql.Tx, key model.Deplo

func (d *DAL) removeSubscriptionsAndSubscribers(ctx context.Context, tx *sql.Tx, key model.DeploymentKey) error {
if err := tx.DeleteSubscriptions(ctx, key); err != nil {
return fmt.Errorf("could not delete old subscriptions: %w", translatePGError(err))
return fmt.Errorf("could not delete old subscriptions: %w", dalerrs.TranslatePGError(err))
}
if err := tx.DeleteSubscribers(ctx, key); err != nil {
return fmt.Errorf("could not delete old subscribers: %w", translatePGError(err))
return fmt.Errorf("could not delete old subscribers: %w", dalerrs.TranslatePGError(err))
}
return nil
}
3 changes: 2 additions & 1 deletion backend/controller/ingress/handler.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ import (
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/db/dalerrs"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
)
@@ -30,7 +31,7 @@ func Handle(
logger.Debugf("%s %s", r.Method, r.URL.Path)
route, err := GetIngressRoute(routes, r.Method, r.URL.Path)
if err != nil {
if errors.Is(err, dal.ErrNotFound) {
if errors.Is(err, dalerrs.ErrNotFound) {
http.NotFound(w, r)
return
}
3 changes: 2 additions & 1 deletion backend/controller/ingress/ingress.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/db/dalerrs"
"github.com/TBD54566975/ftl/internal/slices"
)

@@ -27,7 +28,7 @@ func GetIngressRoute(routes []dal.IngressRoute, method string, path string) (*da
})

if len(matchedRoutes) == 0 {
return nil, dal.ErrNotFound
return nil, dalerrs.ErrNotFound
}

// TODO: add load balancing at some point
6 changes: 3 additions & 3 deletions backend/controller/leader/leader.go
Original file line number Diff line number Diff line change
@@ -20,8 +20,8 @@ import (
"sync"
"time"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/db/dalerrs"
"github.com/TBD54566975/ftl/internal/log"
"github.com/alecthomas/types/optional"
)
@@ -132,7 +132,7 @@ func (c *Coordinator[P]) Get() (leaderOrFollower P, err error) {
logger.Tracef("new leader for %s: %s", c.key, c.advertise)
return l, nil
}
if !errors.Is(leaseErr, dal.ErrConflict) {
if !errors.Is(leaseErr, dalerrs.ErrConflict) {
return leaderOrFollower, fmt.Errorf("could not acquire lease for %s: %w", c.key, leaseErr)
}
// lease already held
@@ -155,7 +155,7 @@ func (c *Coordinator[P]) createFollower() (out P, err error) {
var urlString string
expiry, err := c.leaser.GetLeaseInfo(c.ctx, c.key, &urlString)
if err != nil {
if errors.Is(err, dal.ErrNotFound) {
if errors.Is(err, dalerrs.ErrNotFound) {
return out, fmt.Errorf("could not acquire or find lease for %s", c.key)
}
return out, fmt.Errorf("could not get lease for %s: %w", c.key, err)
4 changes: 0 additions & 4 deletions backend/controller/sql/querier.go
22 changes: 0 additions & 22 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
@@ -798,25 +798,3 @@ UPDATE topic_subscriptions
SET state = 'idle'
WHERE name = @name::TEXT
AND module_id = (SELECT id FROM module);

-- name: GetModuleConfiguration :one
SELECT value
FROM module_configuration
WHERE
(module IS NULL OR module = @module)
AND name = @name
ORDER BY module NULLS LAST
LIMIT 1;

-- name: ListModuleConfiguration :many
SELECT *
FROM module_configuration
ORDER BY module, name;

-- name: SetModuleConfiguration :exec
INSERT INTO module_configuration (module, name, value)
VALUES ($1, $2, $3);

-- name: UnsetModuleConfiguration :exec
DELETE FROM module_configuration
WHERE module = @module AND name = @name;
69 changes: 0 additions & 69 deletions backend/controller/sql/queries.sql.go
4 changes: 2 additions & 2 deletions cmd/ftl-controller/main.go
Original file line number Diff line number Diff line change
@@ -14,9 +14,9 @@ import (

"github.com/TBD54566975/ftl"
"github.com/TBD54566975/ftl/backend/controller"
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/scaling"
cf "github.com/TBD54566975/ftl/common/configuration"
cfdal "github.com/TBD54566975/ftl/common/configuration/dal"
_ "github.com/TBD54566975/ftl/internal/automaxprocs" // Set GOMAXPROCS to match Linux container CPU quota.
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/observability"
@@ -47,7 +47,7 @@ func main() {
// The FTL controller currently only supports DB as a configuration provider/resolver.
conn, err := pgxpool.New(ctx, cli.ControllerConfig.DSN)
kctx.FatalIfErrorf(err)
dal, err := dal.New(ctx, conn)
dal, err := cfdal.New(ctx, conn)
kctx.FatalIfErrorf(err)
configProviders := []cf.Provider[cf.Configuration]{cf.NewDBConfigProvider(dal)}
configResolver := cf.NewDBConfigResolver(dal)
47 changes: 47 additions & 0 deletions common/configuration/dal/dal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Package dal provides a data abstraction layer for managing module configurations
package dal

import (
"context"

"github.com/alecthomas/types/optional"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/TBD54566975/ftl/common/configuration/sql"
"github.com/TBD54566975/ftl/db/dalerrs"
)

type DAL struct {
db sql.DBI
}

func New(ctx context.Context, pool *pgxpool.Pool) (*DAL, error) {
dal := &DAL{db: sql.NewDB(pool)}
return dal, nil
}

func (d *DAL) GetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) ([]byte, error) {
b, err := d.db.GetModuleConfiguration(ctx, module, name)
if err != nil {
return nil, dalerrs.TranslatePGError(err)
}
return b, nil
}

func (d *DAL) SetModuleConfiguration(ctx context.Context, module optional.Option[string], name string, value []byte) error {
err := d.db.SetModuleConfiguration(ctx, module, name, value)
return dalerrs.TranslatePGError(err)
}

func (d *DAL) UnsetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) error {
err := d.db.UnsetModuleConfiguration(ctx, module, name)
return dalerrs.TranslatePGError(err)
}

func (d *DAL) ListModuleConfiguration(ctx context.Context) ([]sql.ModuleConfiguration, error) {
l, err := d.db.ListModuleConfiguration(ctx)
if err != nil {
return nil, dalerrs.TranslatePGError(err)
}
return l, nil
}
103 changes: 103 additions & 0 deletions common/configuration/dal/dal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package dal

import (
"context"
"testing"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/internal/log"
)

func TestModuleConfiguration(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn)
assert.NoError(t, err)
assert.NotZero(t, dal)

tests := []struct {
TestName string
ModuleSet optional.Option[string]
ModuleGet optional.Option[string]
PresetGlobal bool
}{
{
"SetModuleGetModule",
optional.Some("echo"),
optional.Some("echo"),
false,
},
{
"SetGlobalGetGlobal",
optional.None[string](),
optional.None[string](),
false,
},
{
"SetGlobalGetModule",
optional.None[string](),
optional.Some("echo"),
false,
},
{
"SetModuleOverridesGlobal",
optional.Some("echo"),
optional.Some("echo"),
true,
},
}

b := []byte(`"asdf"`)
for _, test := range tests {
t.Run(test.TestName, func(t *testing.T) {
if test.PresetGlobal {
err := dal.SetModuleConfiguration(ctx, optional.None[string](), "configname", []byte(`"qwerty"`))
assert.NoError(t, err)
}
err := dal.SetModuleConfiguration(ctx, test.ModuleSet, "configname", b)
assert.NoError(t, err)
gotBytes, err := dal.GetModuleConfiguration(ctx, test.ModuleGet, "configname")
assert.NoError(t, err)
assert.Equal(t, b, gotBytes)
err = dal.UnsetModuleConfiguration(ctx, test.ModuleGet, "configname")
assert.NoError(t, err)
})
}

t.Run("List", func(t *testing.T) {
sortedList := []sql.ModuleConfiguration{
{
Module: optional.Some("echo"),
Name: "a",
},
{
Module: optional.Some("echo"),
Name: "b",
},
{
Module: optional.None[string](),
Name: "a",
},
}

// Insert entries in a separate order from how they should be returned to
// test sorting logic in the SQL query
err := dal.SetModuleConfiguration(ctx, sortedList[1].Module, sortedList[1].Name, []byte(`""`))
assert.NoError(t, err)
err = dal.SetModuleConfiguration(ctx, sortedList[2].Module, sortedList[2].Name, []byte(`""`))
assert.NoError(t, err)
err = dal.SetModuleConfiguration(ctx, sortedList[0].Module, sortedList[0].Name, []byte(`""`))
assert.NoError(t, err)

gotList, err := dal.ListModuleConfiguration(ctx)
assert.NoError(t, err)
for i := range sortedList {
assert.Equal(t, sortedList[i].Module, gotList[i].Module)
assert.Equal(t, sortedList[i].Name, gotList[i].Name)
}
})
}
4 changes: 2 additions & 2 deletions common/configuration/db_config_provider.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ import (
"context"
"net/url"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/db/dalerrs"
"github.com/alecthomas/types/optional"
)

@@ -31,7 +31,7 @@ func (DBConfigProvider) Key() string { return "db" }
func (d DBConfigProvider) Load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error) {
value, err := d.dal.GetModuleConfiguration(ctx, ref.Module, ref.Name)
if err != nil {
return nil, dal.ErrNotFound
return nil, dalerrs.ErrNotFound
}
return value, nil
}
2 changes: 1 addition & 1 deletion common/configuration/db_config_resolver.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ import (
"context"
"net/url"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/common/configuration/sql"
"github.com/TBD54566975/ftl/internal/slices"
)

2 changes: 1 addition & 1 deletion common/configuration/db_config_resolver_test.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ import (
"context"
"testing"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/common/configuration/sql"
"github.com/alecthomas/assert/v2"
)

21 changes: 21 additions & 0 deletions common/configuration/sql/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package sql

type DBI interface {
Querier
Conn() ConnI
}

type ConnI interface {
DBTX
}

type DB struct {
conn ConnI
*Queries
}

func NewDB(conn ConnI) *DB {
return &DB{conn: conn, Queries: New(conn)}
}

func (d *DB) Conn() ConnI { return d.conn }
32 changes: 32 additions & 0 deletions common/configuration/sql/db.go
541 changes: 541 additions & 0 deletions common/configuration/sql/models.go

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions common/configuration/sql/querier.go
21 changes: 21 additions & 0 deletions common/configuration/sql/queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- name: GetModuleConfiguration :one
SELECT value
FROM module_configuration
WHERE
(module IS NULL OR module = @module)
AND name = @name
ORDER BY module NULLS LAST
LIMIT 1;

-- name: ListModuleConfiguration :many
SELECT *
FROM module_configuration
ORDER BY module, name;

-- name: SetModuleConfiguration :exec
INSERT INTO module_configuration (module, name, value)
VALUES ($1, $2, $3);

-- name: UnsetModuleConfiguration :exec
DELETE FROM module_configuration
WHERE module = @module AND name = @name;
81 changes: 81 additions & 0 deletions common/configuration/sql/queries.sql.go
55 changes: 55 additions & 0 deletions db/dalerrs/dalerrs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Package dalerrs provides common error handling utilities for all domain-specific DALs,
// e.g. controller DAL and configuration DAL, which all connect to the same underlying DB
// and maintain the same interface guarantees
package dalerrs

import (
stdsql "database/sql"
"errors"
"fmt"
"strings"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)

var (
// ErrConflict is returned by select methods in the DAL when a resource already exists.
//
// Its use will be documented in the corresponding methods.
ErrConflict = errors.New("conflict")
// ErrNotFound is returned by select methods in the DAL when no results are found.
ErrNotFound = errors.New("not found")
// ErrConstraint is returned by select methods in the DAL when a constraint is violated.
ErrConstraint = errors.New("constraint violation")
)

func IsNotFound(err error) bool {
return errors.Is(err, stdsql.ErrNoRows) || errors.Is(err, pgx.ErrNoRows)
}

func TranslatePGError(err error) error {
if err == nil {
return nil
}
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
switch pgErr.Code {
case pgerrcode.ForeignKeyViolation:
return fmt.Errorf("%s: %w", strings.TrimSuffix(strings.TrimPrefix(pgErr.ConstraintName, pgErr.TableName+"_"), "_id_fkey"), ErrNotFound)
case pgerrcode.UniqueViolation:
return fmt.Errorf("%s: %w", pgErr.Message, ErrConflict)
case pgerrcode.IntegrityConstraintViolation,
pgerrcode.RestrictViolation,
pgerrcode.NotNullViolation,
pgerrcode.CheckViolation,
pgerrcode.ExclusionViolation:
return fmt.Errorf("%s: %w", pgErr.Message, ErrConstraint)
default:
}
} else if IsNotFound(err) {
return ErrNotFound
}
return err
}
11 changes: 9 additions & 2 deletions sqlc.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
version: "2"
sql:
- engine: "postgresql"
- &daldir
engine: "postgresql"
queries: "backend/controller/sql/queries.sql"
schema: "backend/controller/sql/schema"
database:
uri: postgres://localhost:15432/ftl?sslmode=disable&user=postgres&password=secret
gen:
go:
go: &gengo
package: "sql"
sql_package: "pgx/v5"
out: "backend/controller/sql"
@@ -137,6 +138,12 @@ sql:
- sqlc/db-prepare
# - postgresql-query-too-costly
- postgresql-no-seq-scan
- <<: *daldir
queries: "common/configuration/sql/queries.sql"
gen:
go:
<<: *gengo
out: "common/configuration/sql"
rules:
- name: postgresql-query-too-costly
message: "Query cost estimate is too high"

0 comments on commit 027b28d

Please sign in to comment.