Skip to content

Commit

Permalink
Merge pull request #126 from formancehq/feat/update-pi-from-payment
Browse files Browse the repository at this point in the history
feat(payments): update payment initiation from payment
  • Loading branch information
paul-nicolas authored Oct 16, 2024
2 parents b01b0c3 + 8c92441 commit 517d15e
Show file tree
Hide file tree
Showing 17 changed files with 239 additions and 17 deletions.
8 changes: 6 additions & 2 deletions internal/connectors/engine/activities/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,12 @@ func (a Activities) DefinitionSet() temporalworker.DefinitionSet {
Func: a.StoragePaymentInitiationsRelatedPaymentsStore,
}).
Append(temporalworker.Definition{
Name: "PaymentInitiationsAdjustmentsStore",
Func: a.PaymentInitiationsAdjustmentsStore,
Name: "StoragePaymentInitiationsAdjustmentsStore",
Func: a.StoragePaymentInitiationsAdjustmentsStore,
}).
Append(temporalworker.Definition{
Name: "StoragePaymentInitiationIDsListFromPaymentID",
Func: a.StoragePaymentInitiationIDsListFromPaymentID,
}).
Append(temporalworker.Definition{
Name: "EventsSendAccount",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"go.temporal.io/sdk/workflow"
)

func (a Activities) PaymentInitiationsAdjustmentsStore(ctx context.Context, adj models.PaymentInitiationAdjustment) error {
func (a Activities) StoragePaymentInitiationsAdjustmentsStore(ctx context.Context, adj models.PaymentInitiationAdjustment) error {
return a.storage.PaymentInitiationAdjustmentsUpsert(ctx, adj)
}

var PaymentInitiationsAdjustmentsStoreActivity = Activities{}.PaymentInitiationsAdjustmentsStore
var StoragePaymentInitiationsAdjustmentsStoreActivity = Activities{}.StoragePaymentInitiationsAdjustmentsStore

func PaymentInitiationsAdjustmentsStore(ctx workflow.Context, adj models.PaymentInitiationAdjustment) error {
return executeActivity(ctx, PaymentInitiationsAdjustmentsStoreActivity, nil, adj)
func StoragePaymentInitiationsAdjustmentsStore(ctx workflow.Context, adj models.PaymentInitiationAdjustment) error {
return executeActivity(ctx, StoragePaymentInitiationsAdjustmentsStoreActivity, nil, adj)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package activities

import (
"context"

"github.com/formancehq/payments/internal/models"
"go.temporal.io/sdk/workflow"
)

func (a Activities) StoragePaymentInitiationIDsListFromPaymentID(ctx context.Context, paymentID models.PaymentID) ([]models.PaymentInitiationID, error) {
return a.storage.PaymentInitiationIDsListFromPaymentID(ctx, paymentID)
}

var StoragePaymentInitiationIDsListFromPaymentIDActivity = Activities{}.StoragePaymentInitiationIDsListFromPaymentID

func StoragePaymentInitiationIDsListFromPaymentID(ctx workflow.Context, paymentID models.PaymentID) ([]models.PaymentInitiationID, error) {
ret := []models.PaymentInitiationID{}
if err := executeActivity(ctx, StoragePaymentInitiationIDsListFromPaymentIDActivity, &ret, paymentID); err != nil {
return nil, err
}
return ret, nil
}
2 changes: 1 addition & 1 deletion internal/connectors/engine/workflow/create_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (w Workflow) addPIAdjustment(
Metadata: metadata,
}

return activities.PaymentInitiationsAdjustmentsStore(
return activities.StoragePaymentInitiationsAdjustmentsStore(
infiniteRetryContext(ctx),
adj,
)
Expand Down
30 changes: 29 additions & 1 deletion internal/connectors/engine/workflow/fetch_payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,41 @@ func (w Workflow) fetchNextPayments(
}

wg := workflow.NewWaitGroup(ctx)
errChan := make(chan error, len(paymentsResponse.Payments)*2)
errChan := make(chan error, len(paymentsResponse.Payments)*3)
for _, payment := range payments {
p := payment

wg.Add(1)
workflow.Go(ctx, func(ctx workflow.Context) {
defer wg.Done()

// We want to update the payment initiation from the payment
// if it exists
if err := workflow.ExecuteChildWorkflow(
workflow.WithChildOptions(
ctx,
workflow.ChildWorkflowOptions{
TaskQueue: fetchNextPayments.ConnectorID.String(),
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
SearchAttributes: map[string]interface{}{
SearchAttributeStack: w.stack,
},
},
),
RunUpdatePaymentInitiationFromPayment,
UpdatePaymentInitiationFromPayment{
Payment: &p,
},
).Get(ctx, nil); err != nil {
errChan <- errors.Wrap(err, "sending events")
}
})

wg.Add(1)
workflow.Go(ctx, func(ctx workflow.Context) {
defer wg.Done()

// Send the payment event
if err := workflow.ExecuteChildWorkflow(
workflow.WithChildOptions(
ctx,
Expand Down Expand Up @@ -118,6 +145,7 @@ func (w Workflow) fetchNextPayments(
errChan <- errors.Wrap(err, "marshalling payment")
}

// Run next tasks
if err := workflow.ExecuteChildWorkflow(
workflow.WithChildOptions(
ctx,
Expand Down
2 changes: 1 addition & 1 deletion internal/connectors/engine/workflow/handle_webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (w Workflow) runHandleWebhooks(
return err
}
} else {
fmt.Errorf("storing webhook translation: %w", err)
return fmt.Errorf("storing webhook translation: %w", err)
}
}
}
Expand Down
9 changes: 2 additions & 7 deletions internal/connectors/engine/workflow/terminate_schedules.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,19 @@ import (
"github.com/formancehq/go-libs/bun/bunpaginate"
"github.com/formancehq/go-libs/query"
"github.com/formancehq/payments/internal/connectors/engine/activities"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/storage"
"go.temporal.io/sdk/workflow"
)

type TerminateSchedules struct {
ConnectorID models.ConnectorID
}

func (w Workflow) runTerminateSchedules(
ctx workflow.Context,
terminateSchedules TerminateSchedules,
uninstallConnector UninstallConnector,
) error {
query := storage.NewListSchedulesQuery(
bunpaginate.NewPaginatedQueryOptions(storage.ScheduleQuery{}).
WithPageSize(100).
WithQueryBuilder(
query.Match("connector_id", terminateSchedules.ConnectorID.String()),
query.Match("connector_id", uninstallConnector.ConnectorID.String()),
),
)
for {
Expand Down
2 changes: 1 addition & 1 deletion internal/connectors/engine/workflow/uninstall_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (w Workflow) runUninstallConnector(
},
),
RunTerminateSchedules,
TerminateSchedules{
UninstallConnector{
ConnectorID: uninstallConnector.ConnectorID,
},
).Get(ctx, nil); err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package workflow

import (
"github.com/formancehq/payments/internal/connectors/engine/activities"
"github.com/formancehq/payments/internal/models"
"go.temporal.io/sdk/workflow"
)

type UpdatePaymentInitiationFromPayment struct {
Payment *models.Payment
}

func (w Workflow) runUpdatePaymentInitiationFromPayment(
ctx workflow.Context,
updatePaymentInitiationFromPayment UpdatePaymentInitiationFromPayment,
) error {
piIDs, err := activities.StoragePaymentInitiationIDsListFromPaymentID(
infiniteRetryContext(ctx),
updatePaymentInitiationFromPayment.Payment.ID,
)
if err != nil {
return err
}

if len(piIDs) == 0 {
// Nothing to do here
return nil
}

for _, piID := range piIDs {
adjustment := models.FromPaymentToPaymentInitiationAdjustment(
updatePaymentInitiationFromPayment.Payment,
piID,
)

if adjustment == nil {
continue
}

if err := activities.StoragePaymentInitiationsAdjustmentsStore(
infiniteRetryContext(ctx),
*adjustment,
); err != nil {
return err
}
}

return nil
}

var RunUpdatePaymentInitiationFromPayment any

func init() {
RunUpdatePaymentInitiationFromPayment = Workflow{}.runUpdatePaymentInitiationFromPayment
}
4 changes: 4 additions & 0 deletions internal/connectors/engine/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,9 @@ func (w Workflow) DefinitionSet() temporalworker.DefinitionSet {
Append(temporalworker.Definition{
Name: "RunSendEvents",
Func: w.runSendEvents,
}).
Append(temporalworker.Definition{
Name: "RunUpdatePaymentInitiationFromPayment",
Func: w.runUpdatePaymentInitiationFromPayment,
})
}
6 changes: 6 additions & 0 deletions internal/models/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func (t Capability) String() string {
return "FETCH_PAYMENTS"
case CAPABILITY_FETCH_OTHERS:
return "FETCH_OTHERS"
case CAPABILITY_FETCH_BALANCES:
return "FETCH_BALANCES"
case CAPABILITY_WEBHOOKS:
return "WEBHOOKS"
case CAPABILITY_CREATION_BANK_ACCOUNT:
Expand All @@ -47,6 +49,8 @@ func (t Capability) Value() (driver.Value, error) {
return "FETCH_ACCOUNTS", nil
case CAPABILITY_FETCH_EXTERNAL_ACCOUNTS:
return "FETCH_EXTERNAL_ACCOUNTS", nil
case CAPABILITY_FETCH_BALANCES:
return "FETCH_BALANCES", nil
case CAPABILITY_FETCH_PAYMENTS:
return "FETCH_PAYMENTS", nil
case CAPABILITY_FETCH_OTHERS:
Expand Down Expand Up @@ -84,6 +88,8 @@ func (t *Capability) Scan(value interface{}) error {
*t = CAPABILITY_FETCH_EXTERNAL_ACCOUNTS
case "FETCH_PAYMENTS":
*t = CAPABILITY_FETCH_PAYMENTS
case "FETCH_BALANCES":
*t = CAPABILITY_FETCH_BALANCES
case "FETCH_OTHERS":
*t = CAPABILITY_FETCH_OTHERS
case "WEBHOOKS":
Expand Down
46 changes: 46 additions & 0 deletions internal/models/payments_initiation_adjusments_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,49 @@ func (t *PaymentInitiationAdjustmentStatus) Scan(value interface{}) error {

return nil
}

func FromPaymentToPaymentInitiationAdjustment(from *Payment, piID PaymentInitiationID) *PaymentInitiationAdjustment {
var status PaymentInitiationAdjustmentStatus
var err error

switch from.Status {
case PAYMENT_STATUS_AMOUNT_ADJUSTEMENT, PAYMENT_STATUS_UNKNOWN:
// No need to add an adjustment for this payment initiation
return nil
case PAYMENT_STATUS_PENDING, PAYMENT_STATUS_AUTHORISATION:
status = PAYMENT_INITIATION_ADJUSTMENT_STATUS_PROCESSING
case PAYMENT_STATUS_SUCCEEDED,
PAYMENT_STATUS_CAPTURE,
PAYMENT_STATUS_REFUND_REVERSED,
PAYMENT_STATUS_DISPUTE_WON:
status = PAYMENT_INITIATION_ADJUSTMENT_STATUS_PROCESSED
case PAYMENT_STATUS_CANCELLED,
PAYMENT_STATUS_CAPTURE_FAILED,
PAYMENT_STATUS_EXPIRED,
PAYMENT_STATUS_FAILED,
PAYMENT_STATUS_DISPUTE_LOST:
status = PAYMENT_INITIATION_ADJUSTMENT_STATUS_FAILED
err = errors.New("payment failed")
case PAYMENT_STATUS_DISPUTE:
status = PAYMENT_INITIATION_ADJUSTMENT_STATUS_UNKNOWN
case PAYMENT_STATUS_REFUNDED:
status = PAYMENT_INITIATION_ADJUSTMENT_STATUS_REVERSED
case PAYMENT_STATUS_REFUNDED_FAILURE:
status = PAYMENT_INITIATION_ADJUSTMENT_STATUS_REVERSE_FAILED
err = errors.New("payment refund failed")
default:
return nil
}

return &PaymentInitiationAdjustment{
ID: PaymentInitiationAdjustmentID{
PaymentInitiationID: piID,
CreatedAt: from.CreatedAt,
Status: status,
},
PaymentInitiationID: piID,
CreatedAt: from.CreatedAt,
Status: status,
Error: err,
}
}
19 changes: 19 additions & 0 deletions internal/storage/payment_initiations.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,25 @@ func (s *store) PaymentInitiationRelatedPaymentsUpsert(ctx context.Context, piID
return nil
}

func (s *store) PaymentInitiationIDsListFromPaymentID(ctx context.Context, id models.PaymentID) ([]models.PaymentInitiationID, error) {
var paymentInitiationRelatedPayments []paymentInitiationRelatedPayment
err := s.db.NewSelect().
Model(&paymentInitiationRelatedPayments).
Column("payment_initiation_id").
Where("payment_id = ?", id).
Scan(ctx)
if err != nil {
return nil, e("failed to get payment initiation related payments", err)
}

ids := make([]models.PaymentInitiationID, 0, len(paymentInitiationRelatedPayments))
for _, pi := range paymentInitiationRelatedPayments {
ids = append(ids, pi.PaymentInitiationID)
}

return ids, nil
}

type PaymentInitiationRelatedPaymentsQuery struct{}

type ListPaymentInitiationRelatedPaymentsQuery bunpaginate.OffsetPaginatedQuery[bunpaginate.PaginatedQueryOptions[PaymentInitiationRelatedPaymentsQuery]]
Expand Down
27 changes: 27 additions & 0 deletions internal/storage/payment_initiations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,33 @@ func TestPaymentInitiationsRelatedPaymentUpsert(t *testing.T) {
})
}

func TestPaymentInitiationIDsFromPaymentID(t *testing.T) {
t.Parallel()

ctx := logging.TestingContext()
store := newStore(t)

upsertConnector(t, ctx, store, defaultConnector)
upsertAccounts(t, ctx, store, defaultAccounts)
upsertPayments(t, ctx, store, defaultPayments)
upsertPaymentInitiations(t, ctx, store, defaultPaymentInitiations)
upsertPaymentInitiationRelatedPayments(t, ctx, store)

t.Run("unknown payment id", func(t *testing.T) {
ids, err := store.PaymentInitiationIDsListFromPaymentID(ctx, models.PaymentID{})
require.NoError(t, err)
require.Len(t, ids, 0)
})

t.Run("known payment id", func(t *testing.T) {
ids, err := store.PaymentInitiationIDsListFromPaymentID(ctx, defaultPayments[0].ID)
require.NoError(t, err)
require.Len(t, ids, 2)
require.Contains(t, ids, piID1)
require.Contains(t, ids, piID2)
})
}

func TestPaymentInitiationRelatedPaymentsList(t *testing.T) {
t.Parallel()

Expand Down
1 change: 1 addition & 0 deletions internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Storage interface {
PaymentInitiationsDelete(ctx context.Context, piID models.PaymentInitiationID) error
PaymentInitiationsDeleteFromConnectorID(ctx context.Context, connectorID models.ConnectorID) error
PaymentInitiationsList(ctx context.Context, q ListPaymentInitiationsQuery) (*bunpaginate.Cursor[models.PaymentInitiation], error)
PaymentInitiationIDsListFromPaymentID(ctx context.Context, id models.PaymentID) ([]models.PaymentInitiationID, error)

// Payment Initiation Adjustments
PaymentInitiationAdjustmentsUpsert(ctx context.Context, adj models.PaymentInitiationAdjustment) error
Expand Down
15 changes: 15 additions & 0 deletions internal/storage/storage_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 517d15e

Please sign in to comment.