Skip to content

Commit

Permalink
Validate workflow task start time when complete
Browse files Browse the repository at this point in the history
  • Loading branch information
yiminc committed Jul 21, 2023
1 parent fdb94a5 commit 742f66f
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 104 deletions.
172 changes: 124 additions & 48 deletions api/token/v1/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions common/tasktoken/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
package tasktoken

import (
"time"

v11 "go.temporal.io/server/api/clock/v1"
tokenspb "go.temporal.io/server/api/token/v1"
)
Expand All @@ -35,6 +37,7 @@ func NewWorkflowTaskToken(
runID string,
scheduledEventID int64,
startedEventId int64,
startedTime *time.Time,
attempt int32,
clock *v11.VectorClock,
version int64,
Expand All @@ -45,6 +48,7 @@ func NewWorkflowTaskToken(
RunId: runID,
ScheduledEventId: scheduledEventID,
StartedEventId: startedEventId,
StartedTime: startedTime,
Attempt: attempt,
Clock: clock,
Version: version,
Expand Down
3 changes: 3 additions & 0 deletions proto/internal/temporal/server/api/token/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ package temporal.server.api.token.v1;

option go_package = "go.temporal.io/server/api/token/v1;token";

import "google/protobuf/timestamp.proto";
import "dependencies/gogoproto/gogo.proto";
import "temporal/server/api/clock/v1/message.proto";
import "temporal/server/api/history/v1/message.proto";

Expand Down Expand Up @@ -63,6 +65,7 @@ message Task {
temporal.server.api.clock.v1.VectorClock clock = 9;
int64 started_event_id = 10;
int64 version = 11;
google.protobuf.Timestamp started_time = 12 [(gogoproto.stdtime) = true];
}

message QueryTask {
Expand Down
1 change: 1 addition & 0 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,7 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(
taskToken.GetRunId(),
histResp.StartedResponse.GetScheduledEventId(),
histResp.StartedResponse.GetStartedEventId(),
histResp.StartedResponse.GetStartedTime(),
histResp.StartedResponse.GetAttempt(),
histResp.StartedResponse.GetClock(),
histResp.StartedResponse.GetVersion(),
Expand Down
3 changes: 2 additions & 1 deletion service/history/api/startworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ import (
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/api/historyservice/v1"

"go.temporal.io/server/common/tasktoken"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -543,6 +543,7 @@ func (s *Starter) generateResponse(
runID,
workflowTaskInfo.ScheduledEventID,
workflowTaskInfo.StartedEventID,
workflowTaskInfo.StartedTime,
workflowTaskInfo.Attempt,
clock,
workflowTaskInfo.Version,
Expand Down
32 changes: 19 additions & 13 deletions service/history/workflow_task_handler_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,11 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskFailed(
if workflowTask == nil ||
workflowTask.StartedEventID == common.EmptyEventID ||
(token.StartedEventId != common.EmptyEventID && token.StartedEventId != workflowTask.StartedEventID) ||
(token.StartedTime != nil && workflowTask.StartedTime != nil && !token.StartedTime.Equal(*workflowTask.StartedTime)) ||
workflowTask.Attempt != token.Attempt ||
(workflowTask.Version != common.EmptyVersion && token.Version != workflowTask.Version) {
// we have not alter mutable state yet, so release with it with nil to avoid clear MS.
workflowContext.GetReleaseFn()(nil)
return nil, serviceerror.NewNotFound("Workflow task not found.")
}

Expand Down Expand Up @@ -393,6 +396,22 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
if err != nil {
return nil, err
}
weContext := workflowContext.GetContext()
ms := workflowContext.GetMutableState()

currentWorkflowTask := ms.GetWorkflowTaskByID(token.GetScheduledEventId())
if !ms.IsWorkflowExecutionRunning() ||
currentWorkflowTask == nil ||
currentWorkflowTask.StartedEventID == common.EmptyEventID ||
(token.StartedEventId != common.EmptyEventID && token.StartedEventId != currentWorkflowTask.StartedEventID) ||
(token.StartedTime != nil && currentWorkflowTask.StartedTime != nil && !token.StartedTime.Equal(*currentWorkflowTask.StartedTime)) ||
currentWorkflowTask.Attempt != token.Attempt ||
(token.Version != common.EmptyVersion && token.Version != currentWorkflowTask.Version) {
// we have not alter mutable state yet, so release with it with nil to avoid clear MS.
workflowContext.GetReleaseFn()(nil)
return nil, serviceerror.NewNotFound("Workflow task not found.")
}

defer func() { workflowContext.GetReleaseFn()(retError) }()

var effects effect.Buffer
Expand All @@ -410,19 +429,6 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
effects.Apply(ctx)
}()

weContext := workflowContext.GetContext()
ms := workflowContext.GetMutableState()

currentWorkflowTask := ms.GetWorkflowTaskByID(token.GetScheduledEventId())
if !ms.IsWorkflowExecutionRunning() ||
currentWorkflowTask == nil ||
currentWorkflowTask.StartedEventID == common.EmptyEventID ||
(token.StartedEventId != common.EmptyEventID && token.StartedEventId != currentWorkflowTask.StartedEventID) ||
currentWorkflowTask.Attempt != token.Attempt ||
(token.Version != common.EmptyVersion && token.Version != currentWorkflowTask.Version) {
return nil, serviceerror.NewNotFound("Workflow task not found.")
}

// It's an error if the workflow has used versioning in the past but this task has no versioning info.
if ms.GetWorkerVersionStamp().GetUseVersioning() && !request.GetWorkerVersionStamp().GetUseVersioning() {
return nil, serviceerror.NewInvalidArgument("Workflow using versioning must continue to use versioning.")
Expand Down
Loading

0 comments on commit 742f66f

Please sign in to comment.