Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move workflow update polling inside of interceptor #2159

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.temporal.client.UpdateHandle;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase;
import io.temporal.opentracing.OpenTracingOptions;
Expand Down Expand Up @@ -119,7 +120,7 @@ public <R> QueryOutput<R> query(QueryInput<R> input) {
}

@Override
public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
public <R> UpdateHandle<R> startUpdate(StartUpdateInput<R> input) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I wonder if we would ever need to return more then UpdateHandle here? I think other SDKs just return UpdateHandle so if we do will have to address in all SDKs

Span workflowStartUpdateSpan =
contextAccessor.writeSpanContextToHeader(
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.temporal.common.interceptors.Header;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.failure.CanceledFailure;
import io.temporal.internal.client.LazyUpdateHandleImpl;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import io.temporal.serviceclient.StatusUtils;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -331,42 +332,20 @@ public <R> UpdateHandle<R> startUpdate(UpdateOptions<R> options, Object... args)
options.validate();
WorkflowExecution targetExecution = execution.get();
try {
WorkflowClientCallsInterceptor.StartUpdateOutput<R> result =
workflowClientInvoker.startUpdate(
new WorkflowClientCallsInterceptor.StartUpdateInput<>(
targetExecution,
options.getUpdateName(),
Header.empty(),
options.getUpdateId(),
args,
options.getResultClass(),
options.getResultType(),
options.getFirstExecutionRunId(),
WaitPolicy.newBuilder()
.setLifecycleStage(options.getWaitForStage().getProto())
.build()));

if (result.hasResult()) {
return new CompletedUpdateHandleImpl<>(
result.getReference().getUpdateId(),
result.getReference().getWorkflowExecution(),
result.getResult());
} else {
LazyUpdateHandleImpl<R> handle =
new LazyUpdateHandleImpl<>(
workflowClientInvoker,
workflowType.orElse(null),
options.getUpdateName(),
result.getReference().getUpdateId(),
result.getReference().getWorkflowExecution(),
options.getResultClass(),
options.getResultType());
if (options.getWaitForStage() == WorkflowUpdateStage.COMPLETED) {
// Don't return the handle until completed, since that's what's been asked for
handle.waitCompleted();
}
return handle;
}
return workflowClientInvoker.startUpdate(
new WorkflowClientCallsInterceptor.StartUpdateInput<>(
targetExecution,
workflowType,
options.getUpdateName(),
Header.empty(),
options.getUpdateId(),
args,
options.getResultClass(),
options.getResultType(),
options.getFirstExecutionRunId(),
WaitPolicy.newBuilder()
.setLifecycleStage(options.getWaitForStage().getProto())
.build()));
} catch (Exception e) {
Throwable throwable = throwAsWorkflowFailureException(e, targetExecution);
throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.update.v1.UpdateRef;
import io.temporal.api.update.v1.WaitPolicy;
import io.temporal.client.UpdateHandle;
import io.temporal.client.WorkflowOptions;
import io.temporal.common.Experimental;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -78,7 +78,7 @@ public interface WorkflowClientCallsInterceptor {
<R> QueryOutput<R> query(QueryInput<R> input);

@Experimental
<R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input);
<R> UpdateHandle<R> startUpdate(StartUpdateInput<R> input);

@Experimental
<R> PollWorkflowUpdateOutput<R> pollWorkflowUpdate(PollWorkflowUpdateInput<R> input);
Expand Down Expand Up @@ -383,6 +383,7 @@ public WorkflowExecution getWorkflowExecution() {
@Experimental
final class StartUpdateInput<R> {
private final WorkflowExecution workflowExecution;
private final Optional<String> workflowType;
private final String updateName;
private final Header header;
private final Object[] arguments;
Expand All @@ -394,6 +395,7 @@ final class StartUpdateInput<R> {

public StartUpdateInput(
WorkflowExecution workflowExecution,
Optional<String> workflowType,
String updateName,
Header header,
String updateId,
Expand All @@ -403,6 +405,7 @@ public StartUpdateInput(
String firstExecutionRunId,
WaitPolicy waitPolicy) {
this.workflowExecution = workflowExecution;
this.workflowType = workflowType;
this.header = header;
this.updateId = updateId;
this.updateName = updateName;
Expand All @@ -417,6 +420,10 @@ public WorkflowExecution getWorkflowExecution() {
return workflowExecution;
}

public Optional<String> getWorkflowType() {
return workflowType;
}

public String getUpdateName() {
return updateName;
}
Expand Down Expand Up @@ -450,44 +457,6 @@ public WaitPolicy getWaitPolicy() {
}
}

@Experimental
final class UpdateOutput<R> {
private final R result;

public UpdateOutput(R result) {
this.result = result;
}

public R getResult() {
return result;
}
}

@Experimental
final class StartUpdateOutput<R> {
private final UpdateRef reference;
private final R result;
private final boolean hasResult;

public StartUpdateOutput(UpdateRef reference, boolean hasResult, R result) {
this.reference = reference;
this.result = result;
this.hasResult = hasResult;
}

public UpdateRef getReference() {
return reference;
}

public boolean hasResult() {
return hasResult;
}

public R getResult() {
return result;
}
}

@Experimental
final class PollWorkflowUpdateInput<R> {
private final WorkflowExecution workflowExecution;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.temporal.common.interceptors;

import io.temporal.client.UpdateHandle;
import java.util.concurrent.TimeoutException;

/** Convenience base class for {@link WorkflowClientCallsInterceptor} implementations. */
Expand Down Expand Up @@ -62,7 +63,7 @@ public <R> QueryOutput<R> query(QueryInput<R> input) {
}

@Override
public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
public <R> UpdateHandle<R> startUpdate(StartUpdateInput<R> input) {
return next.startUpdate(input);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
* limitations under the License.
*/

package io.temporal.client;
package io.temporal.internal.client;

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.UpdateHandle;
import io.temporal.common.Experimental;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@Experimental
final class CompletedUpdateHandleImpl<T> implements UpdateHandle<T> {
public final class CompletedUpdateHandleImpl<T> implements UpdateHandle<T> {

private final String id;
private final WorkflowExecution execution;
private final T result;

CompletedUpdateHandleImpl(String id, WorkflowExecution execution, T result) {
public CompletedUpdateHandleImpl(String id, WorkflowExecution execution, T result) {
this.id = id;
this.execution = execution;
this.result = result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
* limitations under the License.
*/

package io.temporal.client;
package io.temporal.internal.client;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.UpdateHandle;
import io.temporal.client.WorkflowException;
import io.temporal.client.WorkflowServiceException;
import io.temporal.common.Experimental;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.serviceclient.CheckedExceptionWrapper;
Expand All @@ -33,7 +36,7 @@
import java.util.concurrent.TimeoutException;

@Experimental
final class LazyUpdateHandleImpl<T> implements UpdateHandle<T> {
public final class LazyUpdateHandleImpl<T> implements UpdateHandle<T> {

private final WorkflowClientCallsInterceptor workflowClientInvoker;
private final String workflowType;
Expand All @@ -44,7 +47,7 @@ final class LazyUpdateHandleImpl<T> implements UpdateHandle<T> {
private final Type resultType;
private WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput<T> waitCompletedPollCall;

LazyUpdateHandleImpl(
public LazyUpdateHandleImpl(
WorkflowClientCallsInterceptor workflowClientInvoker,
String workflowType,
String updateName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
import io.temporal.api.query.v1.WorkflowQuery;
import io.temporal.api.update.v1.*;
import io.temporal.api.workflowservice.v1.*;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.client.WorkflowUpdateException;
import io.temporal.client.*;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.internal.client.external.GenericWorkflowClient;
Expand Down Expand Up @@ -298,7 +297,7 @@ public <R> QueryOutput<R> query(QueryInput<R> input) {
}

@Override
public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
public <R> UpdateHandle<R> startUpdate(StartUpdateInput<R> input) {
DataConverter dataConverterWithWorkflowContext =
clientOptions
.getDataConverter()
Expand Down Expand Up @@ -337,10 +336,11 @@ public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
// Re-attempt the update until it is at least accepted, or passes the lifecycle stage specified
// by the user.
UpdateWorkflowExecutionResponse result;
UpdateWorkflowExecutionLifecycleStage waitForStage = input.getWaitPolicy().getLifecycleStage();
do {
Deadline pollTimeoutDeadline = Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS);
result = genericClient.update(updateRequest, pollTimeoutDeadline);
} while (result.getStage().getNumber() < input.getWaitPolicy().getLifecycleStage().getNumber()
} while (result.getStage().getNumber() < waitForStage.getNumber()
&& result.getStage().getNumber()
< UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
Expand All @@ -356,7 +356,10 @@ public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
input.getResultClass(),
input.getResultType(),
dataConverterWithWorkflowContext);
return new StartUpdateOutput<R>(result.getUpdateRef(), true, resultValue);
return new CompletedUpdateHandleImpl<>(
result.getUpdateRef().getUpdateId(),
result.getUpdateRef().getWorkflowExecution(),
resultValue);
case FAILURE:
throw new WorkflowUpdateException(
result.getUpdateRef().getWorkflowExecution(),
Expand All @@ -370,7 +373,20 @@ public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
+ result.getOutcome().getValueCase());
}
} else {
return new StartUpdateOutput<R>(result.getUpdateRef(), false, null);
LazyUpdateHandleImpl<R> handle =
new LazyUpdateHandleImpl<>(
this,
input.getWorkflowType().orElse(null),
input.getUpdateName(),
result.getUpdateRef().getUpdateId(),
result.getUpdateRef().getWorkflowExecution(),
input.getResultClass(),
input.getResultType());
if (waitForStage == WorkflowUpdateStage.COMPLETED.getProto()) {
// Don't return the handle until completed, since that's what's been asked for
handle.waitCompleted();
}
return handle;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
import io.temporal.api.workflowservice.v1.ResetWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.ResetWorkflowExecutionResponse;
import io.temporal.client.*;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase;
import io.temporal.common.interceptors.WorkflowClientInterceptorBase;
import io.temporal.failure.ApplicationFailure;
import io.temporal.internal.client.CompletedUpdateHandleImpl;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.WorkerOptions;
Expand Down Expand Up @@ -103,6 +107,55 @@ public void testUpdate() {
assertEquals("Execute-Hello Update Execute-Hello Update 2", result);
}

private static class FakesResultUpdateInterceptor extends WorkflowClientInterceptorBase {
@Override
public WorkflowClientCallsInterceptor workflowClientCallsInterceptor(
WorkflowClientCallsInterceptor next) {
return new WorkflowClientCallsInterceptorBase(next) {
@Override
public <R> UpdateHandle<R> startUpdate(StartUpdateInput<R> input) {
super.startUpdate(input);
Comment on lines +116 to +117
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: Why don't we have something like InputType.fromExisting(input) and then some way to relatively easily modify the input? It's a bit annoying to have to re-pass every field through to a new constructor if you want to modify inputs.

I get why they're not directly mutable, but, might be nice to have a convenient way to do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modify inputs is not really an encouraged pattern. Not that some users don't but I it is a advanced path to take.

return new CompletedUpdateHandleImpl<>(
"someid", input.getWorkflowExecution(), (R) "fake");
}
};
}
}

@Test
public void testUpdateIntercepted() {
String workflowId = UUID.randomUUID().toString();
WorkflowClient workflowClient =
WorkflowClient.newInstance(
testWorkflowRule.getWorkflowServiceStubs(),
WorkflowClientOptions.newBuilder(testWorkflowRule.getWorkflowClient().getOptions())
.setInterceptors(new FakesResultUpdateInterceptor())
.validateAndBuildWithDefaults());
WorkflowOptions options =
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
.setWorkflowId(workflowId)
.build();
WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(WorkflowWithUpdate.class, options);
// To execute workflow client.execute() would do. But we want to start workflow and immediately
// return.
WorkflowExecution execution = WorkflowClient.start(workflow::execute);

SDKTestWorkflowRule.waitForOKQuery(workflow);
assertEquals("initial", workflow.getState());
assertEquals(workflowId, execution.getWorkflowId());

assertEquals("fake", workflow.update(0, "Hello Update"));
assertEquals("fake", workflow.update(1, "Hello Update 2"));
workflow.complete();

String result =
testWorkflowRule
.getWorkflowClient()
.newUntypedWorkflowStub(execution, Optional.empty())
.getResult(String.class);
assertEquals("Execute-Hello Update Execute-Hello Update 2", result);
}

@Test
public void testUpdateUntyped() throws ExecutionException, InterruptedException {
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
Expand Down
Loading