Skip to content

Commit

Permalink
Split up & use more appropriate timeouts for parallel LAs test
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Jan 22, 2025
1 parent b471e13 commit 4b50c59
Showing 1 changed file with 31 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.temporal.workflow.activityTests;

import io.temporal.activity.LocalActivityOptions;
import io.temporal.client.WorkflowOptions;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
Expand All @@ -38,10 +39,20 @@
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class ParallelLocalActivitiesTest {

private final TestActivitiesImpl activitiesImpl = new TestActivitiesImpl();
static final int TOTAL_LOCAL_ACT_COUNT = 100;
@Parameterized.Parameter public int maxLocalActivityExecutionSize;

@Parameterized.Parameters
public static Object[] data() {
return new Object[] {50, TOTAL_LOCAL_ACT_COUNT};
}

@Rule
public SDKTestWorkflowRule testWorkflowRule =
Expand All @@ -52,7 +63,9 @@ public class ParallelLocalActivitiesTest {
// Use a number lower than the number of concurrent activities to ensure that the
// queueing of LAs when task executor is full works
.setWorkerOptions(
WorkerOptions.newBuilder().setMaxConcurrentLocalActivityExecutionSize(50).build())
WorkerOptions.newBuilder()
.setMaxConcurrentLocalActivityExecutionSize(maxLocalActivityExecutionSize)
.build())
.build();

@Test
Expand All @@ -66,16 +79,18 @@ public void testParallelLocalActivities() {

TestWorkflow1 workflowStub =
testWorkflowRule.getWorkflowClient().newWorkflowStub(TestWorkflow1.class, options);
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
String willQueue = maxLocalActivityExecutionSize < TOTAL_LOCAL_ACT_COUNT ? "yes" : "";
String result = workflowStub.execute(willQueue);
Assert.assertEquals("done", result);
Assert.assertEquals(activitiesImpl.toString(), 100, activitiesImpl.invocations.size());
List<String> expected = new ArrayList<String>();
Assert.assertEquals(
activitiesImpl.toString(), TOTAL_LOCAL_ACT_COUNT, activitiesImpl.invocations.size());
List<String> expected = new ArrayList<>();
expected.add("interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP);
expected.add("newThread workflow-method");
for (int i = 0; i < TestParallelLocalActivitiesWorkflowImpl.COUNT; i++) {
for (int i = 0; i < TOTAL_LOCAL_ACT_COUNT; i++) {
expected.add("executeLocalActivity SleepActivity");
}
for (int i = 0; i < TestParallelLocalActivitiesWorkflowImpl.COUNT; i++) {
for (int i = 0; i < TOTAL_LOCAL_ACT_COUNT; i++) {
expected.add("local activity SleepActivity");
}
testWorkflowRule
Expand All @@ -84,16 +99,21 @@ public void testParallelLocalActivities() {
}

public static class TestParallelLocalActivitiesWorkflowImpl implements TestWorkflow1 {
static final int COUNT = 100;

@Override
public String execute(String taskQueue) {
public String execute(String willQueue) {
LocalActivityOptions laOptions = SDKTestOptions.newLocalActivityOptions();
// For the case where LAs will be forced to queue, we want to use start-to-close rather
// than schedule-to-start timeouts.
if (!willQueue.isEmpty()) {
laOptions =
LocalActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(5)).build();
}
VariousTestActivities localActivities =
Workflow.newLocalActivityStub(
VariousTestActivities.class, SDKTestOptions.newLocalActivityOptions());
Workflow.newLocalActivityStub(VariousTestActivities.class, laOptions);
List<Promise<String>> laResults = new ArrayList<>();
Random r = Workflow.newRandom();
for (int i = 0; i < COUNT; i++) {
for (int i = 0; i < TOTAL_LOCAL_ACT_COUNT; i++) {
laResults.add(Async.function(localActivities::sleepActivity, (long) r.nextInt(3000), i));
}
Promise.allOf(laResults).get();
Expand Down

0 comments on commit 4b50c59

Please sign in to comment.