Skip to content

Commit

Permalink
feat(connector): fix reset connector api
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas committed Jan 2, 2025
1 parent 6e6f68d commit 36ea334
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 49 deletions.
2 changes: 1 addition & 1 deletion internal/api/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Backend interface {
ConnectorsList(ctx context.Context, query storage.ListConnectorsQuery) (*bunpaginate.Cursor[models.Connector], error)
ConnectorsInstall(ctx context.Context, provider string, config json.RawMessage) (models.ConnectorID, error)
ConnectorsUninstall(ctx context.Context, connectorID models.ConnectorID) (models.Task, error)
ConnectorsReset(ctx context.Context, connectorID models.ConnectorID) error
ConnectorsReset(ctx context.Context, connectorID models.ConnectorID) (models.Task, error)

// Payments
PaymentsCreate(ctx context.Context, payment models.Payment) error
Expand Down
7 changes: 4 additions & 3 deletions internal/api/backend/backend_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions internal/api/services/connectors_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"github.com/formancehq/payments/internal/models"
)

func (s *Service) ConnectorsReset(ctx context.Context, connectorID models.ConnectorID) error {
err := s.engine.ResetConnector(ctx, connectorID)
return handleEngineErrors(err)
func (s *Service) ConnectorsReset(ctx context.Context, connectorID models.ConnectorID) (models.Task, error) {
task, err := s.engine.ResetConnector(ctx, connectorID)
if err != nil {
return models.Task{}, handleEngineErrors(err)
}
return task, nil
}
2 changes: 1 addition & 1 deletion internal/api/v2/handler_connectors_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func connectorsReset(backend backend.Backend) http.HandlerFunc {
return
}

if err := backend.ConnectorsReset(ctx, connectorID); err != nil {
if _, err := backend.ConnectorsReset(ctx, connectorID); err != nil {
otel.RecordError(span, err)
handleServiceErrors(w, r, err)
return
Expand Down
11 changes: 9 additions & 2 deletions internal/api/v3/handler_connectors_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"go.opentelemetry.io/otel/attribute"
)

type ConnectorResetResponse struct {
TaskID string `json:"taskID"`
}

func connectorsReset(backend backend.Backend) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Tracer().Start(r.Context(), "v3_connectorsReset")
Expand All @@ -23,12 +27,15 @@ func connectorsReset(backend backend.Backend) http.HandlerFunc {
return
}

if err := backend.ConnectorsReset(ctx, connectorID); err != nil {
task, err := backend.ConnectorsReset(ctx, connectorID)
if err != nil {
otel.RecordError(span, err)
handleServiceErrors(w, r, err)
return
}

api.NoContent(w)
api.Accepted(w, ConnectorResetResponse{
TaskID: task.ID.String(),
})
}
}
8 changes: 8 additions & 0 deletions internal/connectors/engine/activities/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,18 @@ func (a Activities) DefinitionSet() temporalworker.DefinitionSet {
Name: "StorageConnectorsStore",
Func: a.StorageConnectorsStore,
}).
Append(temporalworker.Definition{
Name: "StorageConnectorsGet",
Func: a.StorageConnectorsGet,
}).
Append(temporalworker.Definition{
Name: "StorageConnectorsDelete",
Func: a.StorageConnectorsDelete,
}).
Append(temporalworker.Definition{
Name: "StorageConnectorsScheduleForDeletion",
Func: a.StorageConnectorsScheduleForDeletion,
}).
Append(temporalworker.Definition{
Name: "StorageSchedulesStore",
Func: a.StorageSchedulesStore,
Expand Down
27 changes: 27 additions & 0 deletions internal/connectors/engine/activities/storage_connectors_get.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package activities

import (
"context"

"github.com/formancehq/payments/internal/models"
"go.temporal.io/sdk/workflow"
)

func (a Activities) StorageConnectorsGet(ctx context.Context, connectorID models.ConnectorID) (*models.Connector, error) {
connector, err := a.storage.ConnectorsGet(ctx, connectorID)
if err != nil {
return nil, temporalStorageError(err)
}
return connector, nil
}

var StorageConnectorsGetActivity = Activities{}.StorageConnectorsGet

func StorageConnectorsGet(ctx workflow.Context, connectorID models.ConnectorID) (*models.Connector, error) {
var connector models.Connector
err := executeActivity(ctx, StorageConnectorsGetActivity, &connector, connectorID)
if err != nil {
return nil, err
}
return &connector, err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package activities

import (
"context"

"github.com/formancehq/payments/internal/models"
"go.temporal.io/sdk/workflow"
)

func (a Activities) StorageConnectorsScheduleForDeletion(ctx context.Context, connectorID models.ConnectorID) error {
return temporalStorageError(a.storage.ConnectorsScheduleForDeletion(ctx, connectorID))
}

var StorageConnectorsScheduleForDeletionActivity = Activities{}.StorageConnectorsScheduleForDeletion

func StorageConnectorsScheduleForDeletion(ctx workflow.Context, connectorID models.ConnectorID) error {
return executeActivity(ctx, StorageConnectorsScheduleForDeletionActivity, nil, connectorID)
}
65 changes: 35 additions & 30 deletions internal/connectors/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Engine interface {
// Uninstall a connector with the given ID.
UninstallConnector(ctx context.Context, connectorID models.ConnectorID) (models.Task, error)
// Reset a connector with the given ID, by uninstalling and reinstalling it.
ResetConnector(ctx context.Context, connectorID models.ConnectorID) error
ResetConnector(ctx context.Context, connectorID models.ConnectorID) (models.Task, error)

// Create a Formance account, no call to the plugin, just a creation
// of an account in the database related to the provided connector id.
Expand Down Expand Up @@ -212,7 +212,7 @@ func (e *engine) UninstallConnector(ctx context.Context, connectorID models.Conn
workflow.UninstallConnector{
ConnectorID: connectorID,
DefaultWorkerName: getDefaultTaskQueue(e.stack),
TaskID: task.ID,
TaskID: &task.ID,
},
)
if err != nil {
Expand All @@ -229,60 +229,65 @@ func (e *engine) UninstallConnector(ctx context.Context, connectorID models.Conn
return task, nil
}

func (e *engine) ResetConnector(ctx context.Context, connectorID models.ConnectorID) error {
func (e *engine) ResetConnector(ctx context.Context, connectorID models.ConnectorID) (models.Task, error) {
ctx, span := otel.Tracer().Start(ctx, "engine.ResetConnector")
defer span.End()

connector, err := e.storage.ConnectorsGet(ctx, connectorID)
if err != nil {
otel.RecordError(span, err)
return err
}

// Detached the context to avoid being in a weird state if request is
// cancelled in the middle of the operation.
detachedCtx := context.WithoutCancel(ctx)
// Since we detached the context, we need to wait for the operation to finish
// even if the app is shutting down gracefully.
e.wg.Add(1)
defer e.wg.Done()

if _, err := e.UninstallConnector(detachedCtx, connectorID); err != nil {
otel.RecordError(span, err)
return err
now := time.Now()
id := fmt.Sprintf("reset-%s-%s", e.stack, connectorID.String())
task := models.Task{
// Do not fill the connector ID as it will be deleted
ID: models.TaskID{
Reference: id,
ConnectorID: connectorID,
},
Status: models.TASK_STATUS_PROCESSING,
CreatedAt: now,
UpdatedAt: now,
}

_, err = e.InstallConnector(detachedCtx, connectorID.Provider, connector.Config)
if err != nil {
if err := e.storage.TasksUpsert(ctx, task); err != nil {
otel.RecordError(span, err)
return err
return models.Task{}, err
}

run, err := e.temporalClient.ExecuteWorkflow(
// Launch the uninstallation in background
_, err := e.temporalClient.ExecuteWorkflow(
detachedCtx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("reset-%s-%s", e.stack, connectorID.String()),
TaskQueue: getConnectorTaskQueue(e.stack, connectorID),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
ID: id,
TaskQueue: getDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
workflow.SearchAttributeStack: e.stack,
},
},
workflow.RunSendEvents,
workflow.SendEvents{
ConnectorReset: &connectorID,
workflow.RunResetConnector,
workflow.ResetConnector{
ConnectorID: connectorID,
DefaultWorkerName: getDefaultTaskQueue(e.stack),
TaskID: task.ID,
},
)
if err != nil {
otel.RecordError(span, err)
return err
}
task.Status = models.TASK_STATUS_FAILED
task.UpdatedAt = time.Now()
if err := e.storage.TasksUpsert(ctx, task); err != nil {
e.logger.Errorf("failed to update task status to failed: %v", err)
}

if err := run.Get(ctx, nil); err != nil {
otel.RecordError(span, err)
return err
return models.Task{}, err
}
return nil

return task, nil
}

func (e *engine) CreateFormanceAccount(ctx context.Context, account models.Account) error {
Expand Down
Loading

0 comments on commit 36ea334

Please sign in to comment.