diff --git a/internal/connectors/engine/workflow/plugin_workflow.go b/internal/connectors/engine/workflow/plugin_workflow.go index 6f2b4e90..7deb9ce9 100644 --- a/internal/connectors/engine/workflow/plugin_workflow.go +++ b/internal/connectors/engine/workflow/plugin_workflow.go @@ -113,6 +113,9 @@ func (w Workflow) run( Every: config.PollingPeriod, }, Action: client.ScheduleWorkflowAction{ + // Use the same ID as the schedule ID, so we can identify the workflows running. + // This is useful for debugging purposes. + ID: scheduleID, Workflow: nextWorkflow, Args: []interface{}{ request, diff --git a/internal/connectors/engine/workflow/terminate_workflows.go b/internal/connectors/engine/workflow/terminate_workflows.go index cf2709a5..a593596a 100644 --- a/internal/connectors/engine/workflow/terminate_workflows.go +++ b/internal/connectors/engine/workflow/terminate_workflows.go @@ -4,16 +4,22 @@ import ( "fmt" "github.com/formancehq/payments/internal/connectors/engine/activities" + "github.com/formancehq/payments/internal/models" "go.temporal.io/api/enums/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/workflow" ) +type TerminateWorkflows struct { + ConnectorID models.ConnectorID + NextPageToken []byte +} + func (w Workflow) runTerminateWorkflows( ctx workflow.Context, - uninstallConnector UninstallConnector, + terminateWorkflows TerminateWorkflows, ) error { - var nextPageToken []byte + var nextPageToken []byte = terminateWorkflows.NextPageToken for { resp, err := activities.TemporalWorkflowExecutionsList( @@ -22,7 +28,7 @@ func (w Workflow) runTerminateWorkflows( Namespace: w.temporalNamespace, PageSize: 100, NextPageToken: nextPageToken, - Query: fmt.Sprintf("Stack=\"%s\" and TaskQueue=\"%s\"", w.stack, uninstallConnector.ConnectorID.String()), + Query: fmt.Sprintf("Stack=\"%s\" and TaskQueue=\"%s\"", w.stack, terminateWorkflows.ConnectorID.String()), }, ) if err != nil { @@ -65,6 +71,18 @@ func (w Workflow) runTerminateWorkflows( } nextPageToken = resp.NextPageToken + + workflowInfo := workflow.GetInfo(ctx) + if workflowInfo.GetContinueAsNewSuggested() { + // Because we can have lots and lots of workflows, sometimes, we + // will exceed the maximum history size or length of a workflow. + // When that arrive, the workflow will be forced to terminate. + // We need to continue as new to avoid this. + return workflow.NewContinueAsNewError(ctx, RunTerminateWorkflows, TerminateWorkflows{ + ConnectorID: terminateWorkflows.ConnectorID, + NextPageToken: nextPageToken, + }) + } } return nil diff --git a/internal/connectors/engine/workflow/uninstall_connector.go b/internal/connectors/engine/workflow/uninstall_connector.go index 49e488cd..998a5379 100644 --- a/internal/connectors/engine/workflow/uninstall_connector.go +++ b/internal/connectors/engine/workflow/uninstall_connector.go @@ -37,6 +37,8 @@ func (w Workflow) runUninstallConnector( return fmt.Errorf("terminate schedules: %w", err) } + // Since we can have lots of workflows running, we don't need to wait for + // them to be terminated before proceeding with the uninstallation. if err := workflow.ExecuteChildWorkflow( workflow.WithChildOptions( ctx, @@ -49,8 +51,10 @@ func (w Workflow) runUninstallConnector( }, ), RunTerminateWorkflows, - uninstallConnector, - ).Get(ctx, nil); err != nil { + TerminateWorkflows{ + ConnectorID: uninstallConnector.ConnectorID, + }, + ).GetChildWorkflowExecution().Get(ctx, nil); err != nil { return fmt.Errorf("terminate workflows: %w", err) }