Skip to content

Commit

Permalink
Always schedule first workflow task for started abandoned child (#2414)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed May 5, 2022
1 parent 91b099b commit ae84776
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 38 deletions.
15 changes: 11 additions & 4 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5254,10 +5254,16 @@ func addSignaledEvent(builder workflow.MutableState, initiatedID int64, namespac
return event
}

func addStartChildWorkflowExecutionInitiatedEvent(builder workflow.MutableState, workflowTaskCompletedID int64,
createRequestID string, namespace namespace.Name, workflowID, workflowType, taskQueue string, input *commonpb.Payloads,
executionTimeout, runTimeout, taskTimeout time.Duration) (*historypb.HistoryEvent,
*persistencespb.ChildExecutionInfo) {
func addStartChildWorkflowExecutionInitiatedEvent(
builder workflow.MutableState,
workflowTaskCompletedID int64,
createRequestID string,
namespace namespace.Name,
workflowID, workflowType, taskQueue string,
input *commonpb.Payloads,
executionTimeout, runTimeout, taskTimeout time.Duration,
parentClosePolicy enumspb.ParentClosePolicy,
) (*historypb.HistoryEvent, *persistencespb.ChildExecutionInfo) {

event, cei, _ := builder.AddStartChildWorkflowExecutionInitiatedEvent(workflowTaskCompletedID, createRequestID,
&commandpb.StartChildWorkflowExecutionCommandAttributes{
Expand All @@ -5270,6 +5276,7 @@ func addStartChildWorkflowExecutionInitiatedEvent(builder workflow.MutableState,
WorkflowRunTimeout: &runTimeout,
WorkflowTaskTimeout: &taskTimeout,
Control: "",
ParentClosePolicy: parentClosePolicy,
})
return event, cei
}
Expand Down
93 changes: 63 additions & 30 deletions service/history/transferQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,34 +609,10 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
if err != nil {
return err
}
if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
if mutableState == nil {
return nil
}

// Get parent namespace name
var parentNamespaceName namespace.Name
if namespaceEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(task.NamespaceID)); err != nil {
if _, ok := err.(*serviceerror.NotFound); !ok {
return err
}
// it is possible that the namespace got deleted. Use namespaceID instead as this is only needed for the history event
parentNamespaceName = namespace.Name(task.NamespaceID)
} else {
parentNamespaceName = namespaceEntry.Name()
}

// Get target namespace name
var targetNamespaceName namespace.Name
if namespaceEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(task.TargetNamespaceID)); err != nil {
if _, ok := err.(*serviceerror.NotFound); !ok {
return err
}
// it is possible that the namespace got deleted. Use namespaceID instead as this is only needed for the history event
targetNamespaceName = namespace.Name(task.TargetNamespaceID)
} else {
targetNamespaceName = namespaceEntry.Name()
}

childInfo, ok := mutableState.GetChildExecutionInfo(task.InitiatedID)
if !ok {
return nil
Expand All @@ -646,14 +622,39 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
return nil
}

initiatedEvent, err := mutableState.GetChildExecutionInitiatedEvent(ctx, task.InitiatedID)
if err != nil {
return err
// workflow running or not, child started or not, parent close policy is abandon or not
// 8 cases in total
workflowRunning := mutableState.IsWorkflowExecutionRunning()
childStarted := childInfo.StartedId != common.EmptyEventID
if !workflowRunning && (!childStarted || childInfo.ParentClosePolicy != enumspb.PARENT_CLOSE_POLICY_ABANDON) {
// three cases here:
// case 1: workflow not running, child started, parent close policy is not abandon
// case 2: workflow not running, child not started, parent close policy is not abandon
// case 3: workflow not running, child not started, parent close policy is abandon
//
// NOTE: ideally for case 3, we should continue to start child. However, with current start child
// and standby start child verification logic, we can't do that because:
// 1. Once workflow is closed, we can't update mutable state or record child started event.
// If the RPC call for scheduling first workflow task times out but the call actually succeeds on child workflow.
// Then the child workflow can run, complete and another unrelated workflow can reuse this workflowID.
// Now when the start child task retries, we can't rely on requestID to dedup the start child call. (We can use runID instead of requestID to dedup)
// 2. No update to mutable state and child started event means we are not able to replicate the information
// to the standby cluster, so standby start child logic won't be able to verify the child has started.
// To resolve the issue above, we need to
// 1. Start child workflow and schedule the first workflow task in one transaction. Use runID to perform deduplication
// 2. Standby start child logic need to verify if child worflow actually started instead of relying on the information
// in parent mutable state.
return nil
}
attributes := initiatedEvent.GetStartChildWorkflowExecutionInitiatedEventAttributes()

// ChildExecution already started, just create WorkflowTask and complete transfer task
if childInfo.StartedId != common.EmptyEventID {
// If parent already closed, since child workflow started event already written to history,
// still schedule the workflowTask if the parent close policy is Abandon.
// If parent close policy cancel or terminate, parent close policy will be applied in another
// transfer task.
// case 4, 5: workflow started, child started, parent close policy is or is not abandon
// case 6: workflow closed, child started, parent close policy is abandon
if childStarted {
childExecution := &commonpb.WorkflowExecution{
WorkflowId: childInfo.StartedWorkflowId,
RunId: childInfo.StartedRunId,
Expand All @@ -665,6 +666,38 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
return t.createFirstWorkflowTask(task.TargetNamespaceID, childExecution)
}

// remaining 2 cases:
// case 7, 8: workflow running, child not started, parent close policy is or is not abandon

initiatedEvent, err := mutableState.GetChildExecutionInitiatedEvent(ctx, task.InitiatedID)
if err != nil {
return err
}
attributes := initiatedEvent.GetStartChildWorkflowExecutionInitiatedEventAttributes()

var parentNamespaceName namespace.Name
if namespaceEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(task.NamespaceID)); err != nil {
if _, ok := err.(*serviceerror.NotFound); !ok {
return err
}
// it is possible that the namespace got deleted. Use namespaceID instead as this is only needed for the history event
parentNamespaceName = namespace.Name(task.NamespaceID)
} else {
parentNamespaceName = namespaceEntry.Name()
}

// Get target namespace name
var targetNamespaceName namespace.Name
if namespaceEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(task.TargetNamespaceID)); err != nil {
if _, ok := err.(*serviceerror.NotFound); !ok {
return err
}
// it is possible that the namespace got deleted. Use namespaceID instead as this is only needed for the history event
targetNamespaceName = namespace.Name(task.TargetNamespaceID)
} else {
targetNamespaceName = namespaceEntry.Name()
}

childRunID, err := t.startWorkflowWithRetry(
task,
parentNamespaceName,
Expand Down
110 changes: 108 additions & 2 deletions service/history/transferQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1689,8 +1689,20 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su

taskID := int64(59)

event, ci := addStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(),
s.childNamespace, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second)
event, ci := addStartChildWorkflowExecutionInitiatedEvent(
mutableState,
event.GetEventId(),
uuid.New(),
s.childNamespace,
childWorkflowID,
childWorkflowType,
childTaskQueueName,
nil,
1*time.Second,
1*time.Second,
1*time.Second,
enumspb.PARENT_CLOSE_POLICY_TERMINATE,
)

transferTask := &tasks.StartChildExecutionTask{
WorkflowKey: definition.NewWorkflowKey(
Expand Down Expand Up @@ -1779,6 +1791,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Fa
1*time.Second,
1*time.Second,
1*time.Second,
enumspb.PARENT_CLOSE_POLICY_TERMINATE,
)

transferTask := &tasks.StartChildExecutionTask{
Expand Down Expand Up @@ -1860,6 +1873,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su
1*time.Second,
1*time.Second,
1*time.Second,
enumspb.PARENT_CLOSE_POLICY_TERMINATE,
)

transferTask := &tasks.StartChildExecutionTask{
Expand Down Expand Up @@ -1947,6 +1961,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Du
1*time.Second,
1*time.Second,
1*time.Second,
enumspb.PARENT_CLOSE_POLICY_TERMINATE,
)

transferTask := &tasks.StartChildExecutionTask{
Expand Down Expand Up @@ -1979,6 +1994,97 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Du
s.Nil(err)
}

func (s *transferQueueActiveTaskExecutorSuite) TestProcessorStartChildExecution_ChildStarted_ParentClosed() {
execution := commonpb.WorkflowExecution{
WorkflowId: "some random workflow ID",
RunId: uuid.New(),
}
workflowType := "some random workflow type"
taskQueueName := "some random task queue"

childExecution := commonpb.WorkflowExecution{
WorkflowId: "some random child workflow ID",
RunId: uuid.New(),
}
childWorkflowType := "some random child workflow type"
childTaskQueueName := "some random child task queue"

mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetRunId())
_, err := mutableState.AddWorkflowExecutionStartedEvent(
execution,
&historyservice.StartWorkflowExecutionRequest{
Attempt: 1,
NamespaceId: s.namespaceID.String(),
StartRequest: &workflowservice.StartWorkflowExecutionRequest{
WorkflowType: &commonpb.WorkflowType{Name: workflowType},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName},
WorkflowExecutionTimeout: timestamp.DurationPtr(2 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
},
},
)
s.Nil(err)

di := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, di.ScheduleID, taskQueueName, uuid.New())
di.StartedID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, di.ScheduleID, di.StartedID, "some random identity")

taskID := int64(59)

event, ci := addStartChildWorkflowExecutionInitiatedEvent(
mutableState,
event.GetEventId(),
uuid.New(),
s.childNamespace,
childExecution.GetWorkflowId(),
childWorkflowType,
childTaskQueueName,
nil,
1*time.Second,
1*time.Second,
1*time.Second,
enumspb.PARENT_CLOSE_POLICY_ABANDON,
)

transferTask := &tasks.StartChildExecutionTask{
WorkflowKey: definition.NewWorkflowKey(
s.namespaceID.String(),
execution.GetWorkflowId(),
execution.GetRunId(),
),
Version: s.version,
TargetNamespaceID: tests.ChildNamespaceID.String(),
TargetWorkflowID: childExecution.GetWorkflowId(),
TaskID: taskID,
InitiatedID: event.GetEventId(),
VisibilityTimestamp: time.Now().UTC(),
}
event = addChildWorkflowExecutionStartedEvent(mutableState, event.GetEventId(), tests.ChildNamespace, childExecution.GetWorkflowId(), childExecution.GetRunId(), childWorkflowType)
ci.StartedId = event.GetEventId()
di = addWorkflowTaskScheduledEvent(mutableState)
event = addWorkflowTaskStartedEvent(mutableState, di.ScheduleID, taskQueueName, "some random identity")
di.StartedID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, di.ScheduleID, di.StartedID, "some random identity")
event = addCompleteWorkflowEvent(mutableState, event.EventId, nil)
// Flush buffered events so real IDs get assigned
mutableState.FlushBufferedEvents()

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), &historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: s.childNamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: childExecution.WorkflowId,
RunId: childExecution.RunId,
},
IsFirstWorkflowTask: true,
}).Return(&historyservice.ScheduleWorkflowTaskResponse{}, nil).Times(1)

