Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify error handling of CRUD operation of workflow within shard context #2662

Merged
merged 2 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2504,7 +2504,7 @@ func (e *historyEngineImpl) ResetWorkflowExecution(
}

// also load the current run of the workflow, it can be different from the base runID
resp, err := e.executionManager.GetCurrentExecution(ctx, &persistence.GetCurrentExecutionRequest{
resp, err := e.shard.GetCurrentExecution(ctx, &persistence.GetCurrentExecutionRequest{
ShardID: e.shard.GetShardID(),
NamespaceID: namespaceID.String(),
WorkflowID: request.WorkflowExecution.GetWorkflowId(),
Expand Down Expand Up @@ -3596,7 +3596,7 @@ func (e *historyEngineImpl) loadWorkflow(
}

// workflow not running, need to check current record
resp, err := e.shard.GetExecutionManager().GetCurrentExecution(
resp, err := e.shard.GetCurrentExecution(
ctx,
&persistence.GetCurrentExecutionRequest{
ShardID: e.shard.GetShardID(),
Expand Down
4 changes: 2 additions & 2 deletions service/history/nDCTransactionMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (r *nDCTransactionMgrImpl) checkWorkflowExists(
runID string,
) (bool, error) {

_, err := r.shard.GetExecutionManager().GetWorkflowExecution(
_, err := r.shard.GetWorkflowExecution(
ctx,
&persistence.GetWorkflowExecutionRequest{
ShardID: r.shard.GetShardID(),
Expand All @@ -424,7 +424,7 @@ func (r *nDCTransactionMgrImpl) getCurrentWorkflowRunID(
workflowID string,
) (string, error) {

resp, err := r.shard.GetExecutionManager().GetCurrentExecution(
resp, err := r.shard.GetCurrentExecution(
ctx,
&persistence.GetCurrentExecutionRequest{
ShardID: r.shard.GetShardID(),
Expand Down
2 changes: 2 additions & 0 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ type (
UpdateWorkflowExecution(ctx context.Context, request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error)
ConflictResolveWorkflowExecution(ctx context.Context, request *persistence.ConflictResolveWorkflowExecutionRequest) (*persistence.ConflictResolveWorkflowExecutionResponse, error)
SetWorkflowExecution(ctx context.Context, request *persistence.SetWorkflowExecutionRequest) (*persistence.SetWorkflowExecutionResponse, error)
GetCurrentExecution(ctx context.Context, request *persistence.GetCurrentExecutionRequest) (*persistence.GetCurrentExecutionResponse, error)
GetWorkflowExecution(ctx context.Context, request *persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error)
// DeleteWorkflowExecution deletes workflow execution, current workflow execution, and add task to delete visibility.
// If branchToken != nil, then delete history also, otherwise leave history.
DeleteWorkflowExecution(ctx context.Context, workflowKey definition.WorkflowKey, branchToken []byte, version int64, startTime *time.Time, closeTime *time.Time) error
Expand Down
62 changes: 50 additions & 12 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ import (
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/searchattribute"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
Expand Down Expand Up @@ -599,7 +599,7 @@ func (s *ContextImpl) CreateWorkflowExecution(
currentRangeID := s.getRangeIDLocked()
request.RangeID = currentRangeID
resp, err := s.executionManager.CreateWorkflowExecution(ctx, request)
if err = s.handleErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
if err = s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
return nil, err
}
return resp, nil
Expand Down Expand Up @@ -647,7 +647,7 @@ func (s *ContextImpl) UpdateWorkflowExecution(
currentRangeID := s.getRangeIDLocked()
request.RangeID = currentRangeID
resp, err := s.executionManager.UpdateWorkflowExecution(ctx, request)
if err = s.handleErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
if err = s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
return nil, err
}
return resp, nil
Expand Down Expand Up @@ -705,7 +705,7 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution(
currentRangeID := s.getRangeIDLocked()
request.RangeID = currentRangeID
resp, err := s.executionManager.ConflictResolveWorkflowExecution(ctx, request)
if err = s.handleErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
if err = s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
return nil, err
}
return resp, nil
Expand Down Expand Up @@ -743,7 +743,29 @@ func (s *ContextImpl) SetWorkflowExecution(
currentRangeID := s.getRangeIDLocked()
request.RangeID = currentRangeID
resp, err := s.executionManager.SetWorkflowExecution(ctx, request)
if err = s.handleErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
if err = s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
return nil, err
}
return resp, nil
}

func (s *ContextImpl) GetCurrentExecution(
ctx context.Context,
request *persistence.GetCurrentExecutionRequest,
) (*persistence.GetCurrentExecutionResponse, error) {
resp, err := s.executionManager.GetCurrentExecution(ctx, request)
if err = s.handleReadError(err); err != nil {
return nil, err
}
return resp, nil
}

func (s *ContextImpl) GetWorkflowExecution(
ctx context.Context,
request *persistence.GetWorkflowExecutionRequest,
) (*persistence.GetWorkflowExecutionResponse, error) {
resp, err := s.executionManager.GetWorkflowExecution(ctx, request)
if err = s.handleReadError(err); err != nil {
return nil, err
}
return resp, nil
Expand All @@ -766,7 +788,7 @@ func (s *ContextImpl) addTasksLocked(

request.RangeID = s.getRangeIDLocked()
err := s.executionManager.AddHistoryTasks(ctx, request)
if err = s.handleErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
if err = s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
return err
}
s.engine.NotifyNewTasks(namespaceEntry.ActiveClusterName(), request.Tasks)
Expand Down Expand Up @@ -1006,7 +1028,7 @@ func (s *ContextImpl) renewRangeLocked(isStealing bool) error {
tag.ShardRangeID(updatedShardInfo.GetRangeId()),
tag.PreviousShardRangeID(s.shardInfo.GetRangeId()),
)
return s.handleErrorLocked(err)
return s.handleWriteErrorLocked(err)
}

// Range is successfully updated in cassandra now update shard context to reflect new range
Expand Down Expand Up @@ -1050,7 +1072,7 @@ func (s *ContextImpl) updateShardInfoLocked() error {
PreviousRangeID: s.shardInfo.GetRangeId(),
})
if err != nil {
return s.handleErrorLocked(err)
return s.handleWriteErrorLocked(err)
}

s.lastUpdated = now
Expand Down Expand Up @@ -1212,12 +1234,28 @@ func (s *ContextImpl) GetLastUpdatedTime() time.Time {
return s.lastUpdated
}

func (s *ContextImpl) handleErrorLocked(err error) error {
func (s *ContextImpl) handleReadError(err error) error {
switch err.(type) {
case nil:
return nil

case *persistence.ShardOwnershipLostError:
// Shard is stolen, trigger shutdown of history engine.
// Handling of max read level doesn't matter here.
s.Unload()
return err

default:
return err
}
}

func (s *ContextImpl) handleWriteErrorLocked(err error) error {
// We can use 0 here since updateMaxReadLevelLocked ensures that the read level never goes backwards.
return s.handleErrorAndUpdateMaxReadLevelLocked(err, 0)
return s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, 0)
}

func (s *ContextImpl) handleErrorAndUpdateMaxReadLevelLocked(err error, newMaxReadLevel int64) error {
func (s *ContextImpl) handleWriteErrorAndUpdateMaxReadLevelLocked(err error, newMaxReadLevel int64) error {
switch err.(type) {
case nil:
// Persistence success: update max read level
Expand Down
30 changes: 30 additions & 0 deletions service/history/shard/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion service/history/transferQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ func (t *transferQueueActiveTaskExecutor) processResetWorkflow(
if !currentMutableState.IsWorkflowExecutionRunning() {
// it means this this might not be current anymore, we need to check
var resp *persistence.GetCurrentExecutionResponse
resp, err = t.shard.GetExecutionManager().GetCurrentExecution(ctx, &persistence.GetCurrentExecutionRequest{
resp, err = t.shard.GetCurrentExecution(ctx, &persistence.GetCurrentExecutionRequest{
ShardID: t.shard.GetShardID(),
NamespaceID: task.NamespaceID,
WorkflowID: task.WorkflowID,
Expand Down
22 changes: 10 additions & 12 deletions service/history/workflow/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,10 @@ type (

CacheImpl struct {
cache.Cache
shard shard.Context
executionManager persistence.ExecutionManager
logger log.Logger
metricsClient metrics.Client
config *configs.Config
shard shard.Context
logger log.Logger
metricsClient metrics.Client
config *configs.Config
}

NewCacheFn func(shard shard.Context) Cache
Expand All @@ -94,12 +93,11 @@ func NewCache(shard shard.Context) Cache {
opts.Pin = true

return &CacheImpl{
Cache: cache.New(config.HistoryCacheMaxSize(), opts),
shard: shard,
executionManager: shard.GetExecutionManager(),
logger: log.With(shard.GetLogger(), tag.ComponentHistoryCache),
metricsClient: shard.GetMetricsClient(),
config: config,
Cache: cache.New(config.HistoryCacheMaxSize(), opts),
shard: shard,
logger: log.With(shard.GetLogger(), tag.ComponentHistoryCache),
metricsClient: shard.GetMetricsClient(),
config: config,
}
}

Expand Down Expand Up @@ -268,7 +266,7 @@ func (c *CacheImpl) getCurrentExecutionWithRetry(
var response *persistence.GetCurrentExecutionResponse
op := func() error {
var err error
response, err = c.executionManager.GetCurrentExecution(ctx, request)
response, err = c.shard.GetCurrentExecution(ctx, request)

return err
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/transaction_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func getWorkflowExecutionWithRetry(
var resp *persistence.GetWorkflowExecutionResponse
op := func() error {
var err error
resp, err = shard.GetExecutionManager().GetWorkflowExecution(context.TODO(), request)
resp, err = shard.GetWorkflowExecution(context.TODO(), request)

return err
}
Expand Down
4 changes: 1 addition & 3 deletions service/history/workflowRebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ type (
workflowRebuilderImpl struct {
shard shard.Context
workflowCache workflow.Cache
executionMgr persistence.ExecutionManager
newStateRebuilder nDCStateRebuilderProvider
transaction workflow.Transaction
logger log.Logger
Expand All @@ -73,7 +72,6 @@ func NewWorkflowRebuilder(
return &workflowRebuilderImpl{
shard: shard,
workflowCache: workflowCache,
executionMgr: shard.GetExecutionManager(),
newStateRebuilder: func() nDCStateRebuilder {
return newNDCStateRebuilder(shard, logger)
},
Expand Down Expand Up @@ -195,7 +193,7 @@ func (r *workflowRebuilderImpl) getMutableState(
ctx context.Context,
workflowKey definition.WorkflowKey,
) (*persistencespb.WorkflowMutableState, int64, error) {
record, err := r.executionMgr.GetWorkflowExecution(ctx, &persistence.GetWorkflowExecutionRequest{
record, err := r.shard.GetWorkflowExecution(ctx, &persistence.GetWorkflowExecutionRequest{
ShardID: r.shard.GetShardID(),
NamespaceID: workflowKey.NamespaceID,
WorkflowID: workflowKey.WorkflowID,
Expand Down