Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure GetVersion never yields #2376

Merged
merged 2 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public enum SdkFlag {
* Changes behavior of GetVersion to not yield if no previous call existed in history.
*/
SKIP_YIELD_ON_DEFAULT_VERSION(1),
/*
* Changes behavior of GetVersion to never yield.
*/
SKIP_YIELD_ON_VERSION(2),
UNKNOWN(Integer.MAX_VALUE);

private final int value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ public boolean tryUseSdkFlag(SdkFlag flag) {
}
}

/**
* @return True if this flag is set.
*/
public boolean checkSdkFlag(SdkFlag flag) {
if (!supportSdkMetadata) {
return false;
}

return sdkFlags.contains(flag);
}

/**
* @return All flags set since the last call to takeNewSdkFlags.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ void mutableSideEffect(
* @param callback used to return version
* @return True if the identifier is not present in history
*/
boolean getVersion(
Integer getVersion(
Comment on lines 285 to +287
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring needs update

String changeId,
int minSupported,
int maxSupported,
Expand Down Expand Up @@ -417,6 +417,11 @@ boolean getVersion(
*/
boolean tryUseSdkFlag(SdkFlag flag);

/**
* @return true if this flag is currently set.
*/
boolean checkSdkFlag(SdkFlag flag);

/**
* @return The Build ID of the worker which executed the current Workflow Task. May be empty the
* task was completed by a worker without a Build ID. If this worker is the one executing this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ public boolean tryUseSdkFlag(SdkFlag flag) {
return workflowStateMachines.tryUseSdkFlag(flag);
}

@Override
public boolean checkSdkFlag(SdkFlag flag) {
return workflowStateMachines.checkSdkFlag(flag);
}

@Override
public Optional<String> getCurrentBuildId() {
String curTaskBID = workflowStateMachines.getCurrentTaskBuildId();
Expand Down Expand Up @@ -324,7 +329,7 @@ public void mutableSideEffect(
}

@Override
public boolean getVersion(
public Integer getVersion(
String changeId,
int minSupported,
int maxSupported,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ private VersionStateMachine(
* @param callback used to return version
* @return True if the identifier is not present in history
*/
public boolean getVersion(
public Integer getVersion(
Comment on lines 380 to +382
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring needs update here too

int minSupported, int maxSupported, Functions.Proc2<Integer, RuntimeException> callback) {
InvocationStateMachine ism = new InvocationStateMachine(minSupported, maxSupported, callback);
ism.explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
Expand All @@ -390,7 +390,7 @@ public boolean getVersion(
// This means either this version marker did not exist in the original execution or
// the version marker did exist, but was in an earlier WFT. If the version marker was in a
// previous WFT then the version field should have a value.
return !(ism.getState() == VersionStateMachine.State.SKIPPED_REPLAYING && version == null);
return version == null ? preloadedVersion : version;
}

public void handleNonMatchingEvent(HistoryEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ enum HandleEventStatus {
}

/** Initial set of SDK flags that will be set on all new workflow executions. */
private static final List<SdkFlag> initialFlags =
Collections.unmodifiableList(
Collections.singletonList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));
@VisibleForTesting
public static List<SdkFlag> initialFlags =
Collections.unmodifiableList(Arrays.asList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));

/**
* EventId of the WorkflowTaskStarted event of the Workflow Task that was picked up by a worker
Expand Down Expand Up @@ -661,6 +661,13 @@ public boolean tryUseSdkFlag(SdkFlag flag) {
return flags.tryUseSdkFlag(flag);
}

/**
* @return True if the SDK flag is set in the workflow execution
*/
public boolean checkSdkFlag(SdkFlag flag) {
return flags.checkSdkFlag(flag);
}

/**
* @return Set of all new flags set since the last call
*/
Expand Down Expand Up @@ -1074,7 +1081,7 @@ public void mutableSideEffect(
stateMachineSink);
}

public boolean getVersion(
public Integer getVersion(
String changeId,
int minSupported,
int maxSupported,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ private <R> R mutableSideEffectImpl(
@Override
public int getVersion(String changeId, int minSupported, int maxSupported) {
CompletablePromise<Integer> result = Workflow.newPromise();
boolean markerExists =
Integer versionToUse =
replayContext.getVersion(
changeId,
minSupported,
Expand All @@ -1140,12 +1140,34 @@ public int getVersion(String changeId, int minSupported, int maxSupported) {
* because it can lead to non-deterministic scheduling.
* */
if (replayContext.isReplaying()
&& !markerExists
&& versionToUse == null
&& replayContext.tryUseSdkFlag(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION)
&& minSupported == DEFAULT_VERSION) {
return DEFAULT_VERSION;
}

/*
* Previously the SDK would yield on the getVersion call to the scheduler. This is not ideal because it can lead to non-deterministic
* scheduling if the getVersion call was removed.
* */
if (replayContext.checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION)) {
// This can happen if we are replaying a workflow and encounter a getVersion call that did not
// exist on the original execution and the range does not include the default version.
if (versionToUse == null) {
versionToUse = DEFAULT_VERSION;
}
if (versionToUse < minSupported || versionToUse > maxSupported) {
throw new UnsupportedVersion(
new UnsupportedVersion.UnsupportedVersionException(
String.format(
"Version %d of changeId %s is not supported. Supported v is between %d and %d.",
versionToUse, changeId, minSupported, maxSupported)));
}
return versionToUse;
}
// Legacy behavior if SKIP_YIELD_ON_VERSION is not set. This means this thread will yield on the
// getVersion call.
// while it waits for the result.
try {
return result.get();
} catch (UnsupportedVersion.UnsupportedVersionException ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.versionTests;

import io.temporal.internal.common.SdkFlag;
import io.temporal.internal.statemachines.WorkflowStateMachines;
import java.util.Arrays;
import java.util.Collections;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public abstract class BaseVersionTest {

@Parameterized.Parameter public static boolean setVersioningFlag;

@Parameterized.Parameters()
public static Object[] data() {
return new Object[][] {{true}, {false}};
}

@Before
public void setup() {
if (setVersioningFlag) {
WorkflowStateMachines.initialFlags =
Collections.unmodifiableList(
Arrays.asList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION, SdkFlag.SKIP_YIELD_ON_VERSION));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.temporal.workflow.shared.TestWorkflows.TestWorkflowReturnString;
import io.temporal.workflow.unsafe.WorkflowUnsafe;
import java.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -36,7 +37,7 @@
* replayed on a code version that doesn't support the {@link
* io.temporal.workflow.Workflow#DEFAULT_VERSION} anymore
*/
public class DefaultVersionNotSupportedDuringReplayTest {
public class DefaultVersionNotSupportedDuringReplayTest extends BaseVersionTest {

private static final Signal unsupportedVersionExceptionThrown = new Signal();

Expand Down Expand Up @@ -64,6 +65,9 @@ public String execute() {
try {
Workflow.getVersion("test_change", 2, 3);
} catch (UnsupportedVersion e) {
Assert.assertEquals(
"Version -1 of changeId test_change is not supported. Supported v is between 2 and 3.",
e.getMessage());
unsupportedVersionExceptionThrown.signal();
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GetVersionAddNewBeforeTest {
public class GetVersionAddNewBeforeTest extends BaseVersionTest {

private static final Logger log = LoggerFactory.getLogger(GetVersionAddNewBeforeTest.class);
private static int versionFoo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

/** This test reproduces a clash in cancellation scopes with getVersion described here: */
@Issue("https://github.com/temporalio/sdk-java/issues/648")
public class GetVersionAfterScopeCancellationInMainWorkflowMethodTest {
public class GetVersionAfterScopeCancellationInMainWorkflowMethodTest extends BaseVersionTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* io.temporal.internal.statemachines.VersionStateMachineTest#testRecordAfterCommandCancellation}
*/
@Issue("https://github.com/temporalio/sdk-java/issues/615")
public class GetVersionAfterScopeCancellationTest {
public class GetVersionAfterScopeCancellationTest extends BaseVersionTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.junit.Rule;
import org.junit.Test;

public class GetVersionAndTimerTest {
public class GetVersionAndTimerTest extends BaseVersionTest {

@Rule
public SDKTestWorkflowRule testWorkflowRuleWithoutVersion =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.junit.Rule;
import org.junit.Test;

public class GetVersionDefaultInSignalTest {
public class GetVersionDefaultInSignalTest extends BaseVersionTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.junit.Rule;
import org.junit.Test;

public class GetVersionInSignalOnReplayTest {
public class GetVersionInSignalOnReplayTest extends BaseVersionTest {
public static boolean hasReplayedSignal;

@Rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.junit.Rule;
import org.junit.Test;

public class GetVersionInSignalTest {
public class GetVersionInSignalTest extends BaseVersionTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.junit.Rule;
import org.junit.Test;

public class GetVersionMultipleCallsDefaultTest {
public class GetVersionMultipleCallsDefaultTest extends BaseVersionTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.junit.Rule;
import org.junit.Test;

public class GetVersionMultipleCallsTest {
public class GetVersionMultipleCallsTest extends BaseVersionTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
Expand Down
Loading
Loading