Skip to content

Commit

Permalink
Ignore parent close policy if child namespace is deleted (#2596)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Mar 11, 2022
1 parent c04f879 commit 4c4652d
Showing 1 changed file with 23 additions and 13 deletions.
36 changes: 23 additions & 13 deletions service/history/transferQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
ns "go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/primitives/timestamp"
Expand Down Expand Up @@ -1295,7 +1294,7 @@ func (t *transferQueueActiveTaskExecutor) resetWorkflow(

func (t *transferQueueActiveTaskExecutor) processParentClosePolicy(
namespaceID string,
namespace string,
namespaceName string,
parentExecution *commonpb.WorkflowExecution,
childInfos map[int64]*persistencespb.ChildExecutionInfo,
) error {
Expand All @@ -1307,22 +1306,27 @@ func (t *transferQueueActiveTaskExecutor) processParentClosePolicy(
scope := t.metricsClient.Scope(metrics.TransferActiveTaskCloseExecutionScope)

if t.shard.GetConfig().EnableParentClosePolicyWorker() &&
len(childInfos) >= t.shard.GetConfig().ParentClosePolicyThreshold(namespace) {
len(childInfos) >= t.shard.GetConfig().ParentClosePolicyThreshold(namespaceName) {

executions := make([]parentclosepolicy.RequestDetail, 0, len(childInfos))
for _, childInfo := range childInfos {
if childInfo.ParentClosePolicy == enumspb.PARENT_CLOSE_POLICY_ABANDON {
continue
}

childNamespaceId, err := t.registry.GetNamespaceID(ns.Name(childInfo.GetNamespace()))
if err != nil {
childNamespaceId, err := t.registry.GetNamespaceID(namespace.Name(childInfo.GetNamespace()))
switch err.(type) {
case nil:
case *serviceerror.NotFound:
// If child namespace is deleted there is nothing to close.
continue
default:
return err
}

executions = append(executions, parentclosepolicy.RequestDetail{
Namespace: childInfo.Namespace,
NamespaceID: string(childNamespaceId),
NamespaceID: childNamespaceId.String(),
WorkflowID: childInfo.StartedWorkflowId,
RunID: childInfo.StartedRunId,
Policy: childInfo.ParentClosePolicy,
Expand All @@ -1334,7 +1338,7 @@ func (t *transferQueueActiveTaskExecutor) processParentClosePolicy(
}

request := parentclosepolicy.Request{
Namespace: namespace,
Namespace: namespaceName,
NamespaceID: namespaceID,
ParentExecution: *parentExecution,
Executions: executions,
Expand Down Expand Up @@ -1371,12 +1375,15 @@ func (t *transferQueueActiveTaskExecutor) applyParentClosePolicy(
return nil

case enumspb.PARENT_CLOSE_POLICY_TERMINATE:
childNamespaceId, err := t.registry.GetNamespaceID(ns.Name(childInfo.GetNamespace()))
if err != nil {
childNamespaceId, err := t.registry.GetNamespaceID(namespace.Name(childInfo.GetNamespace()))
switch err.(type) {
case nil, *serviceerror.NotFound:
// If child namespace is deleted there is nothing to close.
default:
return err
}
_, err = t.historyClient.TerminateWorkflowExecution(ctx, &historyservice.TerminateWorkflowExecutionRequest{
NamespaceId: string(childNamespaceId),
NamespaceId: childNamespaceId.String(),
TerminateRequest: &workflowservice.TerminateWorkflowExecutionRequest{
Namespace: childInfo.GetNamespace(),
WorkflowExecution: &commonpb.WorkflowExecution{
Expand All @@ -1394,13 +1401,16 @@ func (t *transferQueueActiveTaskExecutor) applyParentClosePolicy(
return err

case enumspb.PARENT_CLOSE_POLICY_REQUEST_CANCEL:
nsId, err := t.registry.GetNamespaceID(ns.Name(childInfo.GetNamespace()))
if err != nil {
childNamespaceId, err := t.registry.GetNamespaceID(namespace.Name(childInfo.GetNamespace()))
switch err.(type) {
case nil, *serviceerror.NotFound:
// If child namespace is deleted there is nothing to close.
default:
return err
}

_, err = t.historyClient.RequestCancelWorkflowExecution(ctx, &historyservice.RequestCancelWorkflowExecutionRequest{
NamespaceId: string(nsId),
NamespaceId: childNamespaceId.String(),
CancelRequest: &workflowservice.RequestCancelWorkflowExecutionRequest{
Namespace: childInfo.GetNamespace(),
WorkflowExecution: &commonpb.WorkflowExecution{
Expand Down

0 comments on commit 4c4652d

Please sign in to comment.