From 130d36eb5d9669f4c974211a917a259972d78c47 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Thu, 28 Apr 2022 11:16:20 -0700 Subject: [PATCH] Move history API logic into its own package (#2778) * Move workflow creation logic util into api package * Simplify history engine impl --- service/history/api/start_workflow_uti.go | 128 ++++++++++++++++++ service/history/historyEngine.go | 104 +------------- .../history/timerQueueActiveTaskExecutor.go | 3 +- service/history/workflowTaskHandler.go | 5 +- 4 files changed, 138 insertions(+), 102 deletions(-) create mode 100644 service/history/api/start_workflow_uti.go diff --git a/service/history/api/start_workflow_uti.go b/service/history/api/start_workflow_uti.go new file mode 100644 index 00000000000..4b968818aa3 --- /dev/null +++ b/service/history/api/start_workflow_uti.go @@ -0,0 +1,128 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package api + +import ( + commonpb "go.temporal.io/api/common/v1" + historypb "go.temporal.io/api/history/v1" + "go.temporal.io/api/workflowservice/v1" + + "go.temporal.io/server/api/historyservice/v1" + workflowspb "go.temporal.io/server/api/workflow/v1" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/workflow" +) + +func NewWorkflowWithSignal( + shard shard.Context, + namespaceEntry *namespace.Namespace, + execution commonpb.WorkflowExecution, + startRequest *historyservice.StartWorkflowExecutionRequest, + signalWithStartRequest *workflowservice.SignalWithStartWorkflowExecutionRequest, +) (workflow.Context, workflow.MutableState, error) { + newMutableState, err := CreateMutableState(shard, namespaceEntry, execution.GetRunId()) + if err != nil { + return nil, nil, err + } + + startEvent, err := newMutableState.AddWorkflowExecutionStartedEvent( + execution, + startRequest, + ) + if err != nil { + return nil, nil, err + } + + if signalWithStartRequest != nil { + if signalWithStartRequest.GetRequestId() != "" { + newMutableState.AddSignalRequested(signalWithStartRequest.GetRequestId()) + } + if _, err := newMutableState.AddWorkflowExecutionSignaled( + signalWithStartRequest.GetSignalName(), + signalWithStartRequest.GetSignalInput(), + signalWithStartRequest.GetIdentity(), + signalWithStartRequest.GetHeader(), + ); err != nil { + return nil, nil, err + } + } + + // Generate first workflow task event if not child WF and no first workflow task backoff + if err := GenerateFirstWorkflowTask( + newMutableState, + startRequest.ParentExecutionInfo, + startEvent, + ); err != nil { + return nil, nil, err + } + + newWorkflowContext := workflow.NewContext( + shard, + definition.NewWorkflowKey( + namespaceEntry.ID().String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ), + shard.GetLogger(), + ) + return newWorkflowContext, newMutableState, nil +} + +func CreateMutableState( + shard shard.Context, + namespaceEntry *namespace.Namespace, + runID string, +) (workflow.MutableState, error) { + newMutableState := workflow.NewMutableState( + shard, + shard.GetEventsCache(), + shard.GetLogger(), + namespaceEntry, + shard.GetTimeSource().Now(), + ) + if err := newMutableState.SetHistoryTree(runID); err != nil { + return nil, err + } + return newMutableState, nil +} + +func GenerateFirstWorkflowTask( + mutableState workflow.MutableState, + parentInfo *workflowspb.ParentExecutionInfo, + startEvent *historypb.HistoryEvent, +) error { + + if parentInfo == nil { + // WorkflowTask is only created when it is not a Child Workflow and no backoff is needed + if err := mutableState.AddFirstWorkflowTaskScheduled( + startEvent, + ); err != nil { + return err + } + } + return nil +} diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 3a06ae74fb5..f7734287a2d 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -49,6 +49,7 @@ import ( "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/sdk" + "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/tasks" @@ -58,7 +59,6 @@ import ( "go.temporal.io/server/api/matchingservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" replicationspb "go.temporal.io/server/api/replication/v1" - workflowspb "go.temporal.io/server/api/workflow/v1" "go.temporal.io/server/client/admin" "go.temporal.io/server/client/history" "go.temporal.io/server/common" @@ -464,46 +464,6 @@ func (e *historyEngineImpl) handleClusterMetadataUpdate( } } -func createMutableState( - shard shard.Context, - namespaceEntry *namespace.Namespace, - runID string, -) (workflow.MutableState, error) { - - var newMutableState workflow.MutableState - // version history applies to both local and global namespace - newMutableState = workflow.NewMutableState( - shard, - shard.GetEventsCache(), - shard.GetLogger(), - namespaceEntry, - shard.GetTimeSource().Now(), - ) - - if err := newMutableState.SetHistoryTree(runID); err != nil { - return nil, err - } - - return newMutableState, nil -} - -func (e *historyEngineImpl) generateFirstWorkflowTask( - mutableState workflow.MutableState, - parentInfo *workflowspb.ParentExecutionInfo, - startEvent *historypb.HistoryEvent, -) error { - - if parentInfo == nil { - // WorkflowTask is only created when it is not a Child Workflow and no backoff is needed - if err := mutableState.AddFirstWorkflowTaskScheduled( - startEvent, - ); err != nil { - return err - } - } - return nil -} - // StartWorkflowExecution starts a workflow execution // Consistency guarantee: always write func (e *historyEngineImpl) StartWorkflowExecution( @@ -542,7 +502,7 @@ func (e *historyEngineImpl) StartWorkflowExecution( RunId: uuid.New(), } - weContext, mutableState, err := e.newWorkflowWithSignal(namespaceEntry, execution, startRequest, nil) + weContext, mutableState, err := api.NewWorkflowWithSignal(e.shard, namespaceEntry, execution, startRequest, nil) if err != nil { return nil, err } @@ -629,7 +589,7 @@ func (e *historyEngineImpl) StartWorkflowExecution( ), prevExecutionUpdateAction, func() (workflow.Context, workflow.MutableState, error) { - return e.newWorkflowWithSignal(namespaceEntry, execution, startRequest, nil) + return api.NewWorkflowWithSignal(e.shard, namespaceEntry, execution, startRequest, nil) }, ) switch err { @@ -662,60 +622,6 @@ func (e *historyEngineImpl) StartWorkflowExecution( }, nil } -func (e *historyEngineImpl) newWorkflowWithSignal( - namespaceEntry *namespace.Namespace, - execution commonpb.WorkflowExecution, - startRequest *historyservice.StartWorkflowExecutionRequest, - signalWithStartRequest *workflowservice.SignalWithStartWorkflowExecutionRequest, -) (workflow.Context, workflow.MutableState, error) { - newMutableState, err := createMutableState(e.shard, namespaceEntry, execution.GetRunId()) - if err != nil { - return nil, nil, err - } - - startEvent, err := newMutableState.AddWorkflowExecutionStartedEvent( - execution, - startRequest, - ) - if err != nil { - return nil, nil, err - } - - if signalWithStartRequest != nil { - if signalWithStartRequest.GetRequestId() != "" { - newMutableState.AddSignalRequested(signalWithStartRequest.GetRequestId()) - } - if _, err := newMutableState.AddWorkflowExecutionSignaled( - signalWithStartRequest.GetSignalName(), - signalWithStartRequest.GetSignalInput(), - signalWithStartRequest.GetIdentity(), - signalWithStartRequest.GetHeader(), - ); err != nil { - return nil, nil, err - } - } - - // Generate first workflow task event if not child WF and no first workflow task backoff - if err := e.generateFirstWorkflowTask( - newMutableState, - startRequest.ParentExecutionInfo, - startEvent, - ); err != nil { - return nil, nil, err - } - - newWorkflowContext := workflow.NewContext( - e.shard, - definition.NewWorkflowKey( - namespaceEntry.ID().String(), - execution.GetWorkflowId(), - execution.GetRunId(), - ), - e.logger, - ) - return newWorkflowContext, newMutableState, nil -} - // GetMutableState retrieves the mutable state of the workflow execution func (e *historyEngineImpl) GetMutableState( ctx context.Context, @@ -2201,7 +2107,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution( newWorkflowContext(prevContext, release, prevMutableState), prevExecutionUpdateAction, func() (workflow.Context, workflow.MutableState, error) { - return e.newWorkflowWithSignal(namespaceEntry, execution, startRequest, sRequest) + return api.NewWorkflowWithSignal(e.shard, namespaceEntry, execution, startRequest, sRequest) }, ) switch err { @@ -2219,7 +2125,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution( } now := e.timeSource.Now() - context, mutableState, err := e.newWorkflowWithSignal(namespaceEntry, execution, startRequest, sRequest) + context, mutableState, err := api.NewWorkflowWithSignal(e.shard, namespaceEntry, execution, startRequest, sRequest) if err != nil { return nil, err } diff --git a/service/history/timerQueueActiveTaskExecutor.go b/service/history/timerQueueActiveTaskExecutor.go index 46293a402cc..da4da02d381 100644 --- a/service/history/timerQueueActiveTaskExecutor.go +++ b/service/history/timerQueueActiveTaskExecutor.go @@ -45,6 +45,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" @@ -557,7 +558,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowTimeoutTask( } startAttr := startEvent.GetWorkflowExecutionStartedEventAttributes() - newMutableState, err := createMutableState( + newMutableState, err := api.CreateMutableState( t.shard, mutableState.GetNamespaceEntry(), newRunID, diff --git a/service/history/workflowTaskHandler.go b/service/history/workflowTaskHandler.go index 9566849ee6c..a7a0c997b3f 100644 --- a/service/history/workflowTaskHandler.go +++ b/service/history/workflowTaskHandler.go @@ -49,6 +49,7 @@ import ( "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/searchattribute" + "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" @@ -1031,7 +1032,7 @@ func (handler *workflowTaskHandlerImpl) handleRetry( } startAttr := startEvent.GetWorkflowExecutionStartedEventAttributes() - newStateBuilder, err := createMutableState( + newStateBuilder, err := api.CreateMutableState( handler.shard, handler.mutableState.GetNamespaceEntry(), newRunID, @@ -1075,7 +1076,7 @@ func (handler *workflowTaskHandlerImpl) handleCron( lastCompletionResult = startAttr.LastCompletionResult } - newStateBuilder, err := createMutableState( + newStateBuilder, err := api.CreateMutableState( handler.shard, handler.mutableState.GetNamespaceEntry(), newRunID,