diff --git a/internal/cmd/build/main.go b/internal/cmd/build/main.go index 2de4ccdce..2c69decd0 100644 --- a/internal/cmd/build/main.go +++ b/internal/cmd/build/main.go @@ -142,7 +142,7 @@ func (b *builder) integrationTest() error { devServer, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{ // TODO: Use stable release once server 1.27.0 is out. CachedDownload: testsuite.CachedDownload{ - Version: "v1.3.0-versioning.0", + Version: "v1.3.0-rc.0", }, ClientOptions: &client.Options{ HostPort: "127.0.0.1:7233", diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index eb444a7f0..0cb593a68 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -232,6 +232,13 @@ type ( } ) +func (h ResultHandler) wrap(callback ResultHandler) ResultHandler { + return func(result *commonpb.Payloads, err error) { + callback(result, err) + h(result, err) + } +} + func (t *polledTask) getTask() taskForWorker { return t.task } diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index ca3084113..42f33ea2b 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -214,7 +214,10 @@ type ( WorkflowID string WaitForCancellation bool WorkflowIDReusePolicy enumspb.WorkflowIdReusePolicy + // WorkflowIDConflictPolicy and OnConflictOptions are only used in test environment for + // running Nexus operations as child workflow. WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy + OnConflictOptions *OnConflictOptions DataConverter converter.DataConverter RetryPolicy *commonpb.RetryPolicy CronSchedule string diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index cdc8bdde6..ac8a634f8 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -428,7 +428,11 @@ func (env *testWorkflowEnvironmentImpl) setContinuedExecutionRunID(rid string) { env.workflowInfo.ContinuedExecutionRunID = rid } -func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(params *ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error)) (*testWorkflowEnvironmentImpl, error) { +func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild( + params *ExecuteWorkflowParams, + callback ResultHandler, + startedHandler func(r WorkflowExecution, e error), +) (*testWorkflowEnvironmentImpl, error) { // create a new test env childEnv := newTestWorkflowEnvironmentImpl(env.testSuite, env.registry) childEnv.parentEnv = env @@ -474,15 +478,27 @@ func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(param childEnv.runTimeout = params.WorkflowRunTimeout if workflowHandler, ok := env.runningWorkflows[params.WorkflowID]; ok { + alreadyStartedErr := serviceerror.NewWorkflowExecutionAlreadyStarted( + "Workflow execution already started", + "", + childEnv.workflowInfo.WorkflowExecution.RunID, + ) // duplicate workflow ID if !workflowHandler.handled { - return nil, serviceerror.NewWorkflowExecutionAlreadyStarted("Workflow execution already started", "", "") + if params.WorkflowIDConflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING { + if params.OnConflictOptions != nil && params.OnConflictOptions.AttachCompletionCallbacks { + workflowHandler.callback = workflowHandler.callback.wrap(callback) + } + startedHandler(workflowHandler.env.workflowInfo.WorkflowExecution, nil) + return nil, nil + } + return nil, alreadyStartedErr } if params.WorkflowIDReusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE { - return nil, serviceerror.NewWorkflowExecutionAlreadyStarted("Workflow execution already started", "", "") + return nil, alreadyStartedErr } if workflowHandler.err == nil && params.WorkflowIDReusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY { - return nil, serviceerror.NewWorkflowExecutionAlreadyStarted("Workflow execution already started", "", "") + return nil, alreadyStartedErr } } @@ -2380,16 +2396,20 @@ func (env *testWorkflowEnvironmentImpl) executeChildWorkflowWithDelay(delayStart childEnv, err := env.newTestWorkflowEnvironmentForChild(¶ms, callback, startedHandler) if err != nil { env.logger.Info("ExecuteChildWorkflow failed", tagError, err) - callback(nil, err) startedHandler(WorkflowExecution{}, err) + callback(nil, err) return } - env.logger.Info("ExecuteChildWorkflow", tagWorkflowType, params.WorkflowType.Name) - env.runningCount++ + // childEnv can be nil when WorkflowIDConflictPolicy is USE_EXISTING and there's already a running + // workflow. This is only possible in the test environment for running Nexus handler workflow. + if childEnv != nil { + env.logger.Info("ExecuteChildWorkflow", tagWorkflowType, params.WorkflowType.Name) + env.runningCount++ - // run child workflow in separate goroutinue - go childEnv.executeWorkflowInternal(delayStart, params.WorkflowType.Name, params.Input) + // run child workflow in separate goroutinue + go childEnv.executeWorkflowInternal(delayStart, params.WorkflowType.Name, params.Input) + } } func (env *testWorkflowEnvironmentImpl) newTestNexusTaskHandler( diff --git a/internal/nexus_operations.go b/internal/nexus_operations.go index 27b98d475..8a3a38639 100644 --- a/internal/nexus_operations.go +++ b/internal/nexus_operations.go @@ -453,6 +453,7 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context, } run := &testEnvWorkflowRunForNexusOperations{} + startedErrCh := make(chan error, 1) doneCh := make(chan error) var callback *commonpb.Callback @@ -476,6 +477,8 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context, WorkflowTaskTimeout: options.WorkflowTaskTimeout, DataConverter: t.env.dataConverter, WorkflowIDReusePolicy: options.WorkflowIDReusePolicy, + WorkflowIDConflictPolicy: options.WorkflowIDConflictPolicy, + OnConflictOptions: options.onConflictOptions, ContextPropagators: t.env.contextPropagators, SearchAttributes: options.SearchAttributes, TypedSearchAttributes: options.TypedSearchAttributes, @@ -485,6 +488,14 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context, RetryPolicy: convertToPBRetryPolicy(options.RetryPolicy), }, }, func(result *commonpb.Payloads, wfErr error) { + // This callback handles async completion of Nexus operations. If there was an error when + // starting the workflow, then the operation failed synchronously and this callback doesn't + // need to be executed. + startedErr := <-startedErrCh + if startedErr != nil { + return + } + ncb := callback.GetNexus() if ncb == nil { return @@ -519,6 +530,8 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context, } }, func(r WorkflowExecution, err error) { run.WorkflowExecution = r + startedErrCh <- err + close(startedErrCh) doneCh <- err }) }, false) diff --git a/test/nexus_test.go b/test/nexus_test.go index 2c66bc256..9faa782f5 100644 --- a/test/nexus_test.go +++ b/test/nexus_test.go @@ -39,7 +39,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.temporal.io/api/common/v1" - "go.temporal.io/api/enums/v1" + enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" nexuspb "go.temporal.io/api/nexus/v1" "go.temporal.io/api/operatorservice/v1" @@ -59,6 +59,8 @@ import ( "go.temporal.io/sdk/workflow" ) +const defaultNexusTestTimeout = 10 * time.Second + type testContext struct { client client.Client metricsHandler *metrics.CapturingHandler @@ -228,7 +230,7 @@ var workflowOp = temporalnexus.NewWorkflowRunOperation( ) func TestNexusSyncOperation(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) defer cancel() tc := newTestContext(t, ctx) @@ -392,7 +394,7 @@ func TestNexusSyncOperation(t *testing.T) { } func TestNexusWorkflowRunOperation(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) defer cancel() tc := newTestContext(t, ctx) @@ -412,7 +414,7 @@ func TestNexusWorkflowRunOperation(t *testing.T) { RunId: "caller-run-id", Reference: &common.Link_WorkflowEvent_EventRef{ EventRef: &common.Link_WorkflowEvent_EventReference{ - EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + EventType: enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, }, }, } @@ -436,11 +438,11 @@ func TestNexusWorkflowRunOperation(t *testing.T) { require.Equal(t, "http://localhost/test", callback.Nexus.Url) require.Subset(t, callback.Nexus.Header, map[string]string{"test": "ok"}) - iter := tc.client.GetWorkflowHistory(ctx, workflowID, "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + iter := tc.client.GetWorkflowHistory(ctx, workflowID, "", false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) for iter.HasNext() { event, err := iter.Next() require.NoError(t, err) - if event.GetEventType() == enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED { + if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED { require.Len(t, event.GetLinks(), 1) require.True(t, proto.Equal(link, event.GetLinks()[0].GetWorkflowEvent())) break @@ -453,7 +455,7 @@ func TestNexusWorkflowRunOperation(t *testing.T) { } func TestSyncOperationFromWorkflow(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) defer cancel() tc := newTestContext(t, ctx) @@ -667,7 +669,7 @@ func TestSyncOperationFromWorkflow(t *testing.T) { } func TestInvalidOperationInput(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) defer cancel() tc := newTestContext(t, ctx) @@ -686,7 +688,7 @@ func TestInvalidOperationInput(t *testing.T) { } func TestAsyncOperationFromWorkflow(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) defer cancel() tc := newTestContext(t, ctx) @@ -782,16 +784,16 @@ func TestAsyncOperationFromWorkflow(t *testing.T) { run.GetID(), run.GetRunID(), false, - enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, ) var nexusOperationScheduleEventID int64 var targetEvent *historypb.HistoryEvent for iter.HasNext() { event, err := iter.Next() require.NoError(t, err) - if event.GetEventType() == enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED { + if event.GetEventType() == enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED { nexusOperationScheduleEventID = event.GetEventId() - } else if event.GetEventType() == enums.EVENT_TYPE_NEXUS_OPERATION_STARTED { + } else if event.GetEventType() == enumspb.EVENT_TYPE_NEXUS_OPERATION_STARTED { targetEvent = event break } @@ -804,7 +806,7 @@ func TestAsyncOperationFromWorkflow(t *testing.T) { require.NotEmpty(t, link.GetWorkflowEvent().GetRunId()) require.True(t, proto.Equal( &common.Link_WorkflowEvent_EventReference{ - EventType: enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, }, link.GetWorkflowEvent().GetEventRef(), )) @@ -816,13 +818,13 @@ func TestAsyncOperationFromWorkflow(t *testing.T) { handlerWfID, handlerRunID, false, - enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, ) targetEvent = nil for iter.HasNext() { event, err := iter.Next() require.NoError(t, err) - if event.GetEventType() == enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED { + if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED { targetEvent = event break } @@ -840,7 +842,7 @@ func TestAsyncOperationFromWorkflow(t *testing.T) { Reference: &common.Link_WorkflowEvent_EventRef{ EventRef: &common.Link_WorkflowEvent_EventReference{ EventId: nexusOperationScheduleEventID, - EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + EventType: enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, }, }, }, @@ -919,11 +921,11 @@ func TestAsyncOperationFromWorkflow(t *testing.T) { require.ErrorAs(t, err, &canceledErr) // Verify that the operation was never scheduled. - history := tc.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + history := tc.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) for history.HasNext() { event, err := history.Next() require.NoError(t, err) - require.NotEqual(t, enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, event.EventType) + require.NotEqual(t, enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, event.EventType) } }) @@ -943,6 +945,142 @@ func TestAsyncOperationFromWorkflow(t *testing.T) { }) } +func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) { + if os.Getenv("DISABLE_SERVER_1_27_TESTS") == "1" { + t.Skip() + } + ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) + defer cancel() + tctx := newTestContext(t, ctx) + + handlerWorkflowID := uuid.NewString() + handlerWf := func(ctx workflow.Context, input string) (string, error) { + workflow.GetSignalChannel(ctx, "terminate").Receive(ctx, nil) + return "hello " + input, nil + } + + op := temporalnexus.NewWorkflowRunOperation( + "op", + handlerWf, + func(ctx context.Context, input string, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { + var conflictPolicy enumspb.WorkflowIdConflictPolicy + if input == "conflict-policy-use-existing" { + conflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING + } + return client.StartWorkflowOptions{ + ID: handlerWorkflowID, + WorkflowIDConflictPolicy: conflictPolicy, + }, nil + }, + ) + + type CallerWfOutput struct { + CntOk int + CntErr int + } + + callerWf := func(ctx workflow.Context, input string, numCalls int) (CallerWfOutput, error) { + output := CallerWfOutput{} + var retError error + + wg := workflow.NewWaitGroup(ctx) + execOpCh := workflow.NewChannel(ctx) + client := workflow.NewNexusClient(tctx.endpoint, "test") + + for i := 0; i < numCalls; i++ { + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + defer wg.Done() + fut := client.ExecuteOperation(ctx, op, input, workflow.NexusOperationOptions{}) + var exec workflow.NexusOperationExecution + err := fut.GetNexusOperationExecution().Get(ctx, &exec) + execOpCh.Send(ctx, nil) + if err != nil { + output.CntErr++ + var handlerErr *nexus.HandlerError + var appErr *temporal.ApplicationError + if !errors.As(err, &handlerErr) { + retError = err + } else if !errors.As(handlerErr, &appErr) { + retError = err + } else if appErr.Type() != "WorkflowExecutionAlreadyStarted" { + retError = err + } + return + } + output.CntOk++ + var res string + err = fut.Get(ctx, &res) + if err != nil { + retError = err + } else if res != "hello "+input { + retError = fmt.Errorf("unexpected result from handler workflow: %q", res) + } + }) + } + + for i := 0; i < numCalls; i++ { + execOpCh.Receive(ctx, nil) + } + + // signal handler workflow so it will complete + workflow.SignalExternalWorkflow(ctx, handlerWorkflowID, "", "terminate", nil).Get(ctx, nil) + wg.Wait(ctx) + return output, retError + } + + w := worker.New(tctx.client, tctx.taskQueue, worker.Options{}) + service := nexus.NewService("test") + require.NoError(t, service.Register(op)) + w.RegisterNexusService(service) + w.RegisterWorkflow(handlerWf) + w.RegisterWorkflow(callerWf) + require.NoError(t, w.Start()) + t.Cleanup(w.Stop) + + testCases := []struct { + input string + checkOutput func(t *testing.T, numCalls int, res CallerWfOutput) + }{ + { + input: "conflict-policy-fail", + checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput) { + require.EqualValues(t, 1, res.CntOk) + require.EqualValues(t, numCalls-1, res.CntErr) + }, + }, + { + input: "conflict-policy-use-existing", + checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput) { + require.EqualValues(t, numCalls, res.CntOk) + }, + }, + } + + // number of concurrent Nexus operation calls + numCalls := 5 + for _, tc := range testCases { + t.Run(tc.input, func(t *testing.T) { + run, err := tctx.client.ExecuteWorkflow( + ctx, + client.StartWorkflowOptions{ + TaskQueue: tctx.taskQueue, + // The endpoint registry may take a bit to propagate to the history service, use a shorter + // workflow task timeout to speed up the attempts. + WorkflowTaskTimeout: time.Second, + }, + callerWf, + tc.input, + numCalls, + ) + require.NoError(t, err) + var res CallerWfOutput + require.NoError(t, run.Get(ctx, &res)) + tc.checkOutput(t, numCalls, res) + }) + } +} + type manualAsyncOp struct { nexus.UnimplementedOperation[nexus.NoValue, nexus.NoValue] } @@ -995,7 +1133,7 @@ func (o *manualAsyncOp) Start(ctx context.Context, input nexus.NoValue, options // TestAsyncOperationCompletionCustomFailureConverter tests the completion path when a failure is generated with a // custom failure converter. func TestAsyncOperationCompletionCustomFailureConverter(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) defer cancel() tc := newTestContext(t, ctx) @@ -1049,7 +1187,7 @@ func TestAsyncOperationCompletionCustomFailureConverter(t *testing.T) { } func TestNewNexusClientValidation(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) defer cancel() tc := newTestContext(t, ctx) @@ -1081,7 +1219,7 @@ func TestNewNexusClientValidation(t *testing.T) { } func TestReplay(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) defer cancel() tc := newTestContext(t, ctx) @@ -1122,7 +1260,7 @@ func TestReplay(t *testing.T) { require.NoError(t, run.Get(ctx, nil)) events := make([]*historypb.HistoryEvent, 0) - hist := tc.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + hist := tc.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) for hist.HasNext() { e, err := hist.Next() require.NoError(t, err) @@ -1507,6 +1645,123 @@ func TestWorkflowTestSuite_WorkflowRunOperation_WithCancel(t *testing.T) { } } +func TestWorkflowTestSuite_WorkflowRunOperation_MultipleCallers(t *testing.T) { + handlerWorkflowID := uuid.NewString() + handlerWf := func(ctx workflow.Context, input string) (string, error) { + workflow.GetSignalChannel(ctx, "terminate").Receive(ctx, nil) + return "hello " + input, nil + } + + op := temporalnexus.NewWorkflowRunOperation( + "op", + handlerWf, + func(ctx context.Context, input string, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { + var conflictPolicy enumspb.WorkflowIdConflictPolicy + if input == "conflict-policy-use-existing" { + conflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING + } + return client.StartWorkflowOptions{ + ID: handlerWorkflowID, + WorkflowIDConflictPolicy: conflictPolicy, + }, nil + }, + ) + + type CallerWfOutput struct { + CntOk int + CntErr int + } + + callerWf := func(ctx workflow.Context, input string, numCalls int) (CallerWfOutput, error) { + output := CallerWfOutput{} + var retError error + + wg := workflow.NewWaitGroup(ctx) + execOpCh := workflow.NewChannel(ctx) + client := workflow.NewNexusClient("endpoint", "test") + + for i := 0; i < numCalls; i++ { + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + defer wg.Done() + fut := client.ExecuteOperation(ctx, op, input, workflow.NexusOperationOptions{}) + var exec workflow.NexusOperationExecution + err := fut.GetNexusOperationExecution().Get(ctx, &exec) + execOpCh.Send(ctx, nil) + if err != nil { + output.CntErr++ + var handlerErr *nexus.HandlerError + var appErr *temporal.ApplicationError + if !errors.As(err, &handlerErr) { + retError = err + } else if !errors.As(handlerErr, &appErr) { + retError = err + } else if appErr.Type() != "WorkflowExecutionAlreadyStarted" { + retError = err + } + return + } + output.CntOk++ + var res string + err = fut.Get(ctx, &res) + if err != nil { + retError = err + } else if res != "hello "+input { + retError = fmt.Errorf("unexpected result from handler workflow: %q", res) + } + }) + } + + for i := 0; i < numCalls; i++ { + execOpCh.Receive(ctx, nil) + } + + workflow.SignalExternalWorkflow(ctx, handlerWorkflowID, "", "terminate", nil).Get(ctx, nil) + wg.Wait(ctx) + return output, retError + } + + service := nexus.NewService("test") + service.MustRegister(op) + + testCases := []struct { + input string + checkOutput func(t *testing.T, numCalls int, res CallerWfOutput) + }{ + { + input: "conflict-policy-fail", + checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput) { + require.EqualValues(t, 1, res.CntOk) + require.EqualValues(t, numCalls-1, res.CntErr) + }, + }, + { + input: "conflict-policy-use-existing", + checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput) { + require.EqualValues(t, numCalls, res.CntOk) + }, + }, + } + + // number of concurrent Nexus operation calls + numCalls := 5 + for _, tc := range testCases { + t.Run(tc.input, func(t *testing.T) { + suite := testsuite.WorkflowTestSuite{} + env := suite.NewTestWorkflowEnvironment() + env.RegisterWorkflow(handlerWf) + env.RegisterNexusService(service) + + env.ExecuteWorkflow(callerWf, tc.input, numCalls) + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + var res CallerWfOutput + require.NoError(t, env.GetWorkflowResult(&res)) + tc.checkOutput(t, numCalls, res) + }) + } +} + func TestWorkflowTestSuite_NexusSyncOperation_ScheduleToCloseTimeout(t *testing.T) { sleepDuration := 500 * time.Millisecond op := nexus.NewSyncOperation(