err = s.transferQueueActiveTaskExecutor.execute(context.Background(), transferTask, true)
s.Nil(err)
}

func (s *transferQueueActiveTaskExecutorSuite) TestCopySearchAttributes() {
var input map[string]*commonpb.Payload
s.Nil(copySearchAttributes(input))
Expand Down
1 change: 1 addition & 0 deletions service/history/transferQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ func (t *transferQueueStandbyTaskExecutor) processStartChildExecution(
if childWorkflowInfo.StartedId != common.EmptyEventID {
return nil, nil
}
// TODO: standby logic should verify if first workflow task is scheduled or not as well?

return getHistoryResendInfo(mutableState)
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/transferQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessStartChildExecution_P

taskID := int64(59)
event, _ = addStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(),
tests.ChildNamespace, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second)
tests.ChildNamespace, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_ABANDON)
nextEventID := event.GetEventId()

now := time.Now().UTC()
Expand Down Expand Up @@ -1076,7 +1076,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessStartChildExecution_S

taskID := int64(59)
event, childInfo := addStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(),
tests.ChildNamespace, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second)
tests.ChildNamespace, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_ABANDON)

now := time.Now().UTC()
transferTask := &tasks.StartChildExecutionTask{
Expand Down

0 comments on commit ae84776

Please sign in to comment.