Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repeated version #2248

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,8 @@ <R> R mutableSideEffect(

int getVersion(String changeId, int minSupported, int maxSupported);

int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported);

void continueAsNew(ContinueAsNewInput input);

void registerQuery(RegisterQueryInput input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public int getVersion(String changeId, int minSupported, int maxSupported) {
return next.getVersion(changeId, minSupported, maxSupported);
}

@Override
public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) {
return next.getVersion(seriesId, iterationId, minSupported, maxSupported);
}

@Override
public void continueAsNew(ContinueAsNewInput input) {
next.continueAsNew(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall
private Map<String, UpdateHandlerInfo> runningUpdateHandlers = new HashMap<>();
// Map of all running signal handlers. Key is the event Id of the signal event.
private Map<Long, SignalHandlerInfo> runningSignalHandlers = new HashMap<>();
// Current versions for the getVersion call that supports iterationId.
private final Map<String, Integer> currentVersions = new HashMap<>();

public SyncWorkflowContext(
@Nonnull String namespace,
Expand Down Expand Up @@ -991,6 +993,46 @@ public int getVersion(String changeId, int minSupported, int maxSupported) {
}
}

@Override
public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) {
Integer currentVersion = currentVersions.get(seriesId);
// When replaying check if there is a marker (by calling getVersion) for each iteration.
if (isReplaying()) {
int iVersion = getVersion(seriesId + "/" + iterationId, minSupported, maxSupported);
if (currentVersion != null) {
if (iVersion < currentVersion) {
throw new IllegalArgumentException(
"getVersion for changeId '"
+ seriesId
+ "/"
+ iterationId
+ "' returned "
+ iVersion
+ " which is smaller than previously found version of "
+ currentVersion);
}
if (iVersion != DEFAULT_VERSION) {
currentVersions.put(seriesId, iVersion);
return iVersion;
}
return currentVersion;
}
return iVersion;
} else {
// When not replaying, only insert a marker (by calling getVersion) if the maxSupported is
// larger than the already recorded one.
if (currentVersion == null || (currentVersion != null && maxSupported > currentVersion)) {
int iVersion = getVersion(seriesId + "/" + iterationId, minSupported, maxSupported);
if (iVersion != maxSupported) {
throw new RuntimeException("getVersion returned wrong version: " + iVersion);
}
currentVersions.put(seriesId, iVersion);
return iVersion;
}
return currentVersion;
}
}

@Override
public void registerQuery(RegisterQueryInput request) {
queryDispatcher.registerQueryHandlers(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,13 @@ public static int getVersion(String changeId, int minSupported, int maxSupported
return getWorkflowOutboundInterceptor().getVersion(changeId, minSupported, maxSupported);
}

public static int getVersion(
String seriesId, String iterationId, int minSupported, int maxSupported) {
assertNotReadOnly("get version");
return getWorkflowOutboundInterceptor()
.getVersion(seriesId, iterationId, minSupported, maxSupported);
}

public static <V> Promise<Void> promiseAllOf(Iterable<Promise<V>> promises) {
return new AllOfPromise(promises);
}
Expand Down
86 changes: 86 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,92 @@ public static int getVersion(String changeId, int minSupported, int maxSupported
return WorkflowInternal.getVersion(changeId, minSupported, maxSupported);
}

/**
* Used to perform workflow safe code changes in code that is called repeatedly like body of a
* loop or a signal handler.
*
* <p>Consider the following example:
*
* <pre>
* for (int i=0; i<100; i++) {
* if (getVersion("fix1", DEFAULT_VERSION, 1) == DEFAULT_VERSION) {
* // OLD CODE
* } else {
* // NEW CODE
* }
* }
* </pre>
*
* If the change is introduced after the loop starts executing, all iterations are going to use
* the default version, even if most of them happen after the change. This happens because the *
* getVersion call returns the same version for all calls that share a changeId. The same issue *
* arises when changing code in callbacks like signal or update handlers. Frequently, there is a
* need for a new version used for newer iterations (or signal handler invocations).
*
* <p>The following solution supports updating the version of each iteration separately, as it
* uses a different changeId for each iteration:
*
* <pre>
* for (int i=0; i<100; i++) {
* if (getVersion("fix1-" + i, DEFAULT_VERSION, 1) == DEFAULT_VERSION) {
* // OLD CODE
* } else {
* // NEW CODE
* }
* }
* </pre>
*
* The drawback is a marker event as well as a search attribute update for each iteration. So, it
* is not practical for the large number of iterations.
*
* <p>This method provides an efficient alternative to the solution that uses a different changeId
* for each iteration. It only inserts a marker when a version changes.
*
* <p>Here is how it could be used:
*
* <pre>
* for (int i=0; i<100; i++) {
* if (getVersion("fix1", String.valueOf(i), DEFAULT_VERSION, 1) == DEFAULT_VERSION) {
* // OLD CODE
* } else {
* // NEW CODE
* }
* }
* </pre>
*
* Adding more branches later is OK, assuming that the series stays the same. During replay, the
* iterationId should return the same values as in the original execution.
*
* <pre>
* for (int i=0; i<100; i++) {
* int v = getVersion("fix1", String.valueOf(i), DEFAULT_VERSION, 2);
* if (v == DEFAULT_VERSION) {
* // OLD CODE
* } else if (v == 1) {
* // CODE FOR THE FIRST CHANGE
* } else {
* // CODE FOR THE LAST CHANGE
* }
* }
* </pre>
*
* All calls with the same seriesId and iterationId argument return the same value. But only if
* they follow each other. The moment a call with a different iteration is made, the version
* changes.
*
* @param seriesId identifier of a series of changes.
* @param iterationId identifier of each iteration over the changed code.
* @param minSupported min version supported for the change
* @param maxSupported max version supported for the change, this version is used as the current
* one during the original execution.
* @return {@code maxSupported} when is originally executed. Original version recorded in the
* history on replays.
*/
public static int getVersion(
String seriesId, String iterationId, int minSupported, int maxSupported) {
return WorkflowInternal.getVersion(seriesId, iterationId, minSupported, maxSupported);
}

/**
* Get scope for reporting business metrics in workflow logic. This should be used instead of
* creating new metrics scopes as it is able to dedupe metrics during replay.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.versionTests;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.client.WorkflowStub;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.WorkerOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
import io.temporal.workflow.shared.TestActivities.VariousTestActivities;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
import io.temporal.workflow.unsafe.WorkflowUnsafe;
import java.time.Duration;
import java.util.List;
import org.junit.Rule;
import org.junit.Test;

public class GetVersionSeriesTest {

private static boolean hasReplayed;

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestGetVersionSeriesWorkflowImpl.class)
.setActivityImplementations(new TestActivitiesImpl())
// Forcing a replay. Full history arrived from a normal queue causing a replay.
.setWorkerOptions(
WorkerOptions.newBuilder()
.setStickyQueueScheduleToStartTimeout(Duration.ZERO)
.build())
.build();

@Test
public void testGetVersion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
assertTrue(hasReplayed);
assertEquals("foo", result);
WorkflowStub untyped = WorkflowStub.fromTyped(workflowStub);
List<HistoryEvent> markers =
testWorkflowRule.getHistoryEvents(
untyped.getExecution().getWorkflowId(), EventType.EVENT_TYPE_MARKER_RECORDED);
assertEquals(10, markers.size());
}

public static class TestGetVersionSeriesWorkflowImpl implements TestWorkflow1 {

@Override
public String execute(String taskQueue) {
VariousTestActivities testActivities =
Workflow.newActivityStub(
VariousTestActivities.class,
SDKTestOptions.newActivityOptionsForTaskQueue(taskQueue));

for (int i = 0; i < 20; i++) {
// Test adding a version check in non-replay code.
int maxSupported = i / 2 + 1;
int version =
Workflow.getVersion(
"s1", String.valueOf(maxSupported), Workflow.DEFAULT_VERSION, maxSupported);
assertEquals(version, maxSupported);
testActivities.activity2("activity2", 2);
}

// Test adding a version check in replay code.
if (WorkflowUnsafe.isReplaying()) {
hasReplayed = true;
}
// Force replay
Workflow.sleep(1000);
return "foo";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ public class GetVersionTest {
public void testGetVersion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
assertTrue(hasReplayed);
assertEquals("activity22activity1activity1activity1", result);
testWorkflowRule
.getInterceptor(TracingWorkerInterceptor.class)
.setExpected(
Expand All @@ -77,6 +74,10 @@ public void testGetVersion() {
"getVersion",
"executeActivity customActivity1",
"activity customActivity1");

String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
assertTrue(hasReplayed);
assertEquals("activity22activity1activity1activity1", result);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,11 @@ public int getVersion(String changeId, int minSupported, int maxSupported) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public void continueAsNew(ContinueAsNewInput input) {
throw new UnsupportedOperationException("not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ public int getVersion(String changeId, int minSupported, int maxSupported) {
return next.getVersion(changeId, minSupported, maxSupported);
}

@Override
public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) {
if (!WorkflowUnsafe.isReplaying()) {
trace.add("getVersionSeries");
}
return next.getVersion(seriesId, iterationId, minSupported, maxSupported);
}

@Override
public void continueAsNew(ContinueAsNewInput input) {
if (!WorkflowUnsafe.isReplaying()) {
Expand Down
Loading