Skip to content

Commit

Permalink
Refactor how DeterministicRunnerImpl and SyncWorkflowContext handle c…
Browse files Browse the repository at this point in the history
…reation of workflow root and method threads
  • Loading branch information
Spikhalskiy committed Jul 1, 2021
1 parent 820627b commit c4be0e0
Show file tree
Hide file tree
Showing 21 changed files with 399 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package io.temporal.common.interceptors;

import javax.annotation.Nullable;

/**
* Intercepts calls to the workflow execution. Executes under workflow context. So all the
* restrictions on the workflow code should be obeyed.
Expand Down Expand Up @@ -128,4 +130,24 @@ public Object getResult() {

/** Called when a workflow is queried. */
QueryOutput handleQuery(QueryInput input);

/**
* Intercepts creation of the workflow main method thread
*
* @param runnable thread function to run
* @param name name of the thread, optional
* @return created workflow thread. Should be treated as a pass-through object that shouldn't be
* manipulated in any way by the interceptor code.
*/
Object newWorkflowMethodThread(Runnable runnable, @Nullable String name);

/**
* Intercepts creation of the workflow callback thread
*
* @param runnable thread function to run
* @param name name of the thread, optional
* @return created workflow thread. Should be treated as a pass-through object that shouldn't be
* manipulated in any way by the interceptor code.
*/
Object newCallbackThread(Runnable runnable, @Nullable String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,14 @@ public void handleSignal(SignalInput input) {
public QueryOutput handleQuery(QueryInput input) {
return next.handleQuery(input);
}

@Override
public Object newWorkflowMethodThread(Runnable runnable, String name) {
return next.newWorkflowMethodThread(runnable, name);
}

@Override
public Object newCallbackThread(Runnable runnable, String name) {
return next.newCallbackThread(runnable, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@
import io.temporal.activity.ActivityOptions;
import io.temporal.activity.LocalActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.ContinueAsNewOptions;
import io.temporal.workflow.DynamicQueryHandler;
import io.temporal.workflow.DynamicSignalHandler;
import io.temporal.workflow.Functions;
import io.temporal.workflow.*;
import io.temporal.workflow.Functions.Func;
import io.temporal.workflow.Promise;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -483,7 +478,18 @@ <R> R mutableSideEffect(

void upsertSearchAttributes(Map<String, Object> searchAttributes);

Object newThread(Runnable runnable, boolean detached, String name);
/**
* Intercepts creation of the workflow child thread.
*
* <p>Please note, that "workflow child thread" and "child workflow" are different and independent
* concepts.
*
* @param runnable thread function to run
* @param detached if this thread is detached from the parent {@link CancellationScope}
* @param name name of the thread
* @return created WorkflowThread
*/
Object newChildThread(Runnable runnable, boolean detached, String name);

long currentTimeMillis();
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ public void upsertSearchAttributes(Map<String, Object> searchAttributes) {
}

@Override
public Object newThread(Runnable runnable, boolean detached, String name) {
return next.newThread(runnable, detached, name);
public Object newChildThread(Runnable runnable, boolean detached, String name) {
return next.newChildThread(runnable, detached, name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,15 @@ private static <R> Promise<R> execute(boolean async, Functions.Func<R> func) {
}
} else {
CompletablePromise<R> result = Workflow.newPromise();
WorkflowInternal.newThread(
false,
WorkflowThread.newThread(
() -> {
try {
result.complete(func.apply());
} catch (Exception e) {
result.completeExceptionally(Workflow.wrap(e));
}
})
},
false)
.start();
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.internal.sync;

import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;

public class BaseRootWorkflowInboundCallsInterceptor implements WorkflowInboundCallsInterceptor {
protected final SyncWorkflowContext workflowContext;

public BaseRootWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) {
this.workflowContext = workflowContext;
}

@Override
public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
workflowContext.initHeadOutboundCallsInterceptor(outboundCalls);
}

@Override
public WorkflowOutput execute(WorkflowInput input) {
throw new UnsupportedOperationException();
}

@Override
public void handleSignal(SignalInput input) {
workflowContext.handleInterceptedSignal(input);
}

@Override
public QueryOutput handleQuery(QueryInput input) {
return workflowContext.handleInterceptedQuery(input);
}

@Override
public Object newWorkflowMethodThread(Runnable runnable, String name) {
return workflowContext.newWorkflowMethodThreadIntercepted(runnable, name);
}

@Override
public Object newCallbackThread(Runnable runnable, String name) {
return workflowContext.newWorkflowCallbackThreadIntercepted(runnable, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@

package io.temporal.internal.sync;

import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.internal.replay.WorkflowExecutorCache;
import io.temporal.workflow.CancellationScope;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* Executes code passed to {@link #newRunner(Runnable)} as well as threads created from it using
* {@link WorkflowInternal#newThread(boolean, Runnable)} deterministically. Requires use of provided
* {@link WorkflowThread#newThread(Runnable, boolean)} deterministically. Requires use of provided
* wrappers for synchronization and notification instead of native ones.
*/
interface DeterministicRunner {
Expand Down Expand Up @@ -118,9 +118,11 @@ static DeterministicRunner newRunner(
void executeInWorkflowThread(String name, Runnable r);

/**
* Creates a new instance of a workflow thread. To be called only from another workflow thread.
* Creates a new instance of a workflow child thread. To be called only from another workflow
* thread.
*/
WorkflowThread newThread(Runnable runnable, boolean detached, String name);
WorkflowThread newWorkflowThread(Runnable runnable, boolean detached, @Nullable String name);

void setInterceptorHead(WorkflowOutboundCallsInterceptor interceptorHead);
/** Creates a new instance of a workflow callback thread. */
WorkflowThread newCallbackThread(Runnable runnable, @Nullable String name);
}
Loading

0 comments on commit c4be0e0

Please sign in to comment.