-
Notifications
You must be signed in to change notification settings - Fork 910
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor the Update type to a state machine #4297
Refactor the Update type to a state machine #4297
Conversation
} else { | ||
s.NoError(err) | ||
require.NoError(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block here is the loop body of a table-driven test within a Suite. Using s.Error or s.Contains here is wrong because it fails the parent test rather than the current test as we're iterating through the testcase data.
// 3rd attempt UpdateWorkflowExecution call has timed out but the | ||
// update is still running | ||
updRequestMsg := task.Messages[0] | ||
s.EqualValues(9, updRequestMsg.GetEventId()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the update was cleared/deleted at the end of the RPC handler function (with a defer), this was correct. However as part of supporting async updates, we will need to let Updates exist in the Registry longer than the RPC handler function is on the stack. Thus this modification to the test to expect that the update has not gone away at this point.
|
||
type Controller interface { | ||
OnAfterCommit(func(context.Context)) | ||
OnAfterRollback(func(context.Context)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
passing context here not so much for timeout/cancellation as for the possibility that there might be useful values attached - specifically otel spans. I also considered including a ControllerID in the callback so that these funcs could verify that the caller is actually the right caller. But these callbacks need to be infallible so it felt like there wasn't any point to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For encapsulation, I'd prefer for this Invoke function to be in the update package. Maybe in the future - not needed for this PR
RegistryStore interface { | ||
GetAcceptedWorkflowExecutionUpdateIDs(context.Context) []string | ||
GetUpdateInfo(ctx context.Context, updateID string) (*persistencespb.UpdateInfo, bool) | ||
GetUpdateOutcome(ctx context.Context, updateID string) (*updatepb.Outcome, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I split the Registry storage needs and the Event storage needs since they're used separately. In practice, they're both MutableState.
) | ||
|
||
var _ Registry = (*RegistryImpl)(nil) | ||
//revive:disable:unexported-return I *want* it to be unexported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want external packages to be able to create their own option implementations. I get that there's a hack where it's still possible, but this makes it harder and sends a message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving to an internal
package instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg
Find(ctx context.Context, protocolInstanceID string) (*Update, bool) | ||
|
||
// CreateOutgoingMessages polls each registered Update for outbound | ||
// messages and returns them. | ||
CreateOutgoingMessages(startedEventID int64) ([]*protocolpb.Message, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startedEventID probably shouldn't be here - it breaks layering - but it's fine for now.
// effectively gives us update request deduplication by update ID. If the Update | ||
// is in stateAdmitted then it builds a protocolpb.Message that will be sent on | ||
// ensuing calls to PollOutboundMessages until the update is accepted. | ||
func (u *Update) onRequestMsg( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pattern for all the on<Type>Msg functions:
- assert expected state(s)
- validate message
- update msg counter
- write event(s) if applicable
- transition to next provisional state
- register OnAfterCommit to transition from Provisional to non-Provisional state and set Future(s) if applicable
- register OnAfterRollback to undo transition into provisional
// lazyOutcome adapts a func to the future.Future[*updatepb.Outcome] interface | ||
lazyOutcome func(context.Context) (*updatepb.Outcome, error) | ||
|
||
instrumentation struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't be the worst idea to make this type and its fields exported in common
(remove the member functions) and attach an instance to every context.Context. Would save us a lot of boilerplate moving these three types around to various places.
); err != nil { | ||
return handler.failWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_UPDATE_WORKFLOW_EXECUTION_MESSAGE, err) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next 4 lines probably belong in a protocol utility package. TODO.
effects.Cancel(ctx) | ||
} | ||
effects.Apply(ctx) | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like the safest thing but the error handling is weird between here and workflowTaskHandler so I've also added additional calls to effects.Cancel in some key places - namely, if command/message handling fails and if the MutableState write fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? To cancel effects earlier? I think this PR comment needs to be converted into code comment.
// Buffer holds a set of effect and rollback functions that can be invoked as a | ||
// batch with a defined order. Once either Apply or Cancel is called, all | ||
// buffered effects are cleared. This type is not threadsafe. The zero-value of | ||
// a Buffer is a valid state. | ||
type Buffer struct { | ||
effects []func(context.Context) | ||
cancels []func(context.Context) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As noted that this type is not threadsafe is this not an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just means that it's up to the caller to make sure that there's no concurrent usage. Pretty common for Go types.
OnAfterCommit(func(context.Context)) | ||
OnAfterRollback(func(context.Context)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about creating a separate type for func(context.Context)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want for values to be able to implement this interface without referencing this package. If I use type CommitCallback func(context.Context)
or similar then an implementer will have to import this pkg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg
upd, duplicate, removeFn := weCtx.GetContext().UpdateRegistry().Add(req.GetRequest().GetRequest()) | ||
if removeFn != nil { | ||
defer removeFn() | ||
updateID := req.GetRequest().GetRequest().GetMeta().GetUpdateId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤯 .GetRequest().GetRequest()
Another question is there a possible NPE(nil-pointer-exception)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Request.Request is because the pattern used is to wrap a workflowservice request with a historyservice request. This is pretty common in our system. There's no NPE concern because there's upstream validation (in Frontend service) and because the Go codegen for protobuf types includes a nil check in the Get<Thing>() function definitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty common in our system
is a wrong argument IMO ;)
And from the user's perspective, it looks like a bug having Request.Request.
IMO it should be something like this:
req.GetWorkflowServiceRequest()
req.GetHistoryServiceRequest()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is matter of preferences, but WorkflowService.Request
is a type. Including type name in field name is not good. Everyone just used to the fact that in both matching and history service, inner request is original FE request. GetOriginRequest
? GetFrontendRequest
? GetUserRequest
? Just GetRequest
!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something smells here. You won't make me believe that having .GetRequest().GetRequest()
is the correct approach.
Shouldn't WorkflowService be able to return UpdateId
without knowing the internals of the n-th level dependency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant is that wrapping an api request in a history request is common
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't WorkflowService be able to return UpdateId without knowing the internals of the n-th level dependency?
It absolutely should. Law of Demeter and all that. Problem is that we're dealing with generated code here and Protobuf doesn't have any way to specify inline embedding of messages so you end up with GetParent().GetChild().GetFoo(). If in a proto message you could specify something like "all the field from that message plus these extra fields" (similar to struct embedding in Go) then we could specify these messages that way and we wouldn't have this problem. But that capability doesn't exist.
I mean if you want to get down to it, I don't like the "Get" part of these method names or the fact that proto doesn't capitalize the last 'd' in UpdateId. Working with these generated types is always jarring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I am happy with Id
. I think we should change all ID
in our go code base to Id
.
sync.RWMutex | ||
updates map[string]*Update | ||
store Storage | ||
mu sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: what is the reasoning for naming the sync.RWMutex
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I consider it an antipattern and agree with uber-go/guide#127
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL
) | ||
|
||
var _ Registry = (*RegistryImpl)(nil) | ||
//revive:disable:unexported-return I *want* it to be unexported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving to an internal
package instead?
weContext.UpdateRegistry(ctx), | ||
&effects, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the function newWorkflowTaskHandler
has too many arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't disagree. But that's going to be out of scope for this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not asking to fix this in this PR; I'm just pointing here as you added more arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is genius. I haven't heard it!
} | ||
} | ||
|
||
type MutableStateWithEffects struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why here? Why not mutable_state_with_effects.go
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
40% lazy; 60% it's just a quick utility class to bring together two types so it seemed like util.go
was reasonable. I don't subscribe to one-type == one-file.
upd, duplicate, removeFn := weCtx.GetContext().UpdateRegistry().Add(req.GetRequest().GetRequest()) | ||
if removeFn != nil { | ||
defer removeFn() | ||
updateID := req.GetRequest().GetRequest().GetMeta().GetUpdateId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is matter of preferences, but WorkflowService.Request
is a type. Including type name in field name is not good. Everyone just used to the fact that in both matching and history service, inner request is original FE request. GetOriginRequest
? GetFrontendRequest
? GetUserRequest
? Just GetRequest
!
outcome *future.FutureImpl[*updatepb.Outcome] | ||
pendingOutcome *updatepb.Outcome | ||
// accessed only while holding workflow lock | ||
id string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
id
field in Update
struct reads as UpdateID
. I believe MessageID
is a better name here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the UpdateID though. MessageID is built just-in-time when sending the protocolpb.Message
func invalidArgf(tmpl string, args ...any) error { | ||
return serviceerror.NewInvalidArgument(fmt.Sprintf(tmpl, args...)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about helpers like this. If we use the everywhere and build up a culture of using it, yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be one of these in common/something for each of the types in serviceerror. But that's a different PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the moment you move it out it becomes not that attractive, because it needs to be prefixed with package name. I whould add serviceerror.NewInvalidArgumentf
for every error in serviceerror
package.
effects.Cancel(ctx) | ||
} | ||
effects.Apply(ctx) | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? To cancel effects earlier? I think this PR comment needs to be converted into code comment.
@@ -468,8 +476,6 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( | |||
activityNotStartedCancelled bool | |||
newMutableState workflow.MutableState | |||
) | |||
// hasPendingUpdates indicates if there are more pending updates (excluding those which are accepted/rejected by this workflow task). | |||
hasPendingUpdates := weContext.UpdateRegistry().HasPending(request.GetMessages()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am super happy see this getting removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha. If only it didn't break 🤦
switch command.GetCommandType() { | ||
case enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK: | ||
return handler.handleCommandScheduleActivity(ctx, command.GetScheduleActivityTaskCommandAttributes()) | ||
|
||
case enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION: | ||
if err := earlyDeliverMessages(ctx); err != nil { | ||
return nil, err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexshtin this what gets us around the problem discovered last night - early deliver messages in this one case so that update completion is visible to the command handler.
12 WorkflowExecutionCompleted`, events) | ||
12 WorkflowExecutionUpdateAccepted | ||
13 WorkflowExecutionUpdateCompleted | ||
14 WorkflowExecutionCompleted`, events) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Old expectation was based on a bug - we shouldn't be able to complete executions with incomplete updates so either we see UpdateAccepted+UpdateCompleted before the WorkflowExecutionCompleted or the history is invalid.
upd, duplicate, removeFn := weCtx.GetContext().UpdateRegistry().Add(req.GetRequest().GetRequest()) | ||
if removeFn != nil { | ||
defer removeFn() | ||
updateID := req.GetRequest().GetRequest().GetMeta().GetUpdateId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I am happy with Id
. I think we should change all ID
in our go code base to Id
.
func invalidArgf(tmpl string, args ...any) error { | ||
return serviceerror.NewInvalidArgument(fmt.Sprintf(tmpl, args...)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the moment you move it out it becomes not that attractive, because it needs to be prefixed with package name. I whould add serviceerror.NewInvalidArgumentf
for every error in serviceerror
package.
weContext.UpdateRegistry(ctx), | ||
&effects, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is genius. I haven't heard it!
This is a very good PR. I learnt few things reviewing it. |
This is a step in the direction of being able to process messages one at a time rather than in a batch. Individual message processing will be required to respect protocols (like update) that need message processing that is ordered with respect to other commands. Additionally we lay the groundwork for async updates and outcome polling. A summary of what changed: - Write update.Update as a protocol state machine that encapsulates all message processing, validation, event writing, and metric emission related to an update. - Make explicit the set of "Provisional" state transitions wherein the Update has recieved and reacted to a protocol message but cannot make externally observable changes like returning update outcomes to RPC callers until it knows that changes have been written to stable storage. - The effect package is introduced as a utility to buffer and later apply or cancel a set of effects. - With more responsibility in the Update type, UpdateRegistry is simplified. - UpdateRegistry can restore its state (mainly in-flight Accepted updates) from MutableState. - Update message validation is moved into the update package. - UpdateRegistry differentiates between having incomplete updates (which prevents a workflow from closing) and having messages to send (which may trigger the creation of a new workflow task) - Removed a chunk of processing from workflowTaskHandler that is now encapsulated in the Update state machine, allowing task handling to be more decoupled from update protocol semantics.
Return true for found, false for not found.
Closing a workflow with incomplete updates causes an error but it may be the case that the messages that complete the incomplete updates are in the same WFTCompleted message as the CompleteWorkflowExecution command. In such cases becuase we currently deliver all messages _after_ all commands, the fact that the update completed is not known at the time we attempt to handle the CompleteWorkflowExecution command and thus the WFTCompletion fails. Here we add (temporarily) the ability to early-deliver messages for this one command type. Soon the messages will be sequenced in with the commands so that this problem goes away.
Improves ergonomics of the state type and adds state masking to avoid allocating temporary slices for every call.
This reverts commit 1c7e895.
What changed?
Why?
This is a step in the direction of being able to process messages one at a time rather than in a batch. Individual message processing will be required to respect protocols (like update) that need message processing that is ordered with respect to other commands. Additionally we lay the groundwork for async updates and outcome polling.
How did you test it?
Potential risks
I'm not thrilled about the size of this PR but there were a number of things that could not be teased apart. Changes external to the update package were minimized
Is hotfix candidate?
no.