diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 684c8bae70..b44d6a322b 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -57,6 +57,7 @@ import ( cf "github.com/TBD54566975/ftl/common/configuration" frontend "github.com/TBD54566975/ftl/frontend" "github.com/TBD54566975/ftl/internal/cors" + "github.com/TBD54566975/ftl/internal/encryption" ftlhttp "github.com/TBD54566975/ftl/internal/http" "github.com/TBD54566975/ftl/internal/log" ftlmaps "github.com/TBD54566975/ftl/internal/maps" @@ -86,6 +87,40 @@ func (c *CommonConfig) Validate() error { return nil } +type EncryptionKeys struct { + Logs string `name:"log-key" help:"Key for sensitive log data in internal FTL tables." env:"FTL_LOG_ENCRYPTION_KEY"` + Async string `name:"async-key" help:"Key for sensitive async call data in internal FTL tables." env:"FTL_ASYNC_ENCRYPTION_KEY"` +} + +func (e EncryptionKeys) Encryptors(required bool) (*dal.Encryptors, error) { + encryptors := dal.Encryptors{} + if e.Logs != "" { + enc, err := encryption.NewForKeyOrURI(e.Logs) + if err != nil { + return nil, fmt.Errorf("could not create log encryptor: %w", err) + } + encryptors.Logs = enc + } else if required { + return nil, fmt.Errorf("FTL_LOG_ENCRYPTION_KEY is required") + } else { + encryptors.Logs = encryption.NoOpEncryptor{} + } + + if e.Async != "" { + enc, err := encryption.NewForKeyOrURI(e.Async) + if err != nil { + return nil, fmt.Errorf("could not create async calls encryptor: %w", err) + } + encryptors.Async = enc + } else if required { + return nil, fmt.Errorf("FTL_ASYNC_ENCRYPTION_KEY is required") + } else { + encryptors.Async = encryption.NoOpEncryptor{} + } + + return &encryptors, nil +} + type Config struct { Bind *url.URL `help:"Socket to bind to." default:"http://localhost:8892" env:"FTL_CONTROLLER_BIND"` IngressBind *url.URL `help:"Socket to bind to for ingress." default:"http://localhost:8891" env:"FTL_CONTROLLER_INGRESS_BIND"` @@ -99,6 +134,7 @@ type Config struct { DeploymentReservationTimeout time.Duration `help:"Deployment reservation timeout." default:"120s"` ModuleUpdateFrequency time.Duration `help:"Frequency to send module updates." default:"30s"` ArtefactChunkSize int `help:"Size of each chunk streamed to the client." default:"1048576"` + EncryptionKeys CommonConfig } @@ -112,7 +148,7 @@ func (c *Config) SetDefaults() { } // Start the Controller. Blocks until the context is cancelled. -func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling) error { +func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling, pool *pgxpool.Pool, encryptors *dal.Encryptors) error { config.SetDefaults() logger := log.FromContext(ctx) @@ -133,13 +169,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali logger.Infof("Web console available at: %s", config.Bind) } - // Bring up the DB connection and DAL. - conn, err := pgxpool.New(ctx, config.DSN) - if err != nil { - return fmt.Errorf("failed to bring up DB connection: %w", err) - } - - svc, err := New(ctx, conn, config, runnerScaling) + svc, err := New(ctx, pool, config, runnerScaling, encryptors) if err != nil { return err } @@ -221,7 +251,7 @@ type Service struct { asyncCallsLock sync.Mutex } -func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) { +func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling scaling.RunnerScaling, encryptors *dal.Encryptors) (*Service, error) { key := config.Key if config.Key.IsZero() { key = model.NewControllerKey(config.Bind.Hostname(), config.Bind.Port()) @@ -235,15 +265,15 @@ func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling s config.ControllerTimeout = time.Second * 5 } - db, err := dal.New(ctx, pool) + db, err := dal.New(ctx, pool, encryptors) if err != nil { return nil, fmt.Errorf("failed to create DAL: %w", err) } svc := &Service{ tasks: scheduledtask.New(ctx, key, db), - pool: pool, dal: db, + pool: pool, key: key, deploymentLogsSink: newDeploymentLogsSink(ctx, db), clients: ttlcache.New(ttlcache.WithTTL[string, clients](time.Minute)), diff --git a/backend/controller/cronjobs/cronjobs_integration_test.go b/backend/controller/cronjobs/cronjobs_integration_test.go index a8e9005851..61c250e34a 100644 --- a/backend/controller/cronjobs/cronjobs_integration_test.go +++ b/backend/controller/cronjobs/cronjobs_integration_test.go @@ -26,7 +26,7 @@ func TestServiceWithRealDal(t *testing.T) { conn := sqltest.OpenForTesting(ctx, t) dal := db.New(conn) - parentDAL, err := parentdb.New(ctx, conn) + parentDAL, err := parentdb.New(ctx, conn, parentdb.NoOpEncryptors()) assert.NoError(t, err) // Using a real clock because real db queries use db clock diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index 775c3426e3..c77d86bc93 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -37,7 +37,7 @@ func TestServiceWithMockDal(t *testing.T) { attemptCountMap: map[string]int{}, } conn := sqltest.OpenForTesting(ctx, t) - parentDAL, err := db.New(ctx, conn) + parentDAL, err := db.New(ctx, conn, db.NoOpEncryptors()) assert.NoError(t, err) testServiceWithDal(ctx, t, mockDal, parentDAL, clk) diff --git a/backend/controller/cronjobs/sql/models.go b/backend/controller/cronjobs/sql/models.go index a8e7d0b65e..4e72c77a20 100644 --- a/backend/controller/cronjobs/sql/models.go +++ b/backend/controller/cronjobs/sql/models.go @@ -523,7 +523,7 @@ type TopicEvent struct { Key model.TopicEventKey TopicID int64 Payload []byte - Caller string + Caller optional.Option[string] } type TopicSubscriber struct { diff --git a/backend/controller/dal/async_calls.go b/backend/controller/dal/async_calls.go index 4046010fa2..347539d9ca 100644 --- a/backend/controller/dal/async_calls.go +++ b/backend/controller/dal/async_calls.go @@ -105,12 +105,19 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error) if err != nil { return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err) } + + var decryptedRequest json.RawMessage + err = d.encryptors.Async.DecryptJSON(row.Request, &decryptedRequest) + if err != nil { + return nil, fmt.Errorf("failed to decrypt async call request: %w", err) + } + lease, _ := d.newLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl) return &AsyncCall{ ID: row.AsyncCallID, Verb: row.Verb, Origin: origin, - Request: row.Request, + Request: decryptedRequest, Lease: lease, ScheduledAt: row.ScheduledAt, RemainingAttempts: row.RemainingAttempts, diff --git a/backend/controller/dal/async_calls_test.go b/backend/controller/dal/async_calls_test.go index d0a08562df..7aab05128f 100644 --- a/backend/controller/dal/async_calls_test.go +++ b/backend/controller/dal/async_calls_test.go @@ -13,7 +13,7 @@ import ( func TestNoCallToAcquire(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) conn := sqltest.OpenForTesting(ctx, t) - dal, err := New(ctx, conn) + dal, err := New(ctx, conn, NoOpEncryptors()) assert.NoError(t, err) _, err = dal.AcquireAsyncCall(ctx) diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 63788bb076..2b5f95d9db 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -20,6 +20,7 @@ import ( dalerrs "github.com/TBD54566975/ftl/backend/dal" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/encryption" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/maps" "github.com/TBD54566975/ftl/internal/model" @@ -208,7 +209,7 @@ func WithReservation(ctx context.Context, reservation Reservation, fn func() err return reservation.Commit(ctx) } -func New(ctx context.Context, pool *pgxpool.Pool) (*DAL, error) { +func New(ctx context.Context, pool *pgxpool.Pool, encryptors *Encryptors) (*DAL, error) { _, err := pool.Acquire(ctx) if err != nil { return nil, fmt.Errorf("could not acquire connection: %w", err) @@ -216,18 +217,33 @@ func New(ctx context.Context, pool *pgxpool.Pool) (*DAL, error) { dal := &DAL{ db: sql.NewDB(pool), DeploymentChanges: pubsub.New[DeploymentNotification](), + encryptors: encryptors, } return dal, nil } type DAL struct { - db sql.DBI + db sql.DBI + encryptors *Encryptors // DeploymentChanges is a Topic that receives changes to the deployments table. DeploymentChanges *pubsub.Topic[DeploymentNotification] } +type Encryptors struct { + Logs encryption.Encryptable + Async encryption.Encryptable +} + +// NoOpEncryptors do not encrypt potentially sensitive data. +func NoOpEncryptors() *Encryptors { + return &Encryptors{ + Logs: encryption.NoOpEncryptor{}, + Async: encryption.NoOpEncryptor{}, + } +} + // Tx is DAL within a transaction. type Tx struct { *DAL @@ -274,6 +290,7 @@ func (d *DAL) Begin(ctx context.Context) (*Tx, error) { return &Tx{&DAL{ db: tx, DeploymentChanges: d.DeploymentChanges, + encryptors: d.encryptors, }}, nil } @@ -701,10 +718,16 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey return dalerrs.TranslatePGError(err) } } + payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{ + "prev_min_replicas": deployment.MinReplicas, + "min_replicas": minReplicas, + }) + if err != nil { + return fmt.Errorf("failed to encrypt payload for InsertDeploymentUpdatedEvent: %w", err) + } err = tx.InsertDeploymentUpdatedEvent(ctx, sql.InsertDeploymentUpdatedEventParams{ - DeploymentKey: key, - MinReplicas: int32(minReplicas), - PrevMinReplicas: deployment.MinReplicas, + DeploymentKey: key, + Payload: payload, }) if err != nil { return dalerrs.TranslatePGError(err) @@ -768,12 +791,19 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl } } + payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{ + "min_replicas": int32(minReplicas), + "replaced": replacedDeploymentKey, + }) + if err != nil { + return fmt.Errorf("replace deployment failed to encrypt payload: %w", err) + } + err = tx.InsertDeploymentCreatedEvent(ctx, sql.InsertDeploymentCreatedEventParams{ DeploymentKey: newDeploymentKey, Language: newDeployment.Language, ModuleName: newDeployment.ModuleName, - MinReplicas: int32(minReplicas), - Replaced: replacedDeploymentKey, + Payload: payload, }) if err != nil { return fmt.Errorf("replace deployment failed to create event: %w", dalerrs.TranslatePGError(err)) @@ -1025,23 +1055,27 @@ func (d *DAL) ExpireRunnerClaims(ctx context.Context) (int64, error) { } func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error { - attributes, err := json.Marshal(log.Attributes) - if err != nil { - return err - } var requestKey optional.Option[string] if name, ok := log.RequestKey.Get(); ok { requestKey = optional.Some(name.String()) } + + payload := map[string]any{ + "message": log.Message, + "attributes": log.Attributes, + "error": log.Error, + "stack": log.Stack, + } + encryptedPayload, err := d.encryptors.Logs.EncryptJSON(payload) + if err != nil { + return fmt.Errorf("failed to encrypt log payload: %w", err) + } return dalerrs.TranslatePGError(d.db.InsertLogEvent(ctx, sql.InsertLogEventParams{ DeploymentKey: log.DeploymentKey, RequestKey: requestKey, TimeStamp: log.Time, Level: log.Level, - Attributes: attributes, - Message: log.Message, - Error: log.Error, - Stack: log.Stack, + Payload: encryptedPayload, })) } @@ -1108,6 +1142,16 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { if rn, ok := call.RequestKey.Get(); ok { requestKey = optional.Some(rn.String()) } + payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{ + "duration_ms": call.Duration.Milliseconds(), + "request": call.Request, + "response": call.Response, + "error": call.Error, + "stack": call.Stack, + }) + if err != nil { + return fmt.Errorf("failed to encrypt call payload: %w", err) + } return dalerrs.TranslatePGError(d.db.InsertCallEvent(ctx, sql.InsertCallEventParams{ DeploymentKey: call.DeploymentKey, RequestKey: requestKey, @@ -1116,11 +1160,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { SourceVerb: sourceVerb, DestModule: call.DestVerb.Module, DestVerb: call.DestVerb.Name, - DurationMs: call.Duration.Milliseconds(), - Request: call.Request, - Response: call.Response, - Error: call.Error, - Stack: call.Stack, + Payload: payload, })) } diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go index 7f48404f84..31fcaf7d7c 100644 --- a/backend/controller/dal/dal_test.go +++ b/backend/controller/dal/dal_test.go @@ -25,7 +25,7 @@ import ( func TestDAL(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) conn := sqltest.OpenForTesting(ctx, t) - dal, err := New(ctx, conn) + dal, err := New(ctx, conn, NoOpEncryptors()) assert.NoError(t, err) assert.NotZero(t, dal) var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) diff --git a/backend/controller/dal/events.go b/backend/controller/dal/events.go index 69fc43335c..a4301f3814 100644 --- a/backend/controller/dal/events.go +++ b/backend/controller/dal/events.go @@ -58,8 +58,8 @@ type CallEvent struct { SourceVerb optional.Option[schema.Ref] DestVerb schema.Ref Duration time.Duration - Request []byte - Response []byte + Request json.RawMessage + Response json.RawMessage Error optional.Option[string] Stack optional.Option[string] } @@ -320,14 +320,14 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter } defer rows.Close() - events, err := transformRowsToEvents(deploymentKeys, rows) + events, err := d.transformRowsToEvents(deploymentKeys, rows) if err != nil { return nil, err } return events, nil } -func transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey, rows pgx.Rows) ([]Event, error) { +func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey, rows pgx.Rows) ([]Event, error) { var out []Event for rows.Next() { row := eventRow{} @@ -346,9 +346,10 @@ func transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey, rows pg switch row.Type { case sql.EventTypeLog: var jsonPayload eventLogJSON - if err := json.Unmarshal(row.Payload, &jsonPayload); err != nil { - return nil, err + if err := d.encryptors.Logs.DecryptJSON(row.Payload, &jsonPayload); err != nil { + return nil, fmt.Errorf("failed to decrypt log event: %w", err) } + level, err := strconv.ParseInt(row.CustomKey1.MustGet(), 10, 32) if err != nil { return nil, fmt.Errorf("invalid log level: %q: %w", row.CustomKey1.MustGet(), err) @@ -367,8 +368,8 @@ func transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey, rows pg case sql.EventTypeCall: var jsonPayload eventCallJSON - if err := json.Unmarshal(row.Payload, &jsonPayload); err != nil { - return nil, err + if err := d.encryptors.Logs.DecryptJSON(row.Payload, &jsonPayload); err != nil { + return nil, fmt.Errorf("failed to decrypt call event: %w", err) } var sourceVerb optional.Option[schema.Ref] sourceModule, smok := row.CustomKey1.Get() @@ -392,8 +393,8 @@ func transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey, rows pg case sql.EventTypeDeploymentCreated: var jsonPayload eventDeploymentCreatedJSON - if err := json.Unmarshal(row.Payload, &jsonPayload); err != nil { - return nil, err + if err := d.encryptors.Logs.DecryptJSON(row.Payload, &jsonPayload); err != nil { + return nil, fmt.Errorf("failed to decrypt call event: %w", err) } out = append(out, &DeploymentCreatedEvent{ ID: row.ID, @@ -407,8 +408,8 @@ func transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey, rows pg case sql.EventTypeDeploymentUpdated: var jsonPayload eventDeploymentUpdatedJSON - if err := json.Unmarshal(row.Payload, &jsonPayload); err != nil { - return nil, err + if err := d.encryptors.Logs.DecryptJSON(row.Payload, &jsonPayload); err != nil { + return nil, fmt.Errorf("failed to decrypt call event: %w", err) } out = append(out, &DeploymentUpdatedEvent{ ID: row.ID, diff --git a/backend/controller/dal/fsm.go b/backend/controller/dal/fsm.go index 477fafee46..6bf4b7c4e9 100644 --- a/backend/controller/dal/fsm.go +++ b/backend/controller/dal/fsm.go @@ -30,12 +30,17 @@ import ( // // Note: no validation of the FSM is performed. func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executionKey string, destinationState schema.RefKey, request json.RawMessage, retryParams schema.RetryParams) (err error) { + encryptedRequest, err := d.encryptors.Async.EncryptJSON(request) + if err != nil { + return fmt.Errorf("failed to encrypt FSM request: %w", err) + } + // Create an async call for the event. origin := AsyncOriginFSM{FSM: fsm, Key: executionKey} asyncCallID, err := d.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{ Verb: destinationState, Origin: origin.String(), - Request: request, + Request: encryptedRequest, RemainingAttempts: int32(retryParams.Count), Backoff: retryParams.MinBackoff, MaxBackoff: retryParams.MaxBackoff, diff --git a/backend/controller/dal/fsm_test.go b/backend/controller/dal/fsm_test.go index 68b2622ff4..a3c781dd0e 100644 --- a/backend/controller/dal/fsm_test.go +++ b/backend/controller/dal/fsm_test.go @@ -17,7 +17,7 @@ import ( func TestSendFSMEvent(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) conn := sqltest.OpenForTesting(ctx, t) - dal, err := New(ctx, conn) + dal, err := New(ctx, conn, NoOpEncryptors()) assert.NoError(t, err) _, err = dal.AcquireAsyncCall(ctx) diff --git a/backend/controller/dal/lease_test.go b/backend/controller/dal/lease_test.go index 72782a45a9..5b73d84bbe 100644 --- a/backend/controller/dal/lease_test.go +++ b/backend/controller/dal/lease_test.go @@ -36,7 +36,7 @@ func TestLease(t *testing.T) { } ctx := log.ContextWithNewDefaultLogger(context.Background()) conn := sqltest.OpenForTesting(ctx, t) - dal, err := New(ctx, conn) + dal, err := New(ctx, conn, NoOpEncryptors()) assert.NoError(t, err) // TTL is too short, expect an error @@ -71,7 +71,7 @@ func TestExpireLeases(t *testing.T) { } ctx := log.ContextWithNewDefaultLogger(context.Background()) conn := sqltest.OpenForTesting(ctx, t) - dal, err := New(ctx, conn) + dal, err := New(ctx, conn, NoOpEncryptors()) assert.NoError(t, err) leasei, _, err := dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*5, optional.None[any]()) diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index afefbfe69a..fbb17e0926 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -2,6 +2,7 @@ package dal import ( "context" + "encoding/json" "fmt" "strings" "time" @@ -18,12 +19,16 @@ import ( ) func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic, caller string, payload []byte) error { - err := d.db.PublishEventForTopic(ctx, sql.PublishEventForTopicParams{ + encryptedPayload, err := d.encryptors.Async.EncryptJSON(json.RawMessage(payload)) + if err != nil { + return fmt.Errorf("failed to encrypt payload: %w", err) + } + err = d.db.PublishEventForTopic(ctx, sql.PublishEventForTopicParams{ Key: model.NewTopicEventKey(module, topic), Module: module, Topic: topic, Caller: caller, - Payload: payload, + Payload: encryptedPayload, }) observability.PubSub.Published(ctx, module, topic, caller, err) if err != nil { @@ -98,10 +103,11 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t Name: subscription.Key.Payload.Name, }, } + _, err = tx.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{ Verb: subscriber.Sink, Origin: origin.String(), - Request: nextCursor.Payload, + Request: nextCursor.Payload, // already encrypted RemainingAttempts: subscriber.RetryAttempts, Backoff: subscriber.Backoff, MaxBackoff: subscriber.MaxBackoff, diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index a8e7d0b65e..4e72c77a20 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -523,7 +523,7 @@ type TopicEvent struct { Key model.TopicEventKey TopicID int64 Payload []byte - Caller string + Caller optional.Option[string] } type TopicSubscriber struct { diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index 9d8dfb72f4..0af302d19a 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -271,69 +271,94 @@ SELECT COUNT(*) FROM rows; -- name: InsertLogEvent :exec -INSERT INTO events (deployment_id, request_id, time_stamp, custom_key_1, type, payload) -VALUES ((SELECT id FROM deployments d WHERE d.key = sqlc.arg('deployment_key')::deployment_key LIMIT 1), - (CASE - WHEN sqlc.narg('request_key')::TEXT IS NULL THEN NULL - ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('request_key')::TEXT LIMIT 1) - END), - sqlc.arg('time_stamp')::TIMESTAMPTZ, - sqlc.arg('level')::INT, - 'log', - jsonb_build_object( - 'message', sqlc.arg('message')::TEXT, - 'attributes', sqlc.arg('attributes')::JSONB, - 'error', sqlc.narg('error')::TEXT, - 'stack', sqlc.narg('stack')::TEXT - )); +INSERT INTO events ( + deployment_id, + request_id, + time_stamp, + custom_key_1, + type, + payload +) +VALUES ( + (SELECT id FROM deployments d WHERE d.key = sqlc.arg('deployment_key')::deployment_key LIMIT 1), + ( + CASE + WHEN sqlc.narg('request_key')::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('request_key')::TEXT LIMIT 1) + END + ), + sqlc.arg('time_stamp')::TIMESTAMPTZ, + sqlc.arg('level')::INT, + 'log', + sqlc.arg('payload') +); -- name: InsertDeploymentCreatedEvent :exec -INSERT INTO events (deployment_id, type, custom_key_1, custom_key_2, payload) -VALUES ((SELECT id - FROM deployments - WHERE deployments.key = sqlc.arg('deployment_key')::deployment_key), - 'deployment_created', - sqlc.arg('language')::TEXT, - sqlc.arg('module_name')::TEXT, - jsonb_build_object( - 'min_replicas', sqlc.arg('min_replicas')::INT, - 'replaced', sqlc.narg('replaced')::deployment_key - )); +INSERT INTO events ( + deployment_id, + type, + custom_key_1, + custom_key_2, + payload +) +VALUES ( + ( + SELECT id + FROM deployments + WHERE deployments.key = sqlc.arg('deployment_key')::deployment_key + ), + 'deployment_created', + sqlc.arg('language')::TEXT, + sqlc.arg('module_name')::TEXT, + sqlc.arg('payload') +); -- name: InsertDeploymentUpdatedEvent :exec -INSERT INTO events (deployment_id, type, custom_key_1, custom_key_2, payload) -VALUES ((SELECT id - FROM deployments - WHERE deployments.key = sqlc.arg('deployment_key')::deployment_key), - 'deployment_updated', - sqlc.arg('language')::TEXT, - sqlc.arg('module_name')::TEXT, - jsonb_build_object( - 'prev_min_replicas', sqlc.arg('prev_min_replicas')::INT, - 'min_replicas', sqlc.arg('min_replicas')::INT - )); +INSERT INTO events ( + deployment_id, + type, + custom_key_1, + custom_key_2, + payload +) +VALUES ( + ( + SELECT id + FROM deployments + WHERE deployments.key = sqlc.arg('deployment_key')::deployment_key + ), + 'deployment_updated', + sqlc.arg('language')::TEXT, + sqlc.arg('module_name')::TEXT, + sqlc.arg('payload') +); -- name: InsertCallEvent :exec -INSERT INTO events (deployment_id, request_id, time_stamp, type, - custom_key_1, custom_key_2, custom_key_3, custom_key_4, payload) -VALUES ((SELECT id FROM deployments WHERE deployments.key = sqlc.arg('deployment_key')::deployment_key), - (CASE - WHEN sqlc.narg('request_key')::TEXT IS NULL THEN NULL - ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('request_key')::TEXT) - END), - sqlc.arg('time_stamp')::TIMESTAMPTZ, - 'call', - sqlc.narg('source_module')::TEXT, - sqlc.narg('source_verb')::TEXT, - sqlc.arg('dest_module')::TEXT, - sqlc.arg('dest_verb')::TEXT, - jsonb_build_object( - 'duration_ms', sqlc.arg('duration_ms')::BIGINT, - 'request', sqlc.arg('request')::JSONB, - 'response', sqlc.arg('response')::JSONB, - 'error', sqlc.narg('error')::TEXT, - 'stack', sqlc.narg('stack')::TEXT - )); +INSERT INTO events ( + deployment_id, + request_id, + time_stamp, + type, + custom_key_1, + custom_key_2, + custom_key_3, + custom_key_4, + payload +) +VALUES ( + (SELECT id FROM deployments WHERE deployments.key = sqlc.arg('deployment_key')::deployment_key), + (CASE + WHEN sqlc.narg('request_key')::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('request_key')::TEXT) + END), + sqlc.arg('time_stamp')::TIMESTAMPTZ, + 'call', + sqlc.narg('source_module')::TEXT, + sqlc.narg('source_verb')::TEXT, + sqlc.arg('dest_module')::TEXT, + sqlc.arg('dest_verb')::TEXT, + sqlc.arg('payload') +); -- name: CreateRequest :exec INSERT INTO requests (origin, "key", source_addr) @@ -667,7 +692,7 @@ VALUES ( WHERE modules.name = sqlc.arg('module')::TEXT AND topics.name = sqlc.arg('topic')::TEXT ), - sqlc.arg('caller'), + sqlc.arg('caller')::TEXT, sqlc.arg('payload') ); diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 89145af7d1..9fb96683dd 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -1740,26 +1740,31 @@ func (q *Queries) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent } const insertCallEvent = `-- name: InsertCallEvent :exec -INSERT INTO events (deployment_id, request_id, time_stamp, type, - custom_key_1, custom_key_2, custom_key_3, custom_key_4, payload) -VALUES ((SELECT id FROM deployments WHERE deployments.key = $1::deployment_key), - (CASE - WHEN $2::TEXT IS NULL THEN NULL - ELSE (SELECT id FROM requests ir WHERE ir.key = $2::TEXT) - END), - $3::TIMESTAMPTZ, - 'call', - $4::TEXT, - $5::TEXT, - $6::TEXT, - $7::TEXT, - jsonb_build_object( - 'duration_ms', $8::BIGINT, - 'request', $9::JSONB, - 'response', $10::JSONB, - 'error', $11::TEXT, - 'stack', $12::TEXT - )) +INSERT INTO events ( + deployment_id, + request_id, + time_stamp, + type, + custom_key_1, + custom_key_2, + custom_key_3, + custom_key_4, + payload +) +VALUES ( + (SELECT id FROM deployments WHERE deployments.key = $1::deployment_key), + (CASE + WHEN $2::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = $2::TEXT) + END), + $3::TIMESTAMPTZ, + 'call', + $4::TEXT, + $5::TEXT, + $6::TEXT, + $7::TEXT, + $8 +) ` type InsertCallEventParams struct { @@ -1770,11 +1775,7 @@ type InsertCallEventParams struct { SourceVerb optional.Option[string] DestModule string DestVerb string - DurationMs int64 - Request []byte - Response []byte - Error optional.Option[string] - Stack optional.Option[string] + Payload json.RawMessage } func (q *Queries) InsertCallEvent(ctx context.Context, arg InsertCallEventParams) error { @@ -1786,35 +1787,37 @@ func (q *Queries) InsertCallEvent(ctx context.Context, arg InsertCallEventParams arg.SourceVerb, arg.DestModule, arg.DestVerb, - arg.DurationMs, - arg.Request, - arg.Response, - arg.Error, - arg.Stack, + arg.Payload, ) return err } const insertDeploymentCreatedEvent = `-- name: InsertDeploymentCreatedEvent :exec -INSERT INTO events (deployment_id, type, custom_key_1, custom_key_2, payload) -VALUES ((SELECT id - FROM deployments - WHERE deployments.key = $1::deployment_key), - 'deployment_created', - $2::TEXT, - $3::TEXT, - jsonb_build_object( - 'min_replicas', $4::INT, - 'replaced', $5::deployment_key - )) +INSERT INTO events ( + deployment_id, + type, + custom_key_1, + custom_key_2, + payload +) +VALUES ( + ( + SELECT id + FROM deployments + WHERE deployments.key = $1::deployment_key + ), + 'deployment_created', + $2::TEXT, + $3::TEXT, + $4 +) ` type InsertDeploymentCreatedEventParams struct { DeploymentKey model.DeploymentKey Language string ModuleName string - MinReplicas int32 - Replaced optional.Option[model.DeploymentKey] + Payload json.RawMessage } func (q *Queries) InsertDeploymentCreatedEvent(ctx context.Context, arg InsertDeploymentCreatedEventParams) error { @@ -1822,32 +1825,37 @@ func (q *Queries) InsertDeploymentCreatedEvent(ctx context.Context, arg InsertDe arg.DeploymentKey, arg.Language, arg.ModuleName, - arg.MinReplicas, - arg.Replaced, + arg.Payload, ) return err } const insertDeploymentUpdatedEvent = `-- name: InsertDeploymentUpdatedEvent :exec -INSERT INTO events (deployment_id, type, custom_key_1, custom_key_2, payload) -VALUES ((SELECT id - FROM deployments - WHERE deployments.key = $1::deployment_key), - 'deployment_updated', - $2::TEXT, - $3::TEXT, - jsonb_build_object( - 'prev_min_replicas', $4::INT, - 'min_replicas', $5::INT - )) +INSERT INTO events ( + deployment_id, + type, + custom_key_1, + custom_key_2, + payload +) +VALUES ( + ( + SELECT id + FROM deployments + WHERE deployments.key = $1::deployment_key + ), + 'deployment_updated', + $2::TEXT, + $3::TEXT, + $4 +) ` type InsertDeploymentUpdatedEventParams struct { - DeploymentKey model.DeploymentKey - Language string - ModuleName string - PrevMinReplicas int32 - MinReplicas int32 + DeploymentKey model.DeploymentKey + Language string + ModuleName string + Payload json.RawMessage } func (q *Queries) InsertDeploymentUpdatedEvent(ctx context.Context, arg InsertDeploymentUpdatedEventParams) error { @@ -1855,8 +1863,7 @@ func (q *Queries) InsertDeploymentUpdatedEvent(ctx context.Context, arg InsertDe arg.DeploymentKey, arg.Language, arg.ModuleName, - arg.PrevMinReplicas, - arg.MinReplicas, + arg.Payload, ) return err } @@ -1895,21 +1902,27 @@ func (q *Queries) InsertEvent(ctx context.Context, arg InsertEventParams) error } const insertLogEvent = `-- name: InsertLogEvent :exec -INSERT INTO events (deployment_id, request_id, time_stamp, custom_key_1, type, payload) -VALUES ((SELECT id FROM deployments d WHERE d.key = $1::deployment_key LIMIT 1), - (CASE - WHEN $2::TEXT IS NULL THEN NULL - ELSE (SELECT id FROM requests ir WHERE ir.key = $2::TEXT LIMIT 1) - END), - $3::TIMESTAMPTZ, - $4::INT, - 'log', - jsonb_build_object( - 'message', $5::TEXT, - 'attributes', $6::JSONB, - 'error', $7::TEXT, - 'stack', $8::TEXT - )) +INSERT INTO events ( + deployment_id, + request_id, + time_stamp, + custom_key_1, + type, + payload +) +VALUES ( + (SELECT id FROM deployments d WHERE d.key = $1::deployment_key LIMIT 1), + ( + CASE + WHEN $2::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = $2::TEXT LIMIT 1) + END + ), + $3::TIMESTAMPTZ, + $4::INT, + 'log', + $5 +) ` type InsertLogEventParams struct { @@ -1917,10 +1930,7 @@ type InsertLogEventParams struct { RequestKey optional.Option[string] TimeStamp time.Time Level int32 - Message string - Attributes []byte - Error optional.Option[string] - Stack optional.Option[string] + Payload json.RawMessage } func (q *Queries) InsertLogEvent(ctx context.Context, arg InsertLogEventParams) error { @@ -1929,10 +1939,7 @@ func (q *Queries) InsertLogEvent(ctx context.Context, arg InsertLogEventParams) arg.RequestKey, arg.TimeStamp, arg.Level, - arg.Message, - arg.Attributes, - arg.Error, - arg.Stack, + arg.Payload, ) return err } @@ -2091,7 +2098,7 @@ VALUES ( WHERE modules.name = $2::TEXT AND topics.name = $3::TEXT ), - $4, + $4::TEXT, $5 ) ` diff --git a/cmd/ftl-controller/main.go b/cmd/ftl-controller/main.go index 557a6f8dac..d4ef85f8a4 100644 --- a/cmd/ftl-controller/main.go +++ b/cmd/ftl-controller/main.go @@ -42,6 +42,10 @@ func main() { kong.Vars{"version": ftl.Version, "timestamp": time.Unix(t, 0).Format(time.RFC3339)}, ) cli.ControllerConfig.SetDefaults() + + encryptors, err := cli.ControllerConfig.EncryptionKeys.Encryptors(true) + kctx.FatalIfErrorf(err, "failed to create encryptors") + ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, cli.LogConfig)) err = observability.Init(ctx, "ftl-controller", ftl.Version, cli.ObservabilityConfig) kctx.FatalIfErrorf(err, "failed to initialize observability") @@ -49,7 +53,7 @@ func main() { // The FTL controller currently only supports DB as a configuration provider/resolver. conn, err := pgxpool.New(ctx, cli.ControllerConfig.DSN) kctx.FatalIfErrorf(err) - dal, err := dal.New(ctx, conn) + dal, err := dal.New(ctx, conn, encryptors) kctx.FatalIfErrorf(err) configDal, err := cfdal.New(ctx, conn) @@ -70,6 +74,6 @@ func main() { kctx.FatalIfErrorf(err) ctx = cf.ContextWithSecrets(ctx, sm) - err = controller.Start(ctx, cli.ControllerConfig, scaling.NewK8sScaling()) + err = controller.Start(ctx, cli.ControllerConfig, scaling.NewK8sScaling(), conn, encryptors) kctx.FatalIfErrorf(err) } diff --git a/cmd/ftl/cmd_box_run.go b/cmd/ftl/cmd_box_run.go index 6197accdec..884f114e22 100644 --- a/cmd/ftl/cmd_box_run.go +++ b/cmd/ftl/cmd_box_run.go @@ -6,6 +6,7 @@ import ( "net/url" "time" + "github.com/jackc/pgx/v5/pgxpool" "github.com/jpillora/backoff" "golang.org/x/sync/errgroup" @@ -54,9 +55,20 @@ func (b *boxRunCmd) Run(ctx context.Context, projConfig projectconfig.Config) er if err != nil { return fmt.Errorf("failed to create runner autoscaler: %w", err) } + + // Bring up the DB connection and DAL. + pool, err := pgxpool.New(ctx, config.DSN) + if err != nil { + return fmt.Errorf("failed to bring up DB connection: %w", err) + } + encryptors, err := config.EncryptionKeys.Encryptors(false) + if err != nil { + return fmt.Errorf("failed to create encryptors: %w", err) + } + wg := errgroup.Group{} wg.Go(func() error { - return controller.Start(ctx, config, runnerScaling) + return controller.Start(ctx, config, runnerScaling, pool, encryptors) }) // Wait for the controller to come up. diff --git a/cmd/ftl/cmd_serve.go b/cmd/ftl/cmd_serve.go index 117712cd0a..69c1644859 100644 --- a/cmd/ftl/cmd_serve.go +++ b/cmd/ftl/cmd_serve.go @@ -15,6 +15,7 @@ import ( "connectrpc.com/connect" "github.com/alecthomas/types/optional" + "github.com/jackc/pgx/v5/pgxpool" "golang.org/x/sync/errgroup" "github.com/TBD54566975/ftl" @@ -145,8 +146,18 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini } controllerCtx = cf.ContextWithSecrets(controllerCtx, sm) + // Bring up the DB connection and DAL. + pool, err := pgxpool.New(ctx, config.DSN) + if err != nil { + return fmt.Errorf("failed to bring up DB connection: %w", err) + } + encryptors, err := config.EncryptionKeys.Encryptors(false) + if err != nil { + return fmt.Errorf("failed to create encryptors: %w", err) + } + wg.Go(func() error { - if err := controller.Start(controllerCtx, config, runnerScaling); err != nil { + if err := controller.Start(controllerCtx, config, runnerScaling, pool, encryptors); err != nil { logger.Errorf(err, "controller%d failed: %v", i, err) return fmt.Errorf("controller%d failed: %w", i, err) } diff --git a/common/configuration/sql/models.go b/common/configuration/sql/models.go index a8e7d0b65e..4e72c77a20 100644 --- a/common/configuration/sql/models.go +++ b/common/configuration/sql/models.go @@ -523,7 +523,7 @@ type TopicEvent struct { Key model.TopicEventKey TopicID int64 Payload []byte - Caller string + Caller optional.Option[string] } type TopicSubscriber struct { diff --git a/go.mod b/go.mod index 8f0038cd3e..494f767c52 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( github.com/rs/cors v1.11.0 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 github.com/swaggest/jsonschema-go v0.3.72 + github.com/tink-crypto/tink-go/v2 v2.2.0 github.com/titanous/json5 v1.0.0 github.com/tliron/commonlog v0.2.17 github.com/tliron/glsp v0.2.2 diff --git a/go.sum b/go.sum index 5becce661c..187085ef5a 100644 --- a/go.sum +++ b/go.sum @@ -240,6 +240,8 @@ github.com/swaggest/jsonschema-go v0.3.72 h1:IHaGlR1bdBUBPfhe4tfacN2TGAPKENEGiNy github.com/swaggest/jsonschema-go v0.3.72/go.mod h1:OrGyEoVqpfSFJ4Am4V/FQcQ3mlEC1vVeleA+5ggbVW4= github.com/swaggest/refl v1.3.0 h1:PEUWIku+ZznYfsoyheF97ypSduvMApYyGkYF3nabS0I= github.com/swaggest/refl v1.3.0/go.mod h1:3Ujvbmh1pfSbDYjC6JGG7nMgPvpG0ehQL4iNonnLNbg= +github.com/tink-crypto/tink-go/v2 v2.2.0 h1:L2Da0F2Udh2agtKztdr69mV/KpnY3/lGTkMgLTVIXlA= +github.com/tink-crypto/tink-go/v2 v2.2.0/go.mod h1:JJ6PomeNPF3cJpfWC0lgyTES6zpJILkAX0cJNwlS3xU= github.com/titanous/json5 v1.0.0 h1:hJf8Su1d9NuI/ffpxgxQfxh/UiBFZX7bMPid0rIL/7s= github.com/titanous/json5 v1.0.0/go.mod h1:7JH1M8/LHKc6cyP5o5g3CSaRj+mBrIimTxzpvmckH8c= github.com/tliron/commonlog v0.2.17 h1:GFVvzDZbNLkuQfT45IZeWkrR5AyqiX7Du8pWAtFuPTY= diff --git a/integration/harness.go b/integration/harness.go index 7390aaa757..da11527ae7 100644 --- a/integration/harness.go +++ b/integration/harness.go @@ -65,6 +65,16 @@ func RunWithoutController(t *testing.T, ftlConfigPath string, actions ...Action) run(t, ftlConfigPath, false, actions...) } +func RunWithEncryption(t *testing.T, ftlConfigPath string, actions ...Action) { + logKey := `{"primaryKeyId":1467957621,"key":[{"keyData":{"typeUrl":"type.googleapis.com/google.crypto.tink.AesCtrHmacStreamingKey","value":"Eg4IgIBAECAYAyIECAMQIBog7t16YRvohzTJBKt0D4WcqFpoeWH0C20Hr09v+AxbOOE=","keyMaterialType":"SYMMETRIC"},"status":"ENABLED","keyId":1467957621,"outputPrefixType":"RAW"}]}` + asyncKey := `{"primaryKeyId":2710864232,"key":[{"keyData":{"typeUrl":"type.googleapis.com/google.crypto.tink.AesCtrHmacStreamingKey","value":"Eg4IgIBAECAYAyIECAMQIBogTFCSLcJGRRazu74LrehNGL82J0sicjnjG5uNZcDyjGE=","keyMaterialType":"SYMMETRIC"},"status":"ENABLED","keyId":2710864232,"outputPrefixType":"RAW"}]}` + + t.Setenv("FTL_LOG_ENCRYPTION_KEY", logKey) + t.Setenv("FTL_ASYNC_ENCRYPTION_KEY", asyncKey) + + run(t, ftlConfigPath, true, actions...) +} + func run(t *testing.T, ftlConfigPath string, startController bool, actions ...Action) { tmpDir := t.TempDir() diff --git a/internal/encryption/encryption.go b/internal/encryption/encryption.go new file mode 100644 index 0000000000..c3abe96726 --- /dev/null +++ b/internal/encryption/encryption.go @@ -0,0 +1,143 @@ +package encryption + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "strings" + + "github.com/tink-crypto/tink-go/v2/insecurecleartextkeyset" + "github.com/tink-crypto/tink-go/v2/keyset" + "github.com/tink-crypto/tink-go/v2/streamingaead" + "github.com/tink-crypto/tink-go/v2/tink" +) + +type Encryptable interface { + EncryptJSON(input any) (json.RawMessage, error) + DecryptJSON(input json.RawMessage, output any) error +} + +func NewForKeyOrURI(keyOrURI string) (Encryptable, error) { + if len(keyOrURI) == 0 { + return NoOpEncryptor{}, nil + } + + // If keyOrUri is a JSON string, it is a clear text key set. + if strings.TrimSpace(keyOrURI)[0] == '{' { + return NewClearTextEncryptor(keyOrURI) + // Otherwise should be a URI for KMS. + // aws-kms://arn:aws:kms:[region]:[account-id]:key/[key-id] + } else if strings.HasPrefix(keyOrURI, "aws-kms://") { + return nil, fmt.Errorf("AWS KMS is not supported yet") + } + return nil, fmt.Errorf("unsupported key or uri: %s", keyOrURI) +} + +// NoOpEncryptor does not encrypt and just passes the input as is. +type NoOpEncryptor struct { +} + +func (n NoOpEncryptor) EncryptJSON(input any) (json.RawMessage, error) { + msg, err := json.Marshal(input) + if err != nil { + return nil, fmt.Errorf("failed to marshal input: %w", err) + } + + return msg, nil +} + +func (n NoOpEncryptor) DecryptJSON(input json.RawMessage, output any) error { + err := json.Unmarshal(input, output) + if err != nil { + return fmt.Errorf("failed to unmarshal input: %w", err) + } + + return nil +} + +func NewClearTextEncryptor(key string) (Encryptable, error) { + keySetHandle, err := insecurecleartextkeyset.Read( + keyset.NewJSONReader(bytes.NewBufferString(key))) + if err != nil { + return nil, fmt.Errorf("failed to read clear text keyset: %w", err) + } + + encryptor, err := NewEncryptor(*keySetHandle) + if err != nil { + return nil, fmt.Errorf("failed to create clear text encryptor: %w", err) + } + + return encryptor, nil +} + +// NewEncryptor encrypts and decrypts JSON payloads using the provided key set. +// The key set must contain a primary key that is a streaming AEAD primitive. +func NewEncryptor(keySet keyset.Handle) (Encryptable, error) { + primitive, err := streamingaead.New(&keySet) + if err != nil { + return nil, fmt.Errorf("failed to create primitive during encryption: %w", err) + } + + return Encryptor{keySetHandle: keySet, primitive: primitive}, nil +} + +type Encryptor struct { + keySetHandle keyset.Handle + primitive tink.StreamingAEAD +} + +type EncryptedPayload struct { + Encrypted []byte `json:"encrypted"` +} + +func (e Encryptor) EncryptJSON(input any) (json.RawMessage, error) { + msg, err := json.Marshal(input) + if err != nil { + return nil, fmt.Errorf("failed to marshal input: %w", err) + } + + encryptedBuffer := &bytes.Buffer{} + msgBuffer := bytes.NewBuffer(msg) + writer, err := e.primitive.NewEncryptingWriter(encryptedBuffer, nil) + if err != nil { + return nil, fmt.Errorf("failed to create encrypting writer: %w", err) + } + + if _, err := io.Copy(writer, msgBuffer); err != nil { + return nil, fmt.Errorf("failed to copy encrypted data: %w", err) + } + if err := writer.Close(); err != nil { + return nil, fmt.Errorf("failed to close encrypted writer: %w", err) + } + + out, err := json.Marshal(EncryptedPayload{Encrypted: encryptedBuffer.Bytes()}) + if err != nil { + return nil, fmt.Errorf("failed to marshal encrypted data: %w", err) + } + return out, nil +} + +func (e Encryptor) DecryptJSON(input json.RawMessage, output any) error { + var payload EncryptedPayload + if err := json.Unmarshal(input, &payload); err != nil { + return fmt.Errorf("failed to unmarshal encrypted payload: %w", err) + } + + inputBytesReader := bytes.NewReader(payload.Encrypted) + reader, err := e.primitive.NewDecryptingReader(inputBytesReader, nil) + if err != nil { + return fmt.Errorf("failed to create decrypting reader: %w", err) + } + + decryptedBuffer := &bytes.Buffer{} + if _, err := io.Copy(decryptedBuffer, reader); err != nil { + return fmt.Errorf("failed to copy decrypted data: %w", err) + } + + if err := json.Unmarshal(decryptedBuffer.Bytes(), output); err != nil { + return fmt.Errorf("failed to unmarshal decrypted data: %w", err) + } + + return nil +} diff --git a/internal/encryption/encryption_test.go b/internal/encryption/encryption_test.go new file mode 100644 index 0000000000..ba707644c2 --- /dev/null +++ b/internal/encryption/encryption_test.go @@ -0,0 +1,46 @@ +package encryption + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/alecthomas/assert/v2" +) + +const key = `{ + "primaryKeyId": 1720777699, + "key": [{ + "keyData": { + "typeUrl": "type.googleapis.com/google.crypto.tink.AesCtrHmacStreamingKey", + "keyMaterialType": "SYMMETRIC", + "value": "Eg0IgCAQIBgDIgQIAxAgGiDtesd/4gCnQdTrh+AXodwpm2b6BFJkp043n+8mqx0YGw==" + }, + "outputPrefixType": "RAW", + "keyId": 1720777699, + "status": "ENABLED" + }] + }` + +func TestNewEncryptor(t *testing.T) { + jsonInput := "\"hello\"" + + encryptor, err := NewForKeyOrURI(key) + assert.NoError(t, err) + + encrypted, err := encryptor.EncryptJSON(jsonInput) + assert.NoError(t, err) + fmt.Printf("Encrypted: %s\n", encrypted) + + var decrypted json.RawMessage + err = encryptor.DecryptJSON(encrypted, &decrypted) + assert.NoError(t, err) + fmt.Printf("Decrypted: %s\n", decrypted) + + var decryptedString string + err = json.Unmarshal(decrypted, &decryptedString) + assert.NoError(t, err) + fmt.Printf("Decrypted string: %s\n", decryptedString) + + assert.Equal(t, jsonInput, decryptedString) +} diff --git a/internal/encryption/integration_test.go b/internal/encryption/integration_test.go new file mode 100644 index 0000000000..fe956e3182 --- /dev/null +++ b/internal/encryption/integration_test.go @@ -0,0 +1,99 @@ +//go:build integration + +package encryption + +import ( + "fmt" + "testing" + "time" + + "connectrpc.com/connect" + pbconsole "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console" + in "github.com/TBD54566975/ftl/integration" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/assert/v2" +) + +func TestEncryptionForLogs(t *testing.T) { + in.RunWithEncryption(t, "", + in.CopyModule("encryption"), + in.Deploy("encryption"), + in.Call[map[string]interface{}, any]("encryption", "echo", map[string]interface{}{"name": "Alice"}, nil), + + // confirm that we can read an event for that call + func(t testing.TB, ic in.TestContext) { + in.Infof("Read Logs") + resp, err := ic.Console.GetEvents(ic.Context, connect.NewRequest(&pbconsole.EventsQuery{ + Limit: 10, + })) + assert.NoError(t, err, "could not get events") + _, ok := slices.Find(resp.Msg.Events, func(e *pbconsole.Event) bool { + call, ok := e.Entry.(*pbconsole.Event_Call) + if !ok { + return false + } + assert.Contains(t, call.Call.Request, "Alice", "request does not contain expected value") + + return true + }) + assert.True(t, ok, "could not find event") + }, + + // confirm that we can't find that raw request string in the table + in.QueryRow("ftl", "SELECT COUNT(*) FROM events WHERE type = 'call'", int64(1)), + func(t testing.TB, ic in.TestContext) { + values := in.GetRow(t, ic, "ftl", "SELECT payload FROM events WHERE type = 'call' LIMIT 1", 1) + payload, ok := values[0].([]byte) + assert.True(t, ok, "could not convert payload to string") + assert.Contains(t, string(payload), "encrypted", "raw request string should not be stored in the table") + assert.NotContains(t, string(payload), "Alice", "raw request string should not be stored in the table") + }, + ) +} + +func TestEncryptionForubSub(t *testing.T) { + in.RunWithEncryption(t, "", + in.CopyModule("encryption"), + in.Deploy("encryption"), + in.Call[map[string]interface{}, any]("encryption", "publish", map[string]interface{}{"name": "AliceInWonderland"}, nil), + + in.Sleep(4*time.Second), + + // check that the event was published with an encrypted request + in.QueryRow("ftl", "SELECT COUNT(*) FROM topic_events", int64(1)), + func(t testing.TB, ic in.TestContext) { + values := in.GetRow(t, ic, "ftl", "SELECT payload FROM topic_events", 1) + payload, ok := values[0].([]byte) + assert.True(t, ok, "could not convert payload to string") + assert.Contains(t, string(payload), "encrypted", "raw request string should not be stored in the table") + assert.NotContains(t, string(payload), "AliceInWonderland", "raw request string should not be stored in the table") + }, + validateAsyncCall("consume", "AliceInWonderland"), + ) +} + +func TestEncryptionForFSM(t *testing.T) { + in.RunWithEncryption(t, "", + in.CopyModule("encryption"), + in.Deploy("encryption"), + in.Call[map[string]interface{}, any]("encryption", "beginFsm", map[string]interface{}{"name": "Rosebud"}, nil), + in.Sleep(3*time.Second), + in.Call[map[string]interface{}, any]("encryption", "transitionFsm", map[string]interface{}{"name": "Rosebud"}, nil), + in.Sleep(3*time.Second), + + validateAsyncCall("created", "Rosebud"), + validateAsyncCall("paid", "Rosebud"), + ) +} + +func validateAsyncCall(verb string, sensitive string) in.Action { + return func(t testing.TB, ic in.TestContext) { + in.QueryRow("ftl", fmt.Sprintf("SELECT COUNT(*) FROM async_calls WHERE verb = 'encryption.%s' AND state = 'success'", verb), int64(1))(t, ic) + + values := in.GetRow(t, ic, "ftl", fmt.Sprintf("SELECT request FROM async_calls WHERE verb = 'encryption.%s' AND state = 'success'", verb), 1) + request, ok := values[0].([]byte) + assert.True(t, ok, "could not convert payload to string") + assert.Contains(t, string(request), "encrypted", "raw request string should not be stored in the table") + assert.NotContains(t, string(request), sensitive, "raw request string should not be stored in the table") + } +} diff --git a/internal/encryption/testdata/go/encryption/encryption.go b/internal/encryption/testdata/go/encryption/encryption.go new file mode 100644 index 0000000000..ef203df425 --- /dev/null +++ b/internal/encryption/testdata/go/encryption/encryption.go @@ -0,0 +1,99 @@ +package encryption + +import ( + "context" + "fmt" + + "github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK. +) + +// Basic call +// +// Used to test encryption of call event logs + +type EchoRequest struct { + Name ftl.Option[string] `json:"name"` +} + +type EchoResponse struct { + Message string `json:"message"` +} + +//ftl:verb +func Echo(ctx context.Context, req EchoRequest) (EchoResponse, error) { + return EchoResponse{Message: fmt.Sprintf("Hello, %s!", req.Name.Default("anonymous"))}, nil +} + +//ftl:data +type Event struct { + Name string `json:"name"` +} + +// PubSub +// +// Used to test encryption of topic_events and async_calls tables + +var Topic = ftl.Topic[Event]("topic") +var _ = ftl.Subscription(Topic, "subscription") + +//ftl:verb +func Publish(ctx context.Context, e Event) error { + fmt.Printf("Publishing event: %s\n", e.Name) + return Topic.Publish(ctx, e) +} + +//ftl:verb +//ftl:subscribe subscription +func Consume(ctx context.Context, e Event) error { + fmt.Printf("Received event: %s\n", e.Name) + if e.Name != "AliceInWonderland" { + return fmt.Errorf("Unexpected event: %s", e.Name) + } + return nil +} + +// FSM +// +// Used to test encryption of async_calls tables via FSM operations + +var fsm = ftl.FSM("payment", + ftl.Start(Created), + ftl.Start(Paid), + ftl.Transition(Created, Paid), + ftl.Transition(Paid, Completed), +) + +type OnlinePaymentCompleted struct { + Name string `json:"name"` +} +type OnlinePaymentPaid struct { + Name string `json:"name"` +} +type OnlinePaymentCreated struct { + Name string `json:"name"` +} + +//ftl:verb +func BeginFSM(ctx context.Context, req OnlinePaymentCreated) error { + return fsm.Send(ctx, "test", req) +} + +//ftl:verb +func TransitionFSM(ctx context.Context, req OnlinePaymentPaid) error { + return fsm.Send(ctx, "test", req) +} + +//ftl:verb +func Completed(ctx context.Context, in OnlinePaymentCompleted) error { + return nil +} + +//ftl:verb +func Created(ctx context.Context, in OnlinePaymentCreated) error { + return nil +} + +//ftl:verb +func Paid(ctx context.Context, in OnlinePaymentPaid) error { + return nil +} diff --git a/internal/encryption/testdata/go/encryption/ftl.toml b/internal/encryption/testdata/go/encryption/ftl.toml new file mode 100644 index 0000000000..13a6f61e5c --- /dev/null +++ b/internal/encryption/testdata/go/encryption/ftl.toml @@ -0,0 +1,2 @@ +module = "encryption" +language = "go" diff --git a/internal/encryption/testdata/go/encryption/go.mod b/internal/encryption/testdata/go/encryption/go.mod new file mode 100644 index 0000000000..0eabab575f --- /dev/null +++ b/internal/encryption/testdata/go/encryption/go.mod @@ -0,0 +1,47 @@ +module ftl/encryption + +go 1.22.5 + +require github.com/TBD54566975/ftl v1.1.5 + +require ( + connectrpc.com/connect v1.16.2 // indirect + connectrpc.com/grpcreflect v1.2.0 // indirect + connectrpc.com/otelconnect v0.7.1 // indirect + github.com/alecthomas/atomic v0.1.0-alpha2 // indirect + github.com/alecthomas/concurrency v0.0.2 // indirect + github.com/alecthomas/participle/v2 v2.1.1 // indirect + github.com/alecthomas/types v0.16.0 // indirect + github.com/alessio/shellescape v1.4.2 // indirect + github.com/benbjohnson/clock v1.3.5 // indirect + github.com/danieljoos/wincred v1.2.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/godbus/dbus/v5 v5.1.0 // indirect + github.com/hashicorp/cronexpr v1.1.2 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx/v5 v5.6.0 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jpillora/backoff v1.0.0 // indirect + github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/multiformats/go-base36 v0.2.0 // indirect + github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect + github.com/swaggest/jsonschema-go v0.3.72 // indirect + github.com/swaggest/refl v1.3.0 // indirect + github.com/zalando/go-keyring v0.2.5 // indirect + go.opentelemetry.io/otel v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.28.0 // indirect + go.opentelemetry.io/otel/trace v1.28.0 // indirect + golang.org/x/crypto v0.25.0 // indirect + golang.org/x/exp v0.0.0-20240707233637-46b078467d37 // indirect + golang.org/x/mod v0.19.0 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect +) + +replace github.com/TBD54566975/ftl => ./../../../../.. diff --git a/internal/encryption/testdata/go/encryption/go.sum b/internal/encryption/testdata/go/encryption/go.sum new file mode 100644 index 0000000000..359cfad1d6 --- /dev/null +++ b/internal/encryption/testdata/go/encryption/go.sum @@ -0,0 +1,148 @@ +connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE= +connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc= +connectrpc.com/grpcreflect v1.2.0 h1:Q6og1S7HinmtbEuBvARLNwYmTbhEGRpHDhqrPNlmK+U= +connectrpc.com/grpcreflect v1.2.0/go.mod h1:nwSOKmE8nU5u/CidgHtPYk1PFI3U9ignz7iDMxOYkSY= +connectrpc.com/otelconnect v0.7.1 h1:scO5pOb0i4yUE66CnNrHeK1x51yq0bE0ehPg6WvzXJY= +connectrpc.com/otelconnect v0.7.1/go.mod h1:dh3bFgHBTb2bkqGCeVVOtHJreSns7uu9wwL2Tbz17ms= +github.com/TBD54566975/scaffolder v1.0.0 h1:QUFSy2wVzumLDg7IHcKC6AP+IYyqWe9Wxiu72nZn5qU= +github.com/TBD54566975/scaffolder v1.0.0/go.mod h1:auVpczIbOAdIhYDVSruIw41DanxOKB9bSvjf6MEl7Fs= +github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY= +github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= +github.com/alecthomas/atomic v0.1.0-alpha2 h1:dqwXmax66gXvHhsOS4pGPZKqYOlTkapELkLb3MNdlH8= +github.com/alecthomas/atomic v0.1.0-alpha2/go.mod h1:zD6QGEyw49HIq19caJDc2NMXAy8rNi9ROrxtMXATfyI= +github.com/alecthomas/concurrency v0.0.2 h1:Q3kGPtLbleMbH9lHX5OBFvJygfyFw29bXZKBg+IEVuo= +github.com/alecthomas/concurrency v0.0.2/go.mod h1:GmuQb/iHX7mbNtPlC/WDzEFxDMB0HYFer2Qda9QTs7w= +github.com/alecthomas/participle/v2 v2.1.1 h1:hrjKESvSqGHzRb4yW1ciisFJ4p3MGYih6icjJvbsmV8= +github.com/alecthomas/participle/v2 v2.1.1/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c= +github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= +github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= +github.com/alecthomas/types v0.16.0 h1:o9+JSwCRB6DDaWDeR/Mg7v/zh3R+MlknM6DrnDyY7U0= +github.com/alecthomas/types v0.16.0/go.mod h1:Tswm0qQpjpVq8rn70OquRsUtFxbQKub/8TMyYYGI0+k= +github.com/alessio/shellescape v1.4.2 h1:MHPfaU+ddJ0/bYWpgIeUnQUqKrlJ1S7BfEYPM4uEoM0= +github.com/alessio/shellescape v1.4.2/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30= +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/bool64/dev v0.2.35 h1:M17TLsO/pV2J7PYI/gpe3Ua26ETkzZGb+dC06eoMqlk= +github.com/bool64/dev v0.2.35/go.mod h1:iJbh1y/HkunEPhgebWRNcs8wfGq7sjvJ6W5iabL8ACg= +github.com/bool64/shared v0.1.5 h1:fp3eUhBsrSjNCQPcSdQqZxxh9bBwrYiZ+zOKFkM0/2E= +github.com/bool64/shared v0.1.5/go.mod h1:081yz68YC9jeFB3+Bbmno2RFWvGKv1lPKkMP6MHJlPs= +github.com/danieljoos/wincred v1.2.0 h1:ozqKHaLK0W/ii4KVbbvluM91W2H3Sh0BncbUNPS7jLE= +github.com/danieljoos/wincred v1.2.0/go.mod h1:FzQLLMKBFdvu+osBrnFODiv32YGwCfx0SkRa/eYHgec= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= +github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/cronexpr v1.1.2 h1:wG/ZYIKT+RT3QkOdgYc+xsKWVRgnxJ1OJtjjy84fJ9A= +github.com/hashicorp/cronexpr v1.1.2/go.mod h1:P4wA0KBl9C5q2hABiMO7cp6jcIg96CDh1Efb3g1PWA4= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= +github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= +github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJn+Ichc= +github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJDkXXS7VoV7XGE= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0= +github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/otiai10/copy v1.14.0 h1:dCI/t1iTdYGtkvCuBG2BgR6KZa83PTclw4U5n2wAllU= +github.com/otiai10/copy v1.14.0/go.mod h1:ECfuL02W+/FkTWZWgQqXPWZgW9oeKCSQ5qVfSc4qc4w= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4= +github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4= +github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY= +github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= +github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/swaggest/assertjson v1.9.0 h1:dKu0BfJkIxv/xe//mkCrK5yZbs79jL7OVf9Ija7o2xQ= +github.com/swaggest/assertjson v1.9.0/go.mod h1:b+ZKX2VRiUjxfUIal0HDN85W0nHPAYUbYH5WkkSsFsU= +github.com/swaggest/jsonschema-go v0.3.72 h1:IHaGlR1bdBUBPfhe4tfacN2TGAPKENEGiNyNzvnVHv4= +github.com/swaggest/jsonschema-go v0.3.72/go.mod h1:OrGyEoVqpfSFJ4Am4V/FQcQ3mlEC1vVeleA+5ggbVW4= +github.com/swaggest/refl v1.3.0 h1:PEUWIku+ZznYfsoyheF97ypSduvMApYyGkYF3nabS0I= +github.com/swaggest/refl v1.3.0/go.mod h1:3Ujvbmh1pfSbDYjC6JGG7nMgPvpG0ehQL4iNonnLNbg= +github.com/yudai/gojsondiff v1.0.0 h1:27cbfqXLVEJ1o8I6v3y9lg8Ydm53EKqHXAOMxEGlCOA= +github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg= +github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3IfnEUduWvb9is428/nNb5L3U01M= +github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM= +github.com/zalando/go-keyring v0.2.5 h1:Bc2HHpjALryKD62ppdEzaFG6VxL6Bc+5v0LYpN8Lba8= +github.com/zalando/go-keyring v0.2.5/go.mod h1:HL4k+OXQfJUWaMnqyuSOc0drfGPX2b51Du6K+MRgZMk= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= +go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/sdk/metric v1.28.0 h1:OkuaKgKrgAbYrrY0t92c+cC+2F6hsFNnCQArXCKlg08= +go.opentelemetry.io/otel/sdk/metric v1.28.0/go.mod h1:cWPjykihLAPvXKi4iZc1dpER3Jdq2Z0YLse3moQUCpg= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= +golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/exp v0.0.0-20240707233637-46b078467d37 h1:uLDX+AfeFCct3a2C7uIWBKMJIR3CJMhcgfrUAqjRK6w= +golang.org/x/exp v0.0.0-20240707233637-46b078467d37/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= +golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8= +golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= +modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U= +modernc.org/libc v1.55.3/go.mod h1:qFXepLhz+JjFThQ4kzwzOjA/y/artDeg+pcYnY+Q83w= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E= +modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU= +modernc.org/sqlite v1.31.1 h1:XVU0VyzxrYHlBhIs1DiEgSl0ZtdnPtbLVy8hSkzxGrs= +modernc.org/sqlite v1.31.1/go.mod h1:UqoylwmTb9F+IqXERT8bW9zzOWN8qwAIcLdzeBZs4hA= +modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= +modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=