-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't return update handles until desired stage reached #2066
Changes from all commits
f03eeb9
8085c83
d085d54
95dc2f3
ca23272
e66446c
6e9cb0c
13c3bb9
0a7499e
17a0460
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -333,9 +333,18 @@ public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) { | |
.setFirstExecutionRunId(input.getFirstExecutionRunId()) | ||
.setRequest(request) | ||
.build(); | ||
Deadline pollTimeoutDeadline = Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS); | ||
UpdateWorkflowExecutionResponse result = | ||
genericClient.update(updateRequest, pollTimeoutDeadline); | ||
|
||
// Re-attempt the update until it is at least accepted, or passes the lifecycle stage specified | ||
// by the user. | ||
UpdateWorkflowExecutionResponse result; | ||
do { | ||
Deadline pollTimeoutDeadline = Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS); | ||
result = genericClient.update(updateRequest, pollTimeoutDeadline); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you make sure down below in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wait, isn't the Python one doing that when it passes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's easy to change Java to not do this and just default to max timeout for getResult calls, but, not sure that's the right thing to do. (I committed it so we can see what I mean - works fine, but, seems like maybe not right? At minimum what python is saying the doc vs. what it does is either inconsistent, or the loop is not needed, or not the same as what I've just done here) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I need to update that Python doc to remove that last sentence (I fixed logic but forgot about docs). We are no longer using timeout/exceptions to drive the loop. Just need to remove the idea that deadline exceeded means something special in the start/poll loop. Let all RPC exceptions bubble out as they always would and change the code to only care about the successful result instead of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that's done now. All it's doing is just interpreting the failure code into the right exception type which makes sense to me. |
||
} while (result.getStage().getNumber() < input.getWaitPolicy().getLifecycleStage().getNumber() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we set a default for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Per @drewhoskins-temporal's latest requirements, we want wait-for-stage to be a required field for start. Also, we should call it "wait-for-stage" IMO to match Python and future SDKs (or if we don't like that term, we should call it something else and be consistent across SDKs with what it is called). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the latest requirements for start were to, if the wait stage is COMPLETED, after ACCEPTED you switched to polling for response before returning from the start call. Can you confirm at least from the user perspective that occurs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 Sorry I missed that |
||
&& result.getStage().getNumber() | ||
< UpdateWorkflowExecutionLifecycleStage | ||
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED | ||
.getNumber()); | ||
|
||
if (result.hasOutcome()) { | ||
switch (result.getOutcome().getValueCase()) { | ||
|
@@ -399,20 +408,18 @@ public <R> PollWorkflowUpdateOutput<R> pollWorkflowUpdate(PollWorkflowUpdateInpu | |
|
||
Deadline pollTimeoutDeadline = Deadline.after(input.getTimeout(), input.getTimeoutUnit()); | ||
pollWorkflowUpdateHelper(future, pollUpdateRequest, pollTimeoutDeadline); | ||
return new PollWorkflowUpdateOutput( | ||
return new PollWorkflowUpdateOutput<>( | ||
future.thenApply( | ||
(result) -> { | ||
if (result.hasOutcome()) { | ||
switch (result.getOutcome().getValueCase()) { | ||
case SUCCESS: | ||
Optional<Payloads> updateResult = Optional.of(result.getOutcome().getSuccess()); | ||
R resultValue = | ||
convertResultPayloads( | ||
updateResult, | ||
input.getResultClass(), | ||
input.getResultType(), | ||
dataConverterWithWorkflowContext); | ||
return resultValue; | ||
return convertResultPayloads( | ||
updateResult, | ||
input.getResultClass(), | ||
input.getResultType(), | ||
dataConverterWithWorkflowContext); | ||
case FAILURE: | ||
throw new WorkflowUpdateException( | ||
input.getWorkflowExecution(), | ||
|
@@ -434,31 +441,26 @@ private void pollWorkflowUpdateHelper( | |
CompletableFuture<PollWorkflowExecutionUpdateResponse> resultCF, | ||
PollWorkflowExecutionUpdateRequest request, | ||
Deadline deadline) { | ||
|
||
Deadline pollTimeoutDeadline = | ||
Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS).minimum(deadline); | ||
genericClient | ||
.pollUpdateAsync(request, pollTimeoutDeadline) | ||
.pollUpdateAsync(request, deadline) | ||
.whenComplete( | ||
(r, e) -> { | ||
if (e == null && !r.hasOutcome()) { | ||
pollWorkflowUpdateHelper(resultCF, request, deadline); | ||
return; | ||
} | ||
if ((e instanceof StatusRuntimeException | ||
&& ((StatusRuntimeException) e).getStatus().getCode() | ||
== Status.Code.DEADLINE_EXCEEDED) | ||
|| pollTimeoutDeadline.isExpired() | ||
|| (e == null && !r.hasOutcome())) { | ||
// if the request has timed out, stop retrying | ||
if (!deadline.isExpired()) { | ||
pollWorkflowUpdateHelper(resultCF, request, deadline); | ||
} else { | ||
resultCF.completeExceptionally( | ||
new TimeoutException( | ||
"WorkflowId=" | ||
+ request.getUpdateRef().getWorkflowExecution().getWorkflowId() | ||
+ ", runId=" | ||
+ request.getUpdateRef().getWorkflowExecution().getRunId() | ||
+ ", updateId=" | ||
+ request.getUpdateRef().getUpdateId())); | ||
} | ||
|| deadline.isExpired()) { | ||
resultCF.completeExceptionally( | ||
new TimeoutException( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note this may be changing shortly with #2069 |
||
"WorkflowId=" | ||
+ request.getUpdateRef().getWorkflowExecution().getWorkflowId() | ||
+ ", runId=" | ||
+ request.getUpdateRef().getWorkflowExecution().getRunId() | ||
+ ", updateId=" | ||
+ request.getUpdateRef().getUpdateId())); | ||
} else if (e != null) { | ||
resultCF.completeExceptionally(e); | ||
} else { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -177,16 +177,6 @@ public interface QueryableWorkflow { | |
void mySignal(String value); | ||
} | ||
|
||
@WorkflowInterface | ||
public interface SimpleWorkflowWithUpdate { | ||
Comment on lines
-180
to
-181
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was unused |
||
|
||
@WorkflowMethod | ||
String execute(); | ||
|
||
@UpdateMethod | ||
String update(String value); | ||
} | ||
|
||
@WorkflowInterface | ||
public interface WorkflowWithUpdate { | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setFromWaitCompleted
is never changed totrue
. I think the intention was to do that here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this looks like a bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Java
StartUpdate
code path is different from the other SDKs in a few subtle ways as well i'll try to align it with other SDKs as well