diff --git a/internal/api/backend/backend.go b/internal/api/backend/backend.go index 72bbde94..f10ac941 100644 --- a/internal/api/backend/backend.go +++ b/internal/api/backend/backend.go @@ -15,6 +15,7 @@ import ( //go:generate mockgen -source backend.go -destination backend_generated.go -package backend . Backend type Backend interface { // Accounts + AccountsCreate(ctx context.Context, account models.Account) error AccountsList(ctx context.Context, query storage.ListAccountsQuery) (*bunpaginate.Cursor[models.Account], error) AccountsGet(ctx context.Context, id models.AccountID) (*models.Account, error) BankAccountsList(ctx context.Context, query storage.ListBankAccountsQuery) (*bunpaginate.Cursor[models.BankAccount], error) diff --git a/internal/api/module.go b/internal/api/module.go index 2fcbfba3..570717d5 100644 --- a/internal/api/module.go +++ b/internal/api/module.go @@ -8,6 +8,7 @@ import ( "github.com/formancehq/payments/internal/api/backend" "github.com/formancehq/payments/internal/api/services" "github.com/formancehq/payments/internal/connectors/engine" + "github.com/formancehq/payments/internal/events" "github.com/formancehq/payments/internal/storage" "github.com/go-chi/chi/v5" "go.uber.org/fx" @@ -31,8 +32,8 @@ func NewModule(bind string, debug bool) fx.Option { ) *chi.Mux { return NewRouter(backend, info, healthController, a, debug, versions...) }, fx.ParamTags(``, ``, ``, ``, `group:"apiVersions"`))), - fx.Provide(func(storage storage.Storage, engine engine.Engine) backend.Backend { - return services.New(storage, engine) + fx.Provide(func(storage storage.Storage, engine engine.Engine, events *events.Events) backend.Backend { + return services.New(storage, engine, events) }), ) } diff --git a/internal/api/services/accounts_create.go b/internal/api/services/accounts_create.go new file mode 100644 index 00000000..d0ab1888 --- /dev/null +++ b/internal/api/services/accounts_create.go @@ -0,0 +1,11 @@ +package services + +import ( + "context" + + "github.com/formancehq/payments/internal/models" +) + +func (s *Service) AccountsCreate(ctx context.Context, account models.Account) error { + return handleEngineErrors(s.engine.CreateFormanceAccount(ctx, account)) +} diff --git a/internal/api/services/services.go b/internal/api/services/services.go index 88b34eec..590d9911 100644 --- a/internal/api/services/services.go +++ b/internal/api/services/services.go @@ -2,6 +2,7 @@ package services import ( "github.com/formancehq/payments/internal/connectors/engine" + "github.com/formancehq/payments/internal/events" "github.com/formancehq/payments/internal/storage" ) @@ -9,11 +10,13 @@ type Service struct { storage storage.Storage engine engine.Engine + events *events.Events } -func New(storage storage.Storage, engine engine.Engine) *Service { +func New(storage storage.Storage, engine engine.Engine, events *events.Events) *Service { return &Service{ storage: storage, engine: engine, + events: events, } } diff --git a/internal/api/v2/handler_accounts_create.go b/internal/api/v2/handler_accounts_create.go new file mode 100644 index 00000000..3c027515 --- /dev/null +++ b/internal/api/v2/handler_accounts_create.go @@ -0,0 +1,140 @@ +package v2 + +import ( + "encoding/json" + "errors" + "net/http" + "time" + + "github.com/formancehq/go-libs/api" + "github.com/formancehq/payments/internal/api/backend" + "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/internal/otel" +) + +type createAccountRequest struct { + Reference string `json:"reference"` + ConnectorID string `json:"connectorID"` + CreatedAt time.Time `json:"createdAt"` + DefaultAsset string `json:"defaultAsset"` + AccountName string `json:"accountName"` + Type string `json:"type"` + Metadata map[string]string `json:"metadata"` +} + +func (r *createAccountRequest) validate() error { + if r.Reference == "" { + return errors.New("reference is required") + } + + if r.ConnectorID == "" { + return errors.New("connectorID is required") + } + + if r.CreatedAt.IsZero() || r.CreatedAt.After(time.Now()) { + return errors.New("createdAt is empty or in the future") + } + + if r.AccountName == "" { + return errors.New("accountName is required") + } + + if r.Type == "" { + return errors.New("type is required") + } + + _, err := models.ConnectorIDFromString(r.ConnectorID) + if err != nil { + return errors.New("connectorID is invalid") + } + + switch r.Type { + case string(models.ACCOUNT_TYPE_EXTERNAL): + case string(models.ACCOUNT_TYPE_INTERNAL): + default: + return errors.New("type is invalid") + } + + return nil +} + +func accountsCreate(backend backend.Backend) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + ctx, span := otel.Tracer().Start(r.Context(), "v2_accountsCreate") + defer span.End() + + var req createAccountRequest + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + otel.RecordError(span, err) + api.BadRequest(w, ErrMissingOrInvalidBody, err) + return + } + + if err := req.validate(); err != nil { + otel.RecordError(span, err) + api.BadRequest(w, ErrValidation, err) + return + } + + connectorID := models.MustConnectorIDFromString(req.ConnectorID) + raw, err := json.Marshal(req) + if err != nil { + otel.RecordError(span, err) + api.InternalServerError(w, r, err) + return + } + + account := models.Account{ + ID: models.AccountID{ + Reference: req.Reference, + ConnectorID: connectorID, + }, + ConnectorID: connectorID, + Reference: req.Reference, + CreatedAt: req.CreatedAt, + Type: models.AccountType(req.Type), + Name: &req.AccountName, + DefaultAsset: &req.DefaultAsset, + Metadata: req.Metadata, + Raw: raw, + } + + err = backend.AccountsCreate(ctx, account) + if err != nil { + otel.RecordError(span, err) + handleServiceErrors(w, r, err) + return + } + + // Compatibility with old API + res := accountResponse{ + ID: account.ID.String(), + Reference: account.Reference, + CreatedAt: account.CreatedAt, + ConnectorID: account.ConnectorID.String(), + Provider: account.ConnectorID.Provider, + Type: string(account.Type), + Metadata: account.Metadata, + Raw: account.Raw, + } + + if account.DefaultAsset != nil { + res.DefaultCurrency = *account.DefaultAsset + res.DefaultAsset = *account.DefaultAsset + } + + if account.Name != nil { + res.AccountName = *account.Name + } + + err = json.NewEncoder(w).Encode(api.BaseResponse[accountResponse]{ + Data: &res, + }) + if err != nil { + otel.RecordError(span, err) + api.InternalServerError(w, r, err) + return + } + } +} diff --git a/internal/api/v2/router.go b/internal/api/v2/router.go index cf0eadcf..5bb4b17d 100644 --- a/internal/api/v2/router.go +++ b/internal/api/v2/router.go @@ -27,11 +27,11 @@ func newRouter(backend backend.Backend, a auth.Authenticator, debug bool) *chi.M // Accounts r.Route("/accounts", func(r chi.Router) { r.Get("/", accountsList(backend)) + r.Post("/", accountsCreate(backend)) r.Route("/{accountID}", func(r chi.Router) { r.Get("/", accountsGet(backend)) r.Get("/balances", accountsBalances(backend)) - // TODO(polo): add create account handler }) }) diff --git a/internal/api/v3/handler_accounts_create.go b/internal/api/v3/handler_accounts_create.go new file mode 100644 index 00000000..b89de7d6 --- /dev/null +++ b/internal/api/v3/handler_accounts_create.go @@ -0,0 +1,112 @@ +package v3 + +import ( + "encoding/json" + "errors" + "net/http" + "time" + + "github.com/formancehq/go-libs/api" + "github.com/formancehq/payments/internal/api/backend" + "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/internal/otel" +) + +type createAccountRequest struct { + Reference string `json:"reference"` + ConnectorID string `json:"connectorID"` + CreatedAt time.Time `json:"createdAt"` + DefaultAsset string `json:"defaultAsset"` + AccountName string `json:"accountName"` + Type string `json:"type"` + Metadata map[string]string `json:"metadata"` +} + +func (r *createAccountRequest) validate() error { + if r.Reference == "" { + return errors.New("reference is required") + } + + if r.ConnectorID == "" { + return errors.New("connectorID is required") + } + + if r.CreatedAt.IsZero() || r.CreatedAt.After(time.Now()) { + return errors.New("createdAt is empty or in the future") + } + + if r.AccountName == "" { + return errors.New("accountName is required") + } + + if r.Type == "" { + return errors.New("type is required") + } + + _, err := models.ConnectorIDFromString(r.ConnectorID) + if err != nil { + return errors.New("connectorID is invalid") + } + + switch r.Type { + case string(models.ACCOUNT_TYPE_EXTERNAL): + case string(models.ACCOUNT_TYPE_INTERNAL): + default: + return errors.New("type is invalid") + } + + return nil +} + +func accountsCreate(backend backend.Backend) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + ctx, span := otel.Tracer().Start(r.Context(), "v3_accountsCreate") + defer span.End() + + var req createAccountRequest + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + otel.RecordError(span, err) + api.BadRequest(w, ErrMissingOrInvalidBody, err) + return + } + + if err := req.validate(); err != nil { + otel.RecordError(span, err) + api.BadRequest(w, ErrValidation, err) + return + } + + connectorID := models.MustConnectorIDFromString(req.ConnectorID) + raw, err := json.Marshal(req) + if err != nil { + otel.RecordError(span, err) + api.InternalServerError(w, r, err) + return + } + + account := models.Account{ + ID: models.AccountID{ + Reference: req.Reference, + ConnectorID: connectorID, + }, + ConnectorID: connectorID, + Reference: req.Reference, + CreatedAt: req.CreatedAt, + Type: models.AccountType(req.Type), + Name: &req.AccountName, + DefaultAsset: &req.DefaultAsset, + Metadata: req.Metadata, + Raw: raw, + } + + err = backend.AccountsCreate(ctx, account) + if err != nil { + otel.RecordError(span, err) + handleServiceErrors(w, r, err) + return + } + + api.Created(w, account) + } +} diff --git a/internal/api/v3/router.go b/internal/api/v3/router.go index 55913efa..398c061e 100644 --- a/internal/api/v3/router.go +++ b/internal/api/v3/router.go @@ -28,11 +28,11 @@ func newRouter(backend backend.Backend, a auth.Authenticator, debug bool) *chi.M // Accounts r.Route("/accounts", func(r chi.Router) { r.Get("/", accountsList(backend)) + r.Post("/", accountsCreate(backend)) r.Route("/{accountID}", func(r chi.Router) { r.Get("/", accountsGet(backend)) r.Get("/balances", accountsBalances(backend)) - // TODO(polo): add create account handler }) }) diff --git a/internal/connectors/engine/engine.go b/internal/connectors/engine/engine.go index debb1d2c..67a06152 100644 --- a/internal/connectors/engine/engine.go +++ b/internal/connectors/engine/engine.go @@ -22,19 +22,41 @@ import ( ) type Engine interface { + // Install a connector with the given provider and configuration. InstallConnector(ctx context.Context, provider string, rawConfig json.RawMessage) (models.ConnectorID, error) + // Uninstall a connector with the given ID. UninstallConnector(ctx context.Context, connectorID models.ConnectorID) error + // Reset a connector with the given ID, by uninstalling and reinstalling it. ResetConnector(ctx context.Context, connectorID models.ConnectorID) error + + // Create a Formance account, no call to the plugin, just a creation + // of an account in the database related to the provided connector id. + CreateFormanceAccount(ctx context.Context, account models.Account) error + + // Forward a bank account to the given connector, which will create it + // in the external system (PSP). ForwardBankAccount(ctx context.Context, bankAccountID uuid.UUID, connectorID models.ConnectorID) (*models.BankAccount, error) + // Create a transfer between two accounts on the given connector (PSP). CreateTransfer(ctx context.Context, piID models.PaymentInitiationID, attempt int) error + // Create a payout on the given connector (PSP). CreatePayout(ctx context.Context, piID models.PaymentInitiationID, attempt int) error + + // We received a webhook, handle it by calling the corresponding plugin to + // translate it to a formance object and store it. HandleWebhook(ctx context.Context, urlPath string, webhook models.Webhook) error + + // Create a Formance pool composed of accounts. CreatePool(ctx context.Context, pool models.Pool) error + // Add an account to a Formance pool. AddAccountToPool(ctx context.Context, id uuid.UUID, accountID models.AccountID) error + // Remove an account from a Formance pool. RemoveAccountFromPool(ctx context.Context, id uuid.UUID, accountID models.AccountID) error + // Delete a Formance pool. DeletePool(ctx context.Context, poolID uuid.UUID) error + // Called when the engine is starting, to start all the plugins. OnStart(ctx context.Context) error + // Called when the engine is stopping, to stop all the plugins. OnStop(ctx context.Context) } @@ -223,6 +245,35 @@ func (e *engine) ResetConnector(ctx context.Context, connectorID models.Connecto return nil } +func (e *engine) CreateFormanceAccount(ctx context.Context, account models.Account) error { + run, err := e.temporalClient.ExecuteWorkflow( + ctx, + client.StartWorkflowOptions{ + ID: fmt.Sprintf("create-formance-account-%s-%s", account.ConnectorID.String(), account.Reference), + TaskQueue: account.ConnectorID.String(), + WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, + WorkflowExecutionErrorWhenAlreadyStarted: false, + SearchAttributes: map[string]interface{}{ + workflow.SearchAttributeStack: e.stack, + }, + }, + workflow.RunCreateFormanceAccount, + workflow.CreateFormanceAccount{ + Account: account, + }, + ) + if err != nil { + return err + } + + // Wait for bank account creation to complete + if err := run.Get(ctx, nil); err != nil { + return err + } + + return nil +} + func (e *engine) ForwardBankAccount(ctx context.Context, bankAccountID uuid.UUID, connectorID models.ConnectorID) (*models.BankAccount, error) { run, err := e.temporalClient.ExecuteWorkflow( ctx, diff --git a/internal/connectors/engine/workflow/create_formance_account.go b/internal/connectors/engine/workflow/create_formance_account.go new file mode 100644 index 00000000..ad6b7e0d --- /dev/null +++ b/internal/connectors/engine/workflow/create_formance_account.go @@ -0,0 +1,52 @@ +package workflow + +import ( + "github.com/formancehq/payments/internal/connectors/engine/activities" + "github.com/formancehq/payments/internal/models" + "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/workflow" +) + +type CreateFormanceAccount struct { + Account models.Account +} + +func (w Workflow) runCreateFormanceAccount( + ctx workflow.Context, + createFormanceAccount CreateFormanceAccount, +) error { + err := activities.StorageAccountsStore( + infiniteRetryContext(ctx), + []models.Account{createFormanceAccount.Account}, + ) + if err != nil { + return err + } + + if err := workflow.ExecuteChildWorkflow( + workflow.WithChildOptions( + ctx, + workflow.ChildWorkflowOptions{ + TaskQueue: createFormanceAccount.Account.ConnectorID.String(), + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, + SearchAttributes: map[string]interface{}{ + SearchAttributeStack: w.stack, + }, + }, + ), + RunSendEvents, + SendEvents{ + Account: &createFormanceAccount.Account, + }, + ).Get(ctx, nil); err != nil { + return err + } + + return nil +} + +var RunCreateFormanceAccount any + +func init() { + RunCreateFormanceAccount = Workflow{}.runCreateFormanceAccount +} diff --git a/internal/connectors/engine/workflow/workflow.go b/internal/connectors/engine/workflow/workflow.go index 7eec7feb..04bffaaa 100644 --- a/internal/connectors/engine/workflow/workflow.go +++ b/internal/connectors/engine/workflow/workflow.go @@ -97,6 +97,10 @@ func (w Workflow) DefinitionSet() temporalworker.DefinitionSet { Name: "Run", Func: w.run, }). + Append(temporalworker.Definition{ + Name: "RunCreateFormanceAccount", + Func: w.runCreateFormanceAccount, + }). Append(temporalworker.Definition{ Name: "RunCreateWebhooks", Func: w.runCreateWebhooks,