Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per Activity Method Options (Part I) #431

Merged
merged 28 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
bd9d6a4
Update service client dependencies
Mar 16, 2021
acc66e4
Update temporal-sdk dependencies
Mar 16, 2021
7a4069c
Missed micrometer-core update
Mar 17, 2021
af82446
Merge remote-tracking branch 'upstream/master'
Mar 17, 2021
b00b312
Merge remote-tracking branch 'upstream/master'
Mar 29, 2021
e1ada5b
Merge remote-tracking branch 'upstream/master'
Mar 31, 2021
c6a874f
Merge remote-tracking branch 'upstream/master'
Apr 1, 2021
073cc38
Merge remote-tracking branch 'upstream/master'
Apr 4, 2021
5317b60
Merge remote-tracking branch 'upstream/master'
Apr 6, 2021
ff848ed
Merge remote-tracking branch 'upstream/master'
Apr 6, 2021
30dda14
Added ability to pass activity method options to activityStub. Usage:
Apr 7, 2021
1044ed8
Merge branch 'master' into per-activity-method-ops
vkoby Apr 7, 2021
bcb2e5e
Removed options field in ActivityInvocationHandler and moved merging …
Apr 8, 2021
f378c61
This test can now use the other overloaded method.
Apr 8, 2021
ece026f
Merge branch 'master' into per-activity-method-ops
vkoby Apr 9, 2021
131d237
Merge branch 'master' into per-activity-method-ops
vkoby Apr 9, 2021
890269c
Cleanup
Apr 9, 2021
09e7ac3
Added LocalActivity support
Apr 9, 2021
3120260
Added unit test for activity method options.
Apr 12, 2021
5495c0b
Fixed documentation.
Apr 12, 2021
9119db0
Merge branch 'master' into per-activity-method-ops
vkoby Apr 12, 2021
bd8c3d1
Changed the method map to use activityTypeName as key. Fixed the unit…
Apr 14, 2021
626a48d
Merge branch 'master' into per-activity-method-ops
vkoby Apr 14, 2021
b1de99b
Merge branch 'master' into per-activity-method-ops
vkoby Apr 14, 2021
5f4fa4b
Changed how doNotIncludeArgumentsIntoMarker is implemented.
Apr 14, 2021
e4cf662
Fixed NPE
Apr 15, 2021
12fa99f
Added documentation
Apr 15, 2021
c2c7f4d
Clarified documentation
Apr 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,37 @@ public Builder setCancellationType(ActivityCancellationType cancellationType) {
return this;
}

public Builder mergeActivityOptions(ActivityOptions override) {
if (override == null) {
return this;
}
this.taskQueue = (override.taskQueue == null) ? this.taskQueue : override.taskQueue;
this.heartbeatTimeout =
(override.heartbeatTimeout == null) ? this.heartbeatTimeout : override.heartbeatTimeout;
this.retryOptions =
(override.retryOptions == null) ? this.retryOptions : override.retryOptions;
this.scheduleToCloseTimeout =
(override.scheduleToCloseTimeout == null)
? this.scheduleToCloseTimeout
: override.scheduleToCloseTimeout;
this.startToCloseTimeout =
(override.startToCloseTimeout == null)
? this.startToCloseTimeout
: override.startToCloseTimeout;
this.scheduleToStartTimeout =
(override.scheduleToStartTimeout == null)
? this.scheduleToStartTimeout
: override.scheduleToStartTimeout;
this.cancellationType =
(override.cancellationType == null) ? this.cancellationType : override.cancellationType;
if (this.contextPropagators == null) {
this.contextPropagators = override.contextPropagators;
} else if (override.contextPropagators != null) {
this.contextPropagators.addAll(override.contextPropagators);
}
return this;
}

