Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: (temporal) add initial delay when being rate-limited by PSP #236

Merged
merged 16 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
StackFlag = "stack"
stackPublicURLFlag = "stack-public-url"
temporalMaxConcurrentWorkflowTaskPollersFlag = "temporal-max-concurrent-workflow-task-pollers"
temporalRateLimitingRetryDelay = "temporal-rate-limiting-retry-delay"
)

func NewRootCommand() *cobra.Command {
Expand Down
4 changes: 4 additions & 0 deletions cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"fmt"
"time"

"github.com/formancehq/go-libs/v2/service"
"github.com/formancehq/go-libs/v2/temporal"
Expand All @@ -25,6 +26,7 @@ func newWorker() *cobra.Command {
// their recommendation.
cmd.Flags().Int(temporalMaxConcurrentWorkflowTaskPollersFlag, 20, "Max concurrent workflow task pollers")
cmd.Flags().String(stackPublicURLFlag, "", "Stack public url")
cmd.Flags().Duration(temporalRateLimitingRetryDelay, 5*time.Second, "Additional delay before a rate limited request is retried by Temporal workers")
return cmd
}

Expand Down Expand Up @@ -54,13 +56,15 @@ func workerOptions(cmd *cobra.Command) (fx.Option, error) {
stack, _ := cmd.Flags().GetString(StackFlag)
stackPublicURL, _ := cmd.Flags().GetString(stackPublicURLFlag)
temporalNamespace, _ := cmd.Flags().GetString(temporal.TemporalNamespaceFlag)
temporalRateLimitingRetryDelay, _ := cmd.Flags().GetDuration(temporalRateLimitingRetryDelay)
temporalMaxConcurrentWorkflowTaskPollers, _ := cmd.Flags().GetInt(temporalMaxConcurrentWorkflowTaskPollersFlag)
return fx.Options(
worker.NewHealthCheckModule(listen, service.IsDebug(cmd)),
worker.NewModule(
stack,
stackPublicURL,
temporalNamespace,
temporalRateLimitingRetryDelay,
temporalMaxConcurrentWorkflowTaskPollers,
service.IsDebug(cmd),
),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ require (
github.com/go-openapi/strfmt v0.23.0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/hashicorp/go-hclog v1.6.3
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/invopop/jsonschema v0.13.0
github.com/jackc/pgx/v5 v5.7.2
Expand Down Expand Up @@ -126,6 +125,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-retryablehttp v0.7.7 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
Expand Down
1 change: 1 addition & 0 deletions internal/api/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Backend interface {
// Connectors
ConnectorsConfigs() plugins.Configs
ConnectorsConfig(ctx context.Context, connectorID models.ConnectorID) (json.RawMessage, error)
ConnectorsConfigUpdate(ctx context.Context, connector models.Connector) error
ConnectorsList(ctx context.Context, query storage.ListConnectorsQuery) (*bunpaginate.Cursor[models.Connector], error)
ConnectorsInstall(ctx context.Context, provider string, config json.RawMessage) (models.ConnectorID, error)
ConnectorsUninstall(ctx context.Context, connectorID models.ConnectorID) (models.Task, error)
Expand Down
15 changes: 15 additions & 0 deletions internal/api/backend/backend_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions internal/api/services/connectors_config_update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package services

import (
"context"

"github.com/formancehq/payments/internal/models"
)

func (s *Service) ConnectorsConfigUpdate(ctx context.Context, connector models.Connector) error {
err := s.storage.ConnectorsConfigUpdate(ctx, connector)
if err != nil {
return newStorageError(err, "update connector")
}
return nil
}
64 changes: 64 additions & 0 deletions internal/api/v3/handler_connectors_config_update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package v3

import (
"encoding/json"
"io"
"net/http"

"github.com/formancehq/go-libs/v2/api"
"github.com/formancehq/payments/internal/api/backend"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/otel"
"go.opentelemetry.io/otel/attribute"
)

func connectorsConfigUpdate(backend backend.Backend) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Tracer().Start(r.Context(), "v3_connectorsConfigUpdate")
defer span.End()

span.SetAttributes(attribute.String("connectorID", connectorID(r)))
connectorID, err := models.ConnectorIDFromString(connectorID(r))
if err != nil {
otel.RecordError(span, err)
api.BadRequest(w, ErrInvalidID, err)
return
}

rawConfig, err := io.ReadAll(r.Body)
if err != nil {
otel.RecordError(span, err)
api.BadRequest(w, ErrMissingOrInvalidBody, err)
return
}

span.SetAttributes(attribute.String("config", string(rawConfig)))

config := models.DefaultConfig()
if err := json.Unmarshal(rawConfig, &config); err != nil {
otel.RecordError(span, err)
api.BadRequest(w, ErrMissingOrInvalidBody, err)
return
}

if err := config.Validate(); err != nil {
otel.RecordError(span, err)
api.BadRequest(w, ErrValidation, err)
return
}

connector := models.Connector{
ID: connectorID,
Name: config.Name,
Provider: connectorID.Provider,
Config: rawConfig,
}
laouji marked this conversation as resolved.
Show resolved Hide resolved
err = backend.ConnectorsConfigUpdate(ctx, connector)
if err != nil {
otel.RecordError(span, err)
handleServiceErrors(w, r, err)
return
}
api.NoContent(w)
}
laouji marked this conversation as resolved.
Show resolved Hide resolved
}
79 changes: 79 additions & 0 deletions internal/api/v3/handler_connectors_config_update_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package v3

import (
"fmt"
"net/http"
"net/http/httptest"

"github.com/formancehq/payments/internal/api/backend"
"github.com/formancehq/payments/internal/models"
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

var _ = Describe("API v3 Connector Config Update", func() {
var (
handlerFn http.HandlerFunc
)

Context("update a connector config", func() {
var (
w *httptest.ResponseRecorder
m *backend.MockBackend

connectorID models.ConnectorID
connector models.Connector
config models.Config
)
BeforeEach(func() {
w = httptest.NewRecorder()
ctrl := gomock.NewController(GinkgoT())
m = backend.NewMockBackend(ctrl)
handlerFn = connectorsConfigUpdate(m)
connectorID = models.ConnectorID{
Reference: uuid.New(),
Provider: "dummypay",
}
connectorName := "some-name"
connector = models.Connector{
ID: connectorID,
Name: connectorName,
Provider: connectorID.Provider,
}
config = models.DefaultConfig()
config.Name = connectorName
conf, err := config.MarshalJSON()
require.Nil(GinkgoT(), err)
connector.Config = conf
})

It("should return a validation error when name missing", func(ctx SpecContext) {
config.Name = ""
handlerFn(w, prepareJSONRequestWithQuery(http.MethodPatch, "connectorID", connectorID.String(), &config))

assertExpectedResponse(w.Result(), http.StatusBadRequest, "VALIDATION")
})

It("should return a validation error when connector ID is invalid", func(ctx SpecContext) {
handlerFn(w, prepareJSONRequestWithQuery(http.MethodPatch, "connectorID", "invalidID", &config))

assertExpectedResponse(w.Result(), http.StatusBadRequest, "INVALID_ID")
})

It("should return an internal server error when backend returns error", func(ctx SpecContext) {
m.EXPECT().ConnectorsConfigUpdate(gomock.Any(), gomock.Any()).Return(
fmt.Errorf("connector update err"),
)
handlerFn(w, prepareJSONRequestWithQuery(http.MethodPatch, "connectorID", connectorID.String(), &config))
assertExpectedResponse(w.Result(), http.StatusInternalServerError, "INTERNAL")
})

It("should return status no content on success", func(ctx SpecContext) {
m.EXPECT().ConnectorsConfigUpdate(gomock.Any(), connector).Return(nil)
handlerFn(w, prepareJSONRequestWithQuery(http.MethodPatch, "connectorID", connectorID.String(), &config))
assertExpectedResponse(w.Result(), http.StatusNoContent, "")
})
})
})
2 changes: 1 addition & 1 deletion internal/api/v3/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func assertExpectedResponse(res *http.Response, expectedStatusCode int, expected

data, err := ioutil.ReadAll(res.Body)
Expect(err).To(BeNil())
Expect(data).To(ContainSubstring(expectedBodyString))
Expect(string(data)).To(ContainSubstring(expectedBodyString))
}

func prepareJSONRequest(method string, a any) *http.Request {
Expand Down
2 changes: 1 addition & 1 deletion internal/api/v3/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ func newRouter(backend backend.Backend, info api.ServiceInfo, a auth.Authenticat
r.Route("/{connectorID}", func(r chi.Router) {
r.Delete("/", connectorsUninstall(backend))
r.Get("/config", connectorsConfig(backend))
r.Patch("/config", connectorsConfigUpdate(backend))
r.Post("/reset", connectorsReset(backend))

r.Get("/schedules", schedulesList(backend))
r.Route("/schedules/{scheduleID}", func(r chi.Router) {
r.Get("/instances", workflowsInstancesList(backend))
})
// TODO(polo): add update config handler
})
})

Expand Down
24 changes: 19 additions & 5 deletions internal/connectors/engine/activities/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package activities

import (
"errors"
"time"

"github.com/formancehq/go-libs/v2/logging"
temporalworker "github.com/formancehq/go-libs/v2/temporal"
"github.com/formancehq/payments/internal/connectors/engine/plugins"
"github.com/formancehq/payments/internal/events"
Expand All @@ -13,10 +15,13 @@ import (
)

type Activities struct {
logger logging.Logger
storage storage.Storage
events *events.Events
temporalClient client.Client

rateLimitingRetryDelay time.Duration

plugins plugins.Plugins
}

Expand Down Expand Up @@ -320,12 +325,21 @@ func (a Activities) DefinitionSet() temporalworker.DefinitionSet {
})
}

func New(temporalClient client.Client, storage storage.Storage, events *events.Events, plugins plugins.Plugins) Activities {
func New(
logger logging.Logger,
temporalClient client.Client,
storage storage.Storage,
events *events.Events,
plugins plugins.Plugins,
rateLimitingRetryDelay time.Duration,
) Activities {
laouji marked this conversation as resolved.
Show resolved Hide resolved
return Activities{
temporalClient: temporalClient,
storage: storage,
plugins: plugins,
events: events,
logger: logger,
temporalClient: temporalClient,
storage: storage,
plugins: plugins,
events: events,
rateLimitingRetryDelay: rateLimitingRetryDelay,
}
}

Expand Down
41 changes: 40 additions & 1 deletion internal/connectors/engine/activities/errors.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
package activities

import (
"context"
"errors"
"regexp"

"github.com/formancehq/payments/internal/connectors/plugins"
"github.com/formancehq/payments/internal/storage"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/temporal"
)

const (
ErrTypeStorage = "STORAGE"
ErrTypeDefault = "DEFAULT"
ErrTypeInvalidArgument = "INVALID_ARGUMENT"
ErrTypeRateLimited = "RATE_LIMITED"
ErrTypeUnimplemented = "UNIMPLEMENTED"
)

func temporalPluginError(err error) error {
var scheduleSuffix = regexp.MustCompile(`\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing all the match regexp, why not passing a boolean in the struct request of the workflow ?

It will prevent in the future some errors if the schedule suffix is changed no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any documents that guarantee that they will keep this naming convention, so I guess you are right. My latest commit modifies the request struct to include the periodicity.


func (a Activities) temporalPluginError(ctx context.Context, err error) error {
return a.temporalPluginErrorCheck(ctx, err, false)
}

func (a Activities) temporalPluginPollingError(ctx context.Context, err error) error {
return a.temporalPluginErrorCheck(ctx, err, true)
}

func (a Activities) temporalPluginErrorCheck(ctx context.Context, err error, isPolling bool) error {

switch {
// Do not retry the following errors
case errors.Is(err, plugins.ErrNotImplemented):
Expand All @@ -25,6 +40,30 @@ func temporalPluginError(err error) error {
case errors.Is(err, plugins.ErrCurrencyNotSupported):
return temporal.NewNonRetryableApplicationError(err.Error(), ErrTypeInvalidArgument, err)

// Potentially retry
case errors.Is(err, plugins.ErrUpstreamRatelimit):
// many polled tasks are on a schedule so we can often skip retry in case of rate-limiting
if isPolling {
info := activity.GetInfo(ctx)

// if this polling activity was triggered by a schedule, the workflow ID will be suffixed with
// YYYY-MM-DDTHH:MM:SSZ
if scheduleSuffix.MatchString(info.WorkflowExecution.ID) {
a.logger.WithFields(map[string]any{
"workflow_type": info.WorkflowType.Name,
"scheduled_time": info.ScheduledTime.String(),
"workflow_id": info.WorkflowExecution.ID,
}).Debug("disabling retry for polled activity triggered by schedule due to rate-limit")
return temporal.NewNonRetryableApplicationError(err.Error(), ErrTypeRateLimited, err)
}
}

return temporal.NewApplicationErrorWithOptions(err.Error(), ErrTypeRateLimited, temporal.ApplicationErrorOptions{
// temporal already implements a backoff strategy, but let's add an extra delay before the next retry
// https://docs.temporal.io/encyclopedia/retry-policies#per-error-next-retry-delay
NextRetryDelay: a.rateLimitingRetryDelay,
})

// Retry the following errors
case errors.Is(err, plugins.ErrNotYetInstalled):
// We want to retry in case of not installed
Expand Down
Loading
Loading