Skip to content

Commit

Permalink
Merge pull request #109 from formancehq/feat/payments-v3-temporal-errors
Browse files Browse the repository at this point in the history
feat: (payments v3) Implement temporal specific error handling in connectors
  • Loading branch information
laouji authored Oct 2, 2024
2 parents fcabf2a + 5dd55f0 commit 9c1ca69
Show file tree
Hide file tree
Showing 84 changed files with 2,140 additions and 516 deletions.
2 changes: 1 addition & 1 deletion Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ generate:
RUN apk update && apk add openjdk11
DO --pass-args core+GO_INSTALL --package=go.uber.org/mock/mockgen@latest
COPY (+sources/*) /src
WORKDIR /src/components/payments
WORKDIR /src
DO --pass-args core+GO_GENERATE
SAVE ARTIFACT internal AS LOCAL internal

Expand Down
33 changes: 33 additions & 0 deletions internal/connectors/engine/activities/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package activities

import (
"errors"

"github.com/formancehq/payments/internal/connectors/engine/plugins"
"github.com/formancehq/payments/internal/connectors/httpwrapper"
"github.com/formancehq/payments/internal/models"
"go.temporal.io/sdk/temporal"
)

var nonRetryableErrors = []error{
httpwrapper.ErrStatusCodeClientError,
models.ErrMissingFromPayloadInRequest,
models.ErrMissingAccountInMetadata,
plugins.ErrNotFound,
}

func temporalError(err error, cause string) error {
isRetryable := true

for _, candidate := range nonRetryableErrors {
if errors.Is(err, candidate) {
isRetryable = false
break
}
}

if isRetryable {
return temporal.NewApplicationErrorWithCause(err.Error(), cause, err)
}
return temporal.NewNonRetryableApplicationError(err.Error(), cause, err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ type CreateBankAccountRequest struct {
func (a Activities) PluginCreateBankAccount(ctx context.Context, request CreateBankAccountRequest) (*models.CreateBankAccountResponse, error) {
plugin, err := a.plugins.Get(request.ConnectorID)
if err != nil {
return nil, err
return nil, temporalError(err, request.ConnectorID.Provider)
}

resp, err := plugin.CreateBankAccount(ctx, request.Req)
if err != nil {
// TODO(polo): temporal errors
return nil, err
return nil, temporalError(err, request.ConnectorID.Provider)
}
return &resp, nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package activities_test

import (
"fmt"
"net/http"
"testing"

"github.com/formancehq/go-libs/errorsutils"
"github.com/formancehq/payments/internal/connectors/engine/activities"
"github.com/formancehq/payments/internal/connectors/engine/plugins"
"github.com/formancehq/payments/internal/connectors/httpwrapper"
"github.com/formancehq/payments/internal/events"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/storage"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.temporal.io/sdk/temporal"
gomock "go.uber.org/mock/gomock"
)

func TestPlugin(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Activities Suite")
}

var _ = Describe("Plugin Create Bank Account", func() {
var (
act activities.Activities
p *plugins.MockPlugins
s *storage.MockStorage
evts *events.Events
sampleResponse models.CreateBankAccountResponse
)

BeforeEach(func() {
evts = &events.Events{}
sampleResponse = models.CreateBankAccountResponse{
RelatedAccount: models.PSPAccount{Reference: "ref"},
}
})

Context("plugin create bank account", func() {
var (
plugin *models.MockPlugin
req activities.CreateBankAccountRequest
)

BeforeEach(func() {
ctrl := gomock.NewController(GinkgoT())
p = plugins.NewMockPlugins(ctrl)
s = storage.NewMockStorage(ctrl)
plugin = models.NewMockPlugin(ctrl)
act = activities.New(s, evts, p)
req = activities.CreateBankAccountRequest{
ConnectorID: models.ConnectorID{
Provider: "some_provider",
},
}
})

It("calls underlying plugin", func(ctx SpecContext) {
p.EXPECT().Get(req.ConnectorID).Return(plugin, nil)
plugin.EXPECT().CreateBankAccount(ctx, req.Req).Return(sampleResponse, nil)
res, err := act.PluginCreateBankAccount(ctx, req)
Expect(err).To(BeNil())
Expect(res.RelatedAccount.Reference).To(Equal(sampleResponse.RelatedAccount.Reference))
})

It("returns a retryable temporal error", func(ctx SpecContext) {
p.EXPECT().Get(req.ConnectorID).Return(plugin, nil)
plugin.EXPECT().CreateBankAccount(ctx, req.Req).Return(sampleResponse, fmt.Errorf("some string"))
_, err := act.PluginCreateBankAccount(ctx, req)
Expect(err).ToNot(BeNil())
temporalErr, ok := err.(*temporal.ApplicationError)
Expect(ok).To(BeTrue())
Expect(temporalErr.NonRetryable()).To(BeFalse())
Expect(temporalErr.Type()).To(Equal(req.ConnectorID.Provider))
})

It("returns a non-retryable temporal error", func(ctx SpecContext) {
wrappedErr := fmt.Errorf("some string: %w", httpwrapper.ErrStatusCodeClientError)
newErr := errorsutils.NewErrorWithExitCode(wrappedErr, http.StatusTeapot)

p.EXPECT().Get(req.ConnectorID).Return(plugin, nil)
plugin.EXPECT().CreateBankAccount(ctx, req.Req).Return(sampleResponse, newErr)
_, err := act.PluginCreateBankAccount(ctx, req)
Expect(err).ToNot(BeNil())
temporalErr, ok := err.(*temporal.ApplicationError)
Expect(ok).To(BeTrue())
Expect(temporalErr.NonRetryable()).To(BeTrue())
Expect(temporalErr.Type()).To(Equal(req.ConnectorID.Provider))
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ type CreateWebhooksRequest struct {
func (a Activities) PluginCreateWebhooks(ctx context.Context, request CreateWebhooksRequest) (*models.CreateWebhooksResponse, error) {
plugin, err := a.plugins.Get(request.ConnectorID)
if err != nil {
return nil, err
return nil, temporalError(err, request.ConnectorID.Provider)
}

resp, err := plugin.CreateWebhooks(ctx, request.Req)
if err != nil {
// TODO(polo): temporal errors
return nil, err
return nil, temporalError(err, request.ConnectorID.Provider)
}
return &resp, nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package activities_test

import (
"fmt"
"net/http"

"github.com/formancehq/go-libs/errorsutils"
"github.com/formancehq/payments/internal/connectors/engine/activities"
"github.com/formancehq/payments/internal/connectors/engine/plugins"
"github.com/formancehq/payments/internal/connectors/httpwrapper"
"github.com/formancehq/payments/internal/events"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/storage"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.temporal.io/sdk/temporal"
gomock "go.uber.org/mock/gomock"
)

var _ = Describe("Plugin Create Webhooks", func() {
var (
act activities.Activities
p *plugins.MockPlugins
s *storage.MockStorage
evts *events.Events
sampleResponse models.CreateWebhooksResponse
)

BeforeEach(func() {
evts = &events.Events{}
sampleResponse = models.CreateWebhooksResponse{Others: make([]models.PSPOther, 0)}
})

Context("plugin create webhook", func() {
var (
plugin *models.MockPlugin
req activities.CreateWebhooksRequest
)

BeforeEach(func() {
ctrl := gomock.NewController(GinkgoT())
p = plugins.NewMockPlugins(ctrl)
s = storage.NewMockStorage(ctrl)
plugin = models.NewMockPlugin(ctrl)
act = activities.New(s, evts, p)
req = activities.CreateWebhooksRequest{
ConnectorID: models.ConnectorID{
Provider: "some_provider",
},
}
})

It("calls underlying plugin", func(ctx SpecContext) {
p.EXPECT().Get(req.ConnectorID).Return(plugin, nil)
plugin.EXPECT().CreateWebhooks(ctx, req.Req).Return(sampleResponse, nil)
res, err := act.PluginCreateWebhooks(ctx, req)
Expect(err).To(BeNil())
Expect(res.Others).To(Equal(sampleResponse.Others))
})

It("returns a retryable temporal error", func(ctx SpecContext) {
p.EXPECT().Get(req.ConnectorID).Return(plugin, nil)
plugin.EXPECT().CreateWebhooks(ctx, req.Req).Return(sampleResponse, fmt.Errorf("some string"))
_, err := act.PluginCreateWebhooks(ctx, req)
Expect(err).ToNot(BeNil())
temporalErr, ok := err.(*temporal.ApplicationError)
Expect(ok).To(BeTrue())
Expect(temporalErr.NonRetryable()).To(BeFalse())
Expect(temporalErr.Type()).To(Equal(req.ConnectorID.Provider))
})

It("returns a non-retryable temporal error", func(ctx SpecContext) {
wrappedErr := fmt.Errorf("some string: %w", httpwrapper.ErrStatusCodeClientError)
newErr := errorsutils.NewErrorWithExitCode(wrappedErr, http.StatusTeapot)

p.EXPECT().Get(req.ConnectorID).Return(plugin, nil)
plugin.EXPECT().CreateWebhooks(ctx, req.Req).Return(sampleResponse, newErr)
_, err := act.PluginCreateWebhooks(ctx, req)
Expect(err).ToNot(BeNil())
temporalErr, ok := err.(*temporal.ApplicationError)
Expect(ok).To(BeTrue())
Expect(temporalErr.NonRetryable()).To(BeTrue())
Expect(temporalErr.Type()).To(Equal(req.ConnectorID.Provider))
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ type FetchNextAccountsRequest struct {
func (a Activities) PluginFetchNextAccounts(ctx context.Context, request FetchNextAccountsRequest) (*models.FetchNextAccountsResponse, error) {
plugin, err := a.plugins.Get(request.ConnectorID)
if err != nil {
return nil, err
return nil, temporalError(err, request.ConnectorID.Provider)
}

resp, err := plugin.FetchNextAccounts(ctx, request.Req)
if err != nil {
// TODO(polo): temporal errors
return nil, err
return nil, temporalError(err, request.ConnectorID.Provider)
}
return &resp, nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package activities_test

import (
"fmt"
"net/http"

"github.com/formancehq/go-libs/errorsutils"
"github.com/formancehq/payments/internal/connectors/engine/activities"
"github.com/formancehq/payments/internal/connectors/engine/plugins"
"github.com/formancehq/payments/internal/connectors/httpwrapper"
"github.com/formancehq/payments/internal/events"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/storage"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.temporal.io/sdk/temporal"
gomock "go.uber.org/mock/gomock"
)

var _ = Describe("Plugin Fetch Next Accounts", func() {
var (
act activities.Activities
p *plugins.MockPlugins
s *storage.MockStorage
evts *events.Events
sampleResponse models.FetchNextAccountsResponse
)

BeforeEach(func() {
evts = &events.Events{}
sampleResponse = models.FetchNextAccountsResponse{HasMore: true}
})

Context("plugin fetch next accounts", func() {
var (
plugin *models.MockPlugin
req activities.FetchNextAccountsRequest
)

BeforeEach(func() {
ctrl := gomock.NewController(GinkgoT())
p = plugins.NewMockPlugins(ctrl)
s = storage.NewMockStorage(ctrl)
plugin = models.NewMockPlugin(ctrl)
act = activities.New(s, evts, p)
req = activities.FetchNextAccountsRequest{
ConnectorID: models.ConnectorID{
Provider: "some_provider",
},
}
})

It("calls underlying plugin", func(ctx SpecContext) {
p.EXPECT().Get(req.ConnectorID).Return(plugin, nil)
plugin.EXPECT().FetchNextAccounts(ctx, req.Req).Return(sampleResponse, nil)
res, err := act.PluginFetchNextAccounts(ctx, req)
Expect(err).To(BeNil())
Expect(res.HasMore).To(Equal(sampleResponse.HasMore))
})

It("returns a retryable temporal error", func(ctx SpecContext) {
p.EXPECT().Get(req.ConnectorID).Return(plugin, nil)
plugin.EXPECT().FetchNextAccounts(ctx, req.Req).Return(sampleResponse, fmt.Errorf("some string"))
_, err := act.PluginFetchNextAccounts(ctx, req)
Expect(err).ToNot(BeNil())
temporalErr, ok := err.(*temporal.ApplicationError)
Expect(ok).To(BeTrue())
Expect(temporalErr.NonRetryable()).To(BeFalse())
Expect(temporalErr.Type()).To(Equal(req.ConnectorID.Provider))
})

It("returns a non-retryable temporal error", func(ctx SpecContext) {
wrappedErr := fmt.Errorf("some string: %w", httpwrapper.ErrStatusCodeClientError)
newErr := errorsutils.NewErrorWithExitCode(wrappedErr, http.StatusTeapot)

p.EXPECT().Get(req.ConnectorID).Return(plugin, nil)
plugin.EXPECT().FetchNextAccounts(ctx, req.Req).Return(sampleResponse, newErr)
_, err := act.PluginFetchNextAccounts(ctx, req)
Expect(err).ToNot(BeNil())
temporalErr, ok := err.(*temporal.ApplicationError)
Expect(ok).To(BeTrue())
Expect(temporalErr.NonRetryable()).To(BeTrue())
Expect(temporalErr.Type()).To(Equal(req.ConnectorID.Provider))
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ type FetchNextBalancesRequest struct {
func (a Activities) PluginFetchNextBalances(ctx context.Context, request FetchNextBalancesRequest) (*models.FetchNextBalancesResponse, error) {
plugin, err := a.plugins.Get(request.ConnectorID)
if err != nil {
return nil, err
return nil, temporalError(err, request.ConnectorID.Provider)
}

resp, err := plugin.FetchNextBalances(ctx, request.Req)
if err != nil {
// TODO(polo): temporal errors
return nil, err
return nil, temporalError(err, request.ConnectorID.Provider)
}
return &resp, nil
}
Expand Down
Loading

0 comments on commit 9c1ca69

Please sign in to comment.