/**
* Properties that are set on this builder take precedence over ones found in the annotation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static final class Builder {
private Duration localRetryThreshold;
private Duration startToCloseTimeout;
private RetryOptions retryOptions;
private boolean doNotIncludeArgumentsIntoMarker;
private Boolean doNotIncludeArgumentsIntoMarker;

/** Copy Builder fields from the options. */
private Builder(LocalActivityOptions options) {
Expand Down Expand Up @@ -94,6 +94,31 @@ public Builder setStartToCloseTimeout(Duration timeout) {
return this;
}

public Builder mergeActivityOptions(LocalActivityOptions override) {
if (override == null) {
return this;
}
this.scheduleToCloseTimeout =
(override.scheduleToCloseTimeout == null)
? this.scheduleToCloseTimeout
: override.scheduleToCloseTimeout;
this.localRetryThreshold =
(override.localRetryThreshold == null)
? this.localRetryThreshold
: override.localRetryThreshold;
this.startToCloseTimeout =
(override.startToCloseTimeout == null)
? this.startToCloseTimeout
: override.startToCloseTimeout;
this.retryOptions =
(override.retryOptions == null) ? this.retryOptions : override.retryOptions;
this.doNotIncludeArgumentsIntoMarker =
(override.doNotIncludeArgumentsIntoMarker != null)
? override.doNotIncludeArgumentsIntoMarker
: this.doNotIncludeArgumentsIntoMarker;
return this;
}

/**
* RetryOptions that define how activity is retried in case of failure. Default is null which is
* no retries.
Expand Down Expand Up @@ -156,14 +181,14 @@ public LocalActivityOptions validateAndBuildWithDefaults() {
private final Duration localRetryThreshold;
private final Duration startToCloseTimeout;
private final RetryOptions retryOptions;
private boolean doNotIncludeArgumentsIntoMarker;
private Boolean doNotIncludeArgumentsIntoMarker;

private LocalActivityOptions(
Duration startToCloseTimeout,
Duration localRetryThreshold,
Duration scheduleToCloseTimeout,
RetryOptions retryOptions,
boolean doNotIncludeArgumentsIntoMarker) {
Boolean doNotIncludeArgumentsIntoMarker) {
this.localRetryThreshold = localRetryThreshold;
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
this.startToCloseTimeout = startToCloseTimeout;
Expand All @@ -188,7 +213,7 @@ public RetryOptions getRetryOptions() {
}

public boolean isDoNotIncludeArgumentsIntoMarker() {
return doNotIncludeArgumentsIntoMarker;
return (doNotIncludeArgumentsIntoMarker == null) ? false : doNotIncludeArgumentsIntoMarker;
}

public Builder toBuilder() {
Expand Down Expand Up @@ -229,7 +254,7 @@ public String toString() {
+ ", retryOptions="
+ retryOptions
+ ", doNotIncludeArgumentsIntoMarker="
+ doNotIncludeArgumentsIntoMarker
+ isDoNotIncludeArgumentsIntoMarker()
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,33 @@
import io.temporal.workflow.ActivityStub;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

@VisibleForTesting
public class ActivityInvocationHandler extends ActivityInvocationHandlerBase {
private final ActivityOptions options;
private final Map<String, ActivityOptions> activityMethodOptions;
private final WorkflowOutboundCallsInterceptor activityExecutor;

@VisibleForTesting
public static InvocationHandler newInstance(
Class<?> activityInterface,
ActivityOptions options,
Map<String, ActivityOptions> methodOptions,
WorkflowOutboundCallsInterceptor activityExecutor) {
return new ActivityInvocationHandler(activityInterface, activityExecutor, options);
return new ActivityInvocationHandler(
activityInterface, activityExecutor, options, methodOptions);
}

private ActivityInvocationHandler(
Class<?> activityInterface,
WorkflowOutboundCallsInterceptor activityExecutor,
ActivityOptions options) {
ActivityOptions options,
Map<String, ActivityOptions> methodOptions) {
this.options = options;
this.activityMethodOptions = (methodOptions == null) ? new HashMap<>() : methodOptions;
this.activityExecutor = activityExecutor;
init(activityInterface);
}
Expand All @@ -54,10 +61,12 @@ private ActivityInvocationHandler(
protected Function<Object[], Object> getActivityFunc(
Method method, MethodRetry methodRetry, String activityName) {
Function<Object[], Object> function;
ActivityOptions mergedOptions =
ActivityOptions.newBuilder(options).mergeMethodRetry(methodRetry).build();
ActivityStub stub = ActivityStubImpl.newInstance(mergedOptions, activityExecutor);

ActivityOptions merged =
ActivityOptions.newBuilder(options)
.mergeActivityOptions(this.activityMethodOptions.get(activityName))
.mergeMethodRetry(methodRetry)
.build();
ActivityStub stub = ActivityStubImpl.newInstance(merged, activityExecutor);
function =
(a) -> stub.execute(activityName, method.getReturnType(), method.getGenericReturnType(), a);
return function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,54 @@

package io.temporal.internal.sync;

import com.google.common.annotations.VisibleForTesting;
import io.temporal.activity.LocalActivityOptions;
import io.temporal.common.MethodRetry;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.workflow.ActivityStub;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

class LocalActivityInvocationHandler extends ActivityInvocationHandlerBase {
@VisibleForTesting
public class LocalActivityInvocationHandler extends ActivityInvocationHandlerBase {
private final LocalActivityOptions options;
private final Map<String, LocalActivityOptions> activityMethodOptions;
private final WorkflowOutboundCallsInterceptor activityExecutor;

static InvocationHandler newInstance(
@VisibleForTesting
public static InvocationHandler newInstance(
Class<?> activityInterface,
LocalActivityOptions options,
Map<String, LocalActivityOptions> methodOptions,
WorkflowOutboundCallsInterceptor activityExecutor) {
return new LocalActivityInvocationHandler(activityInterface, activityExecutor, options);
return new LocalActivityInvocationHandler(
activityInterface, activityExecutor, options, methodOptions);
}

private LocalActivityInvocationHandler(
Class<?> activityInterface,
WorkflowOutboundCallsInterceptor activityExecutor,
LocalActivityOptions options) {
LocalActivityOptions options,
Map<String, LocalActivityOptions> methodOptions) {
this.options = options;
this.activityMethodOptions = (methodOptions == null) ? new HashMap<>() : methodOptions;
this.activityExecutor = activityExecutor;
init(activityInterface);
}

@VisibleForTesting
@Override
protected Function<Object[], Object> getActivityFunc(
public Function<Object[], Object> getActivityFunc(
Method method, MethodRetry methodRetry, String activityName) {
Function<Object[], Object> function;
LocalActivityOptions mergedOptions =
LocalActivityOptions.newBuilder(options)
.mergeActivityOptions(activityMethodOptions.get(activityName))
.setMethodRetry(methodRetry)
.validateAndBuildWithDefaults();
.build();
ActivityStub stub = LocalActivityStubImpl.newInstance(mergedOptions, activityExecutor);
function =
(a) -> stub.execute(activityName, method.getReturnType(), method.getGenericReturnType(), a);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,24 +180,41 @@ public static long currentTimeMillis() {
* Creates client stub to activities that implement given interface.
*
* @param activityInterface interface type implemented by activities
* @param options options that together with the properties of {@link
* io.temporal.activity.ActivityMethod} specify the activity invocation parameters
* @param activityMethodOptions activity method-specific invocation parameters
*/
public static <T> T newActivityStub(Class<T> activityInterface, ActivityOptions options) {
public static <T> T newActivityStub(
Class<T> activityInterface,
ActivityOptions options,
Map<String, ActivityOptions> activityMethodOptions) {
InvocationHandler invocationHandler =
ActivityInvocationHandler.newInstance(
activityInterface, options, WorkflowInternal.getWorkflowInterceptor());
activityInterface,
options,
activityMethodOptions,
WorkflowInternal.getWorkflowInterceptor());
return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
}

/**
* Creates client stub to local activities that implement given interface.
*
* @param activityInterface interface type implemented by activities
* @param options options that together with the properties of {@link
* io.temporal.activity.ActivityMethod} specify the activity invocation parameters
* @param activityMethodOptions activity method-specific invocation parameters
*/
public static <T> T newLocalActivityStub(
Class<T> activityInterface, LocalActivityOptions options) {
Class<T> activityInterface,
LocalActivityOptions options,
Map<String, LocalActivityOptions> activityMethodOptions) {
InvocationHandler invocationHandler =
LocalActivityInvocationHandler.newInstance(
activityInterface, options, WorkflowInternal.getWorkflowInterceptor());
activityInterface,
options,
activityMethodOptions,
WorkflowInternal.getWorkflowInterceptor());
return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
}

