-
Notifications
You must be signed in to change notification settings - Fork 910
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async Workflow Update Outcome Polling #4154
Conversation
Results of completed updates are stored as pointers into the event history and can be retrieved by update ID.
The trickiness here is that workflow.Context instances can be paged in and out of a cache so you don't want your observer watching a Context that has been evicted. The work here allows observers to be reconnected to a Context when it is loaded back into cache. The Observer module makes not assumption about how observer registration or notification is performed, it only promises to call the Connect function on all appropriate Observer instances, concrete implementations of which will do the type-specific registration and notification.
future.Proxy implements a future.Future that wraps an underlying target Future. That target instance can be changed at runtime without disturbing any existing pollers (i.e. callers of Get) on the Proxy object via the Proxy.Rebind call.
Making use of workflow observers to watch in-flight updates and outcome storage in MutableState/History for completed updates.
if errors.Is(err, context.Canceled) && parentCtx.Err() == nil { | ||
// local context was canceled but not the parent context - | ||
// this only happens via a call to Rebind so here we loop back to | ||
// call Get() again on the new target. | ||
continue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a feeling that this can be done with channels in a more clear way. Here, call should block on select
from localTarget.Get
or channel to which Rebind
sends new future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would very much like for this function to be cleaner but I wasn't able to find a way. Can you pseudo-code something up? If I'm understanding correctly, if the number of concurrent callers is >1 and we write the "next" future into a channel, one caller would read it from the channel but how would the others know? Or is the suggestion to have one channel per caller and write the future into all of the channels?
@@ -3202,6 +3241,7 @@ func (ms *MutableStateImpl) AddWorkflowExecutionUpdateCompletedEvent(updResp *up | |||
return nil, err | |||
} | |||
event := ms.hBuilder.AddWorkflowExecutionUpdateCompletedEvent(updResp) | |||
ms.updateOutcomes[updResp.GetMeta().GetUpdateId()] = &historyspb.EventHistoryPointer{EventId: event.GetEventId()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line needs to be in ReplicateWorkflowExecutionUpdateCompletedEvent
which also needs to be called from ReapplyEvents
func. This will address comment bellow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure which you mean by "comment below" or if you're saying that this line should be in both places or just one or the other?
// so a caller should avoid doing things like calling ObserverSet.Remove | ||
// from within a call to ObserverSet.FindOrCreate. | ||
ObserverSet struct { | ||
mut sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If mutex protects single field, we usually just embed it. Same for future.Proxy
. I believe it is common pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That publishes the Lock/Unlock functions on ObserverSet which IMO is a non-starter. How can the type protect itself against some random external entity calling Lock()? With the mutex being a private field one can reason about the locking locally. Also publishing those functions limits any future ability to switch to something like non-locking concurrency control.
FWIW, here's a discussion on the topic uber-go/guide#127
|
||
NewCacheFn func(shard.Context, Observers) Cache | ||
|
||
disableObs int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not struct{}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So that DisableObservation can be const
instead of just var
if err != nil { | ||
return nil, err | ||
switch req.GetRequest().GetWaitPolicy().GetLifecycleStage() { | ||
case enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not in this PR but we need to support case when update was accepted and completed with the same WT. In this case result should be returned right away, even if LIFECYCLE_STAGE_ACCEPTED
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed - there is a missed opportunity for an optimization here but I think we agree: not a correctness issue.
return nil | ||
} | ||
|
||
func (uo *updateObserver) AwaitOutcome(ctx context.Context) (*updatepb.Outcome, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't have to be exported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to prefer UpperCase names on unexported types as a way to differentiate between functions that are expected to be called within the pkg and functions that shouldn't be called even though they can be. But it's a minor thing, I'll change this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also - I could have sworn I wrote a test for this pkg. Baffled. Must have lost it in a rebase or something. I'll try to reconstruct.
@@ -383,6 +384,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( | |||
if err != nil { | |||
return nil, err | |||
} | |||
defer workflowContext.GetContext().UpdateRegistry().Prune() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it is not correct to call Prune
here. It is race condition. What if handler code completes before API caller read update result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the time this runs, I believe (but may be wrong!) that (1) updates completed in this WFT will be written to MS as EventPointers, and (2) updates completed in this WFT will deliver an updatepb.Completed message to registered update.Update objects in the update.Registry. If those two things are correct then a Poll call through the API either registers its interest before Prune() in which case it will block on the Future in update.Update, or after Prune() in which case it will "miss" the update.Update object but will then find through MutableState/History.
Have I missed something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update ExecutionAPICountLimitOverride to include polling API there, it is used to limit concurrent request count.
What changed?
Why?
To support polling for the outcome of async updates.
How did you test it?
Unit tests here and e2e test in features repo
Potential risks
Some tricky thread safety and memory tracking pieces in here. Proxy.Rebind and the whole Observers capability in general.
Is hotfix candidate?
Nah