From bd9d6a434ff90621f81fe7e8c9edb88fe2738821 Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Tue, 16 Mar 2021 14:39:08 -0700 Subject: [PATCH 01/15] Update service client dependencies --- temporal-serviceclient/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/temporal-serviceclient/build.gradle b/temporal-serviceclient/build.gradle index 047a56d98..f1462cc85 100644 --- a/temporal-serviceclient/build.gradle +++ b/temporal-serviceclient/build.gradle @@ -80,7 +80,7 @@ dependencies { api 'io.grpc:grpc-protobuf:1.36.0' api 'io.grpc:grpc-stub:1.36.0' api 'io.grpc:grpc-core:1.36.0' - api group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.15.2' + api group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.15.6' api group: 'com.uber.m3', name: 'tally-core', version: '0.6.1' api group: 'org.slf4j', name: 'slf4j-api', version: '1.7.30' api 'io.grpc:grpc-netty-shaded:1.36.0' @@ -122,7 +122,7 @@ jar { protobuf { protoc { - artifact = 'com.google.protobuf:protoc:3.15.2' + artifact = 'com.google.protobuf:protoc:3.15.6' } plugins { grpc { From acc66e4a7dcf6b91b79b3fc867469dc66112e11f Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Tue, 16 Mar 2021 14:57:10 -0700 Subject: [PATCH 02/15] Update temporal-sdk dependencies --- temporal-sdk/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/temporal-sdk/build.gradle b/temporal-sdk/build.gradle index 38b88acfa..83051ca80 100644 --- a/temporal-sdk/build.gradle +++ b/temporal-sdk/build.gradle @@ -45,8 +45,8 @@ dependencies { implementation group: 'com.google.guava', name: 'guava', version: '30.1-jre' implementation group: 'com.cronutils', name: 'cron-utils', version: '9.1.3' - implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.12.1' - implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.12.1' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.12.2' + implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.12.2' if (!JavaVersion.current().isJava8()) { implementation 'javax.annotation:javax.annotation-api:1.3.2' } From 7a4069c8ec1700cd1acd7a1b2228667d873e89ec Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Tue, 16 Mar 2021 17:35:18 -0700 Subject: [PATCH 03/15] Missed micrometer-core update --- temporal-sdk/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporal-sdk/build.gradle b/temporal-sdk/build.gradle index 83051ca80..6705d32bf 100644 --- a/temporal-sdk/build.gradle +++ b/temporal-sdk/build.gradle @@ -41,7 +41,7 @@ dependencies { api project(':temporal-serviceclient') api group: 'com.google.code.gson', name: 'gson', version: '2.8.6' - api group: 'io.micrometer', name: 'micrometer-core', version: '1.6.4' + api group: 'io.micrometer', name: 'micrometer-core', version: '1.6.5' implementation group: 'com.google.guava', name: 'guava', version: '30.1-jre' implementation group: 'com.cronutils', name: 'cron-utils', version: '9.1.3' From 30dda14abb18eecf0826a48c43da6ef0625f2164 Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Wed, 7 Apr 2021 09:59:57 -0700 Subject: [PATCH 04/15] Added ability to pass activity method options to activityStub. Usage: Map activityMethodOptions Workflow.newActivityStub(AccountActivity.class, options, activityMethodOptions) --- .../io/temporal/activity/ActivityOptions.java | 38 +++++++++++++++++++ .../sync/ActivityInvocationHandler.java | 17 +++++++-- .../internal/sync/WorkflowInternal.java | 10 ++++- .../worker/WorkflowImplementationOptions.java | 15 ++++++++ .../java/io/temporal/workflow/Workflow.java | 24 +++++++++--- .../TestActivityEnvironmentInternal.java | 2 +- 6 files changed, 95 insertions(+), 11 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java b/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java index fe1ea95b8..00bc6ab0a 100644 --- a/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java @@ -175,6 +175,44 @@ public Builder setCancellationType(ActivityCancellationType cancellationType) { return this; } + public Builder mergeMethodOptions(ActivityOptions methodOptions) { + if (methodOptions == null) { + return this; + } + if (methodOptions.taskQueue != null) { + throw new IllegalArgumentException( + "Changing task queue with activityMethodOptions is not supported."); + } + this.heartbeatTimeout = + (methodOptions.heartbeatTimeout == null) + ? this.heartbeatTimeout + : methodOptions.heartbeatTimeout; + this.retryOptions = + (methodOptions.retryOptions == null) ? this.retryOptions : methodOptions.retryOptions; + this.contextPropagators = + (methodOptions.contextPropagators == null) + ? this.contextPropagators + : methodOptions.contextPropagators; + this.scheduleToCloseTimeout = + (methodOptions.scheduleToCloseTimeout == null) + ? this.scheduleToCloseTimeout + : methodOptions.scheduleToCloseTimeout; + this.startToCloseTimeout = + (methodOptions.startToCloseTimeout == null) + ? this.startToCloseTimeout + : methodOptions.startToCloseTimeout; + this.scheduleToStartTimeout = + (methodOptions.scheduleToStartTimeout == null) + ? this.scheduleToStartTimeout + : methodOptions.scheduleToStartTimeout; + this.cancellationType = + (methodOptions.cancellationType == null) + ? this.cancellationType + : methodOptions.cancellationType; + + return this; + } + /** * Properties that are set on this builder take precedence over ones found in the annotation. */ diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java index 1b506172f..7cd66f10a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java @@ -26,26 +26,32 @@ import io.temporal.workflow.ActivityStub; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; +import java.util.Map; import java.util.function.Function; @VisibleForTesting public class ActivityInvocationHandler extends ActivityInvocationHandlerBase { private final ActivityOptions options; + private final Map activityMethodOptions; private final WorkflowOutboundCallsInterceptor activityExecutor; @VisibleForTesting public static InvocationHandler newInstance( Class activityInterface, ActivityOptions options, + Map methodOptions, WorkflowOutboundCallsInterceptor activityExecutor) { - return new ActivityInvocationHandler(activityInterface, activityExecutor, options); + return new ActivityInvocationHandler( + activityInterface, activityExecutor, options, methodOptions); } private ActivityInvocationHandler( Class activityInterface, WorkflowOutboundCallsInterceptor activityExecutor, - ActivityOptions options) { + ActivityOptions options, + Map methodOptions) { this.options = options; + this.activityMethodOptions = methodOptions; this.activityExecutor = activityExecutor; init(activityInterface); } @@ -54,8 +60,13 @@ private ActivityInvocationHandler( protected Function getActivityFunc( Method method, MethodRetry methodRetry, String activityName) { Function function; + ActivityOptions methodOptions = + (activityMethodOptions == null) ? null : activityMethodOptions.get(method.getName()); ActivityOptions mergedOptions = - ActivityOptions.newBuilder(options).mergeMethodRetry(methodRetry).build(); + ActivityOptions.newBuilder(options) + .mergeMethodOptions(methodOptions) + .mergeMethodRetry(methodRetry) + .build(); ActivityStub stub = ActivityStubImpl.newInstance(mergedOptions, activityExecutor); function = diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 84203a69b..b69f99148 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -181,10 +181,16 @@ public static long currentTimeMillis() { * * @param activityInterface interface type implemented by activities */ - public static T newActivityStub(Class activityInterface, ActivityOptions options) { + public static T newActivityStub( + Class activityInterface, + ActivityOptions options, + Map activityMethodOptions) { InvocationHandler invocationHandler = ActivityInvocationHandler.newInstance( - activityInterface, options, WorkflowInternal.getWorkflowInterceptor()); + activityInterface, + options, + activityMethodOptions, + WorkflowInternal.getWorkflowInterceptor()); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); } diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkflowImplementationOptions.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkflowImplementationOptions.java index 3f09efb4e..7bc41485d 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkflowImplementationOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkflowImplementationOptions.java @@ -19,7 +19,9 @@ package io.temporal.worker; +import io.temporal.activity.ActivityOptions; import java.util.Arrays; +import java.util.Map; public final class WorkflowImplementationOptions { @@ -40,6 +42,8 @@ public static Builder newBuilder() { public static final class Builder { private Class[] failWorkflowExceptionTypes; + private Map activityOptions; + private Map> activityToMethodOptions; private Builder() {} @@ -64,6 +68,17 @@ public Builder setFailWorkflowExceptionTypes( return this; } + public Builder setActivityOptions(Map activityOptions) { + this.activityOptions = activityOptions; + return this; + } + + public Builder setActivityMethodOptions( + Map> activityToMethodOptions) { + this.activityToMethodOptions = activityToMethodOptions; + return this; + } + public WorkflowImplementationOptions build() { return new WorkflowImplementationOptions( failWorkflowExceptionTypes == null ? new Class[0] : failWorkflowExceptionTypes); diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index a7b0e9e19..974715a21 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -371,6 +371,15 @@ public final class Workflow { public static final int DEFAULT_VERSION = WorkflowInternal.DEFAULT_VERSION; + /** + * Creates client stub to activities that implement given interface. ` + * + * @param activityInterface interface type implemented by activities + */ + public static T newActivityStub(Class activityInterface) { + return WorkflowInternal.newActivityStub(activityInterface, null, null); + } + /** * Creates client stub to activities that implement given interface. * @@ -379,16 +388,21 @@ public final class Workflow { * io.temporal.activity.ActivityMethod} specify the activity invocation parameters. */ public static T newActivityStub(Class activityInterface, ActivityOptions options) { - return WorkflowInternal.newActivityStub(activityInterface, options); + return WorkflowInternal.newActivityStub(activityInterface, options, null); } /** - * Creates client stub to activities that implement given interface. ` + * Creates client stub to activities that implement given interface. * - * @param activityInterface interface type implemented by activities + * @param activityInterface interface type implemented by activities. + * @param options options that together with the properties of {@link + * io.temporal.activity.ActivityMethod} specify the activity invocation parameters. */ - public static T newActivityStub(Class activityInterface) { - return WorkflowInternal.newActivityStub(activityInterface, null); + public static T newActivityStub( + Class activityInterface, + ActivityOptions options, + Map perActivityMethodOptions) { + return WorkflowInternal.newActivityStub(activityInterface, options, perActivityMethodOptions); } /** diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index ea0f075cd..1ca039e22 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -185,7 +185,7 @@ public T newActivityStub(Class activityInterface) { .build(); InvocationHandler invocationHandler = ActivityInvocationHandler.newInstance( - activityInterface, options, new TestActivityExecutor()); + activityInterface, options, null, new TestActivityExecutor()); invocationHandler = new DeterministicRunnerWrapper(invocationHandler); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); } From bcb2e5e43795225c5162e827afdc8d86af2093f8 Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Thu, 8 Apr 2021 17:11:10 -0500 Subject: [PATCH 05/15] Removed options field in ActivityInvocationHandler and moved merging of the options to the constructor. Added ability to set taskQueue through method options --- .../io/temporal/activity/ActivityOptions.java | 38 +++++---------- .../sync/ActivityInvocationHandler.java | 46 +++++++++++++------ .../java/io/temporal/workflow/Workflow.java | 4 +- 3 files changed, 45 insertions(+), 43 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java b/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java index 00bc6ab0a..1d0f0f542 100644 --- a/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java @@ -175,41 +175,27 @@ public Builder setCancellationType(ActivityCancellationType cancellationType) { return this; } - public Builder mergeMethodOptions(ActivityOptions methodOptions) { - if (methodOptions == null) { - return this; - } - if (methodOptions.taskQueue != null) { - throw new IllegalArgumentException( - "Changing task queue with activityMethodOptions is not supported."); - } + public Builder mergeMethodOptions(ActivityOptions override) { + this.taskQueue = (override.taskQueue == null) ? this.taskQueue : override.taskQueue; this.heartbeatTimeout = - (methodOptions.heartbeatTimeout == null) - ? this.heartbeatTimeout - : methodOptions.heartbeatTimeout; + (override.heartbeatTimeout == null) ? this.heartbeatTimeout : override.heartbeatTimeout; this.retryOptions = - (methodOptions.retryOptions == null) ? this.retryOptions : methodOptions.retryOptions; - this.contextPropagators = - (methodOptions.contextPropagators == null) - ? this.contextPropagators - : methodOptions.contextPropagators; + (override.retryOptions == null) ? this.retryOptions : override.retryOptions; + this.contextPropagators.addAll(override.contextPropagators); this.scheduleToCloseTimeout = - (methodOptions.scheduleToCloseTimeout == null) + (override.scheduleToCloseTimeout == null) ? this.scheduleToCloseTimeout - : methodOptions.scheduleToCloseTimeout; + : override.scheduleToCloseTimeout; this.startToCloseTimeout = - (methodOptions.startToCloseTimeout == null) + (override.startToCloseTimeout == null) ? this.startToCloseTimeout - : methodOptions.startToCloseTimeout; + : override.startToCloseTimeout; this.scheduleToStartTimeout = - (methodOptions.scheduleToStartTimeout == null) + (override.scheduleToStartTimeout == null) ? this.scheduleToStartTimeout - : methodOptions.scheduleToStartTimeout; + : override.scheduleToStartTimeout; this.cancellationType = - (methodOptions.cancellationType == null) - ? this.cancellationType - : methodOptions.cancellationType; - + (override.cancellationType == null) ? this.cancellationType : override.cancellationType; return this; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java index 7cd66f10a..9f337675f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java @@ -26,21 +26,30 @@ import io.temporal.workflow.ActivityStub; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; +import java.util.HashMap; import java.util.Map; import java.util.function.Function; +import javax.annotation.Nonnull; @VisibleForTesting public class ActivityInvocationHandler extends ActivityInvocationHandlerBase { - private final ActivityOptions options; - private final Map activityMethodOptions; + private final Map activityMethodOptions; private final WorkflowOutboundCallsInterceptor activityExecutor; @VisibleForTesting public static InvocationHandler newInstance( - Class activityInterface, - ActivityOptions options, - Map methodOptions, - WorkflowOutboundCallsInterceptor activityExecutor) { + @Nonnull Class activityInterface, + @Nonnull ActivityOptions options, + @Nonnull WorkflowOutboundCallsInterceptor activityExecutor) { + return new ActivityInvocationHandler(activityInterface, activityExecutor, options, null); + } + + @VisibleForTesting + public static InvocationHandler newInstance( + @Nonnull Class activityInterface, + @Nonnull ActivityOptions options, + @Nonnull Map methodOptions, + @Nonnull WorkflowOutboundCallsInterceptor activityExecutor) { return new ActivityInvocationHandler( activityInterface, activityExecutor, options, methodOptions); } @@ -50,8 +59,20 @@ private ActivityInvocationHandler( WorkflowOutboundCallsInterceptor activityExecutor, ActivityOptions options, Map methodOptions) { - this.options = options; - this.activityMethodOptions = methodOptions; + this.activityMethodOptions = new HashMap<>(); + if (methodOptions == null) { + for (Method method : activityInterface.getMethods()) { + this.activityMethodOptions.put(method, options); + } + } else { + for (Method method : activityInterface.getMethods()) { + ActivityOptions mergedOptions = + ActivityOptions.newBuilder(options) + .mergeMethodOptions(methodOptions.get(method.getName())) + .build(); + this.activityMethodOptions.put(method, mergedOptions); + } + } this.activityExecutor = activityExecutor; init(activityInterface); } @@ -60,15 +81,10 @@ private ActivityInvocationHandler( protected Function getActivityFunc( Method method, MethodRetry methodRetry, String activityName) { Function function; - ActivityOptions methodOptions = - (activityMethodOptions == null) ? null : activityMethodOptions.get(method.getName()); + ActivityOptions options = this.activityMethodOptions.get(method); ActivityOptions mergedOptions = - ActivityOptions.newBuilder(options) - .mergeMethodOptions(methodOptions) - .mergeMethodRetry(methodRetry) - .build(); + ActivityOptions.newBuilder(options).mergeMethodRetry(methodRetry).build(); ActivityStub stub = ActivityStubImpl.newInstance(mergedOptions, activityExecutor); - function = (a) -> stub.execute(activityName, method.getReturnType(), method.getGenericReturnType(), a); return function; diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index 974715a21..9e4b72753 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -401,8 +401,8 @@ public static T newActivityStub(Class activityInterface, ActivityOptions public static T newActivityStub( Class activityInterface, ActivityOptions options, - Map perActivityMethodOptions) { - return WorkflowInternal.newActivityStub(activityInterface, options, perActivityMethodOptions); + Map activityMethodOptions) { + return WorkflowInternal.newActivityStub(activityInterface, options, activityMethodOptions); } /** From f378c61c350ca7c244ae8079ead361abb2855232 Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Thu, 8 Apr 2021 17:32:28 -0500 Subject: [PATCH 06/15] This test can now use the other overloaded method. --- .../io/temporal/testing/TestActivityEnvironmentInternal.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index 1ca039e22..ea0f075cd 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -185,7 +185,7 @@ public T newActivityStub(Class activityInterface) { .build(); InvocationHandler invocationHandler = ActivityInvocationHandler.newInstance( - activityInterface, options, null, new TestActivityExecutor()); + activityInterface, options, new TestActivityExecutor()); invocationHandler = new DeterministicRunnerWrapper(invocationHandler); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); } From 890269cc4a7c4b9fd4da57efd1011741c38e0c12 Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Fri, 9 Apr 2021 18:25:04 -0500 Subject: [PATCH 07/15] Cleanup --- .../io/temporal/activity/ActivityOptions.java | 9 ++++++++- .../sync/ActivityInvocationHandler.java | 17 ++++------------- .../worker/WorkflowImplementationOptions.java | 15 --------------- .../TestActivityEnvironmentInternal.java | 2 +- 4 files changed, 13 insertions(+), 30 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java b/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java index 1d0f0f542..ad9d8da3e 100644 --- a/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java @@ -176,12 +176,14 @@ public Builder setCancellationType(ActivityCancellationType cancellationType) { } public Builder mergeMethodOptions(ActivityOptions override) { + if (override == null) { + return this; + } this.taskQueue = (override.taskQueue == null) ? this.taskQueue : override.taskQueue; this.heartbeatTimeout = (override.heartbeatTimeout == null) ? this.heartbeatTimeout : override.heartbeatTimeout; this.retryOptions = (override.retryOptions == null) ? this.retryOptions : override.retryOptions; - this.contextPropagators.addAll(override.contextPropagators); this.scheduleToCloseTimeout = (override.scheduleToCloseTimeout == null) ? this.scheduleToCloseTimeout @@ -196,6 +198,11 @@ public Builder mergeMethodOptions(ActivityOptions override) { : override.scheduleToStartTimeout; this.cancellationType = (override.cancellationType == null) ? this.cancellationType : override.cancellationType; + if (this.contextPropagators == null) { + this.contextPropagators = override.contextPropagators; + } else if (override.contextPropagators != null) { + this.contextPropagators.addAll(override.contextPropagators); + } return this; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java index 9f337675f..b9e11cc68 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java @@ -29,7 +29,6 @@ import java.util.HashMap; import java.util.Map; import java.util.function.Function; -import javax.annotation.Nonnull; @VisibleForTesting public class ActivityInvocationHandler extends ActivityInvocationHandlerBase { @@ -38,18 +37,10 @@ public class ActivityInvocationHandler extends ActivityInvocationHandlerBase { @VisibleForTesting public static InvocationHandler newInstance( - @Nonnull Class activityInterface, - @Nonnull ActivityOptions options, - @Nonnull WorkflowOutboundCallsInterceptor activityExecutor) { - return new ActivityInvocationHandler(activityInterface, activityExecutor, options, null); - } - - @VisibleForTesting - public static InvocationHandler newInstance( - @Nonnull Class activityInterface, - @Nonnull ActivityOptions options, - @Nonnull Map methodOptions, - @Nonnull WorkflowOutboundCallsInterceptor activityExecutor) { + Class activityInterface, + ActivityOptions options, + Map methodOptions, + WorkflowOutboundCallsInterceptor activityExecutor) { return new ActivityInvocationHandler( activityInterface, activityExecutor, options, methodOptions); } diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkflowImplementationOptions.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkflowImplementationOptions.java index 7bc41485d..3f09efb4e 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkflowImplementationOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkflowImplementationOptions.java @@ -19,9 +19,7 @@ package io.temporal.worker; -import io.temporal.activity.ActivityOptions; import java.util.Arrays; -import java.util.Map; public final class WorkflowImplementationOptions { @@ -42,8 +40,6 @@ public static Builder newBuilder() { public static final class Builder { private Class[] failWorkflowExceptionTypes; - private Map activityOptions; - private Map> activityToMethodOptions; private Builder() {} @@ -68,17 +64,6 @@ public Builder setFailWorkflowExceptionTypes( return this; } - public Builder setActivityOptions(Map activityOptions) { - this.activityOptions = activityOptions; - return this; - } - - public Builder setActivityMethodOptions( - Map> activityToMethodOptions) { - this.activityToMethodOptions = activityToMethodOptions; - return this; - } - public WorkflowImplementationOptions build() { return new WorkflowImplementationOptions( failWorkflowExceptionTypes == null ? new Class[0] : failWorkflowExceptionTypes); diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index ea0f075cd..1ca039e22 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -185,7 +185,7 @@ public T newActivityStub(Class activityInterface) { .build(); InvocationHandler invocationHandler = ActivityInvocationHandler.newInstance( - activityInterface, options, new TestActivityExecutor()); + activityInterface, options, null, new TestActivityExecutor()); invocationHandler = new DeterministicRunnerWrapper(invocationHandler); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); } From 09e7ac39007e3b82c71d846206bd918b4ca7f6f7 Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Fri, 9 Apr 2021 18:40:13 -0500 Subject: [PATCH 08/15] Added LocalActivity support --- .../activity/LocalActivityOptions.java | 21 +++++++++++++++ .../sync/LocalActivityInvocationHandler.java | 27 ++++++++++++++++--- .../internal/sync/WorkflowInternal.java | 9 +++++-- .../java/io/temporal/workflow/Workflow.java | 18 ++++++++++--- 4 files changed, 66 insertions(+), 9 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java b/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java index 628d4ccd9..b670aca40 100644 --- a/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java @@ -92,6 +92,27 @@ public Builder setStartToCloseTimeout(Duration timeout) { return this; } + public Builder mergeActivityOptions(LocalActivityOptions override) { + if (override == null) { + return this; + } + this.scheduleToCloseTimeout = + (override.scheduleToCloseTimeout == null) + ? this.scheduleToCloseTimeout + : override.scheduleToCloseTimeout; + this.localRetryThreshold = + (override.localRetryThreshold == null) + ? this.localRetryThreshold + : override.localRetryThreshold; + this.startToCloseTimeout = + (override.startToCloseTimeout == null) + ? this.startToCloseTimeout + : override.startToCloseTimeout; + this.retryOptions = + (override.retryOptions == null) ? this.retryOptions : override.retryOptions; + return this; + } + /** * RetryOptions that define how activity is retried in case of failure. Default is null which is * no retries. diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java index 3ecee488d..edfcec256 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java @@ -25,24 +25,42 @@ import io.temporal.workflow.ActivityStub; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; import java.util.function.Function; class LocalActivityInvocationHandler extends ActivityInvocationHandlerBase { - private final LocalActivityOptions options; + private final Map activityMethodOptions; private final WorkflowOutboundCallsInterceptor activityExecutor; static InvocationHandler newInstance( Class activityInterface, LocalActivityOptions options, + Map methodOptions, WorkflowOutboundCallsInterceptor activityExecutor) { - return new LocalActivityInvocationHandler(activityInterface, activityExecutor, options); + return new LocalActivityInvocationHandler( + activityInterface, activityExecutor, options, methodOptions); } private LocalActivityInvocationHandler( Class activityInterface, WorkflowOutboundCallsInterceptor activityExecutor, - LocalActivityOptions options) { - this.options = options; + LocalActivityOptions options, + Map methodOptions) { + this.activityMethodOptions = new HashMap<>(); + if (methodOptions == null) { + for (Method method : activityInterface.getMethods()) { + this.activityMethodOptions.put(method.getName(), options); + } + } else { + for (Method method : activityInterface.getMethods()) { + LocalActivityOptions mergedOptions = + LocalActivityOptions.newBuilder(options) + .mergeActivityOptions(methodOptions.get(method.getName())) + .build(); + this.activityMethodOptions.put(method.getName(), mergedOptions); + } + } this.activityExecutor = activityExecutor; init(activityInterface); } @@ -51,6 +69,7 @@ private LocalActivityInvocationHandler( protected Function getActivityFunc( Method method, MethodRetry methodRetry, String activityName) { Function function; + LocalActivityOptions options = this.activityMethodOptions.get(method.getName()); LocalActivityOptions mergedOptions = LocalActivityOptions.newBuilder(options) .setMethodRetry(methodRetry) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index b69f99148..b9fb90ce0 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -200,10 +200,15 @@ public static T newActivityStub( * @param activityInterface interface type implemented by activities */ public static T newLocalActivityStub( - Class activityInterface, LocalActivityOptions options) { + Class activityInterface, + LocalActivityOptions options, + Map activityMethodOptions) { InvocationHandler invocationHandler = LocalActivityInvocationHandler.newInstance( - activityInterface, options, WorkflowInternal.getWorkflowInterceptor()); + activityInterface, + options, + activityMethodOptions, + WorkflowInternal.getWorkflowInterceptor()); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index 9e4b72753..99625b46c 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -414,6 +414,15 @@ public static ActivityStub newUntypedActivityStub(ActivityOptions options) { return WorkflowInternal.newUntypedActivityStub(options); } + /** + * Creates client stub to local activities that implement given interface. + * + * @param activityInterface interface type implemented by activities + */ + public static T newLocalActivityStub(Class activityInterface) { + return WorkflowInternal.newLocalActivityStub(activityInterface, null, null); + } + /** * Creates client stub to local activities that implement given interface. A local activity is * similar to a regular activity, but with some key differences: 1. Local activity is scheduled @@ -427,7 +436,7 @@ public static ActivityStub newUntypedActivityStub(ActivityOptions options) { */ public static T newLocalActivityStub( Class activityInterface, LocalActivityOptions options) { - return WorkflowInternal.newLocalActivityStub(activityInterface, options); + return WorkflowInternal.newLocalActivityStub(activityInterface, options, null); } /** @@ -435,8 +444,11 @@ public static T newLocalActivityStub( * * @param activityInterface interface type implemented by activities */ - public static T newLocalActivityStub(Class activityInterface) { - return WorkflowInternal.newLocalActivityStub(activityInterface, null); + public static T newLocalActivityStub( + Class activityInterface, + LocalActivityOptions options, + Map activityMethodOptions) { + return WorkflowInternal.newLocalActivityStub(activityInterface, options, activityMethodOptions); } /** From 31202600897b9f58f8ebc7d24a0343d8a9e1febc Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Mon, 12 Apr 2021 12:29:21 -0500 Subject: [PATCH 09/15] Added unit test for activity method options. --- .../io/temporal/activity/ActivityOptions.java | 2 +- .../sync/ActivityInvocationHandler.java | 7 +- .../activity/ActivityMethodOptionsTest.java | 117 ++++++++++++++++++ 3 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java b/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java index ad9d8da3e..91dfeae71 100644 --- a/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java @@ -175,7 +175,7 @@ public Builder setCancellationType(ActivityCancellationType cancellationType) { return this; } - public Builder mergeMethodOptions(ActivityOptions override) { + public Builder mergeActivityOptions(ActivityOptions override) { if (override == null) { return this; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java index b9e11cc68..c44d6a028 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java @@ -59,7 +59,7 @@ private ActivityInvocationHandler( for (Method method : activityInterface.getMethods()) { ActivityOptions mergedOptions = ActivityOptions.newBuilder(options) - .mergeMethodOptions(methodOptions.get(method.getName())) + .mergeActivityOptions(methodOptions.get(method.getName())) .build(); this.activityMethodOptions.put(method, mergedOptions); } @@ -80,4 +80,9 @@ protected Function getActivityFunc( (a) -> stub.execute(activityName, method.getReturnType(), method.getGenericReturnType(), a); return function; } + + @VisibleForTesting + public Map getActivityMethodOptions() { + return this.activityMethodOptions; + } } diff --git a/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java b/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java new file mode 100644 index 000000000..667f6bf05 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.activity; + +import io.temporal.common.RetryOptions; +import io.temporal.common.metadata.POJOActivityInterfaceMetadata; +import io.temporal.common.metadata.POJOActivityMethodMetadata; +import io.temporal.internal.sync.ActivityInvocationHandler; +import java.lang.reflect.Method; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import org.junit.Assert; +import org.junit.Test; + +public class ActivityMethodOptionsTest { + + private final ActivityOptions options = + ActivityOptions.newBuilder() + .setTaskQueue("ActivityOptions") + .setHeartbeatTimeout(Duration.ofSeconds(5)) + .setScheduleToStartTimeout(Duration.ofSeconds(1)) + .setScheduleToCloseTimeout(Duration.ofDays(5)) + .setStartToCloseTimeout(Duration.ofSeconds(1)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build()) + .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) + .setContextPropagators(null) + .build(); + private final ActivityOptions methodOptions = + ActivityOptions.newBuilder() + .setTaskQueue("ActivityMethodOptions") + .setHeartbeatTimeout(Duration.ofSeconds(3)) + .setScheduleToStartTimeout(Duration.ofSeconds(3)) + .setScheduleToCloseTimeout(Duration.ofDays(3)) + .setStartToCloseTimeout(Duration.ofSeconds(3)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(33).build()) + .setCancellationType(ActivityCancellationType.TRY_CANCEL) + .setContextPropagators(null) + .build(); + + @Test + public void testActivityOptionsMerge() { + // Assert no changes if no per method options + ActivityOptions merged = ActivityOptions.newBuilder(options).mergeActivityOptions(null).build(); + Assert.assertEquals(options, merged); + // Assert options were overridden with method options + merged = ActivityOptions.newBuilder(options).mergeActivityOptions(methodOptions).build(); + Assert.assertEquals(methodOptions, merged); + } + + @Test + public void testActivityRetryOptionsChange() { + Map activityMethodOptions = + new HashMap() { + { + put("method1", methodOptions); + } + }; + + // Test that Map was created + ActivityInvocationHandler invocationHandler = + (ActivityInvocationHandler) + ActivityInvocationHandler.newInstance( + TestActivity.class, options, activityMethodOptions, null); + Map methodToOptionsMap = invocationHandler.getActivityMethodOptions(); + POJOActivityInterfaceMetadata activityMetadata = + POJOActivityInterfaceMetadata.newInstance(TestActivity.class); + + for (POJOActivityMethodMetadata methodMetadata : activityMetadata.getMethodsMetadata()) { + Method method = methodMetadata.getMethod(); + if (method.getName().equals("method1")) { + Assert.assertEquals(methodOptions, methodToOptionsMap.get(method)); + } else { + Assert.assertEquals(options, methodToOptionsMap.get(method)); + } + } + } + + @ActivityInterface + public interface TestActivity { + + @ActivityMethod + void method1(); + + @ActivityMethod + void method2(); + } + + class TestActivityImpl implements TestActivity { + @Override + public void method1() { + System.out.printf("Executing method1."); + } + + @Override + public void method2() { + System.out.printf("Executing method2."); + } + } +} \ No newline at end of file From 5495c0ba79624214188affedae3fb4be6fc005b3 Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Mon, 12 Apr 2021 17:07:54 -0500 Subject: [PATCH 10/15] Fixed documentation. --- .../internal/sync/WorkflowInternal.java | 6 + .../java/io/temporal/workflow/Workflow.java | 14 +- .../activity/ActivityMethodOptionsTest.java | 144 +++++++++--------- ...hCancellationScopeAndCancelParentTest.java | 1 - .../workflow/signalTests/SignalTest.java | 1 - 5 files changed, 87 insertions(+), 79 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index b9fb90ce0..b48d7c818 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -180,6 +180,9 @@ public static long currentTimeMillis() { * Creates client stub to activities that implement given interface. * * @param activityInterface interface type implemented by activities + * @param options options that together with the properties of {@link + * io.temporal.activity.ActivityMethod} specify the activity invocation parameters + * @param activityMethodOptions activity method-specific invocation parameters */ public static T newActivityStub( Class activityInterface, @@ -198,6 +201,9 @@ public static T newActivityStub( * Creates client stub to local activities that implement given interface. * * @param activityInterface interface type implemented by activities + * @param options options that together with the properties of {@link + * io.temporal.activity.ActivityMethod} specify the activity invocation parameters + * @param activityMethodOptions activity method-specific invocation parameters */ public static T newLocalActivityStub( Class activityInterface, diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index 99625b46c..88627a54a 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -381,11 +381,11 @@ public static T newActivityStub(Class activityInterface) { } /** - * Creates client stub to activities that implement given interface. + * Creates client stub to activities that implement given interface * * @param activityInterface interface type implemented by activities. * @param options options that together with the properties of {@link - * io.temporal.activity.ActivityMethod} specify the activity invocation parameters. + * io.temporal.activity.ActivityMethod} specify the activity invocation parameters */ public static T newActivityStub(Class activityInterface, ActivityOptions options) { return WorkflowInternal.newActivityStub(activityInterface, options, null); @@ -394,9 +394,10 @@ public static T newActivityStub(Class activityInterface, ActivityOptions /** * Creates client stub to activities that implement given interface. * - * @param activityInterface interface type implemented by activities. + * @param activityInterface interface type implemented by activities * @param options options that together with the properties of {@link - * io.temporal.activity.ActivityMethod} specify the activity invocation parameters. + * io.temporal.activity.ActivityMethod} specify the activity invocation parameters + * @param activityMethodOptions activity method-specific invocation parameters */ public static T newActivityStub( Class activityInterface, @@ -430,7 +431,7 @@ public static T newLocalActivityStub(Class activityInterface) { * schedule activity task and does not rely on activity worker. 3. Local activity is for short * living activities (usually finishes within seconds). 4. Local activity cannot heartbeat. * - * @param activityInterface interface type implemented by activities. + * @param activityInterface interface type implemented by activities * @param options options that together with the properties of {@link * io.temporal.activity.ActivityMethod} specify the activity invocation parameters. */ @@ -443,6 +444,9 @@ public static T newLocalActivityStub( * Creates client stub to local activities that implement given interface. * * @param activityInterface interface type implemented by activities + * @param options options that together with the properties of {@link + * io.temporal.activity.ActivityMethod} specify the activity invocation parameters + * @param activityMethodOptions activity method-specific invocation parameters */ public static T newLocalActivityStub( Class activityInterface, diff --git a/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java b/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java index 667f6bf05..0b0727091 100644 --- a/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java @@ -32,86 +32,86 @@ public class ActivityMethodOptionsTest { - private final ActivityOptions options = - ActivityOptions.newBuilder() - .setTaskQueue("ActivityOptions") - .setHeartbeatTimeout(Duration.ofSeconds(5)) - .setScheduleToStartTimeout(Duration.ofSeconds(1)) - .setScheduleToCloseTimeout(Duration.ofDays(5)) - .setStartToCloseTimeout(Duration.ofSeconds(1)) - .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build()) - .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) - .setContextPropagators(null) - .build(); - private final ActivityOptions methodOptions = - ActivityOptions.newBuilder() - .setTaskQueue("ActivityMethodOptions") - .setHeartbeatTimeout(Duration.ofSeconds(3)) - .setScheduleToStartTimeout(Duration.ofSeconds(3)) - .setScheduleToCloseTimeout(Duration.ofDays(3)) - .setStartToCloseTimeout(Duration.ofSeconds(3)) - .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(33).build()) - .setCancellationType(ActivityCancellationType.TRY_CANCEL) - .setContextPropagators(null) - .build(); + private final ActivityOptions options = + ActivityOptions.newBuilder() + .setTaskQueue("ActivityOptions") + .setHeartbeatTimeout(Duration.ofSeconds(5)) + .setScheduleToStartTimeout(Duration.ofSeconds(1)) + .setScheduleToCloseTimeout(Duration.ofDays(5)) + .setStartToCloseTimeout(Duration.ofSeconds(1)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build()) + .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) + .setContextPropagators(null) + .build(); + private final ActivityOptions methodOptions = + ActivityOptions.newBuilder() + .setTaskQueue("ActivityMethodOptions") + .setHeartbeatTimeout(Duration.ofSeconds(3)) + .setScheduleToStartTimeout(Duration.ofSeconds(3)) + .setScheduleToCloseTimeout(Duration.ofDays(3)) + .setStartToCloseTimeout(Duration.ofSeconds(3)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(33).build()) + .setCancellationType(ActivityCancellationType.TRY_CANCEL) + .setContextPropagators(null) + .build(); - @Test - public void testActivityOptionsMerge() { - // Assert no changes if no per method options - ActivityOptions merged = ActivityOptions.newBuilder(options).mergeActivityOptions(null).build(); - Assert.assertEquals(options, merged); - // Assert options were overridden with method options - merged = ActivityOptions.newBuilder(options).mergeActivityOptions(methodOptions).build(); - Assert.assertEquals(methodOptions, merged); - } + @Test + public void testActivityOptionsMerge() { + // Assert no changes if no per method options + ActivityOptions merged = ActivityOptions.newBuilder(options).mergeActivityOptions(null).build(); + Assert.assertEquals(options, merged); + // Assert options were overridden with method options + merged = ActivityOptions.newBuilder(options).mergeActivityOptions(methodOptions).build(); + Assert.assertEquals(methodOptions, merged); + } - @Test - public void testActivityRetryOptionsChange() { - Map activityMethodOptions = - new HashMap() { - { - put("method1", methodOptions); - } - }; + @Test + public void testActivityRetryOptionsChange() { + Map activityMethodOptions = + new HashMap() { + { + put("method1", methodOptions); + } + }; - // Test that Map was created - ActivityInvocationHandler invocationHandler = - (ActivityInvocationHandler) - ActivityInvocationHandler.newInstance( - TestActivity.class, options, activityMethodOptions, null); - Map methodToOptionsMap = invocationHandler.getActivityMethodOptions(); - POJOActivityInterfaceMetadata activityMetadata = - POJOActivityInterfaceMetadata.newInstance(TestActivity.class); + // Test that Map was created + ActivityInvocationHandler invocationHandler = + (ActivityInvocationHandler) + ActivityInvocationHandler.newInstance( + TestActivity.class, options, activityMethodOptions, null); + Map methodToOptionsMap = invocationHandler.getActivityMethodOptions(); + POJOActivityInterfaceMetadata activityMetadata = + POJOActivityInterfaceMetadata.newInstance(TestActivity.class); - for (POJOActivityMethodMetadata methodMetadata : activityMetadata.getMethodsMetadata()) { - Method method = methodMetadata.getMethod(); - if (method.getName().equals("method1")) { - Assert.assertEquals(methodOptions, methodToOptionsMap.get(method)); - } else { - Assert.assertEquals(options, methodToOptionsMap.get(method)); - } - } + for (POJOActivityMethodMetadata methodMetadata : activityMetadata.getMethodsMetadata()) { + Method method = methodMetadata.getMethod(); + if (method.getName().equals("method1")) { + Assert.assertEquals(methodOptions, methodToOptionsMap.get(method)); + } else { + Assert.assertEquals(options, methodToOptionsMap.get(method)); + } } + } - @ActivityInterface - public interface TestActivity { + @ActivityInterface + public interface TestActivity { - @ActivityMethod - void method1(); + @ActivityMethod + void method1(); - @ActivityMethod - void method2(); - } + @ActivityMethod + void method2(); + } - class TestActivityImpl implements TestActivity { - @Override - public void method1() { - System.out.printf("Executing method1."); - } + class TestActivityImpl implements TestActivity { + @Override + public void method1() { + System.out.printf("Executing method1."); + } - @Override - public void method2() { - System.out.printf("Executing method2."); - } + @Override + public void method2() { + System.out.printf("Executing method2."); } -} \ No newline at end of file + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/StartChildWorkflowWithCancellationScopeAndCancelParentTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/StartChildWorkflowWithCancellationScopeAndCancelParentTest.java index de65a3a04..8e5c3c090 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/StartChildWorkflowWithCancellationScopeAndCancelParentTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/StartChildWorkflowWithCancellationScopeAndCancelParentTest.java @@ -35,7 +35,6 @@ import io.temporal.workflow.shared.TestWorkflows; import java.util.ArrayList; import java.util.List; - import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalTest.java index 136cf7df7..b5e5364a5 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalTest.java @@ -38,7 +38,6 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; - import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; From bd8c3d1b5f988852b3a0ac63629ae6836de37d5d Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Tue, 13 Apr 2021 21:09:35 -0500 Subject: [PATCH 11/15] Changed the method map to use activityTypeName as key. Fixed the unit test. --- .../sync/ActivityInvocationHandler.java | 34 ++--- .../sync/LocalActivityInvocationHandler.java | 21 +-- .../activity/ActivityMethodOptionsTest.java | 127 +++++++++++------- .../testing/TestActivityEnvironment.java | 17 +++ .../TestActivityEnvironmentInternal.java | 19 +++ 5 files changed, 133 insertions(+), 85 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java index c44d6a028..2124d3b2d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java @@ -32,7 +32,8 @@ @VisibleForTesting public class ActivityInvocationHandler extends ActivityInvocationHandlerBase { - private final Map activityMethodOptions; + private final ActivityOptions options; + private final Map activityMethodOptions; private final WorkflowOutboundCallsInterceptor activityExecutor; @VisibleForTesting @@ -50,20 +51,8 @@ private ActivityInvocationHandler( WorkflowOutboundCallsInterceptor activityExecutor, ActivityOptions options, Map methodOptions) { - this.activityMethodOptions = new HashMap<>(); - if (methodOptions == null) { - for (Method method : activityInterface.getMethods()) { - this.activityMethodOptions.put(method, options); - } - } else { - for (Method method : activityInterface.getMethods()) { - ActivityOptions mergedOptions = - ActivityOptions.newBuilder(options) - .mergeActivityOptions(methodOptions.get(method.getName())) - .build(); - this.activityMethodOptions.put(method, mergedOptions); - } - } + this.options = options; + this.activityMethodOptions = (methodOptions == null) ? new HashMap<>() : methodOptions; this.activityExecutor = activityExecutor; init(activityInterface); } @@ -72,17 +61,14 @@ private ActivityInvocationHandler( protected Function getActivityFunc( Method method, MethodRetry methodRetry, String activityName) { Function function; - ActivityOptions options = this.activityMethodOptions.get(method); - ActivityOptions mergedOptions = - ActivityOptions.newBuilder(options).mergeMethodRetry(methodRetry).build(); - ActivityStub stub = ActivityStubImpl.newInstance(mergedOptions, activityExecutor); + ActivityOptions merged = + ActivityOptions.newBuilder(options) + .mergeActivityOptions(this.activityMethodOptions.get(activityName)) + .mergeMethodRetry(methodRetry) + .build(); + ActivityStub stub = ActivityStubImpl.newInstance(merged, activityExecutor); function = (a) -> stub.execute(activityName, method.getReturnType(), method.getGenericReturnType(), a); return function; } - - @VisibleForTesting - public Map getActivityMethodOptions() { - return this.activityMethodOptions; - } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java index edfcec256..03c3684fa 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java @@ -30,6 +30,7 @@ import java.util.function.Function; class LocalActivityInvocationHandler extends ActivityInvocationHandlerBase { + private final LocalActivityOptions options; private final Map activityMethodOptions; private final WorkflowOutboundCallsInterceptor activityExecutor; @@ -47,20 +48,8 @@ private LocalActivityInvocationHandler( WorkflowOutboundCallsInterceptor activityExecutor, LocalActivityOptions options, Map methodOptions) { - this.activityMethodOptions = new HashMap<>(); - if (methodOptions == null) { - for (Method method : activityInterface.getMethods()) { - this.activityMethodOptions.put(method.getName(), options); - } - } else { - for (Method method : activityInterface.getMethods()) { - LocalActivityOptions mergedOptions = - LocalActivityOptions.newBuilder(options) - .mergeActivityOptions(methodOptions.get(method.getName())) - .build(); - this.activityMethodOptions.put(method.getName(), mergedOptions); - } - } + this.options = options; + this.activityMethodOptions = (methodOptions == null) ? new HashMap<>() : methodOptions; this.activityExecutor = activityExecutor; init(activityInterface); } @@ -69,11 +58,11 @@ private LocalActivityInvocationHandler( protected Function getActivityFunc( Method method, MethodRetry methodRetry, String activityName) { Function function; - LocalActivityOptions options = this.activityMethodOptions.get(method.getName()); LocalActivityOptions mergedOptions = LocalActivityOptions.newBuilder(options) + .mergeActivityOptions(activityMethodOptions.get(activityName)) .setMethodRetry(methodRetry) - .validateAndBuildWithDefaults(); + .build(); ActivityStub stub = LocalActivityStubImpl.newInstance(mergedOptions, activityExecutor); function = (a) -> stub.execute(activityName, method.getReturnType(), method.getGenericReturnType(), a); diff --git a/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java b/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java index 0b0727091..5b63a20f3 100644 --- a/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java @@ -20,98 +20,135 @@ package io.temporal.activity; import io.temporal.common.RetryOptions; -import io.temporal.common.metadata.POJOActivityInterfaceMetadata; -import io.temporal.common.metadata.POJOActivityMethodMetadata; -import io.temporal.internal.sync.ActivityInvocationHandler; -import java.lang.reflect.Method; +import io.temporal.testing.TestActivityEnvironment; import java.time.Duration; import java.util.HashMap; +import java.util.Hashtable; import java.util.Map; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class ActivityMethodOptionsTest { - private final ActivityOptions options = + private static final ActivityOptions defaultOps = ActivityOptions.newBuilder() .setTaskQueue("ActivityOptions") - .setHeartbeatTimeout(Duration.ofSeconds(5)) - .setScheduleToStartTimeout(Duration.ofSeconds(1)) - .setScheduleToCloseTimeout(Duration.ofDays(5)) - .setStartToCloseTimeout(Duration.ofSeconds(1)) + .setHeartbeatTimeout(Duration.ofSeconds(1)) + .setScheduleToStartTimeout(Duration.ofSeconds(2)) + .setScheduleToCloseTimeout(Duration.ofDays(1)) + .setStartToCloseTimeout(Duration.ofSeconds(2)) .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build()) .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) .setContextPropagators(null) .build(); - private final ActivityOptions methodOptions = + private static final ActivityOptions methodOps1 = ActivityOptions.newBuilder() .setTaskQueue("ActivityMethodOptions") .setHeartbeatTimeout(Duration.ofSeconds(3)) .setScheduleToStartTimeout(Duration.ofSeconds(3)) .setScheduleToCloseTimeout(Duration.ofDays(3)) .setStartToCloseTimeout(Duration.ofSeconds(3)) - .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(33).build()) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(2).build()) .setCancellationType(ActivityCancellationType.TRY_CANCEL) .setContextPropagators(null) .build(); + private static final ActivityOptions methodOps2 = + ActivityOptions.newBuilder() + .setHeartbeatTimeout(Duration.ofSeconds(7)) + .setStartToCloseTimeout(Duration.ofSeconds(7)) + .build(); + private static final Map perMethodOptionsMap = + new HashMap() { + { + put("Method1", methodOps2); + } + }; + private TestActivityEnvironment testEnv; + + @Before + public void setUp() { + testEnv = TestActivityEnvironment.newInstance(); + } @Test public void testActivityOptionsMerge() { // Assert no changes if no per method options - ActivityOptions merged = ActivityOptions.newBuilder(options).mergeActivityOptions(null).build(); - Assert.assertEquals(options, merged); + ActivityOptions merged = + ActivityOptions.newBuilder(defaultOps).mergeActivityOptions(null).build(); + Assert.assertEquals(defaultOps, merged); // Assert options were overridden with method options - merged = ActivityOptions.newBuilder(options).mergeActivityOptions(methodOptions).build(); - Assert.assertEquals(methodOptions, merged); + merged = ActivityOptions.newBuilder(defaultOps).mergeActivityOptions(methodOps1).build(); + Assert.assertEquals(methodOps1, merged); } @Test - public void testActivityRetryOptionsChange() { - Map activityMethodOptions = - new HashMap() { - { - put("method1", methodOptions); - } - }; + public void testActivityMethodOptions() { + testEnv.registerActivitiesImplementations(new ActivityImpl()); + TestActivity activity = + testEnv.newActivityStub(TestActivity.class, defaultOps, perMethodOptionsMap); - // Test that Map was created - ActivityInvocationHandler invocationHandler = - (ActivityInvocationHandler) - ActivityInvocationHandler.newInstance( - TestActivity.class, options, activityMethodOptions, null); - Map methodToOptionsMap = invocationHandler.getActivityMethodOptions(); - POJOActivityInterfaceMetadata activityMetadata = - POJOActivityInterfaceMetadata.newInstance(TestActivity.class); + // Check that options for method1 were merged. + Map method1OpsValues = activity.method1(); + Assert.assertEquals( + methodOps2.getHeartbeatTimeout(), Duration.parse(method1OpsValues.get("HeartbeatTimeout"))); + Assert.assertEquals( + defaultOps.getScheduleToCloseTimeout(), + Duration.parse(method1OpsValues.get("ScheduleToCloseTimeout"))); + Assert.assertEquals( + methodOps2.getStartToCloseTimeout(), + Duration.parse(method1OpsValues.get("StartToCloseTimeout"))); - for (POJOActivityMethodMetadata methodMetadata : activityMetadata.getMethodsMetadata()) { - Method method = methodMetadata.getMethod(); - if (method.getName().equals("method1")) { - Assert.assertEquals(methodOptions, methodToOptionsMap.get(method)); - } else { - Assert.assertEquals(options, methodToOptionsMap.get(method)); - } - } + // Check that options for method2 were default. + Map method2OpsValues = activity.method2(); + Assert.assertEquals( + defaultOps.getHeartbeatTimeout(), Duration.parse(method2OpsValues.get("HeartbeatTimeout"))); + Assert.assertEquals( + defaultOps.getScheduleToCloseTimeout(), + Duration.parse(method2OpsValues.get("ScheduleToCloseTimeout"))); + Assert.assertEquals( + defaultOps.getStartToCloseTimeout(), + Duration.parse(method2OpsValues.get("StartToCloseTimeout"))); } @ActivityInterface public interface TestActivity { @ActivityMethod - void method1(); + Map method1(); @ActivityMethod - void method2(); + Map method2(); } - class TestActivityImpl implements TestActivity { + private static class ActivityImpl implements TestActivity { + @Override - public void method1() { - System.out.printf("Executing method1."); + public Map method1() { + ActivityInfo info = Activity.getExecutionContext().getInfo(); + Hashtable result = + new Hashtable() { + { + put("HeartbeatTimeout", info.getHeartbeatTimeout().toString()); + put("ScheduleToCloseTimeout", info.getScheduleToCloseTimeout().toString()); + put("StartToCloseTimeout", info.getStartToCloseTimeout().toString()); + } + }; + return result; } @Override - public void method2() { - System.out.printf("Executing method2."); + public Map method2() { + ActivityInfo info = Activity.getExecutionContext().getInfo(); + Hashtable result = + new Hashtable() { + { + put("HeartbeatTimeout", info.getHeartbeatTimeout().toString()); + put("ScheduleToCloseTimeout", info.getScheduleToCloseTimeout().toString()); + put("StartToCloseTimeout", info.getStartToCloseTimeout().toString()); + } + }; + return result; } } } diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java index a30336654..37c7945f6 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java @@ -21,8 +21,10 @@ import com.google.common.annotations.VisibleForTesting; import io.temporal.activity.ActivityExecutionContext; +import io.temporal.activity.ActivityOptions; import io.temporal.workflow.Functions; import java.lang.reflect.Type; +import java.util.Map; /** * The helper class for unit testing activity implementations. Supports calls to {@link @@ -78,6 +80,21 @@ static TestActivityEnvironment newInstance(TestEnvironmentOptions options) { */ T newActivityStub(Class activityInterface); + /** + * Creates a stub that can be used to invoke activities registered through {@link + * #registerActivitiesImplementations(Object...)}. + * + * @param Type of the activity interface. + * @param activityInterface activity interface class that the object under test implements + * @param options options that specify the activity invocation parameters + * @param activityMethodOptions activity method-specific invocation parameters + * @return The stub that implements the activity interface. + */ + T newActivityStub( + Class activityInterface, + ActivityOptions options, + Map activityMethodOptions); + /** * Sets a listener that is called every time an activity implementation heartbeats through {@link * ActivityExecutionContext#heartbeat(Object)}. diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index 1ca039e22..85611e686 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -190,6 +190,25 @@ public T newActivityStub(Class activityInterface) { return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); } + /** + * Creates client stub to activities that implement given interface. + * + * @param activityInterface interface type implemented by activities + * @param options options that specify the activity invocation parameters + * @param activityMethodOptions activity method-specific invocation parameters + */ + @Override + public T newActivityStub( + Class activityInterface, + ActivityOptions options, + Map activityMethodOptions) { + InvocationHandler invocationHandler = + ActivityInvocationHandler.newInstance( + activityInterface, options, activityMethodOptions, new TestActivityExecutor()); + invocationHandler = new DeterministicRunnerWrapper(invocationHandler); + return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); + } + @Override public void requestCancelActivity() { cancellationRequested.set(true); From 5f4fa4b47578a61cf89d415758a424b8b5aa11f6 Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Wed, 14 Apr 2021 15:36:55 -0500 Subject: [PATCH 12/15] Changed how doNotIncludeArgumentsIntoMarker is implemented. Added method options test for Local Activities. --- .../activity/LocalActivityOptions.java | 14 +- .../sync/LocalActivityInvocationHandler.java | 10 +- .../activity/ActivityMethodOptionsTest.java | 50 +++--- .../LocalActivityMethodOptionsTest.java | 143 ++++++++++++++++++ .../testing/TestActivityEnvironment.java | 16 ++ .../TestActivityEnvironmentInternal.java | 55 ++++++- 6 files changed, 250 insertions(+), 38 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/activity/LocalActivityMethodOptionsTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java b/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java index cd9806c45..7b7cfe2c0 100644 --- a/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java @@ -51,7 +51,7 @@ public static final class Builder { private Duration localRetryThreshold; private Duration startToCloseTimeout; private RetryOptions retryOptions; - private boolean doNotIncludeArgumentsIntoMarker; + private Boolean doNotIncludeArgumentsIntoMarker; /** Copy Builder fields from the options. */ private Builder(LocalActivityOptions options) { @@ -112,6 +112,10 @@ public Builder mergeActivityOptions(LocalActivityOptions override) { : override.startToCloseTimeout; this.retryOptions = (override.retryOptions == null) ? this.retryOptions : override.retryOptions; + this.doNotIncludeArgumentsIntoMarker = + (override.doNotIncludeArgumentsIntoMarker != null) + ? override.doNotIncludeArgumentsIntoMarker + : this.doNotIncludeArgumentsIntoMarker; return this; } @@ -177,14 +181,14 @@ public LocalActivityOptions validateAndBuildWithDefaults() { private final Duration localRetryThreshold; private final Duration startToCloseTimeout; private final RetryOptions retryOptions; - private boolean doNotIncludeArgumentsIntoMarker; + private Boolean doNotIncludeArgumentsIntoMarker; private LocalActivityOptions( Duration startToCloseTimeout, Duration localRetryThreshold, Duration scheduleToCloseTimeout, RetryOptions retryOptions, - boolean doNotIncludeArgumentsIntoMarker) { + Boolean doNotIncludeArgumentsIntoMarker) { this.localRetryThreshold = localRetryThreshold; this.scheduleToCloseTimeout = scheduleToCloseTimeout; this.startToCloseTimeout = startToCloseTimeout; @@ -209,7 +213,7 @@ public RetryOptions getRetryOptions() { } public boolean isDoNotIncludeArgumentsIntoMarker() { - return doNotIncludeArgumentsIntoMarker; + return (doNotIncludeArgumentsIntoMarker == null) ? false : doNotIncludeArgumentsIntoMarker; } public Builder toBuilder() { @@ -250,7 +254,7 @@ public String toString() { + ", retryOptions=" + retryOptions + ", doNotIncludeArgumentsIntoMarker=" - + doNotIncludeArgumentsIntoMarker + + doNotIncludeArgumentsIntoMarker.booleanValue() + '}'; } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java index 03c3684fa..b92d38284 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java @@ -19,6 +19,7 @@ package io.temporal.internal.sync; +import com.google.common.annotations.VisibleForTesting; import io.temporal.activity.LocalActivityOptions; import io.temporal.common.MethodRetry; import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; @@ -29,12 +30,14 @@ import java.util.Map; import java.util.function.Function; -class LocalActivityInvocationHandler extends ActivityInvocationHandlerBase { +@VisibleForTesting +public class LocalActivityInvocationHandler extends ActivityInvocationHandlerBase { private final LocalActivityOptions options; private final Map activityMethodOptions; private final WorkflowOutboundCallsInterceptor activityExecutor; - static InvocationHandler newInstance( + @VisibleForTesting + public static InvocationHandler newInstance( Class activityInterface, LocalActivityOptions options, Map methodOptions, @@ -54,8 +57,9 @@ private LocalActivityInvocationHandler( init(activityInterface); } + @VisibleForTesting @Override - protected Function getActivityFunc( + public Function getActivityFunc( Method method, MethodRetry methodRetry, String activityName) { Function function; LocalActivityOptions mergedOptions = diff --git a/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java b/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java index 5b63a20f3..3f70b871f 100644 --- a/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/activity/ActivityMethodOptionsTest.java @@ -89,63 +89,57 @@ public void testActivityMethodOptions() { testEnv.newActivityStub(TestActivity.class, defaultOps, perMethodOptionsMap); // Check that options for method1 were merged. - Map method1OpsValues = activity.method1(); + Map method1OpsValues = activity.method1(); + Assert.assertEquals(methodOps2.getHeartbeatTimeout(), method1OpsValues.get("HeartbeatTimeout")); Assert.assertEquals( - methodOps2.getHeartbeatTimeout(), Duration.parse(method1OpsValues.get("HeartbeatTimeout"))); + defaultOps.getScheduleToCloseTimeout(), method1OpsValues.get("ScheduleToCloseTimeout")); Assert.assertEquals( - defaultOps.getScheduleToCloseTimeout(), - Duration.parse(method1OpsValues.get("ScheduleToCloseTimeout"))); - Assert.assertEquals( - methodOps2.getStartToCloseTimeout(), - Duration.parse(method1OpsValues.get("StartToCloseTimeout"))); + methodOps2.getStartToCloseTimeout(), method1OpsValues.get("StartToCloseTimeout")); // Check that options for method2 were default. - Map method2OpsValues = activity.method2(); - Assert.assertEquals( - defaultOps.getHeartbeatTimeout(), Duration.parse(method2OpsValues.get("HeartbeatTimeout"))); + Map method2OpsValues = activity.method2(); + Assert.assertEquals(defaultOps.getHeartbeatTimeout(), method2OpsValues.get("HeartbeatTimeout")); Assert.assertEquals( - defaultOps.getScheduleToCloseTimeout(), - Duration.parse(method2OpsValues.get("ScheduleToCloseTimeout"))); + defaultOps.getScheduleToCloseTimeout(), method2OpsValues.get("ScheduleToCloseTimeout")); Assert.assertEquals( - defaultOps.getStartToCloseTimeout(), - Duration.parse(method2OpsValues.get("StartToCloseTimeout"))); + defaultOps.getStartToCloseTimeout(), method2OpsValues.get("StartToCloseTimeout")); } @ActivityInterface public interface TestActivity { @ActivityMethod - Map method1(); + Map method1(); @ActivityMethod - Map method2(); + Map method2(); } private static class ActivityImpl implements TestActivity { @Override - public Map method1() { + public Map method1() { ActivityInfo info = Activity.getExecutionContext().getInfo(); - Hashtable result = - new Hashtable() { + Hashtable result = + new Hashtable() { { - put("HeartbeatTimeout", info.getHeartbeatTimeout().toString()); - put("ScheduleToCloseTimeout", info.getScheduleToCloseTimeout().toString()); - put("StartToCloseTimeout", info.getStartToCloseTimeout().toString()); + put("HeartbeatTimeout", info.getHeartbeatTimeout()); + put("ScheduleToCloseTimeout", info.getScheduleToCloseTimeout()); + put("StartToCloseTimeout", info.getStartToCloseTimeout()); } }; return result; } @Override - public Map method2() { + public Map method2() { ActivityInfo info = Activity.getExecutionContext().getInfo(); - Hashtable result = - new Hashtable() { + Hashtable result = + new Hashtable() { { - put("HeartbeatTimeout", info.getHeartbeatTimeout().toString()); - put("ScheduleToCloseTimeout", info.getScheduleToCloseTimeout().toString()); - put("StartToCloseTimeout", info.getStartToCloseTimeout().toString()); + put("HeartbeatTimeout", info.getHeartbeatTimeout()); + put("ScheduleToCloseTimeout", info.getScheduleToCloseTimeout()); + put("StartToCloseTimeout", info.getStartToCloseTimeout()); } }; return result; diff --git a/temporal-sdk/src/test/java/io/temporal/activity/LocalActivityMethodOptionsTest.java b/temporal-sdk/src/test/java/io/temporal/activity/LocalActivityMethodOptionsTest.java new file mode 100644 index 000000000..dd93f8ec5 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/activity/LocalActivityMethodOptionsTest.java @@ -0,0 +1,143 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.activity; + +import io.temporal.common.RetryOptions; +import io.temporal.testing.TestActivityEnvironment; +import java.time.Duration; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class LocalActivityMethodOptionsTest { + + private static final LocalActivityOptions defaultOps = + LocalActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofDays(1)) + .setStartToCloseTimeout(Duration.ofSeconds(2)) + .setLocalRetryThreshold(Duration.ofSeconds(2)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build()) + .setDoNotIncludeArgumentsIntoMarker(true) + .build(); + + private static final LocalActivityOptions methodOps1 = + LocalActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofDays(2)) + .setStartToCloseTimeout(Duration.ofSeconds(3)) + .setLocalRetryThreshold(Duration.ofSeconds(3)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(2).build()) + .setDoNotIncludeArgumentsIntoMarker(false) + .build(); + private static final LocalActivityOptions methodOps2 = + LocalActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(4)).build(); + private static final Map perMethodOptionsMap = + new HashMap() { + { + put("Method1", methodOps2); + } + }; + private TestActivityEnvironment testEnv; + + @Before + public void setUp() { + testEnv = TestActivityEnvironment.newInstance(); + } + + @Test + public void testActivityOptionsMerge() { + // Assert no changes if no per method options + LocalActivityOptions merged = + LocalActivityOptions.newBuilder(defaultOps).mergeActivityOptions(null).build(); + Assert.assertEquals(defaultOps, merged); + // Assert options were overridden with method options + merged = LocalActivityOptions.newBuilder(defaultOps).mergeActivityOptions(methodOps1).build(); + Assert.assertEquals(methodOps1, merged); + // Check that if doNotIncludeArgumentsIntoMarker is not set, it defaults to false. + Assert.assertEquals(false, methodOps2.isDoNotIncludeArgumentsIntoMarker()); + // Check that original value of doNotIncludeArgumentsIntoMarker is not overridden if it's not + // set in override. + merged = LocalActivityOptions.newBuilder(defaultOps).mergeActivityOptions(methodOps2).build(); + Assert.assertEquals( + defaultOps.isDoNotIncludeArgumentsIntoMarker(), merged.isDoNotIncludeArgumentsIntoMarker()); + } + + @Test + public void testLocalActivityMethodOptions() { + testEnv.registerActivitiesImplementations(new ActivityImpl()); + TestActivity localActivity = + testEnv.newLocalActivityStub(TestActivity.class, defaultOps, perMethodOptionsMap); + + // Check that options for method1 were merged. + Map method1OpsValues = localActivity.method1(); + Assert.assertEquals( + defaultOps.getScheduleToCloseTimeout(), method1OpsValues.get("ScheduleToCloseTimeout")); + Assert.assertEquals( + methodOps2.getStartToCloseTimeout(), method1OpsValues.get("StartToCloseTimeout")); + + // Check that options for method2 were default. + Map method2OpsValues = localActivity.method2(); + Assert.assertEquals( + defaultOps.getScheduleToCloseTimeout(), method2OpsValues.get("ScheduleToCloseTimeout")); + Assert.assertEquals( + defaultOps.getStartToCloseTimeout(), method2OpsValues.get("StartToCloseTimeout")); + } + + @ActivityInterface + public interface TestActivity { + + @ActivityMethod + Map method1(); + + @ActivityMethod + Map method2(); + } + + private static class ActivityImpl implements TestActivity { + + @Override + public Map method1() { + ActivityInfo info = Activity.getExecutionContext().getInfo(); + Hashtable result = + new Hashtable() { + { + put("ScheduleToCloseTimeout", info.getScheduleToCloseTimeout()); + put("StartToCloseTimeout", info.getStartToCloseTimeout()); + } + }; + return result; + } + + @Override + public Map method2() { + ActivityInfo info = Activity.getExecutionContext().getInfo(); + Hashtable result = + new Hashtable() { + { + put("ScheduleToCloseTimeout", info.getScheduleToCloseTimeout()); + put("StartToCloseTimeout", info.getStartToCloseTimeout()); + } + }; + return result; + } + } +} diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java index 37c7945f6..e954e7a20 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import io.temporal.activity.ActivityExecutionContext; import io.temporal.activity.ActivityOptions; +import io.temporal.activity.LocalActivityOptions; import io.temporal.workflow.Functions; import java.lang.reflect.Type; import java.util.Map; @@ -95,6 +96,21 @@ T newActivityStub( ActivityOptions options, Map activityMethodOptions); + /** + * Creates a stub that can be used to invoke activities registered through {@link + * #registerActivitiesImplementations(Object...)}. + * + * @param Type of the activity interface. + * @param activityInterface activity interface class that the object under test implements + * @param options options that specify the activity invocation parameters + * @param activityMethodOptions activity method-specific invocation parameters + * @return The stub that implements the activity interface. + */ + T newLocalActivityStub( + Class activityInterface, + LocalActivityOptions options, + Map activityMethodOptions); + /** * Sets a listener that is called every time an activity implementation heartbeats through {@link * ActivityExecutionContext#heartbeat(Object)}. diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index 85611e686..6729eccb9 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -29,6 +29,7 @@ import io.grpc.stub.StreamObserver; import io.temporal.activity.Activity; import io.temporal.activity.ActivityOptions; +import io.temporal.activity.LocalActivityOptions; import io.temporal.api.common.v1.ActivityType; import io.temporal.api.common.v1.Payloads; import io.temporal.api.common.v1.WorkflowExecution; @@ -49,6 +50,7 @@ import io.temporal.internal.sync.ActivityInvocationHandler; import io.temporal.internal.sync.ActivityInvocationHandlerBase; import io.temporal.internal.sync.DeterministicRunnerWrapper; +import io.temporal.internal.sync.LocalActivityInvocationHandler; import io.temporal.internal.sync.POJOActivityTaskHandler; import io.temporal.internal.worker.ActivityTask; import io.temporal.internal.worker.ActivityTaskHandler; @@ -209,6 +211,25 @@ public T newActivityStub( return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); } + /** + * Creates client stub to activities that implement given interface. + * + * @param activityInterface interface type implemented by activities + * @param options options that specify the activity invocation parameters + * @param activityMethodOptions activity method-specific invocation parameters + */ + @Override + public T newLocalActivityStub( + Class activityInterface, + LocalActivityOptions options, + Map activityMethodOptions) { + InvocationHandler invocationHandler = + LocalActivityInvocationHandler.newInstance( + activityInterface, options, activityMethodOptions, new TestActivityExecutor()); + invocationHandler = new DeterministicRunnerWrapper(invocationHandler); + return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); + } + @Override public void requestCancelActivity() { cancellationRequested.set(true); @@ -280,8 +301,38 @@ public ActivityOutput executeActivity(ActivityInput i) { } @Override - public LocalActivityOutput executeLocalActivity(LocalActivityInput input) { - throw new UnsupportedOperationException("not implemented"); + public LocalActivityOutput executeLocalActivity(LocalActivityInput i) { + Optional payloads = + testEnvironmentOptions + .getWorkflowClientOptions() + .getDataConverter() + .toPayloads(i.getArgs()); + LocalActivityOptions options = i.getOptions(); + PollActivityTaskQueueResponse.Builder taskBuilder = + PollActivityTaskQueueResponse.newBuilder() + .setScheduleToCloseTimeout( + ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout())) + .setStartToCloseTimeout( + ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout())) + .setScheduledTime(ProtobufTimeUtils.getCurrentProtoTime()) + .setStartedTime(ProtobufTimeUtils.getCurrentProtoTime()) + .setTaskToken(ByteString.copyFrom("test-task-token".getBytes(StandardCharsets.UTF_8))) + .setActivityId(String.valueOf(idSequencer.incrementAndGet())) + .setWorkflowExecution( + WorkflowExecution.newBuilder() + .setWorkflowId("test-workflow-id") + .setRunId(UUID.randomUUID().toString()) + .build()) + .setActivityType(ActivityType.newBuilder().setName(i.getActivityName()).build()); + if (payloads.isPresent()) { + taskBuilder.setInput(payloads.get()); + } + PollActivityTaskQueueResponse task = taskBuilder.build(); + Result taskResult = + activityTaskHandler.handle( + new ActivityTask(task, () -> {}), testEnvironmentOptions.getMetricsScope(), false); + return new LocalActivityOutput<>( + Workflow.newPromise(getReply(task, taskResult, i.getResultClass(), i.getResultType()))); } @Override From e4cf6620238c6b9c79db5fb8aa98fc7fc72dbb7c Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Thu, 15 Apr 2021 14:58:49 -0500 Subject: [PATCH 13/15] Fixed NPE --- .../main/java/io/temporal/activity/LocalActivityOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java b/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java index 7b7cfe2c0..cd0eae74a 100644 --- a/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java @@ -254,7 +254,7 @@ public String toString() { + ", retryOptions=" + retryOptions + ", doNotIncludeArgumentsIntoMarker=" - + doNotIncludeArgumentsIntoMarker.booleanValue() + + isDoNotIncludeArgumentsIntoMarker() + '}'; } } From 12fa99fd139b856512f11d338dcad39a9e3fb365 Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Thu, 15 Apr 2021 17:22:11 -0500 Subject: [PATCH 14/15] Added documentation --- .../java/io/temporal/testing/TestActivityEnvironment.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java index e954e7a20..f369f0606 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java @@ -88,7 +88,9 @@ static TestActivityEnvironment newInstance(TestEnvironmentOptions options) { * @param Type of the activity interface. * @param activityInterface activity interface class that the object under test implements * @param options options that specify the activity invocation parameters - * @param activityMethodOptions activity method-specific invocation parameters + * @param activityMethodOptions a map keyed on activityType.name to its specific invocation + * parameters. By default the name of an activity type is its method name with the first + * letter capitalized. * @return The stub that implements the activity interface. */ T newActivityStub( @@ -103,7 +105,9 @@ T newActivityStub( * @param Type of the activity interface. * @param activityInterface activity interface class that the object under test implements * @param options options that specify the activity invocation parameters - * @param activityMethodOptions activity method-specific invocation parameters + * @param activityMethodOptions a map keyed on activityType.name to its specific invocation + * parameters. By default the name of an activity type is its method name with the first + * letter capitalized. * @return The stub that implements the activity interface. */ T newLocalActivityStub( From c2c7f4db2fc3ad80bfddaeb653f01232d738390a Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Thu, 15 Apr 2021 17:47:36 -0500 Subject: [PATCH 15/15] Clarified documentation --- .../java/io/temporal/testing/TestActivityEnvironment.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java index f369f0606..7d3a7f1e4 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java @@ -88,7 +88,7 @@ static TestActivityEnvironment newInstance(TestEnvironmentOptions options) { * @param Type of the activity interface. * @param activityInterface activity interface class that the object under test implements * @param options options that specify the activity invocation parameters - * @param activityMethodOptions a map keyed on activityType.name to its specific invocation + * @param activityMethodOptions a map keyed on Activity Type Name to its specific invocation * parameters. By default the name of an activity type is its method name with the first * letter capitalized. * @return The stub that implements the activity interface. @@ -105,7 +105,7 @@ T newActivityStub( * @param Type of the activity interface. * @param activityInterface activity interface class that the object under test implements * @param options options that specify the activity invocation parameters - * @param activityMethodOptions a map keyed on activityType.name to its specific invocation + * @param activityMethodOptions a map keyed on Activity Type Name to its specific invocation * parameters. By default the name of an activity type is its method name with the first * letter capitalized. * @return The stub that implements the activity interface.