Expand Down
50 changes: 40 additions & 10 deletions temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -372,23 +372,38 @@ public final class Workflow {
public static final int DEFAULT_VERSION = WorkflowInternal.DEFAULT_VERSION;

/**
* Creates client stub to activities that implement given interface.
* Creates client stub to activities that implement given interface. `
*
* @param activityInterface interface type implemented by activities
*/
public static <T> T newActivityStub(Class<T> activityInterface) {
return WorkflowInternal.newActivityStub(activityInterface, null, null);
}

/**
* Creates client stub to activities that implement given interface
*
* @param activityInterface interface type implemented by activities.
* @param options options that together with the properties of {@link
* io.temporal.activity.ActivityMethod} specify the activity invocation parameters.
* io.temporal.activity.ActivityMethod} specify the activity invocation parameters
*/
public static <T> T newActivityStub(Class<T> activityInterface, ActivityOptions options) {
return WorkflowInternal.newActivityStub(activityInterface, options);
return WorkflowInternal.newActivityStub(activityInterface, options, null);
}

/**
* Creates client stub to activities that implement given interface. `
* Creates client stub to activities that implement given interface.
*
* @param activityInterface interface type implemented by activities
* @param options options that together with the properties of {@link
* io.temporal.activity.ActivityMethod} specify the activity invocation parameters
* @param activityMethodOptions activity method-specific invocation parameters
*/
public static <T> T newActivityStub(Class<T> activityInterface) {
return WorkflowInternal.newActivityStub(activityInterface, null);
public static <T> T newActivityStub(
Class<T> activityInterface,
ActivityOptions options,
Map<String, ActivityOptions> activityMethodOptions) {
return WorkflowInternal.newActivityStub(activityInterface, options, activityMethodOptions);
}

/**
Expand All @@ -400,29 +415,44 @@ public static ActivityStub newUntypedActivityStub(ActivityOptions options) {
return WorkflowInternal.newUntypedActivityStub(options);
}

/**
* Creates client stub to local activities that implement given interface.
*
* @param activityInterface interface type implemented by activities
*/
public static <T> T newLocalActivityStub(Class<T> activityInterface) {
return WorkflowInternal.newLocalActivityStub(activityInterface, null, null);
}

/**
* Creates client stub to local activities that implement given interface. A local activity is
* similar to a regular activity, but with some key differences: 1. Local activity is scheduled
* and run by the workflow worker locally. 2. Local activity does not need Temporal server to
* schedule activity task and does not rely on activity worker. 3. Local activity is for short
* living activities (usually finishes within seconds). 4. Local activity cannot heartbeat.
*
* @param activityInterface interface type implemented by activities.
* @param activityInterface interface type implemented by activities
* @param options options that together with the properties of {@link
* io.temporal.activity.ActivityMethod} specify the activity invocation parameters.
*/
public static <T> T newLocalActivityStub(
Class<T> activityInterface, LocalActivityOptions options) {
return WorkflowInternal.newLocalActivityStub(activityInterface, options);
return WorkflowInternal.newLocalActivityStub(activityInterface, options, null);
}

/**
* Creates client stub to local activities that implement given interface.
*
* @param activityInterface interface type implemented by activities
* @param options options that together with the properties of {@link
* io.temporal.activity.ActivityMethod} specify the activity invocation parameters
* @param activityMethodOptions activity method-specific invocation parameters
*/
public static <T> T newLocalActivityStub(Class<T> activityInterface) {
return WorkflowInternal.newLocalActivityStub(activityInterface, null);
public static <T> T newLocalActivityStub(
Class<T> activityInterface,
LocalActivityOptions options,
Map<String, LocalActivityOptions> activityMethodOptions) {
return WorkflowInternal.newLocalActivityStub(activityInterface, options, activityMethodOptions);
}

/**
Expand Down
Loading