Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for WorkflowIdConflictPolicy #1563

Merged
merged 2 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,11 +633,16 @@ type (
// Optional: defaulted to 10 secs.
WorkflowTaskTimeout time.Duration

// WorkflowIDReusePolicy - Whether server allow reuse of workflow ID, can be useful
// for dedupe logic if set to RejectDuplicate.
// WorkflowIDReusePolicy - Specifies server behavior if a *completed* workflow with the same id exists.
// This can be useful for dedupe logic if set to RejectDuplicate
// Optional: defaulted to AllowDuplicate.
WorkflowIDReusePolicy enumspb.WorkflowIdReusePolicy

// WorkflowIDConflictPolicy - Specifies server behavior if a *running* workflow with the same id exists.
// This cannot be set if WorkflowIDReusePolicy is set to TerminateIfRunning.
// Optional: defaulted to Fail.
WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy

// When WorkflowExecutionErrorWhenAlreadyStarted is true, Client.ExecuteWorkflow will return an error if the
// workflow id has already been used and WorkflowIDReusePolicy would disallow a re-run. If it is set to false,
// rather than erroring a WorkflowRun instance representing the current or last run will be returned.
Expand Down
1 change: 1 addition & 0 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ type (
WorkflowID string
WaitForCancellation bool
WorkflowIDReusePolicy enumspb.WorkflowIdReusePolicy
WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy
DataConverter converter.DataConverter
RetryPolicy *commonpb.RetryPolicy
CronSchedule string
Expand Down
3 changes: 3 additions & 0 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
taskqueuepb "go.temporal.io/api/taskqueue/v1"
updatepb "go.temporal.io/api/update/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/internal/common/retry"
Expand Down Expand Up @@ -1592,6 +1593,7 @@ func (w *workflowClientInterceptor) ExecuteWorkflow(
WorkflowTaskTimeout: durationpb.New(workflowTaskTimeout),
Identity: w.client.identity,
WorkflowIdReusePolicy: in.Options.WorkflowIDReusePolicy,
WorkflowIdConflictPolicy: in.Options.WorkflowIDConflictPolicy,
RetryPolicy: convertToPBRetryPolicy(in.Options.RetryPolicy),
CronSchedule: in.Options.CronSchedule,
Memo: memo,
Expand Down Expand Up @@ -1745,6 +1747,7 @@ func (w *workflowClientInterceptor) SignalWithStartWorkflow(
Memo: memo,
SearchAttributes: searchAttr,
WorkflowIdReusePolicy: in.Options.WorkflowIDReusePolicy,
WorkflowIdConflictPolicy: in.Options.WorkflowIDConflictPolicy,
Header: header,
}

Expand Down
72 changes: 72 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,43 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseIgnoreDuplicateWhileRunning()
ts.NotEqual(run1.GetRunID(), run3.GetRunID())
}

func (ts *IntegrationTestSuite) TestWorkflowIDConflictPolicy() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opts := ts.startWorkflowOptions("test-workflowidconflict-" + uuid.New())
opts.WorkflowExecutionErrorWhenAlreadyStarted = true

var alreadyStartedErr *serviceerror.WorkflowExecutionAlreadyStarted

// Start a workflow
run1, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy)
ts.NoError(err)

// Confirm another fails by default
_, err = ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy)
ts.ErrorAs(err, &alreadyStartedErr)

// Confirm fails if explicitly given that option
opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL
_, err = ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy)
ts.ErrorAs(err, &alreadyStartedErr)

// Confirm gives back same WorkflowRun if requested
opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING
run2, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy)
ts.Equal(run1.GetRunID(), run2.GetRunID())

// Confirm terminates and starts new if requested
opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING
run3, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy)
ts.NotEqual(run1.GetRunID(), run3.GetRunID())

statusRun1, err := ts.client.DescribeWorkflowExecution(ctx, run1.GetID(), run1.GetRunID())
ts.NoError(err)
ts.Equal(statusRun1.WorkflowExecutionInfo.Status, enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED)
}

func (ts *IntegrationTestSuite) TestChildWFWithRetryPolicy_ShortLived() {
ts.testChildWFWithRetryPolicy(ts.workflows.ChildWorkflowWithRetryPolicy, 0)
}
Expand Down Expand Up @@ -1919,6 +1956,41 @@ func (ts *IntegrationTestSuite) TestStartDelaySignalWithStart() {
ts.Equal(5*time.Second, event.GetWorkflowExecutionStartedEventAttributes().GetFirstWorkflowTaskBackoff().AsDuration())
}

func (ts *IntegrationTestSuite) TestSignalWithStartIdConflictPolicy() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var invalidArgErr *serviceerror.InvalidArgument
opts := ts.startWorkflowOptions("test-signalwithstart-workflowidconflict-" + uuid.New())

// Start a workflow
run1, err := ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy)
ts.NoError(err)

// Confirm gives back same WorkflowRun by default
run2, err := ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy)
ts.Equal(run1.GetRunID(), run2.GetRunID())

// Confirm gives back same WorkflowRun if requested explicitly
opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING
run3, err := ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy)
ts.Equal(run1.GetRunID(), run3.GetRunID())

// Confirm policy to fail is invalid
opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL
_, err = ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy)
ts.ErrorAs(err, &invalidArgErr)

// Confirm terminates and starts new if requested
opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING
run4, err := ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy)
ts.NotEqual(run1.GetRunID(), run4.GetRunID())

statusRun1, err := ts.client.DescribeWorkflowExecution(ctx, run1.GetID(), run1.GetRunID())
ts.NoError(err)
ts.Equal(statusRun1.WorkflowExecutionInfo.Status, enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED)
}

func (ts *IntegrationTestSuite) TestResetWorkflowExecution() {
var originalResult []string
err := ts.executeWorkflow("basic-reset-workflow-execution", ts.workflows.Basic, &originalResult)
Expand Down
9 changes: 9 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal"
"go.temporal.io/sdk/temporal"
Expand Down Expand Up @@ -644,6 +645,13 @@ func (w *Workflows) IDReusePolicy(
return ans1 + ans2, nil
}

func (w *Workflows) IDConflictPolicy(
ctx workflow.Context,
) error {
workflow.Await(ctx, func() bool { return false })
return nil
}

func (w *Workflows) ChildWorkflowWithRetryPolicy(ctx workflow.Context, expectedMaximumAttempts int, iterations int) error {
return w.childWorkflowWithRetryPolicy(ctx, w.childWithRetryPolicy, expectedMaximumAttempts, iterations)
}
Expand Down Expand Up @@ -3063,6 +3071,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.ContinueAsNewWithRetryPolicy)
worker.RegisterWorkflow(w.ContinueAsNewWithChildWF)
worker.RegisterWorkflow(w.IDReusePolicy)
worker.RegisterWorkflow(w.IDConflictPolicy)
worker.RegisterWorkflow(w.InspectActivityInfo)
worker.RegisterWorkflow(w.InspectLocalActivityInfo)
worker.RegisterWorkflow(w.LargeQueryResultWorkflow)
Expand Down
Loading