Skip to content

Commit

Permalink
feat: Encrypt sensitive internal table columns (#2248)
Browse files Browse the repository at this point in the history
- `ftl-controller` requires encryption keys, whereas `ftl dev`, `ftl
serve` and `ftl box` can optionally use them
- envar keys are `FTL_LOG_ENCRYPTION_KEY` and `FTL_ASYNC_ENCRYPTION_KEY`
- `encryption` package handles encryption, with `Encryptable` interface
exposing `EncryptJSON` and `DecryptJSON`
    - `NoOpEncryptor` is used when no keys are provided
    - `Encryptor` is used when keys are provided

---------

Co-authored-by: gak <[email protected]>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 2, 2024
1 parent 7be3ded commit 85b08ae
Show file tree
Hide file tree
Showing 30 changed files with 952 additions and 207 deletions.
52 changes: 41 additions & 11 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
cf "github.com/TBD54566975/ftl/common/configuration"
frontend "github.com/TBD54566975/ftl/frontend"
"github.com/TBD54566975/ftl/internal/cors"
"github.com/TBD54566975/ftl/internal/encryption"
ftlhttp "github.com/TBD54566975/ftl/internal/http"
"github.com/TBD54566975/ftl/internal/log"
ftlmaps "github.com/TBD54566975/ftl/internal/maps"
Expand Down Expand Up @@ -86,6 +87,40 @@ func (c *CommonConfig) Validate() error {
return nil
}

type EncryptionKeys struct {
Logs string `name:"log-key" help:"Key for sensitive log data in internal FTL tables." env:"FTL_LOG_ENCRYPTION_KEY"`
Async string `name:"async-key" help:"Key for sensitive async call data in internal FTL tables." env:"FTL_ASYNC_ENCRYPTION_KEY"`
}

func (e EncryptionKeys) Encryptors(required bool) (*dal.Encryptors, error) {
encryptors := dal.Encryptors{}
if e.Logs != "" {
enc, err := encryption.NewForKeyOrURI(e.Logs)
if err != nil {
return nil, fmt.Errorf("could not create log encryptor: %w", err)
}
encryptors.Logs = enc
} else if required {
return nil, fmt.Errorf("FTL_LOG_ENCRYPTION_KEY is required")
} else {
encryptors.Logs = encryption.NoOpEncryptor{}
}

if e.Async != "" {
enc, err := encryption.NewForKeyOrURI(e.Async)
if err != nil {
return nil, fmt.Errorf("could not create async calls encryptor: %w", err)
}
encryptors.Async = enc
} else if required {
return nil, fmt.Errorf("FTL_ASYNC_ENCRYPTION_KEY is required")
} else {
encryptors.Async = encryption.NoOpEncryptor{}
}

return &encryptors, nil
}

type Config struct {
Bind *url.URL `help:"Socket to bind to." default:"http://localhost:8892" env:"FTL_CONTROLLER_BIND"`
IngressBind *url.URL `help:"Socket to bind to for ingress." default:"http://localhost:8891" env:"FTL_CONTROLLER_INGRESS_BIND"`
Expand All @@ -99,6 +134,7 @@ type Config struct {
DeploymentReservationTimeout time.Duration `help:"Deployment reservation timeout." default:"120s"`
ModuleUpdateFrequency time.Duration `help:"Frequency to send module updates." default:"30s"`
ArtefactChunkSize int `help:"Size of each chunk streamed to the client." default:"1048576"`
EncryptionKeys
CommonConfig
}

Expand All @@ -112,7 +148,7 @@ func (c *Config) SetDefaults() {
}

// Start the Controller. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling) error {
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling, pool *pgxpool.Pool, encryptors *dal.Encryptors) error {
config.SetDefaults()

logger := log.FromContext(ctx)
Expand All @@ -133,13 +169,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali
logger.Infof("Web console available at: %s", config.Bind)
}

// Bring up the DB connection and DAL.
conn, err := pgxpool.New(ctx, config.DSN)
if err != nil {
return fmt.Errorf("failed to bring up DB connection: %w", err)
}

svc, err := New(ctx, conn, config, runnerScaling)
svc, err := New(ctx, pool, config, runnerScaling, encryptors)
if err != nil {
return err
}
Expand Down Expand Up @@ -221,7 +251,7 @@ type Service struct {
asyncCallsLock sync.Mutex
}

func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling scaling.RunnerScaling, encryptors *dal.Encryptors) (*Service, error) {
key := config.Key
if config.Key.IsZero() {
key = model.NewControllerKey(config.Bind.Hostname(), config.Bind.Port())
Expand All @@ -235,15 +265,15 @@ func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling s
config.ControllerTimeout = time.Second * 5
}

db, err := dal.New(ctx, pool)
db, err := dal.New(ctx, pool, encryptors)
if err != nil {
return nil, fmt.Errorf("failed to create DAL: %w", err)
}

svc := &Service{
tasks: scheduledtask.New(ctx, key, db),
pool: pool,
dal: db,
pool: pool,
key: key,
deploymentLogsSink: newDeploymentLogsSink(ctx, db),
clients: ttlcache.New(ttlcache.WithTTL[string, clients](time.Minute)),
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/cronjobs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestServiceWithRealDal(t *testing.T) {

conn := sqltest.OpenForTesting(ctx, t)
dal := db.New(conn)
parentDAL, err := parentdb.New(ctx, conn)
parentDAL, err := parentdb.New(ctx, conn, parentdb.NoOpEncryptors())
assert.NoError(t, err)

// Using a real clock because real db queries use db clock
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestServiceWithMockDal(t *testing.T) {
attemptCountMap: map[string]int{},
}
conn := sqltest.OpenForTesting(ctx, t)
parentDAL, err := db.New(ctx, conn)
parentDAL, err := db.New(ctx, conn, db.NoOpEncryptors())
assert.NoError(t, err)

testServiceWithDal(ctx, t, mockDal, parentDAL, clk)
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,19 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
if err != nil {
return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err)
}

var decryptedRequest json.RawMessage
err = d.encryptors.Async.DecryptJSON(row.Request, &decryptedRequest)
if err != nil {
return nil, fmt.Errorf("failed to decrypt async call request: %w", err)
}

lease, _ := d.newLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl)
return &AsyncCall{
ID: row.AsyncCallID,
Verb: row.Verb,
Origin: origin,
Request: row.Request,
Request: decryptedRequest,
Lease: lease,
ScheduledAt: row.ScheduledAt,
RemainingAttempts: row.RemainingAttempts,
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func TestNoCallToAcquire(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn)
dal, err := New(ctx, conn, NoOpEncryptors())
assert.NoError(t, err)

_, err = dal.AcquireAsyncCall(ctx)
Expand Down
80 changes: 60 additions & 20 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
dalerrs "github.com/TBD54566975/ftl/backend/dal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/encryption"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/maps"
"github.com/TBD54566975/ftl/internal/model"
Expand Down Expand Up @@ -208,26 +209,41 @@ func WithReservation(ctx context.Context, reservation Reservation, fn func() err
return reservation.Commit(ctx)
}

func New(ctx context.Context, pool *pgxpool.Pool) (*DAL, error) {
func New(ctx context.Context, pool *pgxpool.Pool, encryptors *Encryptors) (*DAL, error) {
_, err := pool.Acquire(ctx)
if err != nil {
return nil, fmt.Errorf("could not acquire connection: %w", err)
}
dal := &DAL{
db: sql.NewDB(pool),
DeploymentChanges: pubsub.New[DeploymentNotification](),
encryptors: encryptors,
}

return dal, nil
}

type DAL struct {
db sql.DBI
db sql.DBI
encryptors *Encryptors

// DeploymentChanges is a Topic that receives changes to the deployments table.
DeploymentChanges *pubsub.Topic[DeploymentNotification]
}

type Encryptors struct {
Logs encryption.Encryptable
Async encryption.Encryptable
}

// NoOpEncryptors do not encrypt potentially sensitive data.
func NoOpEncryptors() *Encryptors {
return &Encryptors{
Logs: encryption.NoOpEncryptor{},
Async: encryption.NoOpEncryptor{},
}
}

// Tx is DAL within a transaction.
type Tx struct {
*DAL
Expand Down Expand Up @@ -274,6 +290,7 @@ func (d *DAL) Begin(ctx context.Context) (*Tx, error) {
return &Tx{&DAL{
db: tx,
DeploymentChanges: d.DeploymentChanges,
encryptors: d.encryptors,
}}, nil
}

Expand Down Expand Up @@ -701,10 +718,16 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey
return dalerrs.TranslatePGError(err)
}
}
payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{
"prev_min_replicas": deployment.MinReplicas,
"min_replicas": minReplicas,
})
if err != nil {
return fmt.Errorf("failed to encrypt payload for InsertDeploymentUpdatedEvent: %w", err)
}
err = tx.InsertDeploymentUpdatedEvent(ctx, sql.InsertDeploymentUpdatedEventParams{
DeploymentKey: key,
MinReplicas: int32(minReplicas),
PrevMinReplicas: deployment.MinReplicas,
DeploymentKey: key,
Payload: payload,
})
if err != nil {
return dalerrs.TranslatePGError(err)
Expand Down Expand Up @@ -768,12 +791,19 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl
}
}

payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{
"min_replicas": int32(minReplicas),
"replaced": replacedDeploymentKey,
})
if err != nil {
return fmt.Errorf("replace deployment failed to encrypt payload: %w", err)
}

err = tx.InsertDeploymentCreatedEvent(ctx, sql.InsertDeploymentCreatedEventParams{
DeploymentKey: newDeploymentKey,
Language: newDeployment.Language,
ModuleName: newDeployment.ModuleName,
MinReplicas: int32(minReplicas),
Replaced: replacedDeploymentKey,
Payload: payload,
})
if err != nil {
return fmt.Errorf("replace deployment failed to create event: %w", dalerrs.TranslatePGError(err))
Expand Down Expand Up @@ -1025,23 +1055,27 @@ func (d *DAL) ExpireRunnerClaims(ctx context.Context) (int64, error) {
}

func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error {
attributes, err := json.Marshal(log.Attributes)
if err != nil {
return err
}
var requestKey optional.Option[string]
if name, ok := log.RequestKey.Get(); ok {
requestKey = optional.Some(name.String())
}

payload := map[string]any{
"message": log.Message,
"attributes": log.Attributes,
"error": log.Error,
"stack": log.Stack,
}
encryptedPayload, err := d.encryptors.Logs.EncryptJSON(payload)
if err != nil {
return fmt.Errorf("failed to encrypt log payload: %w", err)
}
return dalerrs.TranslatePGError(d.db.InsertLogEvent(ctx, sql.InsertLogEventParams{
DeploymentKey: log.DeploymentKey,
RequestKey: requestKey,
TimeStamp: log.Time,
Level: log.Level,
Attributes: attributes,
Message: log.Message,
Error: log.Error,
Stack: log.Stack,
Payload: encryptedPayload,
}))
}

Expand Down Expand Up @@ -1108,6 +1142,16 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
if rn, ok := call.RequestKey.Get(); ok {
requestKey = optional.Some(rn.String())
}
payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{
"duration_ms": call.Duration.Milliseconds(),
"request": call.Request,
"response": call.Response,
"error": call.Error,
"stack": call.Stack,
})
if err != nil {
return fmt.Errorf("failed to encrypt call payload: %w", err)
}
return dalerrs.TranslatePGError(d.db.InsertCallEvent(ctx, sql.InsertCallEventParams{
DeploymentKey: call.DeploymentKey,
RequestKey: requestKey,
Expand All @@ -1116,11 +1160,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
SourceVerb: sourceVerb,
DestModule: call.DestVerb.Module,
DestVerb: call.DestVerb.Name,
DurationMs: call.Duration.Milliseconds(),
Request: call.Request,
Response: call.Response,
Error: call.Error,
Stack: call.Stack,
Payload: payload,
}))
}

Expand Down
2 changes: 1 addition & 1 deletion backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
func TestDAL(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn)
dal, err := New(ctx, conn, NoOpEncryptors())
assert.NoError(t, err)
assert.NotZero(t, dal)
var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100)
Expand Down
Loading

0 comments on commit 85b08ae

Please sign in to comment.