diff --git a/temporal-sdk/src/main/java/io/temporal/activity/ActivityExecutionContext.java b/temporal-sdk/src/main/java/io/temporal/activity/ActivityExecutionContext.java index 6c1247ca7..5eebf0770 100644 --- a/temporal-sdk/src/main/java/io/temporal/activity/ActivityExecutionContext.java +++ b/temporal-sdk/src/main/java/io/temporal/activity/ActivityExecutionContext.java @@ -22,6 +22,7 @@ import com.uber.m3.tally.Scope; import io.temporal.client.ActivityCompletionException; +import io.temporal.client.WorkflowClient; import io.temporal.serviceclient.WorkflowServiceStubsOptions; import io.temporal.worker.WorkerOptions; import java.lang.reflect.Type; @@ -140,4 +141,10 @@ public interface ActivityExecutionContext { * WorkflowServiceStubsOptions.Builder#setMetricsScope(Scope)} when a worker starts up. */ Scope getMetricsScope(); + + /** + * Get a {@link WorkflowClient} that can be used to start interact with the Temporal service from + * an activity. + */ + WorkflowClient getWorkflowClient(); } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityExecutionContextBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityExecutionContextBase.java index 654a6cdf3..bb62067d2 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityExecutionContextBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityExecutionContextBase.java @@ -25,6 +25,7 @@ import io.temporal.activity.ActivityInfo; import io.temporal.activity.ManualActivityCompletionClient; import io.temporal.client.ActivityCompletionException; +import io.temporal.client.WorkflowClient; import java.lang.reflect.Type; import java.util.Optional; @@ -85,4 +86,9 @@ public ManualActivityCompletionClient useLocalManualCompletion() { public Scope getMetricsScope() { return next.getMetricsScope(); } + + @Override + public WorkflowClient getWorkflowClient() { + return next.getWorkflowClient(); + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactoryImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactoryImpl.java index 00f51f8c6..49193362b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactoryImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactoryImpl.java @@ -21,15 +21,15 @@ package io.temporal.internal.activity; import com.uber.m3.tally.Scope; +import io.temporal.client.WorkflowClient; import io.temporal.common.converter.DataConverter; import io.temporal.internal.client.external.ManualActivityCompletionClientFactory; -import io.temporal.serviceclient.WorkflowServiceStubs; import java.time.Duration; import java.util.Objects; import java.util.concurrent.ScheduledExecutorService; public class ActivityExecutionContextFactoryImpl implements ActivityExecutionContextFactory { - private final WorkflowServiceStubs service; + private final WorkflowClient client; private final String identity; private final String namespace; private final Duration maxHeartbeatThrottleInterval; @@ -39,14 +39,14 @@ public class ActivityExecutionContextFactoryImpl implements ActivityExecutionCon private final ManualActivityCompletionClientFactory manualCompletionClientFactory; public ActivityExecutionContextFactoryImpl( - WorkflowServiceStubs service, + WorkflowClient client, String identity, String namespace, Duration maxHeartbeatThrottleInterval, Duration defaultHeartbeatThrottleInterval, DataConverter dataConverter, ScheduledExecutorService heartbeatExecutor) { - this.service = Objects.requireNonNull(service); + this.client = Objects.requireNonNull(client); this.identity = identity; this.namespace = Objects.requireNonNull(namespace); this.maxHeartbeatThrottleInterval = Objects.requireNonNull(maxHeartbeatThrottleInterval); @@ -56,14 +56,14 @@ public ActivityExecutionContextFactoryImpl( this.heartbeatExecutor = Objects.requireNonNull(heartbeatExecutor); this.manualCompletionClientFactory = ManualActivityCompletionClientFactory.newFactory( - service, namespace, identity, dataConverter); + client.getWorkflowServiceStubs(), namespace, identity, dataConverter); } @Override public InternalActivityExecutionContext createContext( ActivityInfoInternal info, Scope metricsScope) { return new ActivityExecutionContextImpl( - service, + client, namespace, info, dataConverter, diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java index f21a8df88..bd065a86f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java @@ -25,10 +25,10 @@ import io.temporal.activity.ActivityInfo; import io.temporal.activity.ManualActivityCompletionClient; import io.temporal.client.ActivityCompletionException; +import io.temporal.client.WorkflowClient; import io.temporal.common.converter.DataConverter; import io.temporal.internal.client.external.ManualActivityCompletionClientFactory; import io.temporal.payload.context.ActivitySerializationContext; -import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.workflow.Functions; import java.lang.reflect.Type; import java.time.Duration; @@ -47,6 +47,7 @@ @ThreadSafe class ActivityExecutionContextImpl implements InternalActivityExecutionContext { private final Lock lock = new ReentrantLock(); + private final WorkflowClient client; private final ManualActivityCompletionClientFactory manualCompletionClientFactory; private final Functions.Proc completionHandle; private final HeartbeatContext heartbeatContext; @@ -58,7 +59,7 @@ class ActivityExecutionContextImpl implements InternalActivityExecutionContext { /** Create an ActivityExecutionContextImpl with the given attributes. */ ActivityExecutionContextImpl( - WorkflowServiceStubs service, + WorkflowClient client, String namespace, ActivityInfo info, DataConverter dataConverter, @@ -69,13 +70,14 @@ class ActivityExecutionContextImpl implements InternalActivityExecutionContext { String identity, Duration maxHeartbeatThrottleInterval, Duration defaultHeartbeatThrottleInterval) { + this.client = client; this.metricsScope = metricsScope; this.info = info; this.completionHandle = completionHandle; this.manualCompletionClientFactory = manualCompletionClientFactory; this.heartbeatContext = new HeartbeatContextImpl( - service, + client.getWorkflowServiceStubs(), namespace, info, dataConverter, @@ -170,4 +172,9 @@ public ActivityInfo getInfo() { public Object getLastHeartbeatValue() { return heartbeatContext.getLastHeartbeatDetails(); } + + @Override + public WorkflowClient getWorkflowClient() { + return client; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextFactoryImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextFactoryImpl.java index 3305b635a..e569c48fb 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextFactoryImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextFactoryImpl.java @@ -21,14 +21,18 @@ package io.temporal.internal.activity; import com.uber.m3.tally.Scope; +import io.temporal.client.WorkflowClient; public class LocalActivityExecutionContextFactoryImpl implements ActivityExecutionContextFactory { + private final WorkflowClient client; - public LocalActivityExecutionContextFactoryImpl() {} + public LocalActivityExecutionContextFactoryImpl(WorkflowClient client) { + this.client = client; + } @Override public InternalActivityExecutionContext createContext( ActivityInfoInternal info, Scope metricsScope) { - return new LocalActivityExecutionContextImpl(info, metricsScope); + return new LocalActivityExecutionContextImpl(client, info, metricsScope); } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java index ad9c2a9d1..006c0cfe9 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java @@ -24,14 +24,17 @@ import io.temporal.activity.ActivityInfo; import io.temporal.activity.ManualActivityCompletionClient; import io.temporal.client.ActivityCompletionException; +import io.temporal.client.WorkflowClient; import java.lang.reflect.Type; import java.util.Optional; class LocalActivityExecutionContextImpl implements InternalActivityExecutionContext { + private final WorkflowClient client; private final ActivityInfo info; private final Scope metricsScope; - LocalActivityExecutionContextImpl(ActivityInfo info, Scope metricsScope) { + LocalActivityExecutionContextImpl(WorkflowClient client, ActivityInfo info, Scope metricsScope) { + this.client = client; this.info = info; this.metricsScope = metricsScope; } @@ -92,4 +95,9 @@ public Scope getMetricsScope() { public Object getLastHeartbeatValue() { return null; } + + @Override + public WorkflowClient getWorkflowClient() { + return client; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java index 5fd7a96e7..6a3225ecc 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java @@ -20,10 +20,10 @@ package io.temporal.internal.worker; +import io.temporal.client.WorkflowClient; import io.temporal.internal.activity.ActivityExecutionContextFactory; import io.temporal.internal.activity.ActivityExecutionContextFactoryImpl; import io.temporal.internal.activity.ActivityTaskHandlerImpl; -import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.tuning.ActivitySlotInfo; import io.temporal.worker.tuning.SlotSupplier; import java.time.Duration; @@ -47,7 +47,7 @@ public class SyncActivityWorker implements SuspendableWorker { private final ActivityWorker worker; public SyncActivityWorker( - WorkflowServiceStubs service, + WorkflowClient client, String namespace, String taskQueue, double taskQueueActivitiesPerSecond, @@ -69,7 +69,7 @@ public SyncActivityWorker( null)); ActivityExecutionContextFactory activityExecutionContextFactory = new ActivityExecutionContextFactoryImpl( - service, + client, identity, namespace, options.getMaxHeartbeatThrottleInterval(), @@ -86,7 +86,7 @@ public SyncActivityWorker( options.getContextPropagators()); this.worker = new ActivityWorker( - service, + client.getWorkflowServiceStubs(), namespace, taskQueue, taskQueueActivitiesPerSecond, diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java index feb1aa1ab..ff33427e4 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java @@ -24,6 +24,7 @@ import io.temporal.api.common.v1.Payloads; import io.temporal.api.taskqueue.v1.TaskQueue; +import io.temporal.client.WorkflowClient; import io.temporal.common.converter.DataConverter; import io.temporal.internal.activity.ActivityExecutionContextFactory; import io.temporal.internal.activity.ActivityTaskHandlerImpl; @@ -31,7 +32,6 @@ import io.temporal.internal.replay.ReplayWorkflowTaskHandler; import io.temporal.internal.sync.POJOWorkflowImplementationFactory; import io.temporal.internal.sync.WorkflowThreadExecutor; -import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.WorkflowImplementationOptions; import io.temporal.worker.WorkflowTaskDispatchHandle; import io.temporal.worker.tuning.LocalActivitySlotInfo; @@ -75,7 +75,7 @@ public class SyncWorkflowWorker implements SuspendableWorker { private boolean runningLocalActivityWorker; public SyncWorkflowWorker( - @Nonnull WorkflowServiceStubs service, + @Nonnull WorkflowClient client, @Nonnull String namespace, @Nonnull String taskQueue, @Nonnull SingleWorkerOptions singleWorkerOptions, @@ -101,7 +101,7 @@ public SyncWorkflowWorker( namespace); ActivityExecutionContextFactory laActivityExecutionContextFactory = - new LocalActivityExecutionContextFactoryImpl(); + new LocalActivityExecutionContextFactoryImpl(client); laTaskHandler = new ActivityTaskHandlerImpl( namespace, @@ -126,12 +126,12 @@ public SyncWorkflowWorker( singleWorkerOptions, stickyTaskQueue, singleWorkerOptions.getStickyQueueScheduleToStartTimeout(), - service, + client.getWorkflowServiceStubs(), laWorker.getLocalActivityScheduler()); workflowWorker = new WorkflowWorker( - service, + client.getWorkflowServiceStubs(), namespace, taskQueue, stickyTaskQueueName, @@ -152,7 +152,7 @@ public SyncWorkflowWorker( singleWorkerOptions, null, Duration.ZERO, - service, + client.getWorkflowServiceStubs(), laWorker.getLocalActivityScheduler()); queryReplayHelper = new QueryReplayHelper(nonStickyReplayTaskHandler); diff --git a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java index 1e1f8a42d..61f522495 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java @@ -36,7 +36,6 @@ import io.temporal.internal.sync.WorkflowThreadExecutor; import io.temporal.internal.worker.*; import io.temporal.serviceclient.MetricsTag; -import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.tuning.*; import io.temporal.workflow.Functions.Func; import io.temporal.workflow.WorkflowMethod; @@ -94,7 +93,6 @@ public final class Worker { this.taskQueue = taskQueue; this.options = WorkerOptions.newBuilder(options).validateAndBuildWithDefaults(); factoryOptions = WorkerFactoryOptions.newBuilder(factoryOptions).validateAndBuildWithDefaults(); - WorkflowServiceStubs service = client.getWorkflowServiceStubs(); WorkflowClientOptions clientOptions = client.getOptions(); String namespace = clientOptions.getNamespace(); Map tags = @@ -114,7 +112,7 @@ public final class Worker { activityWorker = new SyncActivityWorker( - service, + client, namespace, taskQueue, this.options.getMaxTaskQueueActivitiesPerSecond(), @@ -161,9 +159,10 @@ public final class Worker { ? new FixedSizeSlotSupplier<>(this.options.getMaxConcurrentLocalActivityExecutionSize()) : this.options.getWorkerTuner().getLocalActivitySlotSupplier(); attachMetricsToResourceController(taggedScope, localActivitySlotSupplier); + workflowWorker = new SyncWorkflowWorker( - service, + client, namespace, taskQueue, singleWorkerOptions, diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityClientTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityClientTest.java new file mode 100644 index 000000000..47a4f0adb --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityClientTest.java @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.activityTests; + +import io.temporal.activity.Activity; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityOptions; +import io.temporal.activity.LocalActivityOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class ActivityClientTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestWorkflowImpl.class) + .setActivityImplementations(new ActivityClientTestActivitiesImpl()) + .build(); + + @Test + public void testActivityGetWorkflowClient() { + TestWorkflow stub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow.class); + Assert.assertEquals("from activity of TestWorkflow", stub.execute(false)); + } + + @Test + public void testLocalActivityGetWorkflowClient() { + TestWorkflow stub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow.class); + Assert.assertEquals("from local activity of TestWorkflow", stub.execute(true)); + } + + @ActivityInterface + public interface ActivityClientTestActivities { + String query(String workflowId); + } + + public static class ActivityClientTestActivitiesImpl implements ActivityClientTestActivities { + @Override + public String query(String workflowId) { + String workflowType = + Activity.getExecutionContext() + .getWorkflowClient() + .newUntypedWorkflowStub(workflowId) + .describe() + .getWorkflowType(); + if (Activity.getExecutionContext().getInfo().isLocal()) { + return "from local activity of " + workflowType; + } else { + return "from activity of " + workflowType; + } + } + } + + @WorkflowInterface + public interface TestWorkflow { + + @WorkflowMethod + String execute(boolean local); + } + + public static class TestWorkflowImpl implements TestWorkflow { + + private final ActivityClientTestActivities activities = + Workflow.newActivityStub( + ActivityClientTestActivities.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(5)).build()); + + private final ActivityClientTestActivities localActivities = + Workflow.newLocalActivityStub( + ActivityClientTestActivities.class, + LocalActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(5)) + .build()); + + @Override + public String execute(boolean local) { + if (local) { + return localActivities.query(Workflow.getInfo().getWorkflowId()); + } else { + return activities.query(Workflow.getInfo().getWorkflowId()); + } + } + } +} diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index 1f13e16e8..2f9c2087f 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -39,6 +39,7 @@ import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest; import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedRequest; import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; +import io.temporal.client.WorkflowClient; import io.temporal.common.SearchAttributeUpdate; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.EncodedValues; @@ -123,7 +124,8 @@ public TestActivityEnvironmentInternal(@Nullable TestEnvironmentOptions options) ActivityExecutionContextFactory activityExecutionContextFactory = new ActivityExecutionContextFactoryImpl( - workflowServiceStubs, + WorkflowClient.newInstance( + this.workflowServiceStubs, testEnvironmentOptions.getWorkflowClientOptions()), testEnvironmentOptions.getWorkflowClientOptions().getIdentity(), testEnvironmentOptions.getWorkflowClientOptions().getNamespace(), WorkerOptions.getDefaultInstance().getMaxHeartbeatThrottleInterval(),