Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support conflict options for starting Nexus operations in test framework #1828

Merged
merged 7 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cmd/build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (b *builder) integrationTest() error {
devServer, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
// TODO: Use stable release once server 1.27.0 is out.
CachedDownload: testsuite.CachedDownload{
Version: "v1.3.0-versioning.0",
Version: "v1.3.0-rc.0",
},
ClientOptions: &client.Options{
HostPort: "127.0.0.1:7233",
Expand Down
7 changes: 7 additions & 0 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,13 @@ type (
}
)

func (h ResultHandler) wrap(callback ResultHandler) ResultHandler {
return func(result *commonpb.Payloads, err error) {
callback(result, err)
h(result, err)
}
}

func (t *polledTask) getTask() taskForWorker {
return t.task
}
Expand Down
3 changes: 3 additions & 0 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ type (
WorkflowID string
WaitForCancellation bool
WorkflowIDReusePolicy enumspb.WorkflowIdReusePolicy
// WorkflowIDConflictPolicy and OnConflictOptions are only used in test environment for
// running Nexus operations as child workflow.
WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy
OnConflictOptions *OnConflictOptions
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding this WorkflowOptions, but not sure if I should. I need because the Nexus handler workflow is executed in the test framework as a child workflow, and I need this to resolve workflow ID conflict, ie., starting the same operation multiple times.

I can see this is used in certain operations. For example, I saw WorkflowOptions is built for ExecuteChildWorkflow, and OnConflictOptions is not an options for executing child workflow.

Btw, WorkflowIDConflictPolicy was added here a while ago, but I didn't see any usage of this field anywhere... I'm using it now here though.

cc: @Quinn-With-Two-Ns @cretz

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WorkflowIDConflictPolicy is not supported for child workflows temporalio/temporal#6799 so yeah it probably shouldn't have been added here.

Is it reasonable to refactor the Nexus handler workflow to not be a "child" in the test framework?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we shouldn't add an unused field here (even if there happens to be another unused field here)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Rodrigo is saying it is no longer an unused field, but I guess it is unused when executing outside the test enviorment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, WorkflowIDConflictPolicy is unused since it was added in #1563, but I need it now with this PR.

As for not being a child workflow, I'm not sure how we would be able to run the Nexus workflow without it since the test environment only allows to run one main/root workflow at a time. cc: @bergundy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, my mistake, I thought this was user-facing child workflow options or something. If this is an internal only thing I have no opinion/preference, will defer to Quinn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WorkflowOptions is somewhat user facing, isn't it? The user could get it from the workflow context.
The two fields I mentioned, it's only used in the test environment. Maybe I could just make them unexported?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can users get WorkflowOptions from the workflow context?

Copy link
Contributor Author

@rodrigozhou rodrigozhou Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it's not exported, but technically, the user could write directly ctx.Value("wfEnvOptions").
Otherwise, it seems it's only for internal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think the only reason the type is even exported anyway is for the nexus test environment and maybe the PHP SDK

DataConverter converter.DataConverter
RetryPolicy *commonpb.RetryPolicy
CronSchedule string
Expand Down
38 changes: 29 additions & 9 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,11 @@ func (env *testWorkflowEnvironmentImpl) setContinuedExecutionRunID(rid string) {
env.workflowInfo.ContinuedExecutionRunID = rid
}

func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(params *ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error)) (*testWorkflowEnvironmentImpl, error) {
func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(
params *ExecuteWorkflowParams,
callback ResultHandler,
startedHandler func(r WorkflowExecution, e error),
) (*testWorkflowEnvironmentImpl, error) {
// create a new test env
childEnv := newTestWorkflowEnvironmentImpl(env.testSuite, env.registry)
childEnv.parentEnv = env
Expand Down Expand Up @@ -474,15 +478,27 @@ func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(param

childEnv.runTimeout = params.WorkflowRunTimeout
if workflowHandler, ok := env.runningWorkflows[params.WorkflowID]; ok {
alreadyStartedErr := serviceerror.NewWorkflowExecutionAlreadyStarted(
"Workflow execution already started",
"",
childEnv.workflowInfo.WorkflowExecution.RunID,
)
// duplicate workflow ID
if !workflowHandler.handled {
return nil, serviceerror.NewWorkflowExecutionAlreadyStarted("Workflow execution already started", "", "")
if params.WorkflowIDConflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING {
if params.OnConflictOptions != nil && params.OnConflictOptions.AttachCompletionCallbacks {
workflowHandler.callback = workflowHandler.callback.wrap(callback)
}
startedHandler(workflowHandler.env.workflowInfo.WorkflowExecution, nil)
return nil, nil
}
return nil, alreadyStartedErr
}
if params.WorkflowIDReusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE {
return nil, serviceerror.NewWorkflowExecutionAlreadyStarted("Workflow execution already started", "", "")
return nil, alreadyStartedErr
}
if workflowHandler.err == nil && params.WorkflowIDReusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY {
return nil, serviceerror.NewWorkflowExecutionAlreadyStarted("Workflow execution already started", "", "")
return nil, alreadyStartedErr
}
}

Expand Down Expand Up @@ -2380,16 +2396,20 @@ func (env *testWorkflowEnvironmentImpl) executeChildWorkflowWithDelay(delayStart
childEnv, err := env.newTestWorkflowEnvironmentForChild(&params, callback, startedHandler)
if err != nil {
env.logger.Info("ExecuteChildWorkflow failed", tagError, err)
callback(nil, err)
startedHandler(WorkflowExecution{}, err)
callback(nil, err)
return
}

env.logger.Info("ExecuteChildWorkflow", tagWorkflowType, params.WorkflowType.Name)
env.runningCount++
// childEnv can be nil when WorkflowIDConflictPolicy is USE_EXISTING and there's already a running
// workflow. This is only possible in the test environment for running Nexus handler workflow.
if childEnv != nil {
env.logger.Info("ExecuteChildWorkflow", tagWorkflowType, params.WorkflowType.Name)
env.runningCount++

// run child workflow in separate goroutinue
go childEnv.executeWorkflowInternal(delayStart, params.WorkflowType.Name, params.Input)
// run child workflow in separate goroutinue
go childEnv.executeWorkflowInternal(delayStart, params.WorkflowType.Name, params.Input)
}
}

func (env *testWorkflowEnvironmentImpl) newTestNexusTaskHandler(
Expand Down
13 changes: 13 additions & 0 deletions internal/nexus_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context,
}

run := &testEnvWorkflowRunForNexusOperations{}
startedErrCh := make(chan error, 1)
doneCh := make(chan error)

var callback *commonpb.Callback
Expand All @@ -476,6 +477,8 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context,
WorkflowTaskTimeout: options.WorkflowTaskTimeout,
DataConverter: t.env.dataConverter,
WorkflowIDReusePolicy: options.WorkflowIDReusePolicy,
WorkflowIDConflictPolicy: options.WorkflowIDConflictPolicy,
OnConflictOptions: options.onConflictOptions,
ContextPropagators: t.env.contextPropagators,
SearchAttributes: options.SearchAttributes,
TypedSearchAttributes: options.TypedSearchAttributes,
Expand All @@ -485,6 +488,14 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context,
RetryPolicy: convertToPBRetryPolicy(options.RetryPolicy),
},
}, func(result *commonpb.Payloads, wfErr error) {
// This callback handles async completion of Nexus operations. If there was an error when
// starting the workflow, then the operation failed synchronously and this callback doesn't
// need to be executed.
startedErr := <-startedErrCh
if startedErr != nil {
return
}

ncb := callback.GetNexus()
if ncb == nil {
return
Expand Down Expand Up @@ -519,6 +530,8 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context,
}
}, func(r WorkflowExecution, err error) {
run.WorkflowExecution = r
startedErrCh <- err
close(startedErrCh)
doneCh <- err
})
}, false)
Expand Down
Loading
Loading