Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't return update handles until desired stage reached #2066

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ final class LazyUpdateHandleImpl<T> implements UpdateHandle<T> {
private final WorkflowExecution execution;
private final Class<T> resultClass;
private final Type resultType;
private WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput<T> previousPollCall;

LazyUpdateHandleImpl(
WorkflowClientCallsInterceptor workflowClientInvoker,
Expand Down Expand Up @@ -72,12 +73,15 @@ public String getId() {

@Override
public CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit) {
WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput output =
workflowClientInvoker.pollWorkflowUpdate(
new WorkflowClientCallsInterceptor.PollWorkflowUpdateInput<>(
execution, updateName, id, resultClass, resultType, timeout, unit));
WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput<T> pollCall;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, i have read this a few times and I am not sure logic trying to accomplish?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since before the handle is returned from start when user says complete, there might be a result from polling already and if there is we want to use that, otherwise try it - but then we need to wipe that result in case getResult gets called again

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm this seems racy, if you have two concurrent calls isn't is possible for pollCall=null if two threads interleave in the right way?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes it is, I didn't think about this being called concurrently, too used to Rust.

Copy link
Member Author

@Sushisource Sushisource May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified this (works better as a cache now too)

if (previousPollCall == null) {
pollUntilComplete(timeout, unit);
}
pollCall = previousPollCall;
// Get result may be called more than once for non-complete wait policies, so reset it here.
previousPollCall = null;

return output
return pollCall
.getResult()
.exceptionally(
failure -> {
Expand Down Expand Up @@ -109,4 +113,11 @@ public CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit) {
public CompletableFuture<T> getResultAsync() {
return this.getResultAsync(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}

void pollUntilComplete(long timeout, TimeUnit unit) {
previousPollCall =
workflowClientInvoker.pollWorkflowUpdate(
new WorkflowClientCallsInterceptor.PollWorkflowUpdateInput<>(
execution, updateName, id, resultClass, resultType, timeout, unit));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,19 @@
import io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage;

public enum UpdateWaitPolicy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm in python looks like we changed the name to WorkflowUpdateStage I think we should do the same here because this enum will also be used when describing an updates stage.

Copy link
Member

@cretz cretz May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 And the docs here about what each enum means only apply to starting an update and maybe should move to there (but maybe not).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

K, I'm down to change the name

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs wise, weirdly, even the proto APIs mention nothing about what the stages mean beyond as input to requests. That would be good to fix.

/** Update request waits for the update to be accepted by the workflow */
/**
* Update request waits for the update to be until the update request has been admitted by the
* server - it may be the case that due to a considerations like load or resource limits that an
* update is made to wait before the server will indicate that it has been received and will be
* processed. This value does not wait for any sort of acknowledgement from a worker.
*/
ADMITTED(
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED),

/**
* Update request waits for the update to be accepted (and validated, if there is a validator) by
* the workflow
*/
ACCEPTED(
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ static <T> WorkflowStub fromTyped(T typed) {

/**
* Asynchronously update a workflow execution by invoking its update handler and returning a
* handle to the update request.
* handle to the update request. If {@link UpdateWaitPolicy#COMPLETED} is specified, in the
* options, the handle will not be returned until the update is completed.
*
* @param options options that will be used to configure and start a new update request.
* @param args update method arguments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,20 @@ public <R> UpdateHandle<R> startUpdate(UpdateOptions<R> options, Object... args)
result.getReference().getWorkflowExecution(),
result.getResult());
} else {
return new LazyUpdateHandleImpl<>(
workflowClientInvoker,
workflowType.orElse(null),
options.getUpdateName(),
result.getReference().getUpdateId(),
result.getReference().getWorkflowExecution(),
options.getResultClass(),
options.getResultType());
LazyUpdateHandleImpl<R> handle =
new LazyUpdateHandleImpl<>(
workflowClientInvoker,
workflowType.orElse(null),
options.getUpdateName(),
result.getReference().getUpdateId(),
result.getReference().getWorkflowExecution(),
options.getResultClass(),
options.getResultType());
if (options.getWaitPolicy() == UpdateWaitPolicy.COMPLETED) {
// Don't return the handle until completed, since that's what's been asked for
handle.pollUntilComplete(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
return handle;
}
} catch (Exception e) {
Throwable throwable = throwAsWorkflowFailureException(e, targetExecution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,17 @@ public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
.setRequest(request)
.build();
Deadline pollTimeoutDeadline = Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the deadline be in the loop?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably it doesn't need to be set at all

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I unset this in the most recent commit - but, I'm not sure having a super long timeout by default is what we want to do? OTOH I don't have a firm reason why not I suppose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no strong opinion so long as it's always longer than server's by enough to let server return an empty response on its timeout

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should treat start update as a long poll, hence the long timeout

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved

UpdateWorkflowExecutionResponse result =
genericClient.update(updateRequest, pollTimeoutDeadline);

// Re-attempt the update until it is at least accepted, or passes the lifecycle stage specified
// by the user.
UpdateWorkflowExecutionResponse result;
do {
result = genericClient.update(updateRequest, pollTimeoutDeadline);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make sure down below in pollWorkflowUpdateHelper that you remove the logic that retries on gRPC deadline exceeded error? That should no longer occur, we should just be bubbling all errors out

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, isn't the Python one doing that when it passes retry=True to the service client? Or, if that doesn't retry timeouts, then where is that happening? Because https://github.com/temporalio/sdk-python/blob/1a2acd59634a3b1d694937b8a8433c0014247370/temporalio/client.py#L4303 says it will, but there's no explicit handling of timeouts here: https://github.com/temporalio/sdk-python/blob/1a2acd59634a3b1d694937b8a8433c0014247370/temporalio/client.py#L4359

Copy link
Member Author

@Sushisource Sushisource May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's easy to change Java to not do this and just default to max timeout for getResult calls, but, not sure that's the right thing to do.

(I committed it so we can see what I mean - works fine, but, seems like maybe not right? At minimum what python is saying the doc vs. what it does is either inconsistent, or the loop is not needed, or not the same as what I've just done here)

Copy link
Member

@cretz cretz May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I need to update that Python doc to remove that last sentence (I fixed logic but forgot about docs). We are no longer using timeout/exceptions to drive the loop.

Just need to remove the idea that deadline exceeded means something special in the start/poll loop. Let all RPC exceptions bubble out as they always would and change the code to only care about the successful result instead of the whenComplete today that cares about either result or failure (not sure what the combinator is for success-only).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's done now. All it's doing is just interpreting the failure code into the right exception type which makes sense to me.

} while (result.getStage().getNumber() < input.getWaitPolicy().getLifecycleStage().getNumber()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we set a default for input.getWaitPolicy()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per @drewhoskins-temporal's latest requirements, we want wait-for-stage to be a required field for start. Also, we should call it "wait-for-stage" IMO to match Python and future SDKs (or if we don't like that term, we should call it something else and be consistent across SDKs with what it is called).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the latest requirements for start were to, if the wait stage is COMPLETED, after ACCEPTED you switched to polling for response before returning from the start call. Can you confirm at least from the user perspective that occurs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Sorry I missed that

&& result.getStage().getNumber()
< UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
.getNumber());

if (result.hasOutcome()) {
switch (result.getOutcome().getValueCase()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,6 @@ public interface QueryableWorkflow {
void mySignal(String value);
}

@WorkflowInterface
public interface SimpleWorkflowWithUpdate {
Comment on lines -180 to -181
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was unused


@WorkflowMethod
String execute();

@UpdateMethod
String update(String value);
}

@WorkflowInterface
public interface WorkflowWithUpdate {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
Expand All @@ -53,7 +56,7 @@ public class UpdateTest {
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkerOptions(WorkerOptions.newBuilder().build())
.setWorkflowTypes(TestUpdateWorkflowImpl.class)
.setWorkflowTypes(TestUpdateWorkflowImpl.class, TestWaitingUpdate.class)
.setActivityImplementations(new ActivityImpl())
.build();

Expand Down Expand Up @@ -107,15 +110,15 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException
workflowType,
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()));

WorkflowExecution execution = workflowStub.start();
workflowStub.start();

SDKTestWorkflowRule.waitForOKQuery(workflowStub);
assertEquals("initial", workflowStub.query("getState", String.class));

// send an update through the sync path
assertEquals("Execute-Hello", workflowStub.update("update", String.class, 0, "Hello"));
// send an update through the async path
UpdateHandle updateRef = workflowStub.startUpdate("update", String.class, 0, "World");
UpdateHandle<String> updateRef = workflowStub.startUpdate("update", String.class, 0, "World");
assertEquals("Execute-World", updateRef.getResultAsync().get());
// send a bad update that will be rejected through the sync path
assertThrows(
Expand All @@ -137,6 +140,48 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException
assertEquals("Execute-Hello Execute-World", workflowStub.getResult(String.class));
}

@Test
public void testUpdateHandleNotReturnedUntilCompleteWhenAsked()
throws ExecutionException, InterruptedException {
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
String workflowType = TestWorkflows.WorkflowWithUpdateAndSignal.class.getSimpleName();
WorkflowStub workflowStub =
workflowClient.newUntypedWorkflowStub(
workflowType,
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()));

workflowStub.start();

SDKTestWorkflowRule.waitForOKQuery(workflowStub);
assertEquals("initial", workflowStub.query("getState", String.class));

// Start the update but verify it does not return the handle until the update is complete
AtomicBoolean updateCompletedLast = new AtomicBoolean(false);
Future<?> asyncUpdate =
Executors.newSingleThreadExecutor()
.submit(
() -> {
UpdateHandle<String> handle =
workflowStub.startUpdate(
UpdateOptions.newBuilder(String.class).setUpdateName("update").build(),
"Enchi");
updateCompletedLast.set(true);
try {
assertEquals("Enchi", handle.getResultAsync().get());
} catch (Exception e) {
throw new RuntimeException(e);
}
});

workflowStub.signal("signal", "whatever");
updateCompletedLast.set(false);

asyncUpdate.get();
assertTrue(updateCompletedLast.get());
workflowStub.update("complete", void.class);
workflowStub.getResult(List.class);
}

public static class TestUpdateWorkflowImpl implements WorkflowWithUpdate {
String state = "initial";
List<String> updates = new ArrayList<>();
Expand Down Expand Up @@ -203,4 +248,45 @@ public String execute(String input) {
return Activity.getExecutionContext().getInfo().getActivityType() + "-" + input;
}
}

public static class TestWaitingUpdate implements TestWorkflows.WorkflowWithUpdateAndSignal {
String state = "initial";
List<String> updates = new ArrayList<>();
CompletablePromise<Void> signalled = Workflow.newPromise();
CompletablePromise<Void> promise = Workflow.newPromise();
private final TestActivities.TestActivity1 activity =
Workflow.newActivityStub(
TestActivities.TestActivity1.class,
ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofHours(1)).build());

@Override
public List<String> execute() {
promise.get();
return updates;
}

@Override
public String getState() {
return state;
}

@Override
public void signal(String value) {
signalled.complete(null);
}

@Override
public String update(String value) {
Workflow.await(() -> signalled.isCompleted());
return value;
}

@Override
public void validator(String value) {}

@Override
public void complete() {
promise.complete(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,7 @@
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.RetryPolicy;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.CancelExternalWorkflowExecutionFailedCause;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.enums.v1.RetryState;
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
import io.temporal.api.enums.v1.StartChildWorkflowExecutionFailedCause;
import io.temporal.api.enums.v1.TimeoutType;
import io.temporal.api.enums.v1.*;
import io.temporal.api.errordetails.v1.QueryFailedFailure;
import io.temporal.api.failure.v1.ApplicationFailureInfo;
import io.temporal.api.failure.v1.Failure;
Expand Down Expand Up @@ -1810,6 +1805,9 @@ private static void acceptUpdate(
UpdateRef.newBuilder()
.setWorkflowExecution(ctx.getExecution())
.setUpdateId(data.id))
.setStage(
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED)
.build();

data.acceptance.complete(response);
Expand Down Expand Up @@ -1852,6 +1850,9 @@ private static void completeUpdate(
.setWorkflowExecution(ctx.getExecution())
.setUpdateId(data.id))
.setOutcome(response.getOutcome())
.setStage(
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED)
.build();

data.complete.complete(updateResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1625,6 +1625,9 @@ private void processRejectionMessage(
.setUpdateId(rejection.getRejectedRequest().getMeta().getUpdateId())
.setWorkflowExecution(ctx.getExecution()))
.setOutcome(Outcome.newBuilder().setFailure(rejection.getFailure()).build())
.setStage(
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED)
.build();
u.getAcceptance().complete(response);
}
Expand Down Expand Up @@ -2157,7 +2160,13 @@ public PollWorkflowExecutionUpdateResponse pollUpdateWorkflowExecution(
.setOutcome(completionResponse.getOutcome())
.build();
} catch (TimeoutException e) {
return PollWorkflowExecutionUpdateResponse.getDefaultInstance();
PollWorkflowExecutionUpdateResponse resp =
PollWorkflowExecutionUpdateResponse.getDefaultInstance();
return resp.toBuilder()
.setStage(
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED)
.build();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof StatusRuntimeException) {
Expand Down
Loading