Skip to content

Commit

Permalink
Add method to get workflow client from an activity (#2369)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns authored Jan 15, 2025
1 parent cf06131 commit 35e390e
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.uber.m3.tally.Scope;
import io.temporal.client.ActivityCompletionException;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import io.temporal.worker.WorkerOptions;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -140,4 +141,10 @@ public interface ActivityExecutionContext {
* WorkflowServiceStubsOptions.Builder#setMetricsScope(Scope)} when a worker starts up.
*/
Scope getMetricsScope();

/**
* Get a {@link WorkflowClient} that can be used to start interact with the Temporal service from
* an activity.
*/
WorkflowClient getWorkflowClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.temporal.activity.ActivityInfo;
import io.temporal.activity.ManualActivityCompletionClient;
import io.temporal.client.ActivityCompletionException;
import io.temporal.client.WorkflowClient;
import java.lang.reflect.Type;
import java.util.Optional;

Expand Down Expand Up @@ -85,4 +86,9 @@ public ManualActivityCompletionClient useLocalManualCompletion() {
public Scope getMetricsScope() {
return next.getMetricsScope();
}

@Override
public WorkflowClient getWorkflowClient() {
return next.getWorkflowClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
package io.temporal.internal.activity;

import com.uber.m3.tally.Scope;
import io.temporal.client.WorkflowClient;
import io.temporal.common.converter.DataConverter;
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;

public class ActivityExecutionContextFactoryImpl implements ActivityExecutionContextFactory {
private final WorkflowServiceStubs service;
private final WorkflowClient client;
private final String identity;
private final String namespace;
private final Duration maxHeartbeatThrottleInterval;
Expand All @@ -39,14 +39,14 @@ public class ActivityExecutionContextFactoryImpl implements ActivityExecutionCon
private final ManualActivityCompletionClientFactory manualCompletionClientFactory;

public ActivityExecutionContextFactoryImpl(
WorkflowServiceStubs service,
WorkflowClient client,
String identity,
String namespace,
Duration maxHeartbeatThrottleInterval,
Duration defaultHeartbeatThrottleInterval,
DataConverter dataConverter,
ScheduledExecutorService heartbeatExecutor) {
this.service = Objects.requireNonNull(service);
this.client = Objects.requireNonNull(client);
this.identity = identity;
this.namespace = Objects.requireNonNull(namespace);
this.maxHeartbeatThrottleInterval = Objects.requireNonNull(maxHeartbeatThrottleInterval);
Expand All @@ -56,14 +56,14 @@ public ActivityExecutionContextFactoryImpl(
this.heartbeatExecutor = Objects.requireNonNull(heartbeatExecutor);
this.manualCompletionClientFactory =
ManualActivityCompletionClientFactory.newFactory(
service, namespace, identity, dataConverter);
client.getWorkflowServiceStubs(), namespace, identity, dataConverter);
}

@Override
public InternalActivityExecutionContext createContext(
ActivityInfoInternal info, Scope metricsScope) {
return new ActivityExecutionContextImpl(
service,
client,
namespace,
info,
dataConverter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import io.temporal.activity.ActivityInfo;
import io.temporal.activity.ManualActivityCompletionClient;
import io.temporal.client.ActivityCompletionException;
import io.temporal.client.WorkflowClient;
import io.temporal.common.converter.DataConverter;
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
import io.temporal.payload.context.ActivitySerializationContext;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.workflow.Functions;
import java.lang.reflect.Type;
import java.time.Duration;
Expand All @@ -47,6 +47,7 @@
@ThreadSafe
class ActivityExecutionContextImpl implements InternalActivityExecutionContext {
private final Lock lock = new ReentrantLock();
private final WorkflowClient client;
private final ManualActivityCompletionClientFactory manualCompletionClientFactory;
private final Functions.Proc completionHandle;
private final HeartbeatContext heartbeatContext;
Expand All @@ -58,7 +59,7 @@ class ActivityExecutionContextImpl implements InternalActivityExecutionContext {

/** Create an ActivityExecutionContextImpl with the given attributes. */
ActivityExecutionContextImpl(
WorkflowServiceStubs service,
WorkflowClient client,
String namespace,
ActivityInfo info,
DataConverter dataConverter,
Expand All @@ -69,13 +70,14 @@ class ActivityExecutionContextImpl implements InternalActivityExecutionContext {
String identity,
Duration maxHeartbeatThrottleInterval,
Duration defaultHeartbeatThrottleInterval) {
this.client = client;
this.metricsScope = metricsScope;
this.info = info;
this.completionHandle = completionHandle;
this.manualCompletionClientFactory = manualCompletionClientFactory;
this.heartbeatContext =
new HeartbeatContextImpl(
service,
client.getWorkflowServiceStubs(),
namespace,
info,
dataConverter,
Expand Down Expand Up @@ -170,4 +172,9 @@ public ActivityInfo getInfo() {
public Object getLastHeartbeatValue() {
return heartbeatContext.getLastHeartbeatDetails();
}

@Override
public WorkflowClient getWorkflowClient() {
return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
package io.temporal.internal.activity;

import com.uber.m3.tally.Scope;
import io.temporal.client.WorkflowClient;

public class LocalActivityExecutionContextFactoryImpl implements ActivityExecutionContextFactory {
private final WorkflowClient client;

public LocalActivityExecutionContextFactoryImpl() {}
public LocalActivityExecutionContextFactoryImpl(WorkflowClient client) {
this.client = client;
}

@Override
public InternalActivityExecutionContext createContext(
ActivityInfoInternal info, Scope metricsScope) {
return new LocalActivityExecutionContextImpl(info, metricsScope);
return new LocalActivityExecutionContextImpl(client, info, metricsScope);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
import io.temporal.activity.ActivityInfo;
import io.temporal.activity.ManualActivityCompletionClient;
import io.temporal.client.ActivityCompletionException;
import io.temporal.client.WorkflowClient;
import java.lang.reflect.Type;
import java.util.Optional;

class LocalActivityExecutionContextImpl implements InternalActivityExecutionContext {
private final WorkflowClient client;
private final ActivityInfo info;
private final Scope metricsScope;

LocalActivityExecutionContextImpl(ActivityInfo info, Scope metricsScope) {
LocalActivityExecutionContextImpl(WorkflowClient client, ActivityInfo info, Scope metricsScope) {
this.client = client;
this.info = info;
this.metricsScope = metricsScope;
}
Expand Down Expand Up @@ -92,4 +95,9 @@ public Scope getMetricsScope() {
public Object getLastHeartbeatValue() {
return null;
}

@Override
public WorkflowClient getWorkflowClient() {
return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

package io.temporal.internal.worker;

import io.temporal.client.WorkflowClient;
import io.temporal.internal.activity.ActivityExecutionContextFactory;
import io.temporal.internal.activity.ActivityExecutionContextFactoryImpl;
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.tuning.ActivitySlotInfo;
import io.temporal.worker.tuning.SlotSupplier;
import java.time.Duration;
Expand All @@ -47,7 +47,7 @@ public class SyncActivityWorker implements SuspendableWorker {
private final ActivityWorker worker;

public SyncActivityWorker(
WorkflowServiceStubs service,
WorkflowClient client,
String namespace,
String taskQueue,
double taskQueueActivitiesPerSecond,
Expand All @@ -69,7 +69,7 @@ public SyncActivityWorker(
null));
ActivityExecutionContextFactory activityExecutionContextFactory =
new ActivityExecutionContextFactoryImpl(
service,
client,
identity,
namespace,
options.getMaxHeartbeatThrottleInterval(),
Expand All @@ -86,7 +86,7 @@ public SyncActivityWorker(
options.getContextPropagators());
this.worker =
new ActivityWorker(
service,
client.getWorkflowServiceStubs(),
namespace,
taskQueue,
taskQueueActivitiesPerSecond,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@

import io.temporal.api.common.v1.Payloads;
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.client.WorkflowClient;
import io.temporal.common.converter.DataConverter;
import io.temporal.internal.activity.ActivityExecutionContextFactory;
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
import io.temporal.internal.activity.LocalActivityExecutionContextFactoryImpl;
import io.temporal.internal.replay.ReplayWorkflowTaskHandler;
import io.temporal.internal.sync.POJOWorkflowImplementationFactory;
import io.temporal.internal.sync.WorkflowThreadExecutor;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.WorkflowImplementationOptions;
import io.temporal.worker.WorkflowTaskDispatchHandle;
import io.temporal.worker.tuning.LocalActivitySlotInfo;
Expand Down Expand Up @@ -75,7 +75,7 @@ public class SyncWorkflowWorker implements SuspendableWorker {
private boolean runningLocalActivityWorker;

public SyncWorkflowWorker(
@Nonnull WorkflowServiceStubs service,
@Nonnull WorkflowClient client,
@Nonnull String namespace,
@Nonnull String taskQueue,
@Nonnull SingleWorkerOptions singleWorkerOptions,
Expand All @@ -101,7 +101,7 @@ public SyncWorkflowWorker(
namespace);

ActivityExecutionContextFactory laActivityExecutionContextFactory =
new LocalActivityExecutionContextFactoryImpl();
new LocalActivityExecutionContextFactoryImpl(client);
laTaskHandler =
new ActivityTaskHandlerImpl(
namespace,
Expand All @@ -126,12 +126,12 @@ public SyncWorkflowWorker(
singleWorkerOptions,
stickyTaskQueue,
singleWorkerOptions.getStickyQueueScheduleToStartTimeout(),
service,
client.getWorkflowServiceStubs(),
laWorker.getLocalActivityScheduler());

workflowWorker =
new WorkflowWorker(
service,
client.getWorkflowServiceStubs(),
namespace,
taskQueue,
stickyTaskQueueName,
Expand All @@ -152,7 +152,7 @@ public SyncWorkflowWorker(
singleWorkerOptions,
null,
Duration.ZERO,
service,
client.getWorkflowServiceStubs(),
laWorker.getLocalActivityScheduler());

queryReplayHelper = new QueryReplayHelper(nonStickyReplayTaskHandler);
Expand Down
7 changes: 3 additions & 4 deletions temporal-sdk/src/main/java/io/temporal/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import io.temporal.internal.sync.WorkflowThreadExecutor;
import io.temporal.internal.worker.*;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.tuning.*;
import io.temporal.workflow.Functions.Func;
import io.temporal.workflow.WorkflowMethod;
Expand Down Expand Up @@ -94,7 +93,6 @@ public final class Worker {
this.taskQueue = taskQueue;
this.options = WorkerOptions.newBuilder(options).validateAndBuildWithDefaults();
factoryOptions = WorkerFactoryOptions.newBuilder(factoryOptions).validateAndBuildWithDefaults();
WorkflowServiceStubs service = client.getWorkflowServiceStubs();
WorkflowClientOptions clientOptions = client.getOptions();
String namespace = clientOptions.getNamespace();
Map<String, String> tags =
Expand All @@ -114,7 +112,7 @@ public final class Worker {

activityWorker =
new SyncActivityWorker(
service,
client,
namespace,
taskQueue,
this.options.getMaxTaskQueueActivitiesPerSecond(),
Expand Down Expand Up @@ -161,9 +159,10 @@ public final class Worker {
? new FixedSizeSlotSupplier<>(this.options.getMaxConcurrentLocalActivityExecutionSize())
: this.options.getWorkerTuner().getLocalActivitySlotSupplier();
attachMetricsToResourceController(taggedScope, localActivitySlotSupplier);

workflowWorker =
new SyncWorkflowWorker(
service,
client,
namespace,
taskQueue,
singleWorkerOptions,
Expand Down
Loading

0 comments on commit 35e390e

Please sign in to comment.