Skip to content

Commit

Permalink
feat(KMS): derive keys for logs and async (#2338)
Browse files Browse the repository at this point in the history
Fixes #2290 
Follows #2312 
Needs work: #2346 #2348 

> [!CAUTION]
> Will nuke logs and async columns!

- Uses KMS via tink `FTL_KMS_URI`, so `fake-kms://` or `aws-kms://` will
work. Omitting will not encrypt.
- Remove old plaintext keys envs.

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Matt Toohey <[email protected]>
  • Loading branch information
3 people authored Aug 14, 2024
1 parent da69372 commit 2a3edbc
Show file tree
Hide file tree
Showing 29 changed files with 300 additions and 416 deletions.
50 changes: 6 additions & 44 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ import (
cf "github.com/TBD54566975/ftl/common/configuration"
frontend "github.com/TBD54566975/ftl/frontend"
"github.com/TBD54566975/ftl/internal/cors"
"github.com/TBD54566975/ftl/internal/encryption"
ftlhttp "github.com/TBD54566975/ftl/internal/http"
"github.com/TBD54566975/ftl/internal/log"
ftlmaps "github.com/TBD54566975/ftl/internal/maps"
Expand Down Expand Up @@ -85,42 +84,6 @@ func (c *CommonConfig) Validate() error {
return nil
}

// EncryptionKeys for the controller config.
// Deprecated: Will remove this at some stage.
type EncryptionKeys struct {
Logs string `name:"log-key" help:"Key for sensitive log data in internal FTL tables." env:"FTL_LOG_ENCRYPTION_KEY"`
Async string `name:"async-key" help:"Key for sensitive async call data in internal FTL tables." env:"FTL_ASYNC_ENCRYPTION_KEY"`
}

func (e EncryptionKeys) Encryptors(required bool) (*dal.Encryptors, error) {
encryptors := dal.Encryptors{}
if e.Logs != "" {
enc, err := encryption.NewForKeyOrURI(e.Logs)
if err != nil {
return nil, fmt.Errorf("could not create log encryptor: %w", err)
}
encryptors.Logs = enc
} else if required {
return nil, fmt.Errorf("FTL_LOG_ENCRYPTION_KEY is required")
} else {
encryptors.Logs = encryption.NoOpEncryptor{}
}

if e.Async != "" {
enc, err := encryption.NewForKeyOrURI(e.Async)
if err != nil {
return nil, fmt.Errorf("could not create async calls encryptor: %w", err)
}
encryptors.Async = enc
} else if required {
return nil, fmt.Errorf("FTL_ASYNC_ENCRYPTION_KEY is required")
} else {
encryptors.Async = encryption.NoOpEncryptor{}
}

return &encryptors, nil
}

type Config struct {
Bind *url.URL `help:"Socket to bind to." default:"http://127.0.0.1:8892" env:"FTL_CONTROLLER_BIND"`
IngressBind *url.URL `help:"Socket to bind to for ingress." default:"http://127.0.0.1:8891" env:"FTL_CONTROLLER_INGRESS_BIND"`
Expand All @@ -135,8 +98,7 @@ type Config struct {
ModuleUpdateFrequency time.Duration `help:"Frequency to send module updates." default:"30s"`
EventLogRetention *time.Duration `help:"Delete call logs after this time period. 0 to disable" env:"FTL_EVENT_LOG_RETENTION" default:"24h"`
ArtefactChunkSize int `help:"Size of each chunk streamed to the client." default:"1048576"`
KMSURI *url.URL `help:"URI for KMS key e.g. aws-kms://arn:aws:kms:ap-southeast-2:12345:key/0000-1111" env:"FTL_KMS_URI"`
EncryptionKeys
KMSURI *string `help:"URI for KMS key e.g. with fake-kms:// or aws-kms://arn:aws:kms:ap-southeast-2:12345:key/0000-1111" env:"FTL_KMS_URI"`
CommonConfig
}

Expand All @@ -150,7 +112,7 @@ func (c *Config) SetDefaults() {
}

// Start the Controller. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling, conn *sql.DB, encryptors *dal.Encryptors) error {
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling, conn *sql.DB) error {
config.SetDefaults()

logger := log.FromContext(ctx)
Expand All @@ -171,7 +133,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali
logger.Infof("Web console available at: %s", config.Bind)
}

svc, err := New(ctx, conn, config, runnerScaling, encryptors)
svc, err := New(ctx, conn, config, runnerScaling)
if err != nil {
return err
}
Expand Down Expand Up @@ -253,7 +215,7 @@ type Service struct {
asyncCallsLock sync.Mutex
}

func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling.RunnerScaling, encryptors *dal.Encryptors) (*Service, error) {
func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
key := config.Key
if config.Key.IsZero() {
key = model.NewControllerKey(config.Bind.Hostname(), config.Bind.Port())
Expand All @@ -267,7 +229,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling
config.ControllerTimeout = time.Second * 5
}

db, err := dal.New(ctx, conn, encryptors)
db, err := dal.New(ctx, conn, optional.Ptr[string](config.KMSURI))
if err != nil {
return nil, fmt.Errorf("failed to create DAL: %w", err)
}
Expand Down Expand Up @@ -1492,7 +1454,7 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
originalResult := either.RightOf[[]byte](originalError)

request := map[string]any{
"request": call.Request,
"request": json.RawMessage(call.Request),
"error": originalError,
}
body, err := json.Marshal(request)
Expand Down
8 changes: 5 additions & 3 deletions backend/controller/cronjobs/cronjobs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"testing"
"time"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"

db "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal"
parentdb "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
in "github.com/TBD54566975/ftl/integration"
"github.com/TBD54566975/ftl/internal/log"
"github.com/alecthomas/assert/v2"
"github.com/benbjohnson/clock"
)

func TestServiceWithRealDal(t *testing.T) {
Expand All @@ -26,7 +28,7 @@ func TestServiceWithRealDal(t *testing.T) {

conn := sqltest.OpenForTesting(ctx, t)
dal := db.New(conn)
parentDAL, err := parentdb.New(ctx, conn, parentdb.NoOpEncryptors())
parentDAL, err := parentdb.New(ctx, conn, optional.None[string]())
assert.NoError(t, err)

// Using a real clock because real db queries use db clock
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestServiceWithMockDal(t *testing.T) {
attemptCountMap: map[string]int{},
}
conn := sqltest.OpenForTesting(ctx, t)
parentDAL, err := db.New(ctx, conn, db.NoOpEncryptors())
parentDAL, err := db.New(ctx, conn, optional.None[string]())
assert.NoError(t, err)

testServiceWithDal(ctx, t, mockDal, parentDAL, clk)
Expand Down
10 changes: 8 additions & 2 deletions backend/controller/cronjobs/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 13 additions & 6 deletions backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dal

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
Expand All @@ -15,6 +14,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/sql/sqltypes"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/encryption"
)

type asyncOriginParseRoot struct {
Expand Down Expand Up @@ -77,7 +77,7 @@ type AsyncCall struct {
Origin AsyncOrigin
Verb schema.RefKey
CatchVerb optional.Option[schema.RefKey]
Request json.RawMessage
Request []byte
ScheduledAt time.Time
QueueDepth int64
ParentRequestKey optional.Option[string]
Expand Down Expand Up @@ -115,8 +115,7 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err)
}

var decryptedRequest json.RawMessage
err = d.encryptors.Async.DecryptJSON(row.Request, &decryptedRequest)
decryptedRequest, err := d.decrypt(encryption.AsyncSubKey, row.Request)
if err != nil {
return nil, fmt.Errorf("failed to decrypt async call request: %w", err)
}
Expand Down Expand Up @@ -159,7 +158,11 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context,
didScheduleAnotherCall = false
switch result := result.(type) {
case either.Left[[]byte, string]: // Successful response.
_, err = tx.db.SucceedAsyncCall(ctx, result.Get(), call.ID)
encryptedResult, err := d.encrypt(encryption.AsyncSubKey, result.Get())
if err != nil {
return false, fmt.Errorf("failed to encrypt async call result: %w", err)
}
_, err = tx.db.SucceedAsyncCall(ctx, encryptedResult, call.ID)
if err != nil {
return false, dalerrs.TranslatePGError(err) //nolint:wrapcheck
}
Expand Down Expand Up @@ -224,10 +227,14 @@ func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error) {
if err != nil {
return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err)
}
request, err := d.decrypt(encryption.AsyncSubKey, row.Request)
if err != nil {
return nil, fmt.Errorf("failed to decrypt async call request: %w", err)
}
return &AsyncCall{
ID: row.ID,
Verb: row.Verb,
Origin: origin,
Request: row.Request,
Request: request,
}, nil
}
4 changes: 3 additions & 1 deletion backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -13,7 +15,7 @@ import (
func TestNoCallToAcquire(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn, NoOpEncryptors())
dal, err := New(ctx, conn, optional.None[string]())
assert.NoError(t, err)

_, err = dal.AcquireAsyncCall(ctx)
Expand Down
44 changes: 20 additions & 24 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,35 +210,30 @@ func WithReservation(ctx context.Context, reservation Reservation, fn func() err
return reservation.Commit(ctx)
}

func New(ctx context.Context, conn *stdsql.DB, encryptors *Encryptors) (*DAL, error) {
return &DAL{
func New(ctx context.Context, conn *stdsql.DB, kmsURL optional.Option[string]) (*DAL, error) {
d := &DAL{
db: sql.NewDB(conn),
DeploymentChanges: pubsub.New[DeploymentNotification](),
encryptors: encryptors,
}, nil
kmsURL: kmsURL,
}

if err := d.setupEncryptor(ctx); err != nil {
return nil, fmt.Errorf("failed to setup encryptor: %w", err)
}

return d, nil
}

type DAL struct {
db sql.DBI
encryptors *Encryptors
db sql.DBI

kmsURL optional.Option[string]
encryptor encryption.DataEncryptor

// DeploymentChanges is a Topic that receives changes to the deployments table.
DeploymentChanges *pubsub.Topic[DeploymentNotification]
}

type Encryptors struct {
Logs encryption.Encryptable
Async encryption.Encryptable
}

// NoOpEncryptors do not encrypt potentially sensitive data.
func NoOpEncryptors() *Encryptors {
return &Encryptors{
Logs: encryption.NoOpEncryptor{},
Async: encryption.NoOpEncryptor{},
}
}

// Tx is DAL within a transaction.
type Tx struct {
*DAL
Expand Down Expand Up @@ -285,7 +280,8 @@ func (d *DAL) Begin(ctx context.Context) (*Tx, error) {
return &Tx{&DAL{
db: tx,
DeploymentChanges: d.DeploymentChanges,
encryptors: d.encryptors,
kmsURL: d.kmsURL,
encryptor: d.encryptor,
}}, nil
}

Expand Down Expand Up @@ -713,7 +709,7 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey
return dalerrs.TranslatePGError(err)
}
}
payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{
payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]interface{}{
"prev_min_replicas": deployment.MinReplicas,
"min_replicas": minReplicas,
})
Expand Down Expand Up @@ -786,7 +782,7 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl
}
}

payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{
payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]any{
"min_replicas": int32(minReplicas),
"replaced": replacedDeploymentKey,
})
Expand Down Expand Up @@ -1061,7 +1057,7 @@ func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error {
"error": log.Error,
"stack": log.Stack,
}
encryptedPayload, err := d.encryptors.Logs.EncryptJSON(payload)
encryptedPayload, err := d.encryptJSON(encryption.LogsSubKey, payload)
if err != nil {
return fmt.Errorf("failed to encrypt log payload: %w", err)
}
Expand Down Expand Up @@ -1141,7 +1137,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
if pr, ok := call.ParentRequestKey.Get(); ok {
parentRequestKey = optional.Some(pr.String())
}
payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{
payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]any{
"duration_ms": call.Duration.Milliseconds(),
"request": call.Request,
"response": call.Response,
Expand Down
7 changes: 4 additions & 3 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
func TestDAL(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn, NoOpEncryptors())
dal, err := New(ctx, conn, optional.None[string]())
assert.NoError(t, err)
assert.NotZero(t, dal)
var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100)
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestDAL(t *testing.T) {
DeploymentKey: deploymentKey,
RequestKey: optional.Some(requestKey),
Request: []byte("{}"),
Response: []byte(`{"time": "now"}`),
Response: []byte(`{"time":"now"}`),
DestVerb: schema.Ref{Module: "time", Name: "time"},
}
t.Run("InsertCallEvent", func(t *testing.T) {
Expand Down Expand Up @@ -396,6 +396,7 @@ func normaliseEvents(events []Event) []Event {
f.Set(reflect.Zero(f.Type()))
events[i] = event
}

return events
}

Expand All @@ -407,7 +408,7 @@ func assertEventsEqual(t *testing.T, expected, actual []Event) {
func TestDeleteOldEvents(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn, NoOpEncryptors())
dal, err := New(ctx, conn, optional.None[string]())
assert.NoError(t, err)

var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100)
Expand Down
Loading

0 comments on commit 2a3edbc

Please sign in to comment.