Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Nexus] Set OnConflictOptions for WorkflowRunOperation #1797

Merged
merged 3 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,11 @@ type (
// When WorkflowExecutionErrorWhenAlreadyStarted is true, Client.ExecuteWorkflow will return an error if the
// workflow id has already been used and WorkflowIDReusePolicy or WorkflowIDConflictPolicy would
// disallow a re-run. If it is set to false, rather than erroring a WorkflowRun instance representing
// the current or last run will be returned. However, when WithStartOperation is set, this field is ignored and
// the WorkflowIDConflictPolicy UseExisting must be used instead to prevent erroring.
// the current or last run will be returned. However, this field is ignored in the following cases:
// - when WithStartOperation is set;
// - in the Nexus WorkflowRunOperation.
// When this field is ignored, you must set WorkflowIDConflictPolicy to UseExisting to prevent
// erroring.
//
// Optional: defaults to false
WorkflowExecutionErrorWhenAlreadyStarted bool
Expand Down Expand Up @@ -742,6 +745,14 @@ type (
callbacks []*commonpb.Callback
// links. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
links []*commonpb.Link

// OnConflictOptions - Optional workflow ID conflict options used in conjunction with conflict policy
// WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING. If onConflictOptions is set and a workflow is already
// running, the options specifies the actions to be taken on the running workflow. If not set or use
// together with any other WorkflowIDConflictPolicy, this parameter is ignored.
//
// NOTE: Only settable by the SDK -- e.g. [temporalnexus.workflowRunOperation].
onConflictOptions *OnConflictOptions
}

// WithStartWorkflowOperation defines how to start a workflow when using UpdateWithStartWorkflow.
Expand Down Expand Up @@ -1195,3 +1206,14 @@ func SetCallbacksOnStartWorkflowOptions(opts *StartWorkflowOptions, callbacks []
func SetLinksOnStartWorkflowOptions(opts *StartWorkflowOptions, links []*commonpb.Link) {
opts.links = links
}

// SetOnConflictOptionsOnStartWorkflowOptions is an internal only method for setting conflict
// options on StartWorkflowOptions.
// OnConflictOptions are purposefully not exposed to users for the time being.
func SetOnConflictOptionsOnStartWorkflowOptions(opts *StartWorkflowOptions) {
opts.onConflictOptions = &OnConflictOptions{
AttachRequestID: true,
AttachCompletionCallbacks: true,
AttachLinks: true,
}
}
1 change: 1 addition & 0 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1678,6 +1678,7 @@ func (w *workflowClientInterceptor) createStartWorkflowRequest(
CompletionCallbacks: in.Options.callbacks,
Links: in.Options.links,
VersioningOverride: versioningOverrideToProto(in.Options.VersioningOverride),
OnConflictOptions: in.Options.onConflictOptions.ToProto(),
}

startRequest.UserMetadata, err = buildUserMetadata(in.Options.StaticSummary, in.Options.StaticDetails, dataConverter)
Expand Down
21 changes: 21 additions & 0 deletions internal/internal_workflow_execution_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ type (
// Required if behavior is [VersioningBehaviorPinned]. Must be absent if behavior is not [VersioningBehaviorPinned].
PinnedVersion string
}

// OnConflictOptions specifies the actions to be taken when using the workflow ID conflict policy
// USE_EXISTING.
//
// NOTE: Experimental
OnConflictOptions struct {
AttachRequestID bool
AttachCompletionCallbacks bool
AttachLinks bool
}
)

// Mapping WorkflowExecutionOptions field names to proto ones.
Expand Down Expand Up @@ -209,3 +219,14 @@ func (r *UpdateWorkflowExecutionOptionsRequest) validateAndConvertToProto(namesp

return requestMsg, nil
}

func (o *OnConflictOptions) ToProto() *workflowpb.OnConflictOptions {
if o == nil {
return nil
}
return &workflowpb.OnConflictOptions{
AttachRequestId: o.AttachRequestID,
AttachCompletionCallbacks: o.AttachCompletionCallbacks,
AttachLinks: o.AttachLinks,
}
}
12 changes: 12 additions & 0 deletions temporalnexus/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ type WorkflowRunOperationOptions[I, O any] struct {
// The options returned must include a workflow ID that is deterministically generated from the input in order
// for the operation to be idempotent as the request to start the operation may be retried.
// TaskQueue is optional and defaults to the current worker's task queue.
// WorkflowExecutionErrorWhenAlreadyStarted is ignored and always set to true.
// WorkflowIDConflictPolicy is by default set to fail if a workflow is already running. That is,
// if a caller executes another operation that starts the same workflow, it will fail. You can set
// it to WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING to attach the caller's callback to the existing
// running workflow. This way, all attached callers will be notified when the workflow completes.
GetOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error)
// Handler for starting a workflow with a different input than the operation. Mutually exclusive with Workflow
// and GetOptions.
Expand Down Expand Up @@ -382,6 +387,13 @@ func ExecuteUntypedWorkflow[R any](
}
}
internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links)
internal.SetOnConflictOptionsOnStartWorkflowOptions(&startWorkflowOptions)

// This makes sure that ExecuteWorkflow will respect the WorkflowIDConflictPolicy, ie., if the
// conflict policy is to fail (default value), then ExecuteWorkflow will return an error if the
// workflow already running. For Nexus, this ensures that operation has only started successfully
// when the callback has been attached to the workflow (new or existing running workflow).
startWorkflowOptions.WorkflowExecutionErrorWhenAlreadyStarted = true

run, err := GetClient(ctx).ExecuteWorkflow(ctx, startWorkflowOptions, workflowType, args...)
if err != nil {
Expand Down
